o
    Ti                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	 da
dag adefddZdd	 Zd
d ZdddZdddZdd ZdejdefddZdedejfddZdd ZdS )    N)commrequired_torch_version)get_acceleratorreturnc                   C   s
   t ddS )Ng?)min_versionr    r   r   N/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/runtime/pipe/p2p.pycan_send_recv   s   
r
   c                 C   s4   | a t jdksJ dt sdd t jD ad S d S )N   z There is no pipeline parallelismc                 S   s   g | ]}t j|d qS ))ranks)dist	new_group).0groupr   r   r	   
<listcomp>"   s    z'init_process_groups.<locals>.<listcomp>)_gridpipe_parallel_sizer
   
p2p_groups_groups)gridr   r   r	   init_process_groups   s
   r   c                 C   sR   d}t jd }t| | dks#| |kr||ks%| |kr||ks'J dd S d S d S )Nr   r   zOFunctionality currently limited to send and receive between adjacent ranks only)r   r   abs)	src_stage
dest_stagefirst_stage
last_stager   r   r	   _is_valid_send_recv%   s   
r   Fc                 C   s   |dksJ dt  }t|| t j|d}|r&t| |}t| d S t r/t	| |S t
||}t j|d}tj| |||dS NFzDoesn't support async_op truestage_id)r   async_op)r   get_stage_idr   stage_to_globalr   isend_asyncappendr
   send_get_send_recv_group	broadcast)tensorr   r!   r   	dest_rankopr   src_rankr   r   r	   r'   .   s   

r'   c                 C   sz   |dksJ dt  }t|| t j|d}|r&t| |}t| d S t r/t	| |S t
||}tj| |||dS r   )r   r"   r   r#   r   irecvr%   r&   r
   recvr(   r)   )r*   r   r!   r   r-   r,   r   r   r   r	   r/   C   s   

r/   c                  C   s$   t D ]} |   qg a t   d S )N)r%   waitr   synchronize)r,   r   r   r	   r0   W   s   
r0   msgdestc                 C   sj   t | } ttj| t  } tj	t
| gtjdt  }tj||d tj| |d dS )a+  Send an arbitrary python object to ``dest``.

    Note: ``msg`` must be serializable by msgpack.

    WARN: This incurs a CPU -> GPU transfer and should be used sparingly
    for performance reasons.

    Args:
        msg (typing.Any): The object to send.
        dest (int): Destination rank.
    dtype)dstN)msgpackpackbtorch
ByteTensorByteStoragefrom_buffertor   device_namer*   lenlongr   r'   )r2   r3   length_tensorr   r   r	   send_obj`   s
   
"rB   senderc                    s   t jdgt jdt  }tj|| d t j|	 t j
dt  }tj|| d t|   } fdd  |}|S )zReceive an arbitrary python object from ``sender``.

    WARN: This incur a CPU <-> GPU transfers and should be used sparingly
    for performance reasons.

    Args:
        sender (int): The rank sending the message.
    r   r4   )srcc                    s   t | r| t  S t| ttfr( fdd| D }t| tr&t|}|S t| trCt }| 	 D ]\}} || |< q4|S | S )z'Recursively move to the current device.c                    s   g | ]} |qS r   r   )r   x__tor   r	   r      s    z)recv_obj.<locals>._to.<locals>.<listcomp>)
r9   	is_tensorr=   r   r>   
isinstancetuplelistdictitems)xretkeyvalrF   r   r	   rG      s   


zrecv_obj.<locals>._to)r9   r*   r@   r=   r   r>   r   r/   emptyitemuint8r7   unpackbcpunumpytobytes)rC   lengthr2   r   rF   r	   recv_objw   s   
 rZ   c                 C   s`   d}d}t jd }| |kr||ks||kr| |kr|}n	| |kr#|}n| }	 t j|d}t| S )z@the group id is always the smaller rank unless its a wrap aroundNr   r   r   )r   r   r#   r   )r   r   r    r   r   group_idr   r   r	   r(      s   
r(   )F)r7   typingr9   	deepspeedr   r   deepspeed.utils.torchr   deepspeed.acceleratorr   r   r   r%   boolr
   r   r   r'   r/   r0   AnyintrB   rZ   r(   r   r   r   r	   <module>   s$   

	
	*