o
    .i_                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	Z
ddlZddlmZ zddlmZ W n ey=   dZY nw dd	lmZ d
d Zdd Zdd Z	dddZdd ZG dd dZG dd dZdd ZdS )zTF-specific utils import.    N)partial)ceil)uuid4)get_context)SharedMemory   )configc                    s   t | tr| S tjrdd l}ntd| d }i }| D ]<\ }t |tjr6t	 fdd| D | < qt ||j
rK|	 fdd| D | < qt fdd| D | < q|S )Nr   FCalled a Tensorflow-specific function but Tensorflow is not installed.c                       g | ]}|  qS  r   .0fkr   K/home/ubuntu/.local/lib/python3.10/site-packages/datasets/utils/tf_utils.py
<listcomp>0       z)minimal_tf_collate_fn.<locals>.<listcomp>c                    r
   r   r   r   r   r   r   r   2   r   c                    r
   r   r   r   r   r   r   r   4   r   )
isinstancedictr   TF_AVAILABLE
tensorflowImportErroritemsnpndarraystackTensorarray)featurestffirstbatchvr   r   r   minimal_tf_collate_fn$   s   

r$   c                 C   s&   t | }d|v r|d |d< |d= |S )Nlabellabels)r$   )r   r"   r   r   r   #minimal_tf_collate_fn_with_renaming8   s
   r'   c                 C   s:   t j| rt| jS t j| pt j| pt j| S N)patypesis_listis_numeric_pa_type
value_type
is_integeris_floating
is_decimal)pa_typer   r   r   r,   @   s   
$r,   Fc                    sb  t | tjs
|  } d}t | tjr||    d}n+tt| dkr2|| d | d d   nt | tjr=||   n	tdt	|  d urUfdd 
 D  |rltt  d } fd	d
t|D  | fi | |ri }	|
 D ]\}
}t |
 }||}||	|
< q||	S g }	|
 D ]\}
}t |
 }||}|	| q|	S )NTF   r   zUnexpected type for indices: c                    s&   i | ]\}}| v s|d v r||qS ))r%   	label_idsr&   r   r   keyvalue)cols_to_retainr   r   
<dictcomp>Y   s
    z np_get_batch.<locals>.<dictcomp>c                    s"   g | ]  fd d  D qS )c                    s   i | ]	\}}||  qS r   r   r5   ir   r   r9   b       z+np_get_batch.<locals>.<listcomp>.<dictcomp>r   )r   )r"   r:   r   r   b   s   " z np_get_batch.<locals>.<listcomp>)r   r   r   numpyintegeritemalldiffRuntimeErrortyper   lenlistvaluesranger   astypeappend)indicesdatasetr8   
collate_fncollate_fn_argscolumns_to_np_typesreturn_dict
is_batchedactual_size	out_batchcol
cast_dtyper   r   )r"   r8   r   np_get_batchF   s@   




rV   c	              	      sl  t jrddlntdtdrjntjjdr"jjjnt	dkr-t
d dtt||| dd	fd
d  D jdjgd fdd}	jjt	}
|rdurjdjdjdd}fdd}|
||}
n	|r|
|
 }
|dur|
j||d}
|
|	}
|durfdd}nfdd}|
|S )a  Create a tf.data.Dataset from the underlying Dataset. This is a single-process method - the multiprocess
    equivalent is multiprocess_dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.

    Returns:
        `tf.data.Dataset`
    r   Nr	   random_index_shuffleindex_shufflei zto_tf_dataset() can be memory-inefficient on versions of TensorFlow older than 2.9. If you are iterating over a dataset with a very large number of samples, consider upgrading to TF >= 2.9.F)rL   r8   rM   rN   rO   rP   c                    s   g | ]} j |qS r   )dtypesas_dtype)r   dtype)r    r   r   r          z!dataset_to_tf.<locals>.<listcomp>)input_signaturec                    s,   j | gd  fddt D S )N)inpToutc                    s   i | ]	\}}| | qS r   r   )r   r;   r6   outputr   r   r9      r<   z9dataset_to_tf.<locals>.fetch_function.<locals>.<dictcomp>)py_function	enumeratekeys)rK   )rO   	getter_fnr    toutr`   r   fetch_function   s   z%dataset_to_tf.<locals>.fetch_function   r3   )r[   )r7   c                    s@    | dkrjjddjd} || t d d}| |fS )Nr3   rh   l            )shapemaxvalr[   r2   )indexseed	max_index)
reduce_allrandomuniformint64rE   )staterl   shuffled_index)rL   rW   r    r   r   scan_random_index   s   z(dataset_to_tf.<locals>.scan_random_index)drop_remainderc                        fdd|   D S )Nc                    s$   i | ]\}}| | | jqS r   ensure_shaperj   r   r6   valoutput_signaturer    r   r   r9      s   $ 8dataset_to_tf.<locals>.ensure_shapes.<locals>.<dictcomp>r=   
input_dictr|   r   r   ensure_shapes      z$dataset_to_tf.<locals>.ensure_shapesc                    rw   )Nc              	      s,   i | ]\}}| | | jd d qS )r2   Nrx   rz   r|   r   r   r9      s   , r~   r=   r   r|   r   r   r      r   )r   r   r   r   hasattrrW   rp   experimentalrX   rE   warningswarnr   rV   rG   function
TensorSpecrr   dataDatasetrH   fillcastscanshufflecardinalityr"   map)rL   r8   rM   rN   rO   r}   r   
batch_sizerv   rg   
tf_dataset	base_seedru   r   r   )rO   rL   re   r}   rW   r    rf   r   dataset_to_tfv   sL   $



r   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )SharedMemoryContextc                 C   s   g | _ g | _d S r(   )created_shmsopened_shmsselfr   r   r   __init__   s   
zSharedMemoryContext.__init__c                 C   s6   t t|||d}|r| j| |S | j| |S )N)sizenamecreate)r   intr   rJ   r   )r   r   r   r   shmr   r   r   get_shm   s   zSharedMemoryContext.get_shmc                 C   s4   | j |t|t|j |d}tj|||jdS )N)r   r   r   )r[   buffer)r   r   prodr[   itemsizer   buf)r   r   rj   r[   r   r   r   r   r   	get_array   s   "zSharedMemoryContext.get_arrayc                 C      | S r(   r   r   r   r   r   	__enter__      zSharedMemoryContext.__enter__c                 C   s4   | j D ]
}|  |  q| jD ]}|  qd S r(   )r   closeunlinkr   )r   exc_type	exc_value	tracebackr   r   r   r   __exit__   s   



zSharedMemoryContext.__exit__N)__name__
__module____qualname__r   r   r   r   r   r   r   r   r   r      s    
r   c                   @   s<   e Zd Zdd Zdd Zdd Zedd Zed	d
 ZdS )NumpyMultiprocessingGeneratorc                    s~   | _ | _| _| _dd | D  _ fdd| D  _| _| _| _	|	 _
|
 _ fdd| D  _d S )Nc                 S   s   g | ]\}}|t ju r|qS r   )r   str_r   rT   r[   r   r   r   r     s    z:NumpyMultiprocessingGenerator.__init__.<locals>.<listcomp>c                    s*   i | ]\}}|| j vr|ntd qS )U1)string_columnsr   r[   r   r   r   r   r9     s    z:NumpyMultiprocessingGenerator.__init__.<locals>.<dictcomp>c                    s8   i | ]\}}|| j vrt|jjnt|jjd  qS r2   )r   r   rj   rank)r   rT   specr   r   r   r9   %  s    &)rL   r8   rM   rN   r   r   rO   r}   r   r   rv   num_workerscolumns_to_ranks)r   rL   r8   rM   rN   rO   r}   r   r   rv   r   r   r   r   r     s    

z&NumpyMultiprocessingGenerator.__init__c              
   #   s   t jtttjj }jjj|j	\}}}t
dg g }g }fddt|D }fddt|D }jjjjjjjd}	t t|D ][tt }
d d|
 d d  fd	d
j D }|| | }|kr|d ur|}nd }||| | d|	}jj|dd}|  || qZd}|s8t|D ]v| jddstd|   | }tdd | D rd} nPt :  fdd
| D }dd
 | D }jD ]}|| d|| j d  !d||< qW d    n	1 s'w   Y  |V  | "  q|r|D ]}|#  q:W d    d S 1 sNw   Y  d S )Nspawnc                       g | ]}   qS r   Eventr   _ctxr   r   r   5  r   z:NumpyMultiprocessingGenerator.__iter__.<locals>.<listcomp>c                    r   r   r   r   r   r   r   r   6  r   )rL   r8   rM   rN   rO   r   r   dw_r   
   c              	      4   i | ]\}}| j  d | d|ftjddqS )r   _shapeTrj   r[   r   r   r   rr   r   rT   r   shm_ctxworker_namer   r   r9   G      "z:NumpyMultiprocessingGenerator.__iter__.<locals>.<dictcomp>)r   rK   extra_batcharray_ready_eventarray_loaded_eventT)targetkwargsdaemonF<   )timeoutzData loading worker timed out!c                 s   s    | ]
}t |d k V  qdS )r   N)r   any)r   rj   r   r   r   	<genexpr>e  s    z9NumpyMultiprocessingGenerator.__iter__.<locals>.<genexpr>c              	      s8   i | ]\}}| j   d | |j| ddqS )r   Fr   )r   rO   )r   rT   rj   )batch_shm_ctxr;   namesr   r   r   r9   s  s    c                 S   s   i | ]
\}}|t |qS r   )r   copy)r   rT   arrr   r   r   r9   ~  s    Ur3   )$minr   r   r   rE   rL   r   distribute_batchesrv   r   r   rH   r8   rM   rN   rO   r   r   r   strr   rJ   r   Processworker_loopstartwaitTimeoutErrorclearr   rG   viewrj   squeezesetjoin)r   r   per_worker_batchesfinal_batchfinal_batch_workershape_arraysworkersarray_ready_eventsarray_loaded_events	base_argsworker_random_idworker_shape_arraysworker_indicesfinal_batch_argworker_kwargsworkerend_signal_receivedarray_shapesarrays
string_colr   )r   r   r;   r   r   r   r   r   __iter__*  s    
	



"
)$z&NumpyMultiprocessingGenerator.__iter__c                 C   r   r(   r   r   r   r   r   __call__  r   z&NumpyMultiprocessingGenerator.__call__c              
      s   dt jd< tjrdd l}ntd|jg d  	
f
dd}t 9
fdd	| D |D ]}|| q9|d urH||  D ]
\}}d
|d d < qL	  W d    d S 1 sfw   Y  d S )N3TF_CPP_MIN_LOG_LEVELr   r	   GPUc              	      s   t | dd}i }t S} D ]9\}}|| }|v r,|d|jd }|j| d d < |j	 d| |j|dd||< ||| d d < q      	  W d    d S 1 sfw   Y  d S )NT)rK   rL   r8   rM   rN   rO   rP   r   )r3   r   r   )
rV   r   r   r   reshaperj   r   r   r   r   )rK   r"   
out_arraysr   rT   rU   r   )
r   r   rM   rN   r8   rO   rL   r   r   r   r   r   send_batch_to_parent  s0   

"zGNumpyMultiprocessingGenerator.worker_loop.<locals>.send_batch_to_parentc              	      r   )r   r   Fr   r   r   r   r   r   r9     r   z=NumpyMultiprocessingGenerator.worker_loop.<locals>.<dictcomp>r3   )
osenvironr   r   r   r   set_visible_devicesr   r   r   )rL   r8   rM   rN   rO   r   r   rK   r   r   r   r   r    r  r"   rT   r   r   )r   r   rM   rN   r8   rO   rL   r   r   r   r   r   r     s$   

!

"z)NumpyMultiprocessingGenerator.worker_loopc                 C   s  t t| }|rt j| t|}|||  }t ||g\}}|s*t|dkr,d }|d|}t|}	|	|	|  }
t ||
g\}}|d||}t j||jd dd}dd |D }tt|D ]}t j	|| || ddgdd||< qd|d urt|}nd }|||fS )Nr   r3   r2   )axisc                 S   s   g | ]}t |d qS r   )r   r   )r   r   r   r   r   r     r\   zDNumpyMultiprocessingGenerator.distribute_batches.<locals>.<listcomp>)
r   arangerE   rp   r   splitr  rj   rH   concatenate)rL   r   rv   r   r   rK   num_samplesincomplete_batch_cutofflast_incomplete_batchnum_batchesfinal_batches_cutofffinal_batchesper_worker_indicesr;   incomplete_batch_worker_idxr   r   r   r     s*   (

z0NumpyMultiprocessingGenerator.distribute_batchesN)	r   r   r   r   r   r   staticmethodr   r   r   r   r   r   r     s    "a
Gr   c
                 C   s   t jrddl}
ntdt| |||||||||	d
}|
jjj||d}|r.tt	| | }n
tt
t	| | }||
jj|S )ao  Create a tf.data.Dataset from the underlying Dataset. This is a multi-process method - the single-process
    equivalent is dataset_to_tf.

    Args:
        dataset (`Dataset`): Dataset to wrap with tf.data.Dataset.
        cols_to_retain (`List[str]`): Dataset column(s) to load in the
            tf.data.Dataset. It is acceptable to include column names that are created by the `collate_fn` and
            that do not exist in the original dataset.
        collate_fn(`Callable`): A function or callable object (such as a `DataCollator`) that will collate
            lists of samples into a batch.
        collate_fn_args (`Dict`): A  `dict` of keyword arguments to be passed to the
            `collate_fn`. Can be empty.
        columns_to_np_types (`Dict[str, np.dtype]`): A `dict` mapping column names to numpy dtypes.
        output_signature (`Dict[str, tf.TensorSpec]`): A `dict` mapping column names to
            `tf.TensorSpec` objects.
        shuffle(`bool`): Shuffle the dataset order when loading. Recommended True for training, False for
            validation/evaluation.
        batch_size (`int`, default `None`): Size of batches to load from the dataset. Defaults to `None`, which implies that
            the dataset won't be batched, but the returned dataset can be batched later with `tf_dataset.batch(batch_size)`.
        drop_remainder(`bool`, default `None`): Drop the last incomplete batch when loading. If not provided,
            defaults to the same setting as shuffle.
        num_workers (`int`): Number of workers to use for loading the dataset. Should be >= 1.

    Returns:
        `tf.data.Dataset`
    r   Nr	   )
rL   r8   rM   rN   rO   r}   r   r   rv   r   )r}   )r   r   r   r   r   r   r   from_generatorr   rE   r   applyr   assert_cardinality)rL   r8   rM   rN   rO   r}   r   r   rv   r   r    data_generatorr   dataset_lengthr   r   r   multiprocess_dataset_to_tf  s(   &
r  )F)__doc__r  r   	functoolsr   mathr   uuidr   r>   r   pyarrowr)   multiprocessr   multiprocess.shared_memoryr   r    r   r$   r'   r,   rV   r   r   r   r  r   r   r   r   <module>   s2   
0q  q