o
    ॵi                     @   s"   d dl Z d dlZG dd dZdS )    Nc                   @   s   e Zd ZdZdd Zdd Zdededejfd	d
Z	dedededdfddZ
edd Zedededededef
ddZededededededefddZddedefddZdS )MaxComputeUtila  
    MaxCompute util class.

    Args:
        access_id: your access id of MaxCompute
        access_key: access key of MaxCompute
        project_name: your project name of MaxCompute
        endpoint: endpoint of MaxCompute

    Attributes:
        _odps: ODPS object

    c                 C   s    ddl m} |||||| _d S )Nr   )ODPS)odpsr   _odps)self	access_id
access_keyproject_nameendpointr    r   `/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/msdatasets/utils/maxcompute_utils.py__init__   s   zMaxComputeUtil.__init__c                 C   s   | j |S )z.
        Get MaxCompute table object.
        )r   	get_table)r   
table_namer   r   r   
_get_table   s   zMaxComputeUtil._get_tabler   pt_conditionreturnc                 C   sF   |  |}|j|dd}| }W d   |S 1 sw   Y  |S )z
        Read data from MaxCompute table.
        :param table_name: table name
        :param pt_condition: partition condition,
            Example: pt_condition = 'dt=20230331'
        :return: pandas dataframe with all data
        F)	partitionlimitN)r   open_reader	to_pandas)r   r   r   treaderpd_dfr   r   r   
_read_data!   s   


zMaxComputeUtil._read_dataoutput_pathNc                 C   s.   |  ||}|j|dd td| d dS )a	  
        Fetch data from MaxCompute table to local file.
        :param table_name: table name
        :param pt_condition: partition condition,
            Example: pt_condition = 'dt=20230331'
        :param output_path: output path
        :return: None
        F)indexzFetch data to z successfully.N)r   to_csvprint)r   r   r   r   r   r   r   r   fetch_data_to_csv0   s   
z MaxComputeUtil.fetch_data_to_csvc                 C   s4   |s| j }|dkrtd| ||kr|}||fS )Nr   z%batch_size must be positive, but got )count
ValueError)r   
batch_sizer   r   r   r   _check_batch_args>   s   z MaxComputeUtil._check_batch_argsbatch_size_inlimit_indrop_last_in
partitionscolumnsc                 c   s    t | ||\}}t|| }t|d D ]R}||kr4|s4|| dkr4| || || ||   }n| || |d |  }g }	|D ]}
dd t|
D }|dt|t|  }|	| qDtj	|	|dV  qdS )a  
        Generate batch data from MaxCompute table.

        Args:
            reader: MaxCompute table reader
            batch_size_in: batch size
            limit_in: limit of data, None means fetch all data
            drop_last_in: whether drop last incomplete batch data
            partitions: table partitions
            columns: table columns

        Returns:
            batch data generator
           r   c                 S      g | ]\}}|qS r   r   .0_valr   r   r   
<listcomp>h       z3MaxComputeUtil.gen_reader_batch.<locals>.<listcomp>Nr(   )
r   r#   mathfloorrangelistlenappendpd	DataFrame)r   r$   r%   r&   r'   r(   	batch_numibatch_recordsbatch_data_listrecordtmp_valsr   r   r   gen_reader_batchI   s(   zMaxComputeUtil.gen_reader_batchr   c                 C   s   t | ||\}}|rt|| }nt|| }|dk r%td| ||kr3td| d| || }|d | }	|	|krC|}	| ||	 }
g }|
D ]}dd t|D }|dt|t|  }|| qMt	j
||d	S )
a  
        Get single batch data from MaxCompute table by indexing.

        Args:
            reader: MaxCompute table reader
            index: index of batch data
            batch_size_in: batch size
            limit_in: limit of data, None means fetch all data
            drop_last_in: whether drop last incomplete batch data
            partitions: table partitions
            columns: table columns

        Returns:
            single batch data (dataframe)
        r   z$index must be non-negative, but got z1index must be less than batch_num, but got index=z, batch_num=r)   c                 S   r*   r   r   r+   r   r   r   r/      r0   z2MaxComputeUtil.gen_reader_item.<locals>.<listcomp>Nr1   )r   r#   r2   r3   ceilr!   r5   r6   r7   r8   r9   )r   r   r$   r%   r&   r'   r(   r:   startend
batch_itemr=   r>   r?   r   r   r   gen_reader_itemm   s.   zMaxComputeUtil.gen_reader_itemc                 C   sB   |  |}|j|d}||fW  d    S 1 sw   Y  d S )N)r   )r   r   )r   r   r   	table_insr   r   r   r   get_table_reader_ins   s   
$z#MaxComputeUtil.get_table_reader_ins)N)__name__
__module____qualname____doc__r   r   strr8   r9   r   r   staticmethodr#   intboolr5   r@   rE   rG   r   r   r   r   r      s:    




#.r   )r2   pandasr8   r   r   r   r   r   <module>   s   