
    pi;                     `   U d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlZd dl	m
Z
mZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZmZ d dlm Z m!Z!m"Z"m#Z#m$Z$m%Z% d d	l&m'Z' d d
l(m)Z)  ejT                  e+      Z,ejZ                  e.d<   ddgZ/ G d de      Z0 G d de
      Z1y)    N)AnyOptional)FileSystemReaderFileSystemWriter)consolidate_safetensors_files)_gen_file_name_HFStorageInfo_metadata_fnCUSTOM_METADATA_KEYSAVED_OFFSETS_KEYSHARDED_DIR_NAMESUFFIX)SerializationFormat)ChunkStorageMetadataMetadataMetadataIndexStorageMetaTensorPropertiesTensorStorageMetadata)LoadPlanLoadPlannerReadItemSavePlanSavePlanner	WriteItem)WriteResult)FutureloggerHuggingFaceStorageWriterHuggingFaceStorageReaderc                       e Zd ZdZ	 	 	 	 	 ddedeeeef      dedededed	df fd
Z	de
e   d	e
e   fdZdeded	ee
e      f fdZdede
e
e      d	dfdZdeeeef      de
e   d	eee
e   f   fdZed	efd       Z xZS )r   zP
    A writer that writes to storage in the huggingface safetensors format.
    Npathfqn_to_index_mappingthread_countsave_distributedenable_consolidationthread_count_consolidationreturnc                 6   t         |   |t        j                  |       || _        || _        || _        d| _        | j                  rIt        | j                        | _        | j                  j                  | j                  t              | _	        || _        y)a  
        Initialize the huggingface writer pointing to path.

        Args:
            path: directory where the checkpoint will be read from.
            fqn_to_index_mapping: A mapping from tensor FQN to the index of the file that the tensor should be written to.
                              Indices are from 1 to N, where N is the number of files. If not provided,
                              the tensors will be written to a single file. If none, then all the tensors on the
                              same rank will be written to the same file.
            thread_count: Number of threads to use to write distributed checkpoint. Default to 1.
            save_distributed: If True, save the checkpoint using distributed APIs where every rank saves its own shard.
                        Default is False which assumes rank-0 checkpointing of the full state_dict.
            enable_consolidation: If True, consolidate the sharded checkpoint after saving. The sharded tensors will be
                                saved to path/sharded and the full tensors will be saved to path. Default to False.
            thread_count_consolidation: Number of threads to use for parallel processing of saving data
                                to consolidated output files. Default to 1.
        )r"   serialization_formatr$   N)super__init__r   SAFETENSORSr#   r%   r&   consolidated_output_pathstrr"   fsconcat_pathr   r'   )selfr"   r#   r$   r%   r&   r'   	__class__s          i/opt/services/ai/voice_agent/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/hf_storage.pyr,   z!HuggingFaceStorageWriter.__init__6   s    6 	!4!@!@% 	 	

 ?S!&6*>!7;%$$,/		ND)++DII7GHDI*D'    plansc                     g }t        |d      D ]Y  \  }}i }| j                  | j                  |d<   | j                  r||d<   |j                  t	        j
                  ||             [ |S )N   )startr#   shard_index)storage_data)	enumerater#   r%   appenddataclassesreplace)r2   r6   	new_plansiplanr;   s         r4   prepare_global_planz,HuggingFaceStorageWriter.prepare_global_plan_   s{    	 a0 	SGAt+-L((47;7P7P34$$./]+[00LQR	S r5   rB   plannerc                 &   t        |j                        dk(  rt               }|j                  g        |S |j                  }d }d }d|v r|d   }d|v r|d   }| j                  ||j                        }|t        |j                               nd}t        j                         }	|j                         D ]J  \  }
}t        |
||      }|	j                  | j                  j                  | j                  |      ||f       L t        | A  ||	      S )Nr   r#   r:   r8   )lenitemsr   
set_resultr;   _split_by_storage_planmaxvaluesqueueQueuer   putr0   r1   r"   r+   _write_data)r2   rB   rD   futr;   storage_planr:   bucketshighest_index
file_queue
file_indexwrite_items	file_namer3   s                r4   
write_dataz#HuggingFaceStorageWriter.write_datal   s   
 tzz?a (CNN2J (,'8'815%)!\1'(>?LL(&}5K--lDJJG6B6NL//12TU"'++-
'.}} 	#J&z=+NINN$$TYY	:I{S	 w"7J77r5   metadataresultsc                    | j                   r"| j                  st        j                  d       y | j                   r|| j                  | j                  n-t
        j                  |j                  j                         d      }t        t        | j                        | j                  | j                  |      S i }i }d}|D ]z  }|j                  |D ci c]-  }|j                  j                   |j"                  j$                  / c}       |t'        |D cg c]  }|j"                  j(                   c}      z  }| d|i|d<   ||d<   | j*                  j-                  | j                  t.               }	| j*                  j1                  |	d      5 }
t3        j4                  ||
d	
       d d d        y c c}w c c}w # 1 sw Y   y xY w)Nz4Not consolidating sharded checkpoint in finish step.r8   )	input_dir
output_dirnum_threadsr#   r   
total_sizerY   
weight_mapw   )indent)r%   r&   r   infor#   dictfromkeysstate_dict_metadatakeysr   r/   r"   r.   r'   updateindexfqnr;   relative_pathsumlengthr0   r1   r
   create_streamjsondump)r2   rY   rZ   r#   metadata_to_write
storage_mdr_   wr_listwrmetadata_pathmetadata_files              r4   finishzHuggingFaceStorageWriter.finish   s     )B)B
 KKNO   ,,8 ))]]8#?#?#D#D#FJ ! 1dii.88 ;;%9	  

 	JGGNOr<<<O #H"r55HIIJ		J
 *6z(B*%*4,'++DII,IWW""=#6 	B-II'qA	B 	B PH
	B 	Bs   2F1
F6F;;GrQ   rG   c                     |d|iS i }|D ]<  }|j                   j                  }||   }||vr|g||<   )||   j                  |       > |S )Nr8   )rj   rk   r=   )r2   rQ   rG   rR   itemkeyidxs          r4   rI   z/HuggingFaceStorageWriter._split_by_storage_plan   sk     u: 	*D**..Cs#C'! $v##D)	* r5   c                     t         S N)r
   )r2   s    r4   rv   z&HuggingFaceStorageWriter.metadata_path   s    r5   )Nr8   FFr8   )__name__
__module____qualname____doc__r/   r   re   intboolr,   listr   rC   r   r   r   rX   r   rx   r   rI   propertyrv   __classcell__r3   s   @r4   r   r   1   s>    :>!&%**+'E'E 'tCH~6'E 	'E
 'E #'E %('E 
'ERh DN 88 8 
[!	"	8>%Bx %B$tK7H2I %Bd %BN$T#s(^4=A)_	c4	?"	#& s  r5   c                        e Zd ZdZddededdf fdZdededdfd	Z	d
e
j                  de
j                  deddfdZdededed   fdZdefdZ xZS )r    zQ
    A reader that reads a checkpoint in the huggingface safetensors format.
    r"   r$   r(   Nc                 4    t         |   |       || _        y)z
        Initialize the huggingface reader pointing to path.

        Args:
            path: directory where the checkpoint will be read from.
            thread_count: Number of threads to use to read distributed checkpoint. Default to 1.
        )r"   N)r+   r,   r$   )r2   r"   r$   r3   s      r4   r,   z!HuggingFaceStorageReader.__init__   s     	d#(r5   reqrD   c                    t        d t        |j                  |j                        D              }|j	                  |j
                  j                        |   }|j                  |      j                         }|j                         |j                         k(  s6J d|j
                   d|j                          d|j                                 |j                  |       |j                  ||       y)z1Helper function to process a single read request.c              3   @   K   | ]  \  }}t        |||z           y wr~   )slice).0offsetrn   s      r4   	<genexpr>zAHuggingFaceStorageReader._process_read_request.<locals>.<genexpr>   s'      
 &&6/*
s   zreq z mismatch sizes z vs N)tuplezipstorage_offsetslengths	get_slicestorage_indexrk   resolve_tensordetachsizecopy_commit_tensor)r2   fr   rD   slicestensortarget_tensors          r4   _process_read_requestz.HuggingFaceStorageReader._process_read_request   s      
"%c&9&93;;"G
 
 S..223F;..s3::<!!#v{{}4 	
3$$%%5m6H6H6J5K4PVP[P[P]_	
4 	F#c=1r5   rT   result_queuec                    ddl m} 	 	 |j                         \  }} ||d      5 }|D ]  }| j                  |||        	 d d d        |j	                  d       S# 1 sw Y   xY w# t
        j                  $ r Y y w xY w)Nr   	safe_openTptfilename	framework)safetensorsr   
get_nowaitr   rN   rL   Empty)	r2   rT   r   rD   r   rW   reqsr   r   s	            r4   _read_files_from_queuez/HuggingFaceStorageReader._read_files_from_queue   s     	*	","7"7"9	4	TB Da# D221c7CDD   & D D {{ 		s(   A( AA( A%!A( (A>=A>rB   c                 L   ddl m} i }|j                  D ]H  }| j                  |j                     }|j
                  }|j                  |g       j                  |       J | j                  dk  st        |      dk  rG|j                         D ]3  \  }} ||d      5 }	|D ]  }
| j                  |	|
|        	 d d d        5 nt        j                         }t        j                         }|j                         D ]  \  }}|j                  ||f        g }t        | j                  t        |            }t        |      D ]G  }t!        j"                  | j$                  |||f      }|j'                          |j                  |       I |D ]  }|j)                           d}	 	 |j+                          |dz  }t/               }|j1                  d        |S # 1 sw Y   XxY w# t        j,                  $ r Y nw xY w|t        |      k(  rRJ d| dt        |              )	Nr   r   r8   r   r   )targetargszNot all files were processed: z out of )r   r   rG   r;   r   rl   
setdefaultr=   r$   rF   r   rL   rM   rN   minrange	threadingThreadr   r9   joinr   r   r   rH   )r2   rB   rD   r   per_file	read_itemitem_mdrW   r   r   r   rT   r   threadsr^   _tprocessed_countrP   s                      r4   	read_dataz"HuggingFaceStorageReader.read_data   s7   ).0 	AI&*&7&7	8O8O&PG--I	2.55i@	A
 !S]a%7#+>>#3 D	4	TB Da# D221c7CDD DD ',kkmJ(-L $,>>#3 2	4	4012 Gd//X?K;' "$$66$lG< 	q!"    O ++-#q(O  ht
UD DB ;;  #c(m3 00A#h-Y3s   G'G( G%	(G>=G>c                    ddl m} ddlm} i }i }g }| j                  j                  | j                        D ])  }|j                  t              s|j                  |       + |D ]~  } ||d      5 }|j                         }	|j                         }
d }|
r=|
j                  t              r(t        j                  |
j                  t                    }|	D ]   }|j!                  |      j#                         }|j!                  |      j%                         }|||   t&           }ndgt)        |      z  }||vrt+        t-         ||            t/        j0                  t3        ||      D cg c]
  \  }}||z    c}}      t5        t/        j0                        t/        j0                  |            g      ||<   n||   j6                  j                  t5        t/        j0                  |      t/        j0                  |      	             t9        ||   j:                        }t=        t)        |            D ]  }t?        ||   ||   ||   z         ||<    t/        j0                  |      ||   _        |tA        |||   t&           
      }ntA        |dgt)        |      z  
      }tC        |t/        j0                  |       ||            ||<    	 d d d         tE        ||      }tG        |dd       tI               |_%        | jL                  |jJ                  _&        |S c c}}w # 1 sw Y   xY w)Nr   r   )	_getdtyper   )r   )dtype)offsetssizes)
propertiesr   chunks)r   )rk   r   )rl   shaper   )rg   r;   storage_meta)'r   r   safetensors.torchr   r0   lsr"   endswithr   r=   rh   rY   getr   rp   loadsr   	get_shape	get_dtyper   rF   r   r   torchSizer   r   r   r   r   r   rJ   r   r	   r   getattrr   r   load_id)r2   r   r   rg   r;   safetensors_filesfilesafetensor_filer   rh   extra_metadatadcp_sharding_infor{   r   r   r   savedr   rA   metadata_indexrY   s                        r4   read_metadataz&HuggingFaceStorageReader.read_metadata6  s   )/@B<>GGJJtyy) 	/D}}V$!((.	/  1 7	O?d; 6qvvx!"$(!!n&8&89L&M(,

&**+>?)%   ,CKK,668EKK,668E(4!23!78I!J"#s5z!1"553H'7i>N'O!&EHPVEW XME6 X" !5,1JJv,>*/**U*;!"$4+C0 ,C077>>0 %

6 2%**U:K
  $$7$<$A$AB!&s4y!1 IA&)$q'58fQi3G&HDGI8=

48H+C05 )4)6 #,=c,BCT,U* *73sSQVZGW)X3A&5#jj/'.4L0Q,6 67	r  3%

 8^T2:$/MH!(,%U !Y/6 6s    4C<L40L.?EL4.L44L>	)r8   )r   r   r   r   r/   r   r,   r   r   r   rL   rM   r   r   r   r   r   r   r   r   s   @r4   r    r       s    
)S 
) 
)D 
)2H 2{ 2t 2"KK kk 	
 
$6h 6 6 6pNx Nr5   )2r>   rp   loggingrL   r   typingr   r   r   torch.distributed.checkpointr   r   8torch.distributed.checkpoint._consolidate_hf_safetensorsr   &torch.distributed.checkpoint._hf_utilsr   r	   r
   r   r   r   r   'torch.distributed.checkpoint.filesystemr   %torch.distributed.checkpoint.metadatar   r   r   r   r   r   $torch.distributed.checkpoint.plannerr   r   r   r   r   r   $torch.distributed.checkpoint.storager   torch.futuresr   	getLoggerr   r   Logger__annotations____all__r   r     r5   r4   <module>r      s            K   H   =   +**84 4%'A
BV/ Vrz/ zr5   