
    pi#                     d   U d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZmZ ddlmZ ddlmc mc mc mZ  ej*                         Zi Zeeef   ed<   ded	efd
Ze G d d             Zded	ed   fdZ G d dej>                        Z e G d de              Z!e G d de              Z"y)a/  
Barrier implementations for synchronizing distributed checkpoint operations.

This module provides abstract and concrete barrier implementations that ensure
all ranks in a distributed training environment complete their checkpoint operations
before proceeding, which is essential for data consistency.
    N)Counter)	dataclassfield)	timedelta)AnyOptionalBARRIER_REGISTRYbarrier_classreturnc                 D    t        | d      r| t        | j                  <   | S )z0Register a barrier class in the global registry.barrier_type)hasattrr	   r   )r
   s    u/opt/services/ai/voice_agent/venv/lib/python3.12/site-packages/torch/distributed/checkpoint/_experimental/barriers.pyregister_barrierr      s#    }n-7D334    c                   L    e Zd ZU dZdZee   ed<    ee	      Z
e	eef   ed<   y)BarrierConfiga&  
    Configuration for barrier construction.

    This class provides a flexible way to configure different barrier implementations
    with their specific constructor arguments. The barrier type will be looked up
    from a registry and instantiated with rank_info and barrier_args.

    Attributes:
        barrier_type: A string identifying the barrier type (e.g., "tcp_store").
                     If None, no barrier will be used.
        barrier_args: Dictionary of arguments to pass to the barrier constructor.
                     rank_info will be automatically injected as the first argument.

    Examples:
        # No barrier
        BarrierConfig()

        # TCPStore barrier
        BarrierConfig(
            barrier_type="tcp_store",
            barrier_args={
                'timeout_barrier_init_secs': 30,
                'barrier_prefix_list': ['checkpoint'],
                'use_checkpoint_barrier_tcpstore_libuv': False,
                'tcpstore_port': 12345,
                'master_address': 'localhost'
            }
        )
    Nr   )default_factorybarrier_args)__name__
__module____qualname____doc__r   r   str__annotations__r   dictr   r    r   r   r   r   "   s.    < #'L(3-&#(#>L$sCx.>r   r   barrier_configBarrierc           	          | j                   y| j                   t        vr6t        d| j                    dt        t        j	                                      t        | j                      } |di | j
                  S )a&  
    Create a barrier instance from BarrierConfig.

    Args:
        barrier_config: Configuration for barrier construction.

    Returns:
        Barrier instance or None if no barrier type is configured.

    Raises:
        ValueError: If the barrier_type is not found in the registry.
    NzUnknown barrier type: z. Available types: r   )r   r	   
ValueErrorlistkeysr   )r   r
   s     r   create_barrier_from_configr$   F   s     ""*""*::$^%@%@$A B  $%5%:%:%< =>@
 	

 %^%@%@AM7>6677r   c                   j    e Zd ZdZej
                  deeef   fd       Z	ej
                  dd       Z
y)r   a(  
    Abstract base class for synchronization barriers.

    A barrier ensures that all ranks in a distributed environment reach a certain
    point in execution before any rank proceeds further, which is essential for
    coordinating operations like checkpointing across multiple processes.
    kwargsc                      y)a+  
        Initialize a barrier.

        Args:
            **kwargs: Keyword arguments for specific barrier implementations.
                     Common arguments may include rank information, barrier prefixes,
                     timeout settings, and other barrier-specific configuration.
        Nr   )selfr&   s     r   __init__zBarrier.__init__k       r   Nc                      y)z
        Execute a synchronization barrier.

        This method uses the barrier_prefix provided during initialization to
        coordinate synchronization across processes.
        Nr   r(   s    r   execute_barrierzBarrier.execute_barrierw   r*   r   r   N)r   r   r   r   abcabstractmethodr   r   r   r)   r-   r   r   r   r   r   b   sJ     	c3h   	 r   c                   (    e Zd ZdZdZ	 	 ddZddZy)DistBarriera2  
    A barrier implementation using PyTorch's distributed barrier for synchronization.

    This barrier uses the built-in torch.distributed.barrier() function to coordinate
    synchronization across multiple processes. It's simpler than TCPStoreBarrier but
    requires an initialized process group.
    dist_barrierNc                 :    t        j                         sJ d       y)a/  
        Initialize a DistBarrier.

        This barrier requires an initialized PyTorch distributed process group.
        No additional arguments are needed as it uses the current process group.

        Raises:
            AssertionError: If the distributed process group is not initialized.
        z2DistBarrier requires an initialized process group.N)distis_initializedr,   s    r   r)   zDistBarrier.__init__   s      ""$ 	
@	
$r   c                 ,    t        j                          y)zd
        Execute a synchronization barrier using the prefix provided during initialization.
        N)r5   barrierr,   s    r   r-   zDistBarrier.execute_barrier   s     	r   r.   )r   r   r   r   r   r)   r-   r   r   r   r2   r2      s      "L
	
 r   r2   c                   D    e Zd ZdZdZdedededededed	ed
efdZddZ	y)TCPStoreBarrieraS  
    A barrier implementation using PyTorch's TCPStore for synchronization.

    This barrier uses a TCP-based distributed key-value store to coordinate
    synchronization across multiple processes. It uses a single TCP store
    for all barrier operations, with different prefixes to distinguish between
    different barrier types.
    	tcp_storeglobal_rankglobal_world_sizebarrier_prefixtimeout_barrier_init_secs%use_checkpoint_barrier_tcpstore_libuvtcpstore_portmaster_addresstimeout_secsc	                 &   t         j                  d||||||||	       t               | _        || _        || _        || _        || _        t        j                  |t        |      | j                  t        |      | j
                  dk(        | _        y)a  
        Initialize a TCPStoreBarrier.

        Args:
            global_rank: The rank of the current process in the distributed environment.
            global_world_size: The total number of processes in the distributed environment.
            barrier_prefix: A string prefix to identify this specific barrier.
            timeout_barrier_init_secs: Timeout in seconds for initializing the TCPStore.
            use_checkpoint_barrier_tcpstore_libuv: Whether to use libuv for the TCPStore.
            tcpstore_port: Port number for the TCPStore.
            master_address: Address of the master node for the TCPStore.
            timeout_secs: Maximum time in seconds to wait for all ranks to reach the barrier.
        zInitializing TCPStore master_address=%s tcpstore_port=%s rank=%s world_size=%s barrier_prefix=%s timeout_barrier_init_secs=%s use_checkpoint_barrier_tcpstore_libuv=%s timeout_secs=%s)secondsr   )
world_sizetimeout	is_masterN)loggerinfor   _tcp_store_barrier_seq_barrier_prefix_global_rank_global_world_size_timeout_secsr5   TCPStoreintr   
_tcp_store)	r(   r<   r=   r>   r?   r@   rA   rB   rC   s	            r   r)   zTCPStoreBarrier.__init__   s    0 	G %1	
 07y#- ("3) --..&?@((A-
r   Nc           	         | j                   }t        j                  d|| j                         dt        dt
        fd}| j                  j                   || j                        t        | j                  |                t        j                  | j                  | j                  |t        | j                  |         z          | j                  |xx   dz  cc<   y)a;  
        Execute a synchronization barrier using the prefix provided during initialization.

        The implementation uses a sequence number that is incremented every time
        a barrier is reached. The sequence number is per barrier prefix to allow
        different barriers to operate concurrently.
        z3Executing barrier barrier_prefix=%s timeout_secs=%srankr   c                     d|  S )NrT   r   )rT   s    r   	_rank_keyz2TCPStoreBarrier.execute_barrier.<locals>._rank_key   s    $= r   )storerF   
key_prefix   N)rL   rI   rJ   rO   rQ   r   rR   setrM   rK   
store_utilr8   rN   )r(   r>   rV   s      r   r-   zTCPStoreBarrier.execute_barrier   s     --A	
	!C 	!C 	! 	d''(++N;<	
 	//..T%@%@%P!QQ		
 	##N3q83r   r.   )
r   r   r   r   r   rQ   r   boolr)   r-   r   r   r   r:   r:      sg     L6
6
 6
 	6

 $'6
 046
 6
 6
 6
p!9r   r:   )#r   r/   loggingcollectionsr   dataclassesr   r   datetimer   typingr   r   torch.distributeddistributedr5   %torch.distributed.elastic.utils.storeelasticutilsrW   r[   	getLoggerrI   r	   r   r   typer   r   r   r$   ABCr   r2   r:   r   r   r   <module>rj      s       (      : : 
			 %' $sDy/ &D T   ?  ?  ?F8!8i88cgg > !' ! !H e9g e9 e9r   