o
    bi=                     @   s|   d dl Z d dlmZmZmZmZ d dlmZmZ d dl	m
Z
mZ er&d dlZe eZG dd de
Zdedefd	d
ZdS )    N)TYPE_CHECKINGDictListOptional)BlockBlockMetadata)
DatasourceReadTaskc                   @   s   e Zd ZdZ		ddedededeee  ded f
d	d
Zdee	 fddZ
dee defddZdd Zde	dee fddZdS )MongoDatasourcez3Datasource for reading from and writing to MongoDB.Nuridatabase
collectionpipelineschemapymongoarrow.api.Schemac                 K   sF   || _ || _|| _|| _|| _|| _|sddddiiig| _d | _d S )N$match_idz$existstrue)_uri	_database_collection	_pipeline_schema_mongo_args_client)selfr   r   r   r   r   
mongo_args r   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/mongo_datasource.py__init__   s   	
zMongoDatasource.__init__returnc                 C   s   d S Nr   )r   r   r   r   estimate_inmemory_data_size%   s   z+MongoDatasource.estimate_inmemory_data_sizec                 C   s(   t |dksd|d vri S |d d S )Nr   r   )len)r   r   r   r   r   _get_match_query)   s   z MongoDatasource._get_match_queryc                 C   sV   dd l }| jd u r)|| j| _t| j| j| j | j| j d| jd | _d S d S )Nr   	collstats
avgObjSize)	pymongor   MongoClientr   #_validate_database_collection_existr   r   command_avg_obj_size)r   r'   r   r   r   _get_or_create_client.   s   

z%MongoDatasource._get_or_create_clientparallelismc                    s"  ddl m} |   | j| j | j }| | j}t|j	d|idd|digdd}d	t
d
t
dt
dtt d|d|dtdddtdtfdd g }t|D ]B\}}t|d |d | j d d d}	| j| j| j| j|d d |d d |t|d k| j| jf	}
t|
f fdd	|	}|| qL|S )Nr   )ObjectIdr   z$bucketAutoz$_id)groupBybucketsT)allowDiskUser   r   r   r   min_idmax_idright_closedr   r   kwargsr    c	                 S   s^   dd l }	ddlm}
 ddd||rdnd|iiig}|	| }|
|| | || fd|i|S )	Nr   )aggregate_arrow_allr   r   z$gtez$ltez$ltr   )r'   pymongoarrow.apir6   r(   )r   r   r   r   r2   r3   r4   r   r5   r'   r6   matchclientr   r   r   
make_blockJ   s$   

z2MongoDatasource.get_read_tasks.<locals>.make_blockcount)num_rows
size_bytesinput_files
exec_statsr   minmax   c                    s
    |  gS r!   r   )argsr:   r   r   <lambda>}   s   
 z0MongoDatasource.get_read_tasks.<locals>.<lambda>)bson.objectidr.   r,   r   r   r   r$   r   list	aggregatestrr   r   booldictr   	enumerater   r+   r   r#   r   r   r	   append)r   r-   r.   collmatch_querypartitions_ids
read_tasksi	partitionmetadatamake_block_args	read_taskr   rD   r   get_read_tasks:   sr   
	



zMongoDatasource.get_read_tasks)NN)__name__
__module____qualname____doc__rI   r   r   r   r   intr"   r$   r,   r	   rW   r   r   r   r   r
      s&    

r
   r   r   c                 C   sH   |   }||vrtd| d| |  }||vr"td| dd S )NzThe destination database z doesn't exist.zThe destination collection )list_database_names
ValueErrorlist_collection_names)r9   r   r   db_namescollection_namesr   r   r   r)      s   r)   )loggingtypingr   r   r   r   ray.data.blockr   r   ray.data.datasource.datasourcer   r	   r7   pymongoarrow	getLoggerrX   loggerr
   rI   r)   r   r   r   r   <module>   s    
x