o
    ॵi<0                     @   s   d dl Z d dlZd dlZd dlmZ d dlZd dlZd dlmZ d dl	m	Z	 d dl
mZ d dlmZmZmZmZ d dlmZ d dlmZmZ e ZG d	d
 d
eZG dd deZG dd deZdS )    N)islice)IterableDataset)tqdm)MaxComputeUtil)DEFAULT_MAXCOMPUTE_ENDPOINTEXTENSIONS_TO_LOADMaxComputeEnvsVirgoDatasetConfig)
get_logger)fetch_csv_with_url	valid_urlc                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )ExternalDatasetz"Dataset class for custom datasets.c           	         s0  || _ t|| _| jd| j i d | _dd | j  D | _i | _d}| j  D ]U\} t	 t
rtj rt }tdd |D }d|v rLq*t|dkrfdt }td	| d
| d q*t|d }|tvrqq* fdd|D }|| j|< q*|rt|}tj|fd| ji|| _d S d S )Nsplit_configc                 S   s   i | ]\}}|g qS  r   ).0k_r   r   ]/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/msdatasets/dataset_cls/dataset.py
<dictcomp>    s    z,ExternalDataset.__init__.<locals>.<dictcomp> c                 S   s"   g | ]}t j|d  dqS ).)ospathsplitextstripr   	file_namer   r   r   
<listcomp>+   s    z,ExternalDataset.__init__.<locals>.<listcomp>   ,zSplit-z has been ignored, please flatten your folder structure, and make sure these files have same extensions. Supported extensions: z .r   c                    s   g | ]	}t j |qS r   )r   r   joinr   	split_dirr   r   r   =   s    
data_files)split_path_dictcopydeepcopyconfig_kwargsupdatespec_extension_datasetitemssplit_data_files
custom_map
isinstancestrr   r   isdirlistdirsetlenr!   r   keysloggererrorlistgetdatasetsload_dataset)	selfr%   r(   file_ext
split_namesplit_file_namesset_files_extssupported_extssplit_file_pathsr   r"   r   __init__   sV   



zExternalDataset.__init__c                 C   s   | j st| jS | j  S N)r*   r3   r%   __len__r;   r   r   r   rD   H   s   zExternalDataset.__len__c                 C   s   | j s	| j|S | j |S rC   )r*   r%   r8   __getitem__)r;   itemr   r   r   rF   N   s   zExternalDataset.__getitem__c                 c   sL    | j s| j D ]	\}}||fV  q	d S | j  D ]	\}}||fV  qd S rC   )r*   r%   r+   )r;   r   vr   r   r   __iter__T   s   zExternalDataset.__iter__N)__name__
__module____qualname____doc__rB   rD   rF   rI   r   r   r   r   r      s    .r   c                       sL   e Zd ZdZd fdd	Zdd Zdd Zd	d
 Zdd ZdddZ	  Z
S )NativeIterableDatasetz&The modelscope iterable dataset class.r   c                    s   t  j|||d || _d S )N)ex_iterableinfosplit)superrB   stream_batch_size)r;   rO   rP   rQ   rS   	__class__r   r   rB   `   s   
zNativeIterableDataset.__init__c                 c   s:    t | j| jddd| jddD ]
}| |}|V  qd S )NF
batch_sizedrop_last_batchzOverall progressT)desctotaldynamic_ncols)r   iterrS   n_shards_download_item)r;   rG   retr   r   r   rI   d   s   

zNativeIterableDataset.__iter__c                 C      | j S rC   )r]   rE   r   r   r   rD   o   s   zNativeIterableDataset.__len__c                 c   s    t |tr|}|d }d}n	|j}|j}|j}|dur$|dkr$tdtt| jddd|||ddd	D ]
}| 	|}|V  q5dS )
z`
        Returns the item at index `index` in the dataset. Slice indexing is supported.
        r   Nr   zstep must be positiveFrV   zSlicing progressT)rY   r[   )
r.   intstartstopstep
ValueErrorr   r   r\   r^   )r;   indexrb   rc   rd   rG   r_   r   r   r   rF   r   s*   


z!NativeIterableDataset.__getitem__c              
   C   s   i }t |trVz4| D ],\}}|||< |dr8| jjd}||}t |tr-|g}|||< |||	d< qW |S  t
yU } zt| |}W Y d }~|S d }~ww |}|S )Nz:FILE
dl_manager)r.   dictr+   endswith_ex_iterablekwargsr8   download_and_extractr/   r   	Exceptionr5   r6   )r;   rG   r_   r   rH   rg   ex_cache_pather   r   r   r^      s.   




z$NativeIterableDataset._download_item   c                 C   sF   g }|dkr|S d}|   D ]}||kr |S || |d7 }q|S )z
        Returns the first n rows of the dataset.

        Args:
            n (int): Number of rows to return.

        Returns:
            list: The list of results, e.g. [{'id': 'abc123', 'text': 'hello world'}, ...]
        r   r   )rI   append)r;   nresiter_numrG   r   r   r   head   s   

zNativeIterableDataset.head)r   )rp   )rJ   rK   rL   rM   rB   rI   rD   rF   r^   ru   __classcell__r   r   rT   r   rN   ]   s    rN   c                   @   sf   e Zd ZdZdd Zdd Zdd Zdd	 Zed
e	j
fddZdd Zed
efddZdd ZdS )VirgoDataseta  Dataset class for Virgo.

    Attributes:
        _meta_content (str): Virgo meta data content, could be a url that contains csv file.
        _data_type (int): Virgo dataset type, 0-Standard virgo dataset; Others-User define dataset (to be supported)

    Examples:
        >>> from modelscope.msdatasets.dataset_cls.dataset import VirgoDataset
        >>> input_kwargs = {'metaContent': 'http://xxx-xxx/xxx.csv', 'samplingType': 0}
        >>> virgo_dataset = VirgoDataset(**input_kwargs)
        >>> print(virgo_dataset[1])
        >>> print(len(virgo_dataset))
        >>> for line in virgo_dataset:
        >>>     print(line)

        Note: If you set `download_virgo_files` to True by using
            MsDataset.load(dataset_name='your-virgo-dataset-id', hub=Hubs.virgo, download_virgo_files=True),
            you can get the cache file path of the virgo dataset, the column name is `cache_file`.
        >>> if virgo_dataset.download_virgo_files:
        >>>     print(virgo_dataset[1].get('cache_file'))
    c                 K   s   d| _ d| _d| _d | _d | _|| _t | _| j	t
jd| _ | j	t
jd| _|   |   d| _d| _d| _d | _d | _| j	dd| _| j	dd | _| j	dd| _| jrj| j| j| j\| _| _d S d S )Nr   r   Fodps_batch_sized   
odps_limitodps_drop_last)_meta_content	data_typeodps_table_nameodps_table_partition_odps_utilsr(   pd	DataFrame_metapopr	   meta_contentsampling_type_check_variables_parse_metameta_content_cache_filevirgo_cache_dirdownload_virgo_filesodps_table_insodps_reader_insrx   rz   r{   get_table_reader_ins)r;   rk   r   r   r   rB      s8   
zVirgoDataset.__init__c              	   C   sB   | j rtj| j || j| j| j| jjj| jjj	dS | j
j|  S )N)readerrf   batch_size_inlimit_indrop_last_in
partitionscolumns)r   r   gen_reader_itemrx   rz   r{   r   table_schemar   namesr   ilocto_dict)r;   rf   r   r   r   rF      s   zVirgoDataset.__getitem__c                 C   s$   t | jtr| jddS t| jS )N	odpsCountr   )r.   r   rh   r8   r3   rE   r   r   r   rD      s   
zVirgoDataset.__len__c                 c   sh    | j r#tj| j | j| j| j| jjj| jjj	d}|D ]}|V  qd S | j
 D ]	\}}| V  q(d S )N)r   r   r   r   r   r   )r   r   gen_reader_batchrx   rz   r{   r   r   r   r   r   iterrowsr   )r;   odps_batch_databatchr   rowr   r   r   rI     s    zVirgoDataset.__iter__returnc                 C   r`   )z
        Virgo meta data. Contains columns: id, meta_info, analysis_result, external_info and
            cache_file (if download_virgo_files is True).
        )r   rE   r   r   r   meta  s   zVirgoDataset.metac                 C   sp   t | jtrt| jrt| j}|| _d S t | jtr6| j| _| jdd| _| jdd | _	| 
 | _d S d)NodpsTableNamer   odpsTablePartitionz%The meta content must be url or dict.)r.   r|   r/   r   r   r   rh   r8   r~   r   _get_odps_infor   )r;   meta_content_dfr   r   r   r     s   

zVirgoDataset._parse_metac               
   C   s   t jtjd} t jtjd}t jtjd}t jtjt}| r&|r&|s;t	dtj dtj dtj dtj d	t
| |||S )z
        Get MaxComputeUtil instance.

        Args:
            None

        Returns:
            MaxComputeUtil instance.
        r   z&Please set MaxCompute envs for Virgo: z, z6(default: http://service-corp.odps.aliyun-inc.com/api))r   environr8   r   	ACCESS_IDACCESS_SECRET_KEYPROJECT_NAMEENDPOINTr   re   r   )	access_id
access_key	proj_nameendpointr   r   r   r   '  s"   

zVirgoDataset._get_odps_infoc                 C   sR   | j sd| jdvrd| jdkrt| j sd| jdkr%t| j ts'ddS dS )	zCheck member variables in this class.
            1. Condition-1: self._meta_content cannot be empty
            2. Condition-2: self._meta_content must be url when self._data_type is 0
        z"Them meta content cannot be empty.)r   r   zFSupported samplingType should be 0 or 1, others are not supported yet.r   z1The meta content must be url when data type is 0.r   z2The meta content must be dict when data type is 1.N)r|   r}   r   r.   rh   rE   r   r   r   r   A  s   
zVirgoDataset._check_variablesN)rJ   rK   rL   rM   rB   rF   rD   rI   propertyr   r   r   r   staticmethodr   r   r   r   r   r   r   rw      s     rw   )r&   mathr   	itertoolsr   r9   pandasr   r   r   ,modelscope.msdatasets.utils.maxcompute_utilsr   modelscope.utils.constantr   r   r   r	   modelscope.utils.loggerr
   modelscope.utils.url_utilsr   r   r5   objectr   rN   rw   r   r   r   r   <module>   s    F\