o
    ci                     @   s   d dl Z d dlmZmZ d dlZd dlmZ d dlm	Z	 d dl
mZ dZedee dZe eZedd	d
edefddZedd	dddZdS )    N)OptionalTypeVar)get_train_context)	PublicAPIi  T)boundalpha)	stabilitydatareturnc                 C   sf   | durt t| }|tkrtd| dt d t }| }t	|j
j| | | ddS )aw  Broadcast small (<1kb) data from the rank 0 worker to all other workers.

    Serves as a barrier, meaning that all workers must call this method before
    the training function can continue.

    Example:

        .. testcode:
            :skipif: True

            from ray.train import get_context
            from ray.train.collective import broadcast_from_rank_zero
            from ray.train.torch import TorchTrainer

            def train_func():
                ...
                if get_context().get_world_rank() == 0:
                    data = {"some_key": "some_value"}
                else:
                    data = None
                data = broadcast_from_rank_zero(data)
                ...

            trainer = TorchTrainer(train_func)
            trainer.fit()

    Args:
        data: The small (1kb) data to broadcast from the rank 0 worker to all
            other workers.

    Returns:
        The data broadcasted from the rank 0 worker.

    Raises:
        ValueError: If the data is too big.
        pickle.PicklingError: If the data is not pickleable.
        TypeError: If the data is not pickleable.
    Nz
Data size z- bytes exceeds the maximum broadcast size of z bytesz-ray.train.collective.broadcast_from_rank_zero
world_rank
world_sizer
   caller_method_name)lenpickledumps_MAX_BROADCAST_SIZE_BYTESloggerwarningr   get_synchronization_actorraygetbroadcast_from_rank_zeroremoteget_world_rankget_world_size)r
   
data_bytestrain_context
sync_actor r    T/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/collective/collectives.pyr      s$   )r   c                  C   s0   t  } |  }t|jj|  |  dddS )a  Create a barrier across all workers.

    All workers must call this method before the training function can continue.

    Example:

        .. testcode:
            :skipif: True

            from ray.train import get_context
            from ray.train.collective import barrier
            from ray.train.torch import TorchTrainer

            def train_func():
                ...
                print(f"Rank {get_context().get_world_rank()} is waiting at the barrier.")
                barrier()
                print(f"Rank {get_context().get_world_rank()} has passed the barrier.")
                ...

            trainer = TorchTrainer(train_func)
            trainer.fit()
    Nzray.train.collective.barrierr   )r   r   r   r   r   r   r   r   )r   r   r    r    r!   barrierQ   s   r"   )r   N)loggingtypingr   r   r   ray.cloudpicklecloudpickler   (ray.train.v2._internal.execution.contextr   ray.util.annotationsr   r   objectr   	getLogger__file__r   r   r"   r    r    r    r!   <module>   s    
=