o
    }oi                  	   @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 z
d dlmZ dZW n eefy9   dZY nw ddd	Zdd
dZe jdd Zdd ZdS )    N)logging)is_global_rank_zero)parallel_stateTFncclc           	      C   s   | j }ttdd}ttdd}td| d| d|  |tj  }|dur.|}tj	| d	}td
d}tdd}||d | 7 }tj
j||||d |||fS )zInitialize torch.distributed.RANK0
WORLD_SIZE1z0Initializing torch.distributed with local_rank: z, rank: z, world_size: Nztcp://MASTER_ADDR	localhostMASTER_PORT6000:)backend
world_sizerankinit_method)
local_rankintosgetenvr   infotorchcudadevice_count
set_devicedistributedinit_process_group)	argsr   r   r   r   devicer   	master_ipmaster_port r"   J/home/ubuntu/.local/lib/python3.10/site-packages/nemo/utils/distributed.pyinitialize_distributed!   s    
r$   c                 C   sx   t  s| S t  }t  }|dkr| S dd t|D }tj||  |dur.||kr.dS g }|D ]}|| q2|S )a  
    Collect objects (e.g., results) from all GPUs.
    Useful for inference over multiple GPUs with DDP.

    Use main_rank to specify which rank will be used to gather results.
    This allows to continue execution on the main_rank only after the gather.

    Args:
        partial_results_list: list of partial results from each GPU
        main_rank: rank of the main process to collect results from all GPUs (useful for collecting results in a target rank)


    Example:
        predictions = gather_objects(predictions,main_rank=0)
        # all but rank 0 will return None
        if predictions is None:
            return

        # from here only rank 0 should contiue
        pickle.dump(predictions, open(output_fname, "wb"))
       c                 S   s   g | ]}d qS Nr"   ).0_r"   r"   r#   
<listcomp>]   s    z"gather_objects.<locals>.<listcomp>N)	r   is_initializedget_data_parallel_rankget_data_parallel_world_sizeranger   r   all_gather_objectextend)partial_results_list	main_rankr   r   gathered_resultsresults_listrr"   r"   r#   gather_objects=   s   r5   c                  c   sR    t  r
t g} ndg} t|  | d jV  t  t  r'| d   dS dS )a  Create a shared temporary directory across ranks in distributed setup.

    This function assumes that the distributed setup has been already
    correctly initialized. It is intended to be used only in single-node
    setup so that all ranks can access the directory created.Nr   )r   tempfileTemporaryDirectorydistbroadcast_object_listnamebarriercleanup)tmp_dirr"   r"   r#   temporary_directorym   s   
r>   c                 c   sX    t jj }d}|dur|j}|j}|dkr%t| |d| E dH  dS | E dH  dS )z
    This is for latest webdataset>=0.2.6
    This function will make sure that each worker gets a different subset of the dataset.
    r%   N)r   utilsdataget_worker_infoidnum_workerslist)srcworker_inforC   workerr"   r"   r#   webdataset_split_by_workers   s   rH   )r   r&   )
contextlibr   r6   r   torch.distributedr   r8   
nemo.utilsr   nemo.utils.get_rankr   megatron.corer   HAVE_MEGATRON_COREImportErrorModuleNotFoundErrorr$   r5   contextmanagerr>   rH   r"   r"   r"   r#   <module>   s$   

0
