o
    $i^                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ ddlmZm	Z	 zd dl
mZ d dlmZmZ W n ey=   dZY nw zd dlZdZW n	 eyO   Y nw edurfG d	d
 d
eZdd Zdd ZdS dd ZdS )    N)core)_concat)HighLevelGraph   )MultipleReturnFuncmultiple_return_get)optimize)SimpleShuffleLayershuffle_groupc                   @   s>   e Zd ZedefddZdd Zdd Zdd	 Zd
d Z	dS ) MultipleReturnSimpleShuffleLayerlayerc                 C   s,   | |j |j|j|j|j|j|j|j|jd	S )N	namecolumnnpartitionsnpartitions_inputignore_index
name_input
meta_input	parts_outannotationsr   )clsr    r   X/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/util/dask/optimizations.pyclone   s   z&MultipleReturnSimpleShuffleLayer.clonec                 C   s   d| j  d| j dS )Nz'MultipleReturnSimpleShuffleLayer<name='z', npartitions=>)r   r   selfr   r   r   __repr__,   s   
z)MultipleReturnSimpleShuffleLayer.__repr__c                    s"   g d}t t fdd|D fS )Nr   c                 3   s    | ]}t  |V  qd S )N)getattr).0attrr   r   r   	<genexpr>@   s    z>MultipleReturnSimpleShuffleLayer.__reduce__.<locals>.<genexpr>)r   tuple)r   attrsr   r   r   
__reduce__2   s   z+MultipleReturnSimpleShuffleLayer.__reduce__c              
   C   s&   t | j| j| j| j| j| j| j|dS )N)r   )r   r   r   r   r   r   r   r   )r   r   r   r   r   _cullC   s   z&MultipleReturnSimpleShuffleLayer._cullc              
      s   d| j  }d| j  i }t| j}| jD ]M  fddt| jD }t|| jf|| j  f< |D ]0\}}}t||f|f|||f< ||f|vr`tt	|| j
|f| jd| j| j| j| jf|||f< q0q|S )z/Construct graph for a simple shuffle operation.zgroup-zsplit-c                    s   g | ]} |fqS r   r   )r    part_inpart_outshuffle_split_namer   r   
<listcomp>Y   s    zEMultipleReturnSimpleShuffleLayer._construct_graph.<locals>.<listcomp>r   )r   lenr   ranger   r   r   r   r   r
   r   r   r   )r   shuffle_group_namedskn_parts_out_concat_list_	_part_out_part_inr   r(   r   _construct_graphO   s<   



z1MultipleReturnSimpleShuffleLayer._construct_graphN)
__name__
__module____qualname__classmethodr	   r   r   r%   r&   r5   r   r   r   r   r      s    r   c                 C   sd   t | tstjt| | dd} n|  } | j }| D ]\}}t|tu r/t	
|| j|< q| S )Nr   dependencies)
isinstancer   from_collectionsidcopylayersitemstyper	   r   r   )r/   keysr@   keyr   r   r   r   rewrite_simple_shuffle_layeru   s   

rE   c                 K   s^   t |ttfs
|g}tt|}t | ts tjt| | dd} t| |d} t	| |fi |S )Nr   r:   )rC   )
r<   listsetr   flattenr   r=   r>   rE   r   r/   rC   kwargsr   r   r   dataframe_optimize   s   
rK   c                 K   s   t dtj d d S )NzbCustom dataframe shuffle optimization only works on dask>=2024.11.0,<2025.1.0, you are on version z=.Doing no additional optimization aside from the default one.)warningswarndask__version__rI   r   r   r   rK      s   )rL   rN   r   dask.dataframe.corer   dask.highlevelgraphr   	schedulerr   r   dask.dataframe.optimizer   dask.dataframe.shuffler	   r
   ImportError	dask_exprr   rE   rK   r   r   r   r   <module>   s.    Z