o
    ॵi\1                     @   s  d dl Z d dlmZmZ d dlmZmZ d dlmZm	Z	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZmZ d dl m!Z! d dl"m#Z# e! Z$G dd deZ%G dd de%Z&G dd de%Z'G dd de%Z(dS )    N)ABCabstractmethod)OptionalUnion)DatasetDatasetBuilderDatasetDictIterableDatasetIterableDatasetDict)load_dataset)ModelScopeConfig)OssAuthConfig)DatasetContextConfig)DataFilesManager)ExternalDataset)DataMetaManager)DatasetFormationsDatasetPathNameDownloadModeVirgoDatasetConfig)
get_logger)	valid_urlc                   @   sZ   e Zd ZdZdefddZedd Zedd Zed	d
 Z	edd Z
edd ZdS )BaseDownloaderz%Base dataset downloader to load data.dataset_context_configc                 C   s
   || _ d S Nr   selfr    r   a/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/msdatasets/data_loader/data_loader.py__init__   s   
zBaseDownloader.__init__c                 C      t dtj d)z6The entity processing pipeline for fetching the data. 'No default implementation provided for z	.process.NotImplementedErrorr   __name__r   r   r   r   process!   s   zBaseDownloader.processc                 C   r!   )Nr"   z._authorize.r#   r&   r   r   r   
_authorize(      zBaseDownloader._authorizec                 C   r!   )Nr"   z._build.r#   r&   r   r   r   _build.   r)   zBaseDownloader._buildc                 C   r!   )Nr"   z._prepare_and_download.r#   r&   r   r   r   _prepare_and_download4   r)   z$BaseDownloader._prepare_and_downloadc                 C   r!   )Nr"   z._post_process.r#   r&   r   r   r   _post_process:   r)   zBaseDownloader._post_processN)r%   
__module____qualname____doc__r   r    r   r'   r(   r*   r+   r,   r   r   r   r   r      s    



r   c                       sT   e Zd Zdef fddZdddZddd	Zdd
dZdddZdddZ	  Z
S )OssDownloaderr   c                    s(   t  | d | _d | _d | _d | _d S r   )superr    data_files_builderdatasetbuilderdata_files_managerr   	__class__r   r   r    C   s   
zOssDownloader.__init__returnNc                 C   $   |    |   |   |   dS )z Sequential data fetching process: authorize -> build -> prepare_and_download -> post_process,
        to keep dataset_context_config updated. Nr(   r*   r+   r,   r&   r   r   r   r'   M      zOssDownloader.processc                 C   sV   t  }t  }t  }| jjst|||d}n| jj}||_||_||_	|| j_dS )zs Authorization of target dataset.
        Get credentials from cache and send to the modelscope-hub in the future. cookies	git_token	user_infoN)
r   get_cookies	get_tokenget_user_infor   auth_configr   r=   r>   r?   )r   r=   r>   r?   rC   r   r   r   r(   V   s   zOssDownloader._authorizec                 C   s@   t | j}|  |  |j| _t| jd| _| j | _dS )zj Sequential data files building process: build_meta -> build_data_files , to keep context_config updated. r   N)r   r   fetch_meta_filesparse_dataset_structurer   r5   get_data_files_builderr4   )r   meta_managerr   r   r   r*   h   s   
zOssDownloader._buildc                 C   s   | j jj}| j jj}| j j}| j j}| j j}| j j}| j j}| j j	}| j j
}	| j j}
| j j}| jdu r;|s;d| d|rW|tjkrWt|f||||||	|
jdd|| _dS | j| j| _dS )z/ Fetch data-files from modelscope dataset-hub. Nzmeta-file: z$.py not found on the modelscope hub.T)namerevisionsplitdata_dir
data_files	cache_dirdownload_modeignore_verifications)r   data_meta_configdataset_py_scriptdataset_formationdataset_namesubset_nameversionrJ   rK   rL   cache_root_dirrN   config_kwargsr4   r   hf_compatiblehf_load_datasetvaluer3   r5   fetch_data_files)r   rQ   rR   rS   rT   rU   rJ   rK   rL   rM   rN   input_kwargsr   r   r   r+   u   s>   



z#OssDownloader._prepare_and_downloadc                 C   s"   t | jtr| jjj| j_d S d S r   )
isinstancer3   r   r   rP   meta_type_map
custom_mapr&   r   r   r   r,      s   zOssDownloader._post_process)r8   N)r%   r-   r.   r   r    r'   r(   r*   r+   r,   __classcell__r   r   r6   r   r0   A   s    


	

!r0   c                       N   e Zd ZdZdef fddZdd Zdd Zd	d
 Zdd Z	dd Z
  ZS )VirgoDownloaderz&Data downloader for Virgo data source.r   c                       t  | d | _d S r   r1   r    r3   r   r6   r   r   r          
zVirgoDownloader.__init__c                 C   r9   )z|
        Sequential data fetching virgo dataset process: authorize -> build -> prepare_and_download -> post_process
        Nr:   r&   r   r   r   r'      r;   zVirgoDownloader.processc                 C   sZ   ddl m} t }t }| jjs||d|d}n| jj}||_d|_||_	|| j_dS )zAuthorization of virgo dataset.r   )VirgoAuthConfig r<   N)
&modelscope.msdatasets.auth.auth_configrf   r   r@   rB   r   rC   r=   r>   r?   )r   rf   r=   r?   rC   r   r   r   r(      s   zVirgoDownloader._authorizec                 C   s   ddl m} ddl}t| j}|  |j| _|d
i | jj| _tj	
| jj| jj| jj| jj}tjtj	
|tjdd tj	
|tjd}t| jj|jrm| jj}|j|dd || j_|| j_td	|  dS dS )z;
        Fetch virgo meta and build virgo dataset.
        r   )VirgoDatasetNTexist_okzmeta_content.csvF)indexzVirgo meta content saved to r   ))modelscope.msdatasets.dataset_cls.datasetri   pandasr   r   fetch_virgo_metarW   r3   ospathjoinrV   	namespacerS   rU   makedirsr   	META_NAMEr]   meta	DataFrameto_csvmeta_content_cache_filevirgo_cache_dirloggerinfo)r   ri   pdrG   rz   ry   meta_content_dfr   r   r   r*      s>   
zVirgoDownloader._buildc                    s   | j jdd}| jjdkrp|rrddlddlddl}ddlm	 ddl
m fddd	| j_| j j}tj| jjtj |tjkrN|j d	d
 ddlm} |jdd | jjj fdddd| jjtj< dS dS dS )zK
        Fetch data-files from oss-urls in the virgo meta content.
        download_virgo_filesrg   r   N)urlparse)partialc              
      sp  g }g }zY  | } | dd}|r|| n| dd}|D ]}|dd}|r/|| q |D ])}t|}|rF|}	tj|	j}
ntd| tj||
}|||f q2W n t	yy } zt
d|  g }W Y d }~nd }~ww |D ]9\}}|rtj|st
d|  tj|dd t|d	}||j W d    n1 sw   Y  q||S )
Nurlrg   	inner_urlzUnsupported url: zparse virgo meta info error: zDownloading file to Trj   wb)loadsgetappendr   rp   rq   basename
ValueErrorrr   	Exceptionr{   errorexistsr|   rt   openwritecontent)meta_info_valrK   file_url_listfile_path_listfile_urltmp_inner_member_listitemone_file_urlis_urlurl_parse_res	file_name	file_pathefile_url_itemfile_path_itemf)jsonrequestsr   r   r   download_file   sL   


z<VirgoDownloader._prepare_and_download.<locals>.download_fileT)ignore_errors)tqdmzapply download_file)descc                    s    d| j S )N)rK   )	meta_info)row)data_files_dirr   r   r   r   <lambda>"  s
    z7VirgoDownloader._prepare_and_download.<locals>.<lambda>   )axis)r   rW   popr3   	data_typer   r   shutilurllib.parser   	functoolsr   r   rN   rp   rq   rr   rz   r   DATA_FILES_NAMEr   FORCE_REDOWNLOADrmtreer   rn   rv   progress_applyr   col_cache_file)r   r   r   rN   r   r   )r   r   r   r   r   r   r   r+      s8   (
z%VirgoDownloader._prepare_and_downloadc                 C      d S r   r   r&   r   r   r   r,   &     zVirgoDownloader._post_processr%   r-   r.   r/   r   r    r'   r(   r*   r+   r,   r`   r   r   r6   r   rb      s    	!Hrb   c                       ra   )MaxComputeDownloaderz+Data downloader for MaxCompute data source.r   c                    rc   r   rd   r   r6   r   r   r    .  re   zMaxComputeDownloader.__init__c                 C   r   r   r   r&   r   r   r   r'   2  r   zMaxComputeDownloader.processc                 C   r   r   r   r&   r   r   r   r(   5  r   zMaxComputeDownloader._authorizec                 C   r   r   r   r&   r   r   r   r*   8  r   zMaxComputeDownloader._buildc                 C   r   r   r   r&   r   r   r   r+   ;  r   z*MaxComputeDownloader._prepare_and_downloadc                 C   r   r   r   r&   r   r   r   r,   >  r   z"MaxComputeDownloader._post_processr   r   r   r6   r   r   *  s    r   ))rp   abcr   r   typingr   r   datasetsr   r   r   r	   r
   r   rY   modelscope.hub.apir   rh   r   4modelscope.msdatasets.context.dataset_context_configr   3modelscope.msdatasets.data_files.data_files_managerr   !modelscope.msdatasets.dataset_clsr   ,modelscope.msdatasets.meta.data_meta_managerr   modelscope.utils.constantr   r   r   r   modelscope.utils.loggerr   modelscope.utils.url_utilsr   r{   r   r0   rb   r   r   r   r   r   <module>   s(   &Z 