o
    ciBB                     @   s*  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZe	dZ
dd Zdd Zdd Zdd	d
ddZdd Zdd Z	d9ddZdd Zdd Zdd Zdd Zdd Zd Zd Zd!d" Zd#d$ Zd%d& Zd'Zd(Zd)Zd*Zd+Z d,Z!d-d. Z"d/d0 Z#d1d2 Z$d3d4 Z%d5d6 Z&d7d8 Z'dS ):    Nzray.util.spark.utilsc                   C   s
   dt jv S )NDATABRICKS_RUNTIME_VERSION)osenviron r   r   H/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/spark/utils.pyis_in_databricks_runtime   s   
r   c                 C   s,   d | }d |}d| d| d| dS )N  zCommand z failed with return code z", tail output are included below.

)join)cmdreturn_codetail_output_dequecmd_strtail_outputr   r   r   gen_cmd_exec_failure_msg   s   

r   c                 C   sD   | j dd }t|d d }|d }ddddd}|||  S )	Nzspark.executor.memory1g   i   i   @l        )kmgt)confgetlowerint)spark	value_str	value_num
value_unitunit_mapr   r   r   *get_configured_spark_executor_memory_bytes   s   r"   T)	extra_envsynchronousc                   s   t | h d}|rtdt| |dd}|dur(|dur(td|du r.|ni tj|}tj	| f|dtj
tjd| tjdd	 fd
d}tj|dd  |sa fS   }|dkrqtt| |dS )a  
    A convenience wrapper of `subprocess.Popen` for running a command from a Python
    script.
    If `synchronous` is True, wait until the process terminated and if subprocess
    return code is not 0, raise error containing last 100 lines output.
    If `synchronous` is False, return an `Popen` instance and a deque instance holding
    tail outputs.
    The subprocess stdout / stderr output will be streamly redirected to current
    process stdout.
    >   textstderrstdoutz`kwargs` cannot contain envNz5`extra_env` and `env` cannot be used at the same timeT)r(   r%   r'   r&   d   )maxlenc                     s&    j D ]} |  tj |  qd S N)r'   appendsyswrite)lineprocessr   r   r   redirect_log_thread_fnO   s   

z(exec_cmd.<locals>.redirect_log_thread_fnr   )targetargsr   )setkeysintersection
ValueErrorlistpopr   r   
subprocessPopenPIPESTDOUTcollectionsdeque	threadingThreadstartwaitRuntimeErrorr   )r   r#   r$   kwargsillegal_kwargsr(   r2   r   r   r0   r   exec_cmd)   s8   	
rH   c                 C   s^   dd l }ddlm} || |j|j}|| |fdkW  d    S 1 s(w   Y  d S )Nr   )closing)socket
contextlibrI   AF_INETSOCK_STREAM
connect_ex)hostportrJ   rI   sockr   r   r   is_port_in_usec   s
   $rR   c                 C   sD   t   }t   | |k r t| |rdS t d t   | |k sdS )NT   F)timerR   sleep)rO   rP   timeoutbeg_timer   r   r   _wait_service_upk   s   

rX   r     r)   c                 C   s\   t  }|pg }t|D ]}|||}||v rqt| |s"|  S qtd| d| d)z!
    Get random unused port.
    z!Get available port between range z and z failed.)randomSystemRandomrangerandintrR   rE   )rO   min_portmax_portmax_retriesexclude_listrng_rP   r   r   r   get_random_unused_portv   s   
rd   c                  C   s(   ddl m}  |  }|d u rtd|S )Nr   )SparkSessionzSpark session haven't been initiated yet. Please use `SparkSession.builder` to create a spark session and connect to a spark cluster.)pyspark.sqlre   getActiveSessionrE   )re   spark_sessionr   r   r   get_spark_session   s   ri   c                 C   s   | j dS )Nzspark.driver.host)r   r   )r   r   r   r   !get_spark_application_driver_host   s   rj   c                 C   sV   | j  }|dur"dd }| dgd||  ||jS || 	 S )z0Gets the current max number of concurrent tasks.Nc                 S   s   d S r+   r   )rc   r   r   r   dummpy_mapper   s   z3get_max_num_concurrent_tasks.<locals>.dummpy_mapperrS   )
_jscscparallelizewithResourcesmapcollectmaxNumConcurrentTasks_java_resource_profileresourceProfileManagerdefaultResourceProfile)spark_contextresource_profilesscrk   r   r   r   get_max_num_concurrent_tasks   s   

ry   c                  C   s*   dd l } ttjv rttjt S |  jS Nr   )psutil)RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTESr   r   r   virtual_memorytotal)r{   r   r   r   '_get_spark_worker_total_physical_memory   s   

r   c                  C   s,   dd l } ttjv rttjt S | djS Nr   z/dev/shm)shutil'RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTESr   r   r   
disk_usager~   )r   r   r   r   %_get_spark_worker_total_shared_memory   s   
r   g?c           	      C   s   dd l }dd l}ttjv rttjt }n| j}|t }t	tjv r+ttjt	 }n|
dj}|t }t||| |\}}}|d urHt| ||fS r   )r{   r   )RAY_ON_SPARK_DRIVER_PHYSICAL_MEMORY_BYTESr   r   r   r}   r~   '_RAY_ON_SPARK_NODE_MEMORY_BUFFER_OFFSET'RAY_ON_SPARK_DRIVER_SHARED_MEMORY_BYTESr   _calc_mem_per_ray_node_loggerwarning)	configured_heap_memory_bytesconfigured_object_store_bytesr{   r   available_physical_memavailable_shared_memheap_mem_bytesobject_store_byteswarning_msgr   r   r   calc_mem_ray_head_node   s.   




r   c                 C   s.   t ||  t }t ||  t }t||||S r+   )r   r   r   )num_task_slotsphysical_mem_bytesshared_mem_bytesr   r   available_physical_mem_per_nodeavailable_shared_mem_per_noder   r   r   _calc_mem_per_ray_worker_node   s   

r   c           
      C   s   ddl m}m} d }|p| | }tjds||kr|}| t }||kr(|}d}||k r?||kr7d| d}nd| d}|}t|}|d u rNt| | }	nt|}	|	||fS )	Nr   )&DEFAULT_OBJECT_STORE_MEMORY_PROPORTION!OBJECT_STORE_MINIMUM_MEMORY_BYTES#RAY_OBJECT_STORE_ALLOW_SLOW_STORAGEzzYour configured `object_store_memory_per_node` value is too high and it is capped by 80% of per-Ray node allocated memory.zYour operating system is configured with too small /dev/shm size, so `object_store_memory_worker_node` value is configured to minimal size (z- bytes),Please increase system /dev/shm size.zYou configured too small Ray node object store memory size, so `object_store_memory_worker_node` value is configured to minimal size (zI bytes),Please increase 'object_store_memory_worker_node' argument value.)ray._private.ray_constantsr   r   r   r   r   0_RAY_ON_SPARK_MAX_OBJECT_STORE_MEMORY_PROPORTIONr   )
r   r   r   r   r   r   r   r   object_store_bytes_upper_boundr   r   r   r   r     s@   
r   RAY_ON_SPARK_WORKER_CPU_CORESRAY_ON_SPARK_WORKER_GPU_NUMr|   r   r   r   c                  C   s(   dd l } ttjv rttjt S |  S rz   )multiprocessingr   r   r   r   	cpu_count)r   r   r   r   _get_cpu_coresS  s   
r   c               
   C   s   t tjv rttjt  S tdd u rdS ztjdddddd} t| j	
 dW S  tyG } ztdt|  W Y d }~dS d }~ww )Nz
nvidia-smir   z1nvidia-smi --query-gpu=name --format=csv,noheaderT)shellcheckr%   capture_outputr
   zU'nvidia-smi --query-gpu=name --format=csv,noheader' command execution failed, error: )r   r   r   r   r   whichr;   runlenr'   stripsplit	Exceptionr   inforepr)completed_procer   r   r   _get_num_physical_gpusa  s,   
r   c                 C   sd   || krt d| d|  d| | }|dkr0||kr&t d| d| d||| kr0|| }|S )Nzscpu number per Ray worker node should be <= spark worker node CPU cores, you set cpu number per Ray worker node to z* but spark worker node CPU core number is .r   z|gpu number per Ray worker node should be <= spark worker node GPU number, you set GPU devices number per Ray worker node to z- but spark worker node GPU devices number is )r8   )num_cpusnum_gpusnum_cpus_per_nodenum_gpus_per_nodenum_ray_node_slotsr   r   r   _get_local_ray_node_slots~  s*   r   c                 C   sV   t  }|dkrt }nd}t||| |}t }t }t|||||\}	}
}|	|
d|fS )z
    Returns tuple of (
        ray_worker_node_heap_mem_bytes,
        ray_worker_node_object_store_bytes,
        error_message, # always None
        warning_message,
    )
    r   N)r   r   r   r   r   r   )r   r   heap_memory_per_nodeobject_store_memory_per_noder   r   r   r   r   ray_worker_node_heap_mem_bytes"ray_worker_node_object_store_bytesr   r   r   r   "_get_avail_mem_per_ray_worker_node  s2   	r   c           
         sj    fdd}| j dgd| d \}}}}	|dur(td| d|	dur1t|	 ||fS )a<  
    Return the available heap memory and object store memory for each ray worker,
    and error / warning message if it has.
    Return value is a tuple of
    (ray_worker_node_heap_mem_bytes, ray_worker_node_object_store_bytes,
     error_message, warning_message)
    NB: We have one ray node per spark task.
    c              
      sd   zt  W S  ty1 } zdd l}d||j}ddt|| d fW  Y d }~S d }~ww )Nr   r
   r   )r   r   	tracebackr   	format_tb__traceback__r   )rc   r   r   	trace_msgr   r   r   r   r   r   mapper  s    z1get_avail_mem_per_ray_worker_node.<locals>.mapperrS   r   Nz:Inferring ray worker node available memory failed, error: a  . You can bypass this error by setting following spark configs: spark.executorEnv.RAY_ON_SPARK_WORKER_CPU_CORES, spark.executorEnv.RAY_ON_SPARK_WORKER_GPU_NUM, spark.executorEnv.RAY_ON_SPARK_WORKER_PHYSICAL_MEMORY_BYTES, spark.executorEnv.RAY_ON_SPARK_WORKER_SHARED_MEMORY_BYTES.)sparkContextrn   rp   rq   rE   r   r   )
r   r   r   r   r   r   'inferred_ray_worker_node_heap_mem_bytes+inferred_ray_worker_node_object_store_byteserrr   r   r   r   !get_avail_mem_per_ray_worker_node  s    

r   c                    s:   dt jv rdd t jd dD   fdd| D S | S )NCUDA_VISIBLE_DEVICESc                 S   s   g | ]}t | qS r   )r   r   ).0devr   r   r   
<listcomp>  s    z9get_spark_task_assigned_physical_gpus.<locals>.<listcomp>,c                    s   g | ]} | qS r   r   )r   addrvisible_cuda_dev_listr   r   r     s    )r   r   r   )gpu_addr_listr   r   r   %get_spark_task_assigned_physical_gpus  s   
r   )r   rY   r)   N)(r;   r   r-   rZ   rA   r?   loggingr   rT   	getLoggerr   r   r   r"   rH   rR   rX   rd   ri   rj   ry   r   r   r   r   r   r   r   r   r   r|   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sT    
	:

%E.;