
    piF                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z% d dl&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0 ddl1m2Z2m3Z3m4Z4 g dZ5 G d de      Z6 ede7      	 	 	 	 d2dede-deejp                     de9d e:d!ee%   d"e"fd#       Z; e d$%      e2dddddd$d&ded'ee<ejz                  df   dee-   d!ee%   deejp                     d e:d(e:d"e"fd)              Z>e G d* d+             Z? e d$%      dddde6j                  ddd$d,ded'ee<ejz                  df   dee-   d!ee%   deejp                     d-e6d.ee'   d e:d(e:d"eee?f   fd/       ZA e d$%      ded"efd0       ZB	 	 	 	 	 d3dede-deejp                     de9d e:d!ee%   d(e:d"e"fd1ZCy)4    N)Future)	dataclass)Enum)castOptionalUnion)
deprecated)STATE_DICT_TYPE)_AsyncCheckpointExecutor)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)Metadata)SavePlanSavePlanner)AsyncStagerDefaultStagerStagingOptions)Stateful)StorageWriterWriteResult)_get_default_group   )_api_bc_check_DistWrapper_profile)save_state_dictsave
async_saveAsyncCheckpointerTypeAsyncSaveResponsec                       e Zd ZdZdZdZy)r"   z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS     o/opt/services/ai/voice_agent/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr"   r"   2   s    +FGr.   r"   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |j                          t               5  t        | |||||      cddd       S # 1 sw Y   yxY w)z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r1   r2   r3   r4   r5   r6   s         r/   r   r   9   sF      
 


 
 
s   5>Tlog_exceptionscheckpoint_idr2   r6   r3   r5   use_collectivesr>   r?   c          	      z   t         j                  j                  d       |xs, t        j                          xs t        j
                          }|rt        j                  d       t               5  t        t        t        ||d            }t        t        |       |||||      cddd       S # 1 sw Y   yxY w)a;  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives (bool): If ``False``, this function will assume the intent is to save
            a checkpoint without using cross-rank synchronization.
            (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r1   r2   r3   r5   r6   r?   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r:   _stateful_to_state_dict)r1   r>   r2   r6   r3   r5   r?   s          r/   r    r    U   s    ~ 
HH  !DEQd//11Q4;N;N;P7PG~	
 
 
>.-PUV
  .z:)'+

 
 
s   16B11B:c                   2    e Zd ZU dZed   ed<   ed   ed<   y)r#   a!  This class contains futures for staging and upload completion.
    It is returned by async_save().
    staging_completion is a future that indicates when local copy
    of state_dict is complete.
    upload_completion is a future that indicates when a checkpoint
    completed saving.
    Nstaging_completionupload_completion)r'   r(   r)   r*   r   __annotations__r-   r.   r/   r#   r#      s     t$d|#r.   r#   )r>   r2   r6   r3   async_checkpointer_typeasync_stagerr5   r?   rP   rQ   c          	          t         j                  j                  d       t        j                         rJt        j
                         r6|xs
 t               }	t        j                  d      |	j                  v sJ d       ,|t        |t              r|nt        t        dddd            t        t        t        ||d            }t!                t#        d      d	t$        t&        t(           t(        f   f fd
       }
 |
       }|t*        j,                  k(  r
t/               n	t1               }|j3                  |||||||      }t        |t&              rh|}t'               }|fdt&        t(           dt&        d   fd}|j5                         s|j7                  |       n|j9                  d       t;        ||      S t#        d      fd       } |        |S )a   Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.
        MUST CALL CLOSE AFTER LAST CHECKPOINT IS SAVED

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        async_checkpointer_type (AsyncCheckpointerType):
            whether to do checkpoint in separate thread or process
            (Default: ``AsyncCheckpointerType.THREAD``)
        async_stager (AsyncStager):
            provides staging implementation. If storage_writer implements AsyncStager
            and async_stager is provided, async_stager will be used for staging
        no_dist (bool):
            If ``True``, this function will assume the intent is to save
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives: If False, Save the checkpoint without rank coordination. (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'NFrA   Tr;   r7   c                  &     j                        S N)stage)rQ   r1   s   r/   stage_state_dictz$async_save.<locals>.stage_state_dictA  s    !!*--r.   r=   original_staging_futurereturn_staging_futurec                     	 | j                          |j                  d        y # t        $ r}|j                  |       Y d }~y d }~ww xY wrU   )result
set_result	Exceptionset_exception)rX   rY   es      r/   callbackzasync_save.<locals>.callback[  sB    7'..0%006 7%33A667s   !$ 	AAA)rM   rN   c                  @     j                   r j                          y y rU   ) should_synchronize_after_executesynchronize_staging)rQ   s   r/   maybe_synchronize_stagingz-async_save.<locals>.maybe_synchronize_stagingp  s    <<002 =r.   )rC   rD   rE   rF   rG   rH   r   device_device_types
isinstancer   r   r   r   r   r   rK   r   r   r   r
   r"   r,   r   r   execute_savedoneadd_done_callbackr\   r#   )r1   r>   r2   r6   r3   rP   rQ   r5   r?   pgrW   staging_future_or_state_dictupload_executorupload_futurestaging_futurerY   r`   rd   s   `     `           r/   r!   r!      s   T 
HH  !JKt224202LL2#3#33	
 u	
3
 %*^[*Q)L(	L ~nmERN )4Jt,.eF?$;_$LM . -. $4#5  #&;&C&CC 	-.02  ,88$#%#' 9 M .75.4h 3H	7%+O%<	7#)$<	7 ""$,,X6!,,T2 !4
 	

 
4	0	3 
1	3 	"#r.   c                     i }| j                         D ]*  \  }}t        |t              r|j                         n|||<   , |S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsrg   r   r1   )r1   stateful_state_dictkeyelems       r/   rK   rK   y  sM     %%' 
	T!+D(!;DOO 	C 
 r.   c                 N    t         j                  j                  d       t        || |      
t	               J d i }t        dd       x}||d<   j                  |d<   t        di | fd       }	t        di |fd       }
d rj                  d|	|
      n |	       } |
|g      }|d   t        di |fd       }t        di |fd	       }rj                  d
||      }|S  |       } ||g      }j                          |S )Nz,torch.distributed.checkpoint.save_state_dictr>   r3   c                  F   J j                         } dt        j                  j                        j                  vr2t        j                  d       j                  j                         nj                  | j                         dt        j                  j                        j                  v r)j                  j                  j                         nj                  j                         j                         }j                  |      }|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r1   rw   is_coordinatorkwargs)rankr?   )rw   inspect	signatureset_up_planner
parametersrI   rJ   rx   set_up_storage_writerrz   create_local_planprepare_local_plan)rw   
local_plandistWr6   r1   r2   r?   s     r/   
local_stepz$_save_state_dict.<locals>.local_step  s   """%224!2!273I3I!J!U!UUMM.
 "":u/C/CD""%)$33 #    !E!EFQQR 00$$ZZ / 1  001E1EF..0
#66zB
r.   c                 Z    J j                  |       \  } j                  |       } | S rU   )create_global_planprepare_global_plan)all_local_plansglobal_metadatar6   r2   s    r/   global_stepz%_save_state_dict.<locals>.global_step  s<     """+2+E+Eo+V((<<_Mr.   planr   c                      J J j                        } j                  |       }|j                          |j                         S rU   )finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr6   r2   s     r/   r   z$_save_state_dict.<locals>.write_data  sX    """'''"..|<#../?I
!!r.   c                 6    J j                  |        S )N)metadataresults)finish)all_resultsr   r2   s    r/   finish_checkpointz+_save_state_dict.<locals>.finish_checkpoint  s(    ***Lr.   writer-   )rC   rD   rE   r   r   getattrgroupr   reduce_scatter
all_reducebarrier)r1   r2   r3   r4   r5   r6   r?   ckpt_kwargsckpt_idr   r   r   global_planr   r   r   write_resultsr   r   r   s   ``   ``          @@@r/   r:   r:     sc    
HH  !OPG5EFE$&OK>?DAAN'.O$',{{O$&+& 'B &+& ' (,L++FJL)|
&1:,&?"1~&+&" '" &+& '
 ##GZ9JK O	 ,6<$m_5Or.   )Nr   FN)Nr   FNT)Dr{   osrI   concurrent.futuresr   dataclassesr   enumr   typingr   r   r   typing_extensionsr	   rC   torch.distributeddistributedrF   #torch.distributed._state_dict_utilsr
   ,torch.distributed.checkpoint._async_executorr   4torch.distributed.checkpoint._async_process_executorr   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   r   r   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   r   "torch.distributed.distributed_c10dr   utilsr   r   r   __all__r"   FutureWarningProcessGroupintboolr   strPathLiker    r#   r+   r!   rK   r:   r-   r.   r/   <module>r      sF    	  % !  ( ( (    ? G K B : F 
 ; K A 8 8D  ! 26%)

!
 D--.
 	

 
 k"
 


. 4( 48.2%)15 q
q
 bkk4/0q
 ]+	q

 k"q
 D--.q
 q
 q
 q
  )q
h 
$ 
$ 
$ 4( 48.2%)155J5Q5Q*. \\ bkk4/0\ ]+	\
 k"\ D--.\ 3\ ;'\ \ \ 6$$%\ )\~ 4( O  ) 26%) aa!a D--.a 	a
 a k"a a ar.   