o
    wiD                     @   sT  d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
mZmZmZ ddlZddlmZ d,de	defd	d
Zde	fddZde	fddZdejfddZdeeejf fddZdd Zdd Zdd Zdd Zdd Ze Zde	dedefd d!Z d"edefd#d$Z!d%ee"eee
f fd&d'Z#G d(d) d)Z$G d*d+ d+Z%dS )-zAClasses and functions for writing tar files and WebDataset files.    N)AnyCallableDictOptionalUnion   )gopenPNGimageformatc                 C   sj  ddl }ddl}t| |jjtjfsJ t| t| tjrs| jtdtdfv rXt| dks9t	| dkrIt
dt|  dt	|  t| d	d
} t| d d} | jdv s_J | jdkrm| jd dv smJ |j| } | dkr|d}n| dv rd}|dv rtdd}ni }t }| j|fd|i| | W  d   S 1 sw   Y  dS )aC  Compress an image using PIL and return it as a string.

    Can handle float or uint8 images.

    Args:
        image: ndarray representing an image
        format: compression format (PNG, JPEG, PPM)

    Returns:
        bytes: Compressed image data

    Raises:
        ValueError: If image values are out of range
    r   NfdgMbPgjt?zimage values out of range  g        g      ?g     o@uint8)      r   r   )r   r   JPGJPEG>   IMGIMAGEPPM>   r   tiffd   )qualityr   )PIL	PIL.Image
isinstanceImagenpndarraytypedtypeaminamax
ValueErrorcliparrayndimshape	fromarrayupperdictioBytesIOsavegetvalue)r
   r   r   optsresult r2   N/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/webdataset/writer.pyimageencoder   s0     

$r4   datac                 C   s0   t | tr| S t | tr| dS t| dS )zConvert data into a bytestring.

    Uses str and ASCII encoding for data that isn't already in string format.

    Args:
        data: Data to be converted

    Returns:
        bytes: Converted bytestring
    ascii)r   bytesstrencoder5   r2   r2   r3   bytestrB   s
   


r;   c                 C   s,   ddl }ddl}| }|| | | S )zDump data into a bytestring using torch.dumps.

    This delays importing torch until needed.

    Args:
        data: Data to be dumped

    Returns:
        bytes: Dumped data as bytestring
    r   N)r,   torchr-   r.   r/   )r5   r,   r<   streamr2   r2   r3   torch_dumpsT   s
   r>   c                 C   s0   ddl }ddl}| }|jj||  | S )zDump data into a bytestring using numpy npy format.

    Args:
        data: Data to be dumped

    Returns:
        bytes: Dumped data as bytestring
    r   N)r,   numpy.lib.formatr-   libr   write_arrayr/   )r5   r,   numpyr=   r2   r2   r3   numpy_dumpsh   s
   	rC   c                 C   sl   ddl }t| tsJ t|  D ]\}}t|tsJ t|tjs$J q| }tj	|fi |  |
 S )a  Dump data into a bytestring using numpy npz format.

    Args:
        data: Dictionary of numpy arrays to be dumped

    Returns:
        bytes: Dumped data as bytestring

    Raises:
        AssertionError: If input is not a dictionary of numpy arrays
    r   N)r,   r   r+   listitemsr8   r   r   r-   savez_compressedr/   )r5   r,   kvr=   r2   r2   r3   numpy_npz_dumpsz   s   rI   c                 C   s4   ddl m} t| trt|| S t|| gS )zDump data into a bytestring using tenbin format.

    Args:
        x: Data to be dumped (list or single item)

    Returns:
        memoryview: Dumped data as memoryview
    r   )tenbin) rJ   r   rD   
memoryviewencode_buffer)xrJ   r2   r2   r3   tenbin_dumps   s   	
rO   c                 C      ddl }|| S )zDump data into a bytestring using CBOR format.

    Args:
        x: Data to be dumped

    Returns:
        bytes: Dumped data as bytestring
    r   N)cbordumps)rN   rQ   r2   r2   r3   
cbor_dumps      	
rS   c                 C   rP   )zDump data into a bytestring using MessagePack format.

    Args:
        x: Data to be dumped

    Returns:
        bytes: Dumped data as bytestring
    r   N)msgpackpackb)rN   rU   r2   r2   r3   mp_dumps   rT   rW   c                 C   s(   t |tr	| }|D ]}|| |< qdS )zAdd handlers to a dictionary for given keys.

    Args:
        d: Dictionary to add handlers to
        keys: String of space-separated keys or list of keys
        value: Handler function to be added
    N)r   r8   split)r   keysvaluerG   r2   r2   r3   add_handlers   s
   

r[   c                  C   s   i } t | ddd  t | ddd  t | ddd  t | dtj t | d	t t | d
t t | dt t | dt t | ddd  t | dt t | dt t | ddd  t | ddd  t | ddd  t | ddd  t | ddd  t | ddd  | S )z}Create a list of handlers for encoding data.

    Returns:
        dict: Dictionary of handlers for different data types
    z!cls cls2 class count index inx idc                 S   s   t | dS )Nr6   )r8   r9   rN   r2   r2   r3   <lambda>   s    zmake_handlers.<locals>.<lambda>ztxt text transcriptc                 S   
   |  dS Nutf-8r9   r\   r2   r2   r3   r]         
 zhtml htmc                 S   r^   r_   ra   r\   r2   r2   r3   r]      rb   z
pyd picklepthnpynpzzten tenbin tbzjson jsnc                 S   s   t | dS r_   )jsonrR   r9   r\   r2   r2   r3   r]      s    zmp msgpack msgrQ   zjpg jpeg img imagec                 S   
   t | dS )Njpgr4   r:   r2   r2   r3   r]      rb   pngc                 S   rg   )Nrj   ri   r:   r2   r2   r3   r]      rb   pbmc                 S   rg   )Nrk   ri   r:   r2   r2   r3   r]      rb   pgmc                 S   rg   )Nrl   ri   r:   r2   r2   r3   r]      rb   ppmc                 S   rg   )Nrm   ri   r:   r2   r2   r3   r]      rb   ztiff tifc                 S   rg   )Nr   ri   r:   r2   r2   r3   r]      rb   )	r[   picklerR   r>   rC   rI   rO   rW   rS   handlersr2   r2   r3   make_handlers   s&   rq   tnamerp   c                 C   s   |d dkrt | tstd| S d}|dr d}|dd }td	d
| }t | tr7|r5t	| } | S t | trJ| 
d} |rHt	| } | S ||}|du rZtd| || }|ret	|}|S )aA  Encode data based on its extension and a dict of handlers.

    Args:
        data: Data to be encoded
        tname: File extension
        handlers: Dictionary of handlers for different data types

    Raises:
        ValueError: If no handler is found for the given extension or if metadata values are not strings
    r   _z-the values of metadata must be of string typeFz.gzTNz.*\.rK   r`   zno handler found for )r   r8   r$   endswithresublowerr7   gzipcompressr9   get)r5   rr   rp   rz   	extensionhandlerr1   r2   r2   r3   encode_based_on_extension1   s2   








r~   samplec                    s    fddt |  D S )zEncode an entire sample with a collection of handlers.

    Args:
        sample: Data sample (a dict)
        handlers: Handlers for encoding

    Returns:
        dict: Encoded sample
    c                    s   i | ]\}}|t || qS r2   )r~   ).0rG   rH   ro   r2   r3   
<dictcomp>  s    z-encode_based_on_extension.<locals>.<dictcomp>)rD   rE   )r   rp   r2   ro   r3   encode_based_on_extension  s   
r   specc                    s   du sdu rdd }n+t r}n$ttr"fdd}|}ndu r1t  fdd	}|}nt d
t |sCt d|S )a   Make an encoder function from a specification.

    Args:
        spec: Specification for the encoder

    Returns:
        Callable: Encoder function

    Raises:
        ValueError: If the specification is invalid or doesn't yield a callable encoder
    FNc                 S      | S )zDo not encode at all.r2   r\   r2   r2   r3   encoder,  s   zmake_encoder.<locals>.encoderc                    
   t |  S zEncode based on extension.r   r   )r   r2   r3   r   4     
zmake_encoder.<locals>.fTc                    r   r   r   r   ro   r2   r3   g=  r   zmake_encoder.<locals>.gz: unknown decoder specz! did not yield a callable encoder)callabler   r+   default_handlersr$   )r   r   r   r   r2   )rp   r   r3   make_encoder  s   

r   c                   @   s   e Zd ZdZ								ddeded	ed
eeeef  dedee	f dedee
 defddZdd Zdd Zdd Zdd Zedd
eeeef  fddZdS )	TarWritera4  A class for writing dictionaries to tar files.

    Args:
        fileobj: File name for tar file (.tgz/.tar) or open file descriptor.
        encoder: Sample encoding. Defaults to True.
        compress: Compression flag. Defaults to None.
        user: User for tar files. Defaults to "bigdata".
        group: Group for tar files. Defaults to "bigdata".
        mode: Mode for tar files. Defaults to 0o0444.
        keep_meta: Flag to keep metadata (entries starting with "_"). Defaults to False.
        mtime: Modification time. Defaults to None.
        format: Tar format. Defaults to None.

    Returns:
        TarWriter object.

    Raises:
        ValueError: If the encoder doesn't yield bytes for a key.

    `True` will use an encoder that behaves similar to the automatic
    decoder for `Dataset`. `False` disables encoding and expects byte strings
    (except for metadata, which must be strings). The `encoder` argument can
    also be a `callable`, or a dictionary mapping extensions to encoders.

    The following code will add two file to the tar archive: `a/b.png` and
    `a/b.output.png`.


        tarwriter = TarWriter(stream)
        image = imread("b.jpg")
        image2 = imread("b.out.jpg")
        sample = {"__key__": "a/b", "png": image, "output.png": image2}
        tarwriter.write(sample)

    bigdata$  NTFusergroupmoderz   r   	keep_metamtimer   c
                 C   s   |	rt t|	|	ntj}	|| _| ||}
t|tr"t|d}|| _nd| _t	|| _
|| _|| _tj||
d| _|| _|| _|| _|| _dS )a  Create a tar writer.

        Args:
            fileobj: Stream to write data to.
            user: User for tar files.
            group: Group for tar files.
            mode: Mode for tar files.
            compress: Desired compression.
            encoder: Encoder function.
            keep_meta: Keep metadata (entries starting with "_").
            mtime: Modification time (set this to some fixed value to get reproducible tar files).
            format: Tar format.
        wbN)fileobjr   )getattrtarfileUSTAR_FORMATr   tarmoder   r8   r   own_fileobjr   r   r   r=   open	tarstreamr   r   r   rz   )selfr   r   r   r   rz   r   r   r   r   r   r2   r2   r3   __init__o  s   



zTarWriter.__init__c                 C   r   )zQEnter context.

        Returns:
            self: The TarWriter object.
        r2   r   r2   r2   r3   	__enter__     zTarWriter.__enter__c                 C      |    dS zExit context.Nclose)r   exc_typeexc_valexc_tbr2   r2   r3   __exit__     zTarWriter.__exit__c                 C   s,   | j   | jdur| j  d| _dS dS )zClose the tar file.N)r   r   r   r   r2   r2   r3   r     s
   



zTarWriter.closec           	      C   s^  d}|  |}d|vrtdt| D ]\}}|d dkr qt|tttfs4t| dt| dq|d }t	|
 D ]m}|dkrFq?| jsP|d dkrPq?|| }t|tr^|d}t }t|d | }t||_| jd	urx| jn||_| j|_| j|_| j|_t|tttfstd
| dt| t|}| j|| ||j7 }q?|S )a(  Write a dictionary to the tar file.

        Args:
            obj: Dictionary of objects to be stored.

        Returns:
            int: Size of the entry.

        Raises:
            ValueError: If the object doesn't contain a __key__ or if a key doesn't map to bytes after encoding.
        r   __key__zobject must contain a __key__rs   z( doesn't map to a bytes after encoding ()r`   .Nzconverter didn't yield bytes: z, )r   r$   rD   rE   r   r7   	bytearrayrL   r    sortedrY   r   r8   r9   timer   TarInfolensizer   r   r   unamer   gnamer,   r-   r   addfile)	r   objtotalrG   rH   keynowtir=   r2   r2   r3   write  s@   




zTarWriter.writec                 C   sx   |du rdS |du s|dkst | tr| drdS |dks(t | tr*| dr*dS |dks8t | tr:| dr:d	S dS )
NFzw|Tgzzw|gzbz2zw|bz2xzzw|xz)r   r8   ru   )r   rz   r2   r2   r3   r     s   $zTarWriter.tarmode)r   r   r   NTFNN)N)__name__
__module____qualname____doc__r8   intr   r   boolr   floatr   r   r   r   r   r   staticmethodr   r2   r2   r2   r3   r   J  sB    '	

+-"r   c                   @   s|   e Zd ZdZ						ddeded	ed
ee dededee fddZ	dd Z
dd Zdd Zdd Zdd Zdd ZdS )ShardWritera3  Like TarWriter but splits into multiple shards.

    Args:
        pattern: Output file pattern.
        maxcount: Maximum number of records per shard. Defaults to 100000.
        maxsize: Maximum size of each shard. Defaults to 3e9.
        post: Optional callable to be executed after each shard is written. Defaults to None.
        start_shard: Starting shard number. Defaults to 0.
        verbose: Verbosity level. Defaults to 1.
        opener: Optional callable to open output files. Defaults to None.
        **kw: Other options passed to TarWriter.
    順    ZANr   r   patternmaxcountmaxsizepoststart_shardverboseopenerc           	      K   sZ   || _ || _|| _|| _|| _d| _|| _|| _d| _d| _	d| _
d| _|| _|   dS )a  Create a ShardWriter.

        Args:
            pattern: Output file pattern.
            maxcount: Maximum number of records per shard.
            maxsize: Maximum size of each shard.
            post: Optional callable to be executed after each shard is written.
            start_shard: Starting shard number.
            verbose: Verbosity level.
            opener: Optional callable to open output files.
            **kw: Other options passed to TarWriter.
        Nr   )r   kwr   r   r   r   shardr   r   countr   fnamer   next_stream)	r   r   r   r   r   r   r   r   r   r2   r2   r3   r     s   zShardWriter.__init__c                 C   s   |    | j| j | _| jrtd| j| jd| jd  | j |  jd7  _| j	r7t
| 	| jfi | j| _nt
| jfi | j| _d| _d| _dS )z.Close the current stream and move to the next.z	# writingz%.1f GBg    eAr   r   N)finishr   r   r   r   printr   r   r   r   r   r   r   r   r2   r2   r3   r     s    
zShardWriter.next_streamc                 C   sd   | j du s| j| jks| j| jkr|   | j |}|  jd7  _|  jd7  _|  j|7  _dS )zNWrite a sample.

        Args:
            obj: Sample to be written.
        Nr   )r   r   r   r   r   r   r   r   )r   r   r   r2   r2   r3   r   1  s   "zShardWriter.writec                 C   sF   | j dur!| j   | jdusJ t| jr| | j d| _ dS dS )z'Finish all writing (use close instead).N)r   r   r   r   r   r   r2   r2   r3   r   >  s   



zShardWriter.finishc                 C   s   |    | `| `| `| `dS )zClose the stream.N)r   r   r   r   r   r   r2   r2   r3   r   G  s
   zShardWriter.closec                 C   r   )zSEnter context.

        Returns:
            self: The ShardWriter object.
        r2   r   r2   r2   r3   r   O  r   zShardWriter.__enter__c                 O   r   r   r   )r   argsr   r2   r2   r3   r   W  r   zShardWriter.__exit__)r   r   Nr   r   N)r   r   r   r   r8   r   r   r   r   r   r   r   r   r   r   r   r2   r2   r2   r3   r     s:    
'	r   )r	   )&r   ry   r,   rf   rn   rv   r   r   typingr   r   r   r   r   rB   r   rK   r   r8   r4   r;   r>   r   rC   rI   rO   rS   rW   r[   rq   r   r+   r~   r   r   r   r   r   r2   r2   r2   r3   <module>   s8   +&, 