o
    #iA?                     @   s4  d dl Z d dlmZmZmZmZmZ d dlZd dlm	Z	 d dl
mZmZ g dZe eZ	 dZG dd dZG d	d
 d
eZeeddd Zd ZG dd dZG dd dZdd Z		ddeedf deeeef  dedeeedf  deeeef  deee ee f fddZdee fddZdS )     N)AnyDictListOptionalTuplemap_aggregate)tree_flattentree_unflatten)TensorChunkSpecsplit_args_kwargs_into_chunksmerge_chunksFc                   @   s   e Zd ZdZdd ZdS )_CustomReducera$  
    Custom reducer class that can be used to specify a custom operation that
    reduces losses of multiple microbatches into one value.

    Example:
    >>> # xdoctest: +SKIP
    >>> sum_reducer = _CustomReducer(
    >>>     torch.tensor(0.0),
    >>>     lambda a, b: a + b
    >>> )
    c                 C   s   || _ || _d S N)
init_value	reduce_fn)selfr   r    r   e/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/torch/distributed/pipelining/microbatch.py__init__(   s   
z_CustomReducer.__init__N)__name__
__module____qualname____doc__r   r   r   r   r   r      s    r   c                   @      e Zd ZdS )_LossReducerNr   r   r   r   r   r   r   r   -       r   g        c                 C   s   | | S r   r   )abr   r   r   <lambda>1       r    c                   @   sf   e Zd ZU dZdd Zeed< dd Zdd Ze	d	e
ed
f fddZe	d	eeef fddZdS )r   z2
    Class used to specify chunking of inputs
    c                 C   s
   || _ d S r   	split_dim)r   r#   r   r   r   r   =   s   
zTensorChunkSpec.__init__r#   c                 C   s    | j j d| j j d| j dS )N.())	__class__r   r   r#   r   r   r   r   __repr__B   s   zTensorChunkSpec.__repr__c                 C   s   d| j  dS )NzTensorChunkSpec(r&   r"   r(   r   r   r   __str__G   s   zTensorChunkSpec.__str__
chunk_dims.c                 C      t | dd }|S )a  
        A helper for creating a tuple of `TensorChunkSpec` from a tuple of chunk
        dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # There are three positional arguments to the model, and
            >>> # we are chunking them along dimension 0, 0 and 1, respectively
            >>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1))
        c                 S      t | S r   r   dimr   r   r   r    Y   r!   z,TensorChunkSpec.from_tuple.<locals>.<lambda>r   )r+   args_chunk_specr   r   r   
from_tupleJ   s
   zTensorChunkSpec.from_tuplec                 C   r,   )a\  
        A helper for creating a dictionary of `TensorChunkSpec` from a
        dictionary of chunk dimensions (int's).
        Example:
            >>> # xdoctest: +SKIP
            >>> # Chunk dimension 0 for the "id" argument, 1 for the "mask" argument
            >>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1})
        c                 S   r-   r   r.   r/   r   r   r   r    k   r!   z+TensorChunkSpec.from_dict.<locals>.<lambda>r   )r+   kwargs_chunk_specr   r   r   	from_dict]   s
   zTensorChunkSpec.from_dictN)r   r   r   r   r   int__annotations__r)   r*   staticmethodr   r2   r   strr4   r   r   r   r   r   8   s   
 

r   c                   @   r   )
_ReplicateNr   r   r   r   r   r9   q   r   r9   c           #      C   s  i }g }|}d}t | t |ks"J dt|   dt|  |  D ]\}}t|\}	}
||
 || }|dus?J t|\}}t |	t |krWtd| d| g }t|	|D ]\}}|tu slt	|t
jsu||g|  q^t	|trt	|t
jsJ | d||j}||k r|rtd| d	| d
| d |}ntd| d| d| dt
|||j}trg }d}|D ]2}t
|}|||j }tdddg|j }t||||j< |||< || |||j7 }q|| n|| d}q^td| |||< q&g }t|D ]'}i }| D ]\}}g }|D ]
}|||  q'|||< q|| qg }|D ]+} i }!t |t | ksRJ t|  |D ]\\}}}"t||"|!|< qY||! qC|S )aW  
    Given a dictionary of args, and a dictionary of chunking specs, shard the
    args according to the chunking specs.

    Args:
        args_dict: Dictionary of args
        args_chunk_spec: Dictionary of chunking specs
        num_chunks: Number of chunks to shard the args into

    Returns:
        args_split: List of sharded args
    Tzargs_dict.keys() = z args_chunk_spec.keys() = NzArgument value z9 did not have the same number of values as as chunk spec z is not a tensorz%Tensor size on chunking dimension is z', downsizing the number of chunks from z to r$   zArg z% on chunking dimension has a size of z$, smaller than the number of chunks z. PiPPy cannot reduce the number of chunks because other arguments have bigger chunk-dimension sizes. Please adjust your num_chunks setting.r   FzUnrecognized chunk spec: )lenlistkeysitemsr	   append
ValueErrorzipr9   
isinstancetorchTensorr   sizer#   loggerwarningRuntimeErrortensor_split_debug_mask_minibatches
zeros_likeslicendim	TypeErrorranger
   )#	args_dictr1   
num_chunksargs_sharded_replicated	arg_specsreal_num_chunksfirst_tensorarg_keyargflatspec
chunk_specchunk_spec_flat_sharded_arg_flatvchunk_vv_split_dim_sizechunk_tensorsexpanded_chunkssplit_dim_idxchunk_tensornew_val	upper_idxslice_indiceschunks_flat	chunk_idx
chunk_argskeyarg_single_chunkv_flat
args_splitchunkper_chunk_argsarg_specr   r   r   _shard_dict_of_argsu   s   





rq   args.kwargschunksr1   r3   returnc           	         s  |du ri }|du rt tft|  }|du rt|t t}ttt| tt||}t|}t|||}t||k rOt|}ttt| tt||}t|t|kretdt| dt| g }|D ] |t	 fddt
t D  qi||fS )a  
    Given a sequence of args and kwargs, split them into a number of chunks
    according to  their respective chunking specs.

    Args:
        args: Tuple of args
        kwargs: Dict of kwargs
        chunks: Number of chunks to split the args and kwargs into
        args_chunk_spec: chunking specs for args, in same shape as args
        kwargs_chunk_spec: chunking specs for kwargs, in same shape as kwargs

    Returns:
        args_split: List of sharded args
        kwargs_split: List of sharded kwargs
    Nz;args and kwargs are split into different number of chunks: z, c                 3   s    | ]} | V  qd S r   r   ).0iri   r   r   	<genexpr>Y  s    z0split_args_kwargs_into_chunks.<locals>.<genexpr>)r   DEFAULT_CHUNK_DIMr:   dictfromkeysrq   	enumeraterG   r>   tuplerN   )	rr   rs   rt   r1   r3   args_split_dictrS   kwargs_splitrm   r   rx   r   r      sH   8



&r   c                    s0  |durt |\}}nt | d \}}ttgt| }g | D ]}t |\}}t|t|kr:td| d| | q g }t|D ]\ }	t|	trч fddttD }
t	r|
d j
}|
dd D ]	}|j
|kssJ qjtjtj|dd	it|
|	jd
}g }d}t|
t|ksJ t|
|D ])\}}|||	j }tdddg|j }t||||	j< || }|| |}qn|
}|tj||	jd qFt|	tr|	j}ttD ]}|	||   }q|| qFd   }tdtD ]}|   |ksJ q|| qFt||S )z
    Given a list of chunks, merge them into a single value according to
    the chunk spec.

    Args:
        chunks: list of chunks
        chunk_spec: Chunking spec for the chunks

    Returns:
        value: Merged value
    Nr   zChunk z did not match chunk spec c                    s   g | ]}|   qS r   r   )rv   rh   arg_idxchunks_flattenedr   r   
<listcomp>  s    
z merge_chunks.<locals>.<listcomp>   devicemeta)sectionsr0   r/   )r	   r   rz   r:   r?   r>   r}   rA   rN   rI   shaperB   rH   emptyr#   r@   rD   rK   rL   catr   r   r   r
   )rt   rY   spec_flattenedflatten_specchunk0_flatrn   chunk_flattenedr[   args_flattenedrV   partial_valuesoverall_shapevalmeta_chunksvalues_to_catchunk_start_idxpartial_value
meta_chunkchunk_end_idxrf   slicedreduced_valrh   valuer   r   r   r   ^  sd   -





r   )NN)loggingtypingr   r   r   r   r   rB   torch.fx.noder   torch.utils._pytreer	   r
   __all__	getLoggerr   rE   rI   r   r   tensorsum_reducerrz   r   r9   rq   r8   r5   r   r   r   r   r   r   <module>   sF   
9 

h