o
    .ia9                     @   sN  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ ej j!"e#Z$e
rmd dl%Z%d dl&Z%eG dd dej'Z(ddde)e* fddZ+	dddde)e* dee, fddZ-G dd deZ.G dd dej/Z0dS )    N)Iterable)	dataclass)islice)TYPE_CHECKINGOptionalUnion)ArrowWriterParquetWriter)MAX_SHARD_SIZE)is_remote_filesystemrename)_BaseExamplesIterable)experimental)convert_file_size_to_intc                       s4   e Zd ZU dZdZeej ed<  fddZ	  Z
S )SparkConfigzBuilderConfig for Spark.Nfeaturesc                    s   t    d S N)super__post_init__self	__class__ Y/home/ubuntu/.local/lib/python3.10/site-packages/datasets/packaged_modules/spark/spark.pyr   %   s   zSparkConfig.__post_init__)__name__
__module____qualname____doc__r   r   datasetsFeatures__annotations__r   __classcell__r   r   r   r   r      s   
 r   dfpyspark.sql.DataFramenew_partition_orderc                 C   sP   |  dd|d  }|dd  D ]}|  dd| }||}q|S )N*z
part_id = r      )selectwhereunion)r#   r%   df_combinedpartition_idpartition_dfr   r   r   _reorder_dataframe_by_partition)   s
   r.   partition_order
state_dictc                 c   s    dd l }| d|jj d}|r|d nd}t|||d  }|jdd}d }|r1|d nd}	t||	d D ];}
|
	 }|d }|
d ||kr^|rZ|d urZ|d  d7  < |}d}	|rf|	d |d< | d	|	 |fV  |	d7 }	q9d S )
Nr   r&   part_idpartition_idxT)prefetchPartitionspartition_example_idxr'   _)pysparkr(   sql	functionsspark_partition_idaliasr.   toLocalIteratorr   asDictpop)r#   r/   r0   r6   df_with_partition_idpartition_idx_startr-   rowscurr_partitionrow_idrowrow_as_dictr1   r   r   r   _generate_iterable_examples1   s,   

rE   c                       s   e Zd Z	d	d fddZdefddZed	edef fd
dZdd Zde	j
jdd fddZddededd fddZedefddZ  ZS )SparkExamplesIterableNr#   r$   c                    s*   t    || _|pt| jj | _d S r   )r   __init__r#   rangerddgetNumPartitionsr/   )r   r#   r/   r   r   r   rG   O   s   
zSparkExamplesIterable.__init__returnc                 C   s   ddd| _ | j S )Nr   )r2   r4   )_state_dictr   r   r   r   _init_state_dictX   s   z&SparkExamplesIterable._init_state_dictr0   c                    s   t  |S r   )r   load_state_dict)r   r0   r   r   r   rN   \   s   z%SparkExamplesIterable.load_state_dictc                 c   s    t | j| j| jE d H  d S r   )rE   r#   r/   rL   r   r   r   r   __iter__`   s   zSparkExamplesIterable.__iter__	generatorc                 C   s,   t t| jj }|| t| j|dS )Nr/   )listrH   r#   rI   rJ   shufflerF   )r   rP   r/   r   r   r   shuffle_data_sourcesc   s   
z*SparkExamplesIterable.shuffle_data_sourcesT
num_shardsindexc                 C   s   | j |||d}t| j|dS )N)rU   rV   
contiguousrQ   )split_shard_indices_by_workerrF   r#   )r   rU   rV   rW   r/   r   r   r   shard_data_sourcesh   s   z(SparkExamplesIterable.shard_data_sourcesc                 C   
   t | jS r   )lenr/   r   r   r   r   rU   l   s   
z SparkExamplesIterable.num_shardsr   )r#   r$   )T)r   r   r   rG   dictrM   r   rN   rO   nprandom	GeneratorrT   intrY   propertyrU   r"   r   r   r   r   rF   N   s    	rF   c                       s   e Zd ZeZ		ddddedef fddZdd	 Zd
d Zde	j
jjfddZdd Zdedededeeeeeeef f  fddZ			d dddedeeeef  dee fddZdddefddZ  ZS )!SparkNr#   r$   	cache_dirworking_dirc                    sJ   dd l }|jjj | _|| _|| _t j	d|t
| j d| d S )Nr   )rc   config_namer   )r6   r7   SparkSessionbuildergetOrCreate_sparkr#   _working_dirr   rG   strsemanticHash)r   r#   rc   rd   config_kwargsr6   r   r   r   rG   t   s   
zSpark.__init__c                    sl   | j   fdd}| jjdddrd S | j r2| jjtdd|	 }t
j|d r2d S td)	Nc                    s6   t j dd t j dt j }t|d |gS )NT)exist_okfs_testa)osmakedirspathjoinuuiduuid4hexopen)context
probe_filerc   r   r   create_cache_and_write_probe   s   
z?Spark._validate_cache_dir.<locals>.create_cache_and_write_probezspark.master localr'   r   ztWhen using Dataset.from_spark on a multi-node cluster, the driver and all workers should be able to access cache_dir)
_cache_dirri   confget
startswithsparkContextparallelizerH   mapPartitionscollectrq   rs   isfile
ValueError)r   r|   prober   r{   r   _validate_cache_dir   s   
zSpark._validate_cache_dirc                 C   s   t j| jjdS )N)r   )r   DatasetInfoconfigr   r   r   r   r   _info   s   zSpark._info
dl_managerc                 C   s   t jt jjdgS )N)name)r   SplitGeneratorSplitTRAIN)r   r   r   r   r   _split_generators   s   zSpark._split_generatorsc           	      C   s   dd l }dd }| j }|dkr|nd}| j|d|d|jj	d
d d j| }|| }||krNt|t|| }| j|| _d S d S )	Nr   c                 s   s&    | D ]}t jd|jgiV  qd S )Nbatch_bytes)paRecordBatchfrom_pydictnbytes)itbatchr   r   r   get_arrow_batch_size   s   z=Spark._repartition_df_if_needed.<locals>.get_arrow_batch_sized   r'   zbatch_bytes: longr   sample_bytes)r6   r#   countlimitrepartition
mapInArrowaggr7   r8   sumr:   r   r   minr`   )	r   max_shard_sizer6   r   df_num_rowssample_num_rowsapprox_bytes_per_rowapprox_total_sizenew_num_partitionsr   r   r   _repartition_df_if_needed   s&   

	zSpark._repartition_df_if_neededfpathfile_formatr   rK   c              	   #   s   dd l |dkrtnt| jrtj| jtjn|dk | jj	| j
| jj f	dd}| j|ddjjddjjd	d
jjd	djjdd }|D ]}|j|j|j|j|jffV  qod S )Nr   parquetc                 3   s      }t| d }|d u r tjj|gdgdggg ddS d}d|dd|d d}tj|g}|	| | D ]L}d ur|j
kr| \}}|  tjj|g|g|ggg ddV  |d7 }|jd|dd|d d}tj|g}|	| qE|j
dkr| \}}|  tjj|g|g|ggg ddV  krttjD ]}	tjtjtj|	}
t|	|
 qd S d S )	Nr   )task_idnum_examples	num_bytes)namesSSSSS05dTTTTT)r   rs   writer_batch_sizestorage_optionsembed_local_filesr'   )TaskContexttaskAttemptIdnextr   r   from_arraysreplaceTablefrom_batcheswrite_table
_num_bytesfinalizeclose	_featuresrq   listdirrs   dirnamert   basenameshutilmove)r   r   first_batchshard_idwritertabler   r   r   filedest	r   r   r   r   r6   r   working_fpathr   writer_classr   r   write_arrow   sb   


z0Spark._prepare_split_single.<locals>.write_arrowz2task_id: long, num_examples: long, num_bytes: longr   r   total_num_examplesr   total_num_bytesrU   shard_lengths)r6   r	   r   rj   rq   rs   rt   r   r   r   _writer_batch_size_fsr   r#   r   groupByr   r7   r8   r   r:   r   collect_listr   r   r   r   rU   r   )r   r   r   r   r   statsrC   r   r   r   _prepare_split_single   s,   "5zSpark._prepare_split_singlearrowsplit_generatorzdatasets.SplitGeneratornum_procc                    s  |    t|pt}| | t| j }|rtjjnt	j}d}| j
 d|j
 | d| }	|| j|	d}
d}dg }g }| ||D ]&\}}|\}}}}|dkrk|
|7 }
||7 }|7 |||f || qE|
|j_||j_td d dkr||j_| jdtd	td
tffdd g }d}tt|D ]}|| \}}t|D ]}||||g |d7 }qq| jj|t| fdd  d S d}|d d }| d|dd|d|d d S )Nz-TTTTT-SSSSS-of-NNNNN-.r   z	Renaming z shards.r'   r   r   global_shard_idc                    s@   t  d|dd| d d|ddd d S )Nr   r   r   zTTTTT-SSSSSNNNNN)r   r   )r   r   r   )r   fstotal_shardsr   r   _rename_shardO  s
   z+Spark._prepare_split.<locals>._rename_shardc                    s    |  S r   r   )args)r   r   r   <lambda>a  s    z&Spark._prepare_split.<locals>.<lambda>r   r   r   r}   )r   r   r
   r   r   r   rq   rs   rt   	posixpathr   _output_dirr   appendextend
split_infor   r   loggerdebugr   r`   rH   r[   ri   r   r   mapr   _renamer   )r   r   r   r   r   kwargsis_local	path_joinSUFFIXfnamer   r   task_id_and_num_shardsall_shard_lengthsr   contentr   r   rU   r   r   r   ir   r   )r   r   r   r   r   _prepare_split  sn   


*
zSpark._prepare_splitc                 C   rZ   r   )rF   r#   )r   r   r   r   r    _get_examples_iterable_for_splitk  s   
z&Spark._get_examples_iterable_for_split)NN)r   NN)r   r   r   r   BUILDER_CONFIG_CLASSrk   rG   r   r   r   downloaddownload_managerDownloadManagerr   r   r`   r   tupleboolr   r   r   r   rF   r   r"   r   r   r   r   rb   q   sR    !
W
Prb   r   )1rq   r   ru   collections.abcr   dataclassesr   	itertoolsr   typingr   r   r   numpyr]   pyarrowr   r   datasets.arrow_writerr   r	   datasets.configr
   datasets.filesystemsr   r   datasets.iterable_datasetr   datasets.utilsr   datasets.utils.py_utilsr   utilslogging
get_loggerr   r   r6   pyspark.sqlBuilderConfigr   rR   r`   r.   r\   rE   rF   DatasetBuilderrb   r   r   r   r   <module>   sB    	
#