o
    $iS
                     @   s   d dl mZ d dlmZmZmZ d dlZd dlmZm	Z	 er$d dl
mZ deee  ded fd	d
Zg dZdedefddZdedefddZde	de	fddZdS )    )ThreadPoolExecutor)TYPE_CHECKINGAnyListN)BlockAccessorCallableClass	RefBundlesimple_datareturnr	   c              
   C   st   ddl }ddl}ddlm} g }| D ]%}|d|i}||t|t	|
 fgd|jjj|ddd q|S )	zcCreate ref bundles from a list of block data.

    One bundle is created for each input block.
    r   Nr   idTF)preserve_index)owns_blocksschema)pandaspyarrow'ray.data._internal.execution.interfacesr	   	DataFrameappendrayputr   	for_blockget_metadatalibSchemafrom_pandas)r
   pdpar	   outputblock r    ^/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/execution/util.pymake_ref_bundles   s"   r"   )BKiBMiBGiBTiBPiB	num_bytesc                 C   sV   d}| dkr"|t td k r"| d } |d7 }| dkr"|t td k s| dt|  S )zDReturn a human-readable memory string for the given amount of bytes.r   i      z.1f)lenmemory_units)r)   kr    r    r!   memory_string*   s   r.   locality_hitsc                 C   s   |sdS d|  d| |  dS )z9Return a human-readable string for object locality stats.z[all objects local][/z objects local]r    )r/   locality_missesr    r    r!   locality_string3   s   r3   callable_clsc                 C   s   G dd d| }|S )aB  Returns a thread-safe CallableClass with the same logic as the provided
    `callable_cls`.

    This function allows the usage of concurrent actors by safeguarding user logic
    behind a separate thread.

    This allows batch slicing and formatting to occur concurrently, to overlap with the
    user provided UDF.
    c                       s4   e Zd Z fddZ fddZ fddZ  ZS )zCmake_callable_class_single_threaded.<locals>._SingleThreadedWrapperc                    s"   t dd| _t j|i | d S )Nr*   )max_workers)r   thread_pool_executorsuper__init__)selfargskwargs	__class__r    r!   r8   F   s   zLmake_callable_class_single_threaded.<locals>._SingleThreadedWrapper.__init__c                    s
   t   S N)r7   __repr__)r9   r<   r    r!   r?   J   s   
zLmake_callable_class_single_threaded.<locals>._SingleThreadedWrapper.__repr__c                    s&   | j jt jg|R i |}| S r>   )r6   submitr7   __call__result)r9   r:   r;   futurer<   r    r!   rA   M   s   zLmake_callable_class_single_threaded.<locals>._SingleThreadedWrapper.__call__)__name__
__module____qualname__r8   r?   rA   __classcell__r    r    r<   r!   _SingleThreadedWrapperE   s    rH   r    )r4   rH   r    r    r!   #make_callable_class_single_threaded:   s   rI   )concurrent.futuresr   typingr   r   r   r   ray.data.blockr   r   r   r	   r"   r,   floatstrr.   intr3   rI   r    r    r    r!   <module>   s    	