o
    ̳i./                     @   s  d dl mZ d dlmZmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d d	lmZ e	d
edZ	d>dee dedee dee fddZde
ee f ddfddZ!de de ded deee ef  fddZ"dee defddZ#e			 	 	 	!d?d"e d#ed$ee d%e$d&ed'ed(ed) d*e$d+ee ef defd,d-Z%e	.	 d@d/ee ef d0ee e&f d1e d&edef
d2d3Z'e			 	 	4	!dAd5ed6ed7ed8eeegef  d9e$d'ed(ed) d:ee d;e$defd<d=Z(dS )B    )Path)AnyCallableDictListLiteralOptionalTypeVarUnion)request)load_dataset)split_dataset_by_node)default_collateDistributedSampler)DatasetTypeLoaderrequires_torchdata)	Transform)get_world_size_and_rankT)boundNtokensmax_seq_leneos_idreturnc                 C   s,   | d| }|dur|d |kr||d< |S )a  
    Truncate a list of tokens to a maximum length. If eos_id is provided, the last
    token will be replaced with eos_id.

    Args:
        tokens (List[Any]): list of tokens to truncate
        max_seq_len (int): maximum length of the list
        eos_id (Optional[Any]): token to replace the last token with. If None, the
            last token will not be replaced. Default is None.

    Returns:
        List[Any]: truncated list of tokens
    N )r   r   r   tokens_truncatedr   r   I/home/ubuntu/.local/lib/python3.10/site-packages/torchtune/data/_utils.pytruncate   s   r   	image_loczPIL.Image.Imagec              
   C   s   ddl m} t| tr,| dr,zt| } W n ty+ } ztd|  |d}~ww z|	| }W |S  tyH } ztd|  |d}~ww )a  
    Convenience method to load an image in PIL format from a local file path or remote source.

    Args:
        image_loc (Union[Path, str]): Local file path or remote source pointing to the image
            which will be loaded in PIL format.

    Note:
        If loading an image from a remote source, the function expects the URL provided in ``image_loc``
        to start with "http" or "https" e.g. "https://www.wikipedia.org/en/bird.jpg".

    Raises:
        ValueError:
            If the image cannot be loaded from remote source, **or**
            if the image cannot be opened as a :class:`~PIL.Image.Image`.

    Examples:
        >>> # Load from remote source
        >>> image = load_image("https://www.wikipedia.org/en/bird.jpg")

        >>> # Load from local file path
        >>> image = load_image(Path("/home/user/bird.jpg"))

    Returns:
        PIL.Image.Image: The loaded image.
    r   )ImagehttpzFailed to load image from Nz'Failed to open image as PIL Image from )
PILr!   
isinstancestr
startswithr   urlopen	Exception
ValueErroropen)r    r!   eimager   r   r   
load_image/   s   r-   content	image_tagimagesc                C   s   |  |}t||krtdt| d| d|  | |}g }t|D ]%\}}t|dkr7|d|d |t|d k rJ|d|dd q%|S )	a  
    Given a raw text string, split by the specified ``image_tag``
    and form into list of dictionaries to be used in the :class:`~torchtune.data.Message` content
    field::

        [
            {
                "role": "system" | "user" | "assistant",
                "content":
                    [
                        {"type": "image", "content": <PIL.Image.Image>},
                        {"type": "text", "content": "This is a sample image."},
                    ],
            },
            ...
        ]

    Args:
        content (str): raw message text
        image_tag (str): string to split the text by
        images (List["PIL.Image.Image"]): list of images to be used in the content

    Raises:
        ValueError: If the number of images does not match the number of image tags in the content

    Examples:
        >>> content = format_content_with_images(
        ...     "<|image|>hello <|image|>world",
        ...     image_tag="<|image|>",
        ...     images=[<PIL.Image.Image>, <PIL.Image.Image>]
        ... )
        >>> print(content)
        [
            {"type": "image", "content": <PIL.Image.Image>},
            {"type": "text", "content": "hello "},
            {"type": "image", "content": <PIL.Image.Image>},
            {"type": "text", "content": "world"}
        ]

    Returns:
        List[Dict[str, Any]]: list of dictionaries to be used in the :class:`~torchtune.data.Message` content field
    zNumber of images (z') does not match number of image tags (z) in content: r   text)typer.      r,   )countlenr)   split	enumerateappendpop)r.   r/   r0   num_image_tags_in_contentsplit_contentfinal_content_listisubstrr   r   r   format_content_with_images^   s$   
-
r?   funcsc                     s    fdd}|S )z
    Chain a list of functions together into a single function.

    Args:
        *funcs (List[Callable]): list of functions to chain together

    Returns:
        Callable: chained function
    c                    s    D ]}|| } q| S Nr   )xfnr@   r   r   
chained_fn   s   
zchain.<locals>.chained_fnr   )r@   rE   r   rD   r   chain   s   rF   TthreadFsource	transform	filter_fnshuffleseednum_workersparallel_method)processrG   	streamingload_dataset_kwargsc                 K   s   ddl m}	m}
m} d|v r d|vsJ d||d|d< t| fi |}|dur1||}t \}}|rLt|||d}|rG|j	|d}|	|}nt
|||||d	}||}t|j|}|
||||d
}|S )a  
    Load a HuggingFace dataset (Map or Streaming) and apply a Transform to it.

    Args:
        source (str): HuggingFace dataset source.
        transform (Transform): Transform to apply to the samples of the dataset.
        filter_fn (Optional[Callable]): Filter function to pass to HuggingFace dataset.
        shuffle (bool): Whether to shuffle the dataset. Default is True. For streaming datasets, this is passed to
            HuggingFace dataset as .shuffle(). For map datasets, a DistributedSampler is used.
        seed (int): Seed for the random number generator in the case of Map style dataset shuffling. Default is 0.
        num_workers (int): Number of workers to use for loading the dataset. Default is 0 (no parallelism). Setting this
            greater than 0 will create `parallel_method` workers to perform transforms to the dataset.
        parallel_method (Literal["process", "thread"]): Method to use for parallelism. Default is "thread". No effect if
            num_workers is 0.
        streaming (bool): whether to load a streaming vs map-style dataset. Default False.
        **load_dataset_kwargs (Dict[str, Any]): Additional Keyword arguments to pass to HuggingFace dataset. See Hugging Face's
            documentation.

    Returns:
        A ``torchdata.nodes`` iterator that can be passed directly to a Loader, or combined with other-datasets in a multi-dataset
        sampler.
    r   )IterableWrapperParallelMapperSamplerWrappersubsetnamezTfound both 'subset' and 'name' found, you may only specify one, load_dataset_kwargs=N)rank
world_size)rL   )num_replicasrW   rK   rL   map_fnrM   method)torchdata.nodesrR   rS   rT   r9   r   filterr   r   rK   r   rF   __getitem__)rH   rI   rJ   rK   rL   rM   rN   rP   rQ   rR   rS   rT   datasetrX   rW   nodesamplerr   r   r   load_hf_dataset   s8   "



	rc   !CYCLE_UNTIL_ALL_DATASETS_EXHASTEDdatasetsweightsstop_criteriac                 C   s   ddl m} || |||dS )a  
    Given a dictionary of datasets and their corresponding weights, return a dataset that
    samples from the given datasets according to the specified weights.

    Args:
        datasets (Dict[str, DatasetType]): dictionary of datasets
        weights (Dict[str, float]): dictionary of weights for each dataset. If not
        stop_criteria (str): stop criteria for the sampler. Default "CYCLE_UNTIL_ALL_DATASETS_EXHASTED".
            See also: torchdata.nodes.StopCriteria
        seed (int): seed for the random number generator. Default 0.

    Returns:
        A ``torchdata.nodes`` iterator which can be passed to Loader, or further composed with other Nodes.
    r   )MultiNodeWeightedSampler)source_nodesrf   rg   rL   )r]   rh   )re   rf   rg   rL   rh   r   r   r   get_multi_dataset   s   rj      r`   model_transform
batch_size
collate_fn	drop_lastprefetch_factor
pin_memoryc	                 C   sx   ddl m}	m}
m}m} |du rt}|
| |||d}|	|||d}|
||||d}|r/||}|dur8|||}t|S )ax  
    This will configure TorchData Nodes to approximate torch.utils.data.DataLoader.
    Given a dataset, apply model_transform (eg tokenization), batching, collation,
    memory pinning, and pre-fetching.

    Args:
        dataset (DatasetType): dataset to load. May be a MultiNodeWeightedSampler
        model_transform (Transform): model transform to apply to the samples of the dataset
        batch_size (int): batch size
        collate_fn (Optional[Callable[[Any], Any]]): collate function to apply to the samples of the dataset. If None, use
            torch.utils.data.default_collate. Default None.
        drop_last (bool): whether to drop the last batch. Default is True.
        num_workers (int): number of workers to use for loading the dataset. Default is 0 (no parallelism
        parallel_method (Literal["process", "thread"]): method to use for parallelism. Default is "thread".
        prefetch_factor (Optional[int]): number of batches to prefetch. Default is 4.
        pin_memory (bool): whether to pin memory. Default is False.

    Returns:
        A ``torchdata.nodes`` Loader, an Iterable that returns batches.
    r   )BatcherrS   	PinMemory
PrefetcherNrZ   )ro   )r]   rr   rS   rs   rt   r   r   )r`   rl   rm   rn   ro   rM   rN   rp   rq   rr   rS   rs   rt   ra   r   r   r   get_dataloader  s   !
ru   rA   )NTr   r   rG   F)rd   r   )NTr   rG   rk   F))pathlibr   typingr   r   r   r   r   r   r	   r
   urllibr   re   r   datasets.distributedr   torch.utils.datar   r   torchtune.data._torchdatar   r   r   torchtune.modules.transformsr   torchtune.utilsr   r2   r   intr   r%   r-   r?   rF   boolrc   floatrj   ru   r   r   r   r   <module>   s   (
/
?
	
F

	
