
    pi.                     
   d dl Z d dlZd dlmZ d dlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ  ej(                  e      ZdZ ed      Z G d de j2                        Z G d de      Z G d de      Zy)    N)Future)AnyOptionalTypeVar   )CheckpointProcess)CheckpointReader)CheckpointWriter)CheckpointStager)
STATE_DICT)wrap_future<   Tc                       e Zd ZdZej
                  dededeee	f   de
eeef      fd       Zej
                  	 dddd	dede
e   d
e	dedeee	f   defd       Zej
                  dd       Zy)CheckpointeraT  
    WARNING: This class is experimental, and is created to validate certain ideas,
    and is subjected to change or deprecation and we strong discourage any usages at
    this time.

    Abstract base class that defines the API for checkpointing.

    This class defines the interface for coordinating the writing and loading of model
    state dictionaries to and from storage. It provides abstract methods to save and load model states
    with support for both synchronous and asynchronous operations.

    Concrete implementations of this class must implement all the abstract methods.
    path
state_dictkwargsreturnc                      y)a  
        Save a state dictionary to storage.

        Args:
            path: The path where the checkpoint should be saved.
            state_dict: The state dictionary to save.
            **kwargs: Additional keyword arguments to pass to the writer.

        Returns:
            For synchronous implementations: None
            For asynchronous implementations: tuple of (stage_future, write_future)
                                            representing the staging and writing operations.
        N selfr   r   r   s       y/opt/services/ai/voice_agent/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/_experimental/checkpointer.pysavezCheckpointer.save#           NFdefault_map_locationstrictr   r    c                     y)ah  
        Load a state dictionary from storage.

        Args:
            path: The path from which to load the checkpoint.
            state_dict: Optional state dictionary to update with loaded values.
                        If provided, only keys in this dictionary will be loaded.
            default_map_location: Device mapping function or device name for relocating tensors.
            strict: If True, raises an error when there are missing keys in the checkpoint.
            **kwargs: Additional keyword arguments to pass to the reader.

        Returns:
            The loaded state dictionary.
        Nr   )r   r   r   r   r    r   s         r   loadzCheckpointer.load8   r   r   c                      y)
        Close the checkpointer and release any resources.

        This method should be called when the checkpointer is no longer needed to ensure
        proper cleanup of resources.
        Nr   r   s    r   closezCheckpointer.closeQ   r   r   Nr   N)__name__
__module____qualname____doc__abcabstractmethodstrr   dictr   r   tupler   r   boolr"   r&   r   r   r   r   r      s     	  sCx.	
 
%'	( ( 	 ,0
 %) Z(
 "  sCx. 
 0 	 r   r   c                       e Zd ZdZdedefdZdedede	ee
f   deeeef      fd	Z	 dd
dddedee   de
dede	ee
f   defdZddZy
)SyncCheckpointera  
    Synchronous implementation of Checkpointer.

    This class coordinates the writing and loading of model state dictionaries to and from storage
    using only synchronous operations. It provides a simple, efficient interface for checkpoint
    operations without async overhead.

    Attributes:
        _writer: CheckpointWriter for writing state dictionaries to storage.
        _reader: CheckpointReader for reading state dictionaries from storage.

    Example:
        checkpointer = SyncCheckpointer(writer=writer, reader=reader)
        checkpointer.save(state_dict, path)
        loaded_state_dict = checkpointer.load(path)
    writerreaderc                      || _         || _        y)z
        Initialize a synchronous checkpointer.

        Args:
            writer: CheckpointWriter for writing checkpoints to storage.
            reader: CheckpointReader for reading checkpoints from storage.
        N)_writer_reader)r   r5   r6   s      r   __init__zSyncCheckpointer.__init__m   s     r   r   r   r   r   c                 l    t         j                  d|        | j                  j                  ||fi | y)a  
        Save a state dictionary to storage synchronously.

        Args:
            path: The path where the checkpoint should be saved.
            state_dict: The state dictionary to save.
            **kwargs: Additional keyword arguments to pass to the writer.

        Returns:
            Always returns None as operations are synchronous.

        Example:
            checkpointer.save("/path/to/checkpoint", state_dict)
        z%Saving checkpoint synchronously to %sN)loggerdebugr8   writer   s       r   r   zSyncCheckpointer.save|   s1    ( 	<dC46v6r   NFr   r   r    c                    t         j                  d|        | j                  j                  d|||d|\  }}|r||g k7  rt	        d| d|       |S )a  
        Load a state dictionary from storage.

        Args:
            path: The path from which to load the checkpoint.
            state_dict: Optional state dictionary to update with loaded values.
                        If provided, only keys in this dictionary will be loaded.
            default_map_location: Device mapping function or device name for relocating tensors.
            strict: If True, raises an error when there are missing keys in the checkpoint.
            **kwargs: Additional keyword arguments to pass to the reader.

        Returns:
            The loaded state dictionary.

        Raises:
            RuntimeError: If strict=True and there are missing keys in the checkpoint.
            FileNotFoundError: If the checkpoint file is not found.
        Loading checkpoint from %sr   r   map_locationCheckpoint at  is missing keys: r   r<   infor9   readRuntimeErrorr   r   r   r   r    r   loaded_state_dictmissing_keyss           r   r"   zSyncCheckpointer.load   sw    6 	0$7*;$,,*;*; +
!-+
 	+
'< l.<23Ev5G~VWW  r   c                 b    | j                   j                          t        j                  d       y)r$   zSyncCheckpointer closedN)r8   r&   r<   rF   r%   s    r   r&   zSyncCheckpointer.close   s!     	-.r   r'   r(   )r)   r*   r+   r,   r
   r	   r:   r/   r   r0   r   r   r1   r   r   r2   r"   r&   r   r   r   r4   r4   [   s    "  !  sCx.	
 
%'	(6 ,0%!
 %)%!%! Z(%!
 "%! %! sCx.%! 
%!N/r   r4   c                       e Zd ZdZdededefdZdede	de
d	eeeef      fd
Z	 dddddedee	   de
dede
d	e	fdZddZy)AsyncCheckpointera  
    Asynchronous implementation of Checkpointer.

    This class coordinates the writing and loading of model state dictionaries to and from storage
    using asynchronous operations for saving. It provides efficient async checkpoint operations
    with staging and background writing capabilities.

    Attributes:
        _reader: CheckpointReader for reading state dictionaries from storage.
        _checkpoint_stager: Stager for async operations.
        _checkpoint_process: Process for async operations.
        _write_future: Future representing the ongoing async write operation.

    Example:
        checkpointer = AsyncCheckpointer(
            reader=reader,
            checkpoint_stager=stager,
            checkpoint_process=process
        )
        stage_future, write_future = checkpointer.save(state_dict, path)
        # ... do other work ...
        write_future.result()  # Wait for completion
    checkpoint_stagercheckpoint_processr6   c                 <    || _         || _        || _        d| _        y)a  
        Initialize an asynchronous checkpointer.

        Args:
            checkpoint_stager: Stager for async operations.
            checkpoint_process: Process for async operations.
            reader: CheckpointReader for reading checkpoints from storage.
        N)r9   _checkpoint_stager_checkpoint_process_write_future)r   rO   rP   r6   s       r   r:   zAsyncCheckpointer.__init__   s#     "3#5 48r   r   r   r   r   c                    t         j                  d|       | j                  | j                  j                          t         j	                  d        | j
                  j                  dd|i|}t         j	                  d|        | j                  j                  ||fi || _        t         j                  d|       | j                  t        |      | j                  fS t        d      )ac  
        Save a state dictionary to storage asynchronously.

        Args:
            path: The path where the checkpoint should be saved.
            state_dict: The state dictionary to save.
            **kwargs: Additional keyword arguments to pass to the stager and writer.

        Returns:
            A tuple of (stage_future, write_future) representing the staging and writing operations.

        Example:
            stage_future, write_future = checkpointer.save("/path/to/checkpoint", state_dict)
            # ... do other work ...
            write_future.result()  # Wait for completion
        zMInitiating checkpoint save to %s. Will wait for prev checkpoints to complete.z!Starting state dictionary stagingr   zStarting checkpoint write to %szCheckpoint save to %s initiatedz!Write future is unexpectedly Noner   )r<   rF   rT   resultr=   rR   stagerS   r>   r   rH   )r   r   r   r   staging_results        r   r   zAsyncCheckpointer.save   s    , 	[	

 )%%'8960066 
!


 	6=;T55;;D
$*
 	5t< )~.0B0BBB BCCr   NFr   r   r    c                    t         j                  d|        | j                  j                  d|||d|\  }}|r||g k7  rt	        d| d|       |S )ac  
        Load a state dictionary from storage.

        Loading is always performed synchronously, even in AsyncCheckpointer.

        Args:
            path: The path from which to load the checkpoint.
            state_dict: Optional state dictionary to update with loaded values.
                        If provided, only keys in this dictionary will be loaded.
            default_map_location: Device mapping function or device name for relocating tensors.
            strict: If True, raises an error when there are missing keys in the checkpoint.
            **kwargs: Additional keyword arguments to pass to the reader.

        Returns:
            The loaded state dictionary.

        Raises:
            RuntimeError: If strict=True and there are missing keys in the checkpoint.
            FileNotFoundError: If the checkpoint file is not found.
        r@   rA   rC   rD   r   rE   rI   s           r   r"   zAsyncCheckpointer.load#  sw    : 	0$7*;$,,*;*; +
!-+
 	+
'< l.<23Ev5G~VWW  r   c                     | j                   j                          | j                  j                          t        j	                  d       y)z
        Close the checkpointer and release any resources.

        This method should be called when the checkpointer is no longer needed to ensure
        proper cleanup of async resources.
        zAsyncCheckpointer closedN)rR   r&   rS   r<   rF   r%   s    r   r&   zAsyncCheckpointer.closeL  s5     	%%'  &&(./r   r'   r(   )r)   r*   r+   r,   r   r   r	   r:   r/   r   r   r   r1   r   r   r2   r"   r&   r   r   r   rN   rN      s    09+9 .9 !	9&/D/D /D 	/D
 
%'	(/Dh ,0'!
 %)'!'! Z('!
 "'! '! '! 
'!R	0r   rN   )r-   loggingconcurrent.futuresr   typingr   r   r   rP   r   checkpoint_readerr	   checkpoint_writerr
   stagingr   typesr   utilsr   	getLoggerr)   r<   LOG_INTERVALr   ABCr   r4   rN   r   r   r   <module>rf      sx    
  % ) ) 1 / / %   
		8	$CLD377 DNh/| h/VO0 O0r   