o
    ;i>                    @   s  d dl Z d dlZd dlZd dlZd dlZd dl mZ d dlmZ d dlmZm	Z	m
Z
mZ d dlmZ d dlZd dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z%m&Z&m'Z'm(Z( d dl)m*Z*m+Z+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 ej5rd dl6Ze*ej7g Z8dZ9dZ:G dd dZ;ee;Z<eG dd dZ=dZ>dZ?dZ@ej5rd dlAZG dd dZBG dd dZCG dd deCZDG dd  d eCZEd!d"d#e;d$d%d&eFeGeHf fd'd(ZId!d"d#e;d$d%d)eJd*eJd+eJd,e
e	eHeHgdf  d-d.fd/d0ZKd!d"d#e;d$d%d)eJd*eJd+eJd,e
e	eHeHgdf  d&ejLedf fd1d2ZMi d3d4d3fd5d6d7ejLedf d)eJd*eJd+eJd&ejLedf fd8d9ZNd:eGd*eJd+eJfd;d<ZOd&eJfd=d>ZPed?d@i d3d4d3dAd5d6dBejejQe ejRe f d)eJd*eJd+eJd&ejLedf fdCdDZSedEd@i d3d4d3dAdFejejQejTe  ejRejTe  f d)eJd*eJd+eJd&ejRe f
dGdHZUi d4dIdJeJd&dfdKdLZVedMd@i d3d4d3dAdBejQe d)eJd*eJd+eJd&ef
dNdOZWi dPdidRdSZXi f	6	&	QdjdTdUZYi dPdidVdWZZi dPdkdXdYZ[i dPdkdZd[Z\i d4dIdJeJfd\d]Z]ed^d@i d3d4d3dAdFejQejTe  d)eJd*eJd+eJd&ef
d_d`Z^G dadb dbej_Z`G dcdd ddej_ZaG dedf dfZbG dgdh dhZcdS )l    N)FIRST_COMPLETED)	dataclass)AnyCallableOptionalUnion)Status)current_input_id)AsyncOrSyncIterableTimestampPriorityQueueaclosing	async_mapasync_map_orderedasync_merge	async_zipqueue_batch_iterator%run_coroutine_in_temporary_event_loopsync_or_async_itersynchronize_apisynchronizer!warn_if_generator_is_not_consumed)BLOB_MAX_PARALLELISMdeprecation_warning)ATTEMPT_TIMEOUT_GRACE_PERIODOUTPUTS_TIMEOUT_create_input_process_result)RETRYABLE_GRPC_STATUS_CODESRetryRetryWarningMessage)
DecodedJwt)logger)RetryManager)api_pb2      c                   @   s4   e Zd ZdZdd Zejdd Zejdd ZdS )	_SynchronizedQueuezmdmd:hiddenc                    s   t  | _d S N)asyncioQueueqself r.   F/home/ubuntu/.local/lib/python3.10/site-packages/modal/parallel_map.pyinit9   s   z_SynchronizedQueue.initc                    s   | j |I d H  d S r(   )r+   putr-   itemr.   r.   r/   r1   >   s   z_SynchronizedQueue.putc                    s   | j  I d H S r(   )r+   getr,   r.   r.   r/   r4   B   s   z_SynchronizedQueue.getN)	__name__
__module____qualname____doc__r0   r   no_io_translationr1   r4   r.   r.   r.   r/   r'   5   s    
r'   c                   @   s   e Zd ZU eed< dS )_OutputValuevalueN)r5   r6   r7   r   __annotations__r.   r.   r.   r/   r:   J   s   
 r:     1   i   c                   @   s^   e Zd ZdZdddedejdddeegd	f d
eg d	f fddZ	dd Z
dd Zdd Zd	S )InputPreprocessorzx
    Constructs FunctionPutInputsItem objects from the raw-input queue, and puts them in the processed-input queue.
    clientmodal.client._Clientraw_input_queueprocessed_input_queuefunctionmodal.functions._Functioncreated_callbackNdone_callbackc                C   s.   || _ || _d| _|| _|| _|| _|| _d S Nr   )r@   rD   inputs_createdrB   rC   rF   rG   )r-   r@   rB   rC   rD   rF   rG   r.   r.   r/   __init__`      

zInputPreprocessor.__init__c                 C  s(   	 | j  I d H }|d u rd S |V  qr(   )rB   r4   )r-   	raw_inputr.   r.   r/   
input_iterr   s   zInputPreprocessor.input_iterc                    s    fdd}|S )Nc                    sH    j }  j d7  _   j  | \}}t|| jj| jdI d H S )N   idxrD   )rI   rF   r   r@   stubrD   )
argskwargsrP   argskwargsr,   r.   r/   create_inputz   s   z<InputPreprocessor.create_input_factory.<locals>.create_inputr.   )r-   rU   r.   r,   r/   create_input_factoryy   s   z&InputPreprocessor.create_input_factoryc              	   C  s   t t|  |  td4 I d H }|2 z3 d H W }| j|I d H  q6 W d   I d H  n1 I d H s6w   Y  | jd I d H  |   d V  d S )Nconcurrency)r   r   rM   rV   r   rC   r1   rG   )r-   streamerr3   r.   r.   r/   drain_input_generator   s   (
z'InputPreprocessor.drain_input_generator)r5   r6   r7   r8   r'   r)   r*   r   intrJ   rM   rV   rZ   r.   r.   r.   r/   r?   [   s$    

r?   c                   @   sX   e Zd ZdZdddddejddd	ed
eded fddZ	dd Z
edefddZdS )InputPumperz\
    Reads inputs from a queue of FunctionPutInputsItems, and sends them to the server.
    N)map_items_managerr@   rA   input_queuerD   rE   function_call_idmax_batch_sizer]   _MapItemsManagerc                C   s.   || _ || _|| _|| _d| _|| _|| _d S rH   )r@   rD   r]   r^   inputs_sentr_   r`   )r-   r@   r^   rD   r_   r`   r]   r.   r.   r/   rJ      rK   zInputPumper.__init__c                 C  s   | j jsJ t| j| jd2 zc3 d H W }| jd ur#| j|I d H  tj| j	j
|| jd}tdt| d| j  d | j jj|| jdI d H }|  jt|7  _| jd ura| j|j tdt| d| j  d q6 d V  d S )	Nr`   )function_idinputsr_   zPushing z6 inputs to server. Num queued inputs awaiting push is z. retryzSuccessfully pushed .)r@   rQ   r   r^   r`   r]   	add_itemsr$   FunctionPutInputsRequestrD   	object_idr_   r"   debuglenqsizeFunctionPutInputs_function_inputs_retryrb   handle_put_inputs_responsere   )r-   itemsrequestrespr.   r.   r/   pump_inputs   s4   


zInputPumper.pump_inputsreturnc                 C   s2   t d| jj ddtjgd}td ttjg|dS )Nz#Warning: map progress for function zZ is limited. Common bottlenecks include slow iteration over results, or function backlogs.r%   )messagewarning_intervalerrors_to_warn_for)max_retries	max_delayadditional_status_codeswarning_message)r    rD   _function_namer   RESOURCE_EXHAUSTEDr   PUMP_INPUTS_MAX_RETRY_DELAY)r-   retry_warning_messager.   r.   r/   rp      s   z"InputPumper._function_inputs_retry)r5   r6   r7   r8   r)   r*   strr[   r   rJ   ru   propertyr   rp   r.   r.   r.   r/   r\      s&    
r\   c                       sD   e Zd Zdddejdedddeded	d
f fddZdd Z  Z	S )SyncInputPumperr@   rA   r^   retry_queuerD   rE   function_call_jwtr_   r]   ra   c                   s.   t  j||||t|d || _d| _|| _d S )N)r^   rD   r_   r`   r]   r   )superrJ   MAP_INVOCATION_CHUNK_SIZEr   inputs_retriedr   )r-   r@   r^   r   rD   r   r_   r]   	__class__r.   r/   rJ      s   
zSyncInputPumper.__init__c                 C  s   t | j| jd2 z?3 d H W }| j|I d H }tj| j|d}| jj	j
|| jdI d H }| j|j tdt| d |  jt|7  _q	6 d V  d S )Nrc   )r   re   rf   zSuccessfully pushed retry for z to server.)r   r   r`   r]   prepare_items_for_retryr$   FunctionRetryInputsRequestr   r@   rQ   FunctionRetryInputsrp   handle_retry_response
input_jwtsr"   rl   rm   r   )r-   retriable_idxsre   rs   rt   r.   r.   r/   retry_inputs   s   

zSyncInputPumper.retry_inputs)
r5   r6   r7   r)   r*   r   r   rJ   r   __classcell__r.   r.   r   r/   r      s"    	r   c                       s<   e Zd Zdddejdddef fddZ fd	d
Z  ZS )AsyncInputPumperr@   rA   r^   rD   rE   r_   c                   s   t  j||||td d S )N)r^   rD   r_   r`   )r   rJ   SPAWN_MAP_INVOCATION_CHUNK_SIZE)r-   r@   r^   rD   r_   r   r.   r/   rJ     s   
zAsyncInputPumper.__init__c                   s\   t   2 z3 d H W }q6 tj| jj| j| jd}| jj	j
|td ddI d H  d V  d S )N)rd   r_   
num_inputs)rz   rf   )r   ru   r$   FunctionFinishInputsRequestrD   rk   r_   rb   r@   rQ   FunctionFinishInputsr   )r-   _rs   r   r.   r/   ru     s   
zAsyncInputPumper.pump_inputs)	r5   r6   r7   r)   r*   r   rJ   ru   r   r.   r.   r   r/   r     s    r   rD   rE   rB   r@   rA   rv   c                    s   |j sJ tj| jt pdtjtjd}|j |I d H }|j}d dfdd} fdd}t	
 }t|||| ||d	}	t||| |d
 fddfdd}
dd }t	|
 }t	||	 | I d H  |  |I d H  |fS )N )rd   parent_input_idfunction_call_typefunction_call_invocation_typeFr   c                    s   | d u s
|  ks
J |  d S r(   r.   set_inputs_created)rI   r.   r/   r   3  s   z1_spawn_map_invocation.<locals>.set_inputs_createdc                      s   d d S )NTr.   r.   )have_all_inputsr.   r/   set_have_all_inputs8  s   z2_spawn_map_invocation.<locals>.set_have_all_inputsr@   rB   rC   rD   rF   rG   )r@   r^   rD   r_   c                	      s$   t d  d dj d d S )Nzhave_all_inputs= inputs_created=z inputs_sent= )r"   rl   rb   r.   )r   input_pumperrI   r.   r/   	log_statsM  s   z(_spawn_map_invocation.<locals>.log_statsc                      s>   	    z
t dI d H  W n t jy      Y d S w q)NT
   r)   sleepCancelledErrorr.   r   r.   r/   log_taskR  s   z'_spawn_map_invocation.<locals>.log_taskc                    s   | 2 z3 d H W }q6 d S r(   r.   )genr   r.   r.   r/   consume_generator\  s   z0_spawn_map_invocation.<locals>.consume_generator)rQ   r$   FunctionMapRequestrk   r	   FUNCTION_CALL_TYPE_MAP#FUNCTION_CALL_INVOCATION_TYPE_ASYNCFunctionMapr_   r)   r*   r?   r   create_taskgatherrZ   ru   cancel)rD   rB   r@   rs   responser_   r   r   r^   input_preprocessorr   r   log_debug_stats_taskr.   )r   r   rI   r   r/   _spawn_map_invocation#  sP   
	




r   order_outputsreturn_exceptionswrap_returned_exceptionscount_update_callbackr   ,api_pb2.FunctionCallInvocationType.ValueTypec                   s2  j sJ tj| jt pdtj|d}j |I d H }	|	j|	j}
|	j	}|	j
|	jp.t}d	t ddddddd ddt t t t|||t|| fddfddd}t| |
d	
d	fd
d	 fddfdddtjdtttf ffddfdd} 	
fdd}t| }tt| 
 
 | 4 I d H }|2 z3 d H W }	|	d ur|	jV  q6 W d   I d H  n1 I d H s	w   Y  |   |I d H  d S )Nr   )rd   r   r   r   r   Fr   c                    s
    | dS )Nr   r.   )xupdate_stater.   r/   <lambda>     
 z!_map_invocation.<locals>.<lambda>c                      s
    ddS )NTr   r.   r.   r   r.   r/   r     r   r   )r@   r^   r   rD   r]   r   r_   c                    s   | dusJ |d u s|ksJ |d u s|ksJ | d ur | |d ur&||d ur,| d ur5  rAkrC   d S d S d S )NFset)r   r   set_outputs_completedr   r   rI   map_done_eventoutputs_completedr.   r/   r     s   
z%_map_invocation.<locals>.update_statec                    s  j sJ d}  std	 d d dd  D }tjt| dt |d}t	
j j|td	tt d
d}t	
 }z6t	j||gtdI d H \}}||v rb|  | }n shJ W |  |  d S W |  |  n	|  |  w |j} tt }|jD ]d}	
d7 
|	|I d H }
|
tjkrd7 n1|
tjkrd7 n'|
tjkrd7 n|
tjkrɈd7 n|
tjkrӈ d7  n	|
tjkr܈d7 |
tjks|
tjkr|	j 	d d |	V  q rd S d S )N0-0zRequesting outputs. Have z
 outputs, z inputs.c                 S   s   g | ]}|qS r.   r.   ).0	input_jwtr.   r.   r/   
<listcomp>  s    z<_map_invocation.<locals>.get_all_outputs.<locals>.<listcomp>F)r_   timeoutlast_entry_idclear_on_successrequested_atr      rz   attempt_timeoutrf   return_whenrN   )r   )rQ   is_setr"   rl   !get_input_jwts_waiting_for_outputr$   FunctionGetOutputsRequestr   timer)   r   FunctionGetOutputsr   r   waitr   r   resultr   r[   outputshandle_get_outputs_response_OutputTypeSUCCESSFUL_COMPLETIONFAILED_COMPLETIONNO_CONTEXT_DUPLICATESTALE_RETRY_DUPLICATEALREADY_COMPLETE_DUPLICATERETRYINGaddinput_id)r   r   rs   get_response_taskmap_done_taskdonependingr   now_secondsr3   output_type)already_complete_duplicatesr@   completed_outputsfailed_completionsr_   rI   r   r]   no_context_duplicatesr   outputs_receivedretried_outputsstale_retry_duplicatessuccessful_completionsr   r.   r/   get_all_outputs  sz   

	














z(_map_invocation.<locals>.get_all_outputsc                    s   j sJ zjt 4 I d H } | 2 z	3 d H W }|V  q6 W d   I d H  n*1 I d H s.w   Y  W tjdddt d} j |I d H   I d H  d S W tjdddt d} j |I d H   I d H  d S tjdddt d} j |I d H   I d H  w )Nr   r   T)r_   r   r   r   r   )rQ   r   r$   r   r   r   close)output_itemsr3   rs   )r@   r_   r   r   r.   r/   get_all_outputs_and_clean_up  sH   
*z5_map_invocation.<locals>.get_all_outputs_and_clean_upr3   rv   c              
      sp   zt | j| j j I d H }W n" ty2 } zr&r#tj|}n|}n|W Y d }~nd }~ww | j|fS r(   )	r   r   data_formatrQ   	Exceptionmodal	exceptionUserCodeExceptionrP   r3   outpute)r@   r   r   r.   r/   fetch_output  s   
z%_map_invocation.<locals>.fetch_outputc               	     s   i } d}t t  td4 I d H 8}|2 z)3 d H W \}}s&t|V  q|| |< 	 || vr0n| |}t|V  |d7 }q+q6 W d   I d H  n1 I d H sQw   Y  t| dks^J d S )Nr   rW   TrN   r   r   r   r:   poprm   received_outputs
output_idxrY   rP   r  r  r   r   r.   r/   poll_outputs/  ,   

(z%_map_invocation.<locals>.poll_outputsc                     sd    	
fdd} 	 |   z
t dI d H  W n t jy0   |   Y d S w q)Nc                !      sx   t d d d dj dj d d d d	 d
 d  d	 d  d
  dt  d S )Nz'Map stats: sync_client_retries_enabled=z have_all_inputs=r   z input_sent=z inputs_retried=z outputs_received=z successful_completions= failed_completions= no_context_duplicates=z old_retry_duplicates= already_complete_duplicates= retried_outputs= input_queue_size=z retry_queue_size=z map_items_manager=)r"   rl   rb   r   rn   rm   r.   r   r   r   r   r^   rI   r]   r   r   r   r   r   r   sync_client_retries_enabledr.   r/   r   L  s>   		

z;_map_invocation.<locals>.log_debug_stats.<locals>.log_statsTr   r   r   r  r.   r/   log_debug_statsK  s   &z(_map_invocation.<locals>.log_debug_stats)NNN)!rQ   r$   r   rk   r	   r   r   r_   r   retry_policyr  max_inputs_outstandingMAX_INPUTS_OUTSTANDING_DEFAULTr)   Eventr   r   r*   ra   r?   r   FunctionGetOutputsItemtupler[   r   r   r   r   rZ   ru   r   r;   r   )rD   rB   r@   r   r   r   r   r   rs   r   r   r  r  r   r  r  r   rY   r.   )r   r@   r   r   r   r  r_   r   r   r   r   r^   rI   r   r]   r   r   r   r   r   r   r   r   r   r  r   r   r/   _map_invocationj  s   





	
(M$&*r  c                   sL  j sJ dj I dH jsJ dd	t ddddddd ddddt t ttjddddd	}t	|tj
d
td
d	d'dtdtdttdf f	fddfddfdd

fdd}fdd}	fdd}
 fddfdddtjd tttf ffd!d"fd#d$} f
d%d&}t| }tt| |	 | |
 4 I dH }|2 z3 dH W }|dur|jV  q6 W d  I dH  n1 I dH sw   Y  |  dS )(a<  Input-plane implementation of a function map invocation.

    This is analogous to `_map_invocation`, but instead of the control-plane
    `FunctionMap` / `FunctionPutInputs` / `FunctionGetOutputs` RPCs it speaks
    the input-plane protocol consisting of `MapStartOrContinue`, `MapAwait`, and `MapCheckInputs`.
    zO_map_invocation_inputplane should only be used for input-plane backed functionsNzBClient must be hydrated with a stub for _map_invocation_inputplaneFr   r   r=   g      ?)retriesinitial_delay_msmax_delay_msbackoff_coefficientT)r  r   r   r  r  is_input_plane_instancecreated_deltacompleted_deltar   c                    sV   | r| 7 |r|7 |d ur| d ur  r'kr)   d S d S d S r(   r   )r!  r"  r   r   r.   r/   update_counters  s   
z3_map_invocation_inputplane.<locals>.update_countersc                    sB   d }dd | \}}t || j|dI d H }tj|dS )NrN   )r!  rO   )input)r   rQ   r$   MapStartOrContinueItem)rR   rP   rS   rT   put_item)r@   rD   rI   r#  r.   r/   rU     s   
z0_map_invocation_inputplane.<locals>.create_inputc                    s&   	    I d H } | d u rd S | V  qr(   )r4   )rL   )rB   r.   r/   rM     s   z._map_invocation_inputplane.<locals>.input_iterc               	     s   t t  td4 I d H  } | 2 z3 d H W }t |I d H  q6 W d   I d H  n1 I d H s5w   Y  dd d V  d S )NrW   Tr   )r   r   r   r1   r   )rY   q_item)rU   rM   queuer#  r.   r/   rZ     s   (

z9_map_invocation_inputplane.<locals>.drain_input_generatorc                    s   t td2 zj3 d H W } dd | D   I d H  tjjt p%d d}jI d H }j	|t
tjgtd d|dI d H } fddt|jD }| d u rq|j  |jpdt|j |j q6 d V  d S )	Nrc   c                 S   s   g | ]}t j|j|jd qS )r$  attempt_token)r$   r%  r$  r*  )r   qir.   r.   r/   r     s    zC_map_invocation_inputplane.<locals>.pump_inputs.<locals>.<listcomp>r   )rd   	map_tokenr   rr   )r|   r{   rz   rg   metadatac                    s    g | ]\}} | j j|fqS r.   r$  rP   )r   rP   r*  request_itemsr.   r/   r     s    )r   r   add_items_inputplaner$   MapStartOrContinueRequestrk   r	   get_input_plane_metadata_input_plane_regionMapStartOrContinuer   r   r   r   	enumerateattempt_tokenshandle_put_continue_responser,  r   r  r  set_retry_policyr  update_items_retry_policy)batchrs   r.  r   response_items_idx_tuple)r@   rD   input_plane_stubr]   r,  map_token_receivedr  r(  r0  r/   ru     sF   



/z/_map_invocation_inputplane.<locals>.pump_inputsc                    s  zw  ssd u r	 I d H  qttd} t }tj| |gtdI d H \}}||v r4n?  dd  D }tjd|d}	j
I d H }j||dI d H  fddtjD }|I d H    rd V  W d S  tjy   Y d S w )	NrN   r   c                 S   s   g | ]\}}|qS r.   r.   )r   r   r*  r.   r.   r/   r   0      zI_map_invocation_inputplane.<locals>.check_lost_inputs.<locals>.<listcomp>r   )r   r   r8  )r.  c                    s&   g | ]\}} | d  j | fqS )r   )lost)r   resp_idxr   check_inputsr   r.   r/   r   ;  s    )r   r   r)   r   r   r   !get_input_idxs_waiting_for_outputr$   MapCheckInputsRequestr4  r5  MapCheckInputsr7  rA  handle_check_inputs_responser   )
sleep_taskr   r   r   r8  rs   r.  check_inputs_response)r@   rD   r>  r   r   r]   r,  r?  rC  r/   check_lost_inputs   s>   z5_map_invocation_inputplane.<locals>.check_lost_inputsc            	        s    sd u r	 I d H  qtjt td} jI d H }t	j
| tdtt d|d}t	 }z6tj||gtdI d H \}}||v rY|  | }n  s_J W |  |  d S W |  |  n	|  |  w |j|jD ]d}|tt I d H }|tjkrd7 n9|tjkrd7 n/|tjkrd7 n%|tjkr
d7 
n|tjkrĈd7 n|tjkrΈ d7  ntd| |tjks|tjkrdd |V  q  rd S d S )	N)r,  r   r   r   r   r   r-  r   rN   zUnknown output type: )r"  )r   r   r$   MapAwaitRequestr   r   r4  r5  r)   r   MapAwaitr   r   r   r   r   r   r   r   r[   r   r   r   r   r   r   r   r   )	rs   r.  r   r   r   r   r   output_itemr   )r   r@   r   rD   r>  r   r   r]   r,  r?  r   r   r   r   r#  r.   r/   r   D  st   


















z3_map_invocation_inputplane.<locals>.get_all_outputsc               	     s   z@t   4 I d H } | 2 z	3 d H W }|V  q6 W d   I d H  n1 I d H s)w   Y  W  I d H  d S W  I d H  d S  I d H  w r(   )r   r   )streamr3   )r   r(  r.   r/   r     s   *z@_map_invocation_inputplane.<locals>.get_all_outputs_and_clean_upr3   rv   c              
      sn   zt | j| j I d H }W n" ty1 } zr%r"tj|}n|}n|W Y d }~nd }~ww | j|fS r(   )r   r   r   r   r   r   r   rP   r  )r@   r>  r   r   r.   r/   r    s   
z0_map_invocation_inputplane.<locals>.fetch_outputc               	     s   i } d}t t  td4 I d H 8}|2 z)3 d H W \}}s&t|V  q|| |< 	 || vr0n| |}t|V  |d7 }q+q6 W d   I d H  n1 I d H sQw   Y  t| dks^J d S )NrN   rW   Tr   r  r  r
  r.   r/   r    r  z0_map_invocation_inputplane.<locals>.poll_outputsc               
      s\    	f
dd} 	 |   z
t dI d H  W n t jy,   |   Y d S w q)Nc                      sN   t d	 d d d d  d d d d	t d
  d S )Nz"Map stats:
successful_completions=r  r  z stale_retry_duplicates=r  r  z map_token=z max_inputs_outstanding=z map_items_manager_size=r  )r"   rl   rm   r.   
r   r   input_queue_sizer]   r,  r  r   r   r   r   r.   r/   r     s&   zF_map_invocation_inputplane.<locals>.log_debug_stats.<locals>.log_statsTr   r   r   rP  r.   r/   r    s   	z3_map_invocation_inputplane.<locals>.log_debug_stats)r   r   N)_input_plane_urlget_stubrQ   r)   r  r   r  r$   FunctionRetryPolicyra   "FUNCTION_CALL_INVOCATION_TYPE_SYNCr[   r   boolr  r  r   r   r   r   r;   r   )rD   rB   r@   r   r   r   r   r  rZ   ru   rK  r  r  r   mergedmaybe_outputr.   )r   r@   r   rU   r   r  rD   r   r   r   rM   r>  rQ  rI   r   r   r]   r,  r?  r  r   r   r   r(  rB   r   r   r   r   r#  r   r/   _map_invocation_inputplanet  s   

3($E&	
*rY  TFr-   modal.functions.Functionasync_input_genc           	   	     s   t  j I dH   fdd}tt| j|||| 4 I dH }|2 z	3 dH W }|V  q*6 W d  I dH  dS 1 I dH sFw   Y  dS )aF  Core implementation that supports `_map_async()`, `_starmap_async()` and `_for_each_async()`.

    Runs in an event loop on the main thread. Concurrently feeds new input to the input queue and yields available
    outputs to the caller.

    Note that since the iterator(s) can block, it's a bit opaque how often the event
    loop decides to get a new input vs how often it will emit a new output.

    We could make this explicit as an improvement or even let users decide what they
    prefer: throughput (prioritize queueing inputs) or latency (prioritize yielding results)
    Nc               	     s~   t  4 I d H  } | 2 z3 d H W }j|fI d H  q6 W d   I d H  n1 I d H s/w   Y  jd I d H  d S r(   r   r1   aiorY   rS   r[  rT   rB   r.   r/   
feed_queue  s   (z_map_helper.<locals>.feed_queue)SynchronizedQueuer0   r]  r   r   _map)	r-   r[  rT   r   r   r   r`  map_output_streamr  r.   r_  r/   _map_helper  s   .rd  	func_namec                 C   s,   |r|rt dd|  d|  d d S d S d S )N)i        z	Function.a@   currently leaks an internal exception wrapping type (modal.exceptions.UserCodeException) when `return_exceptions=True` is set. In the future, this will change, and the underlying exception will be returned directly.
To opt into the future behavior and silence this warning, add `wrap_returned_exceptions=False`:

    f.z=(..., return_exceptions=True, wrap_returned_exceptions=False)r   )re  r   r   r.   r.   r/   _maybe_warn_about_exceptions	  s   rh  c                  C   s4   zt  } | jjjj}|dkW S  ty   Y dS w )zBCheck whether the calling function was called from a sync wrapper.asendF)inspectcurrentframef_backf_codeco_namer   )framecaller_function_namer.   r.   r/   _invoked_from_sync_wrapper  s   
rq  zFunction.map.aio)function_namerT   r   r   r   input_iteratorsc                G  sT   t  s
td|| tdd |D  }t| |||||d2 z	3 d H W }|V  q6 d S )Nzmap.aioc                 S      g | ]}t |qS r.   r   r   itr.   r.   r/   r   1  r@  z_map_async.<locals>.<listcomp>rs  )rq  rh  r   rd  )r-   rT   r   r   r   rt  r[  r  r.   r.   r/   
_map_async$  s   ry  zFunction.starmap.aioinput_iteratorc                C  sF   t  s
td|| t| t|||||d2 z	3 d H W }|V  q6 d S )Nzstarmap.aiors  )rq  rh  rd  r   )r-   rz  rT   r   r   r   r  r.   r.   r/   _starmap_async=  s   
r{  rT   ignore_exceptionsr}  c                   s<   t dd |D  }t| ||d|dd2 z3 d H W }q6 d S )Nc                 S   ru  r.   rv  rw  r.   r.   r/   r   W  r@  z#_for_each_async.<locals>.<listcomp>Frs  )r   rd  )r-   rT   r}  rt  r[  r   r.   r.   r/   _for_each_asyncT  s   r~  zFunction.mapc                G   s0   t d|| tt| g|R ||||dddS )a  Parallel map over a set of inputs.

    Takes one iterator argument per argument in the function being mapped over.

    Example:
    ```python
    @app.function()
    def my_func(a):
        return a ** 2


    @app.local_entrypoint()
    def main():
        assert list(my_func.map([1, 2, 3, 4])) == [1, 4, 9, 16]
    ```

    If applied to a `app.function`, `map()` returns one result per input and the output order
    is guaranteed to be the same as the input order. Set `order_outputs=False` to return results
    in the order that they are completed instead.

    `return_exceptions` can be used to treat exceptions as successful results:

    ```python
    @app.function()
    def my_func(a):
        if a == 2:
            raise Exception("ohno")
        return a ** 2


    @app.local_entrypoint()
    def main():
        # [0, 1, UserCodeException(Exception('ohno'))]
        print(list(my_func.map(range(3), return_exceptions=True)))
    ```
    maprs  zgYou can't iter(Function.map()) from an async function. Use async for ... in Function.map.aio() instead.nested_async_message)rh  r
   ry  )r-   rT   r   r   r   rt  r.   r.   r/   	_map_syncc  s   -	r  )rT   modal.functions._FunctionCallc                   s&   t dd |D  }t| ||I d H S )Nc                 S   ru  r.   rv  rw  r.   r.   r/   r     r@  z1_experimental_spawn_map_async.<locals>.<listcomp>)r   _spawn_map_helper)r-   rT   rt  r[  r.   r.   r/   _experimental_spawn_map_async  s   r  c                    sL   t  j I d H   fdd}t| j| I d H \}}|S )Nc               	      s~   t  4 I d H  } | 2 z3 d H W }j|fI d H  q6 W d   I d H  n1 I d H s/w   Y  jd I d H  d S r(   r\  r^  r_  r.   r/   r`    s   (z%_spawn_map_helper.<locals>.feed_queue)ra  r0   r]  r)   r   
_spawn_map)r-   r[  rT   r`  fcr   r.   r_  r/   r    s    r  c                G      t t| g|R d|idS )aQ  mdmd:hidden
    Spawn parallel execution over a set of inputs, returning as soon as the inputs are created.

    Unlike `modal.Function.map`, this method does not block on completion of the remote execution but
    returns a `modal.FunctionCall` object that can be used to poll status and retrieve results later.

    Takes one iterator argument per argument in the function being mapped over.

    Example:
    ```python
    @app.function()
    def my_func(a, b):
        return a ** b


    @app.local_entrypoint()
    def main():
        fc = my_func.spawn_map([1, 2], [3, 4])
    ```

    rT   `You can't run Function.spawn_map() from an async function. Use Function.spawn_map.aio() instead.)r   r  r-   rT   rt  r.   r.   r/   _experimental_spawn_map_sync  s   r  c                   sD    fdd}t dd |D  }t||dd2 z3 dH W }q6 dS )zThis runs in an event loop on the main thread. It consumes inputs from the input iterators and creates async
    function calls for each.
    c                    s   j j| i  S )a  
        Returns co-routine that invokes a function with the given arguments.

        On RESOURCE_EXHAUSTED, it will retry indefinitely with exponential backoff up to 30 seconds. Every 10 retriable
        errors, log a warning that the function call is waiting to be created.
        )_spawn_map_innerr]  )rS   rT   r-   r.   r/   _call_with_args  s   z)_spawn_map_async.<locals>._call_with_argsc                 S   ru  r.   rv  rw  r.   r.   r/   r     r@  z$_spawn_map_async.<locals>.<listcomp>   rW   N)r   r   )r-   rT   rt  r  	input_genr   r.   r  r/   _spawn_map_async  s   
r  c                G   r  )a  Spawn parallel execution over a set of inputs, exiting as soon as the inputs are created (without waiting
    for the map to complete).

    Takes one iterator argument per argument in the function being mapped over.

    Example:
    ```python
    @app.function()
    def my_func(a):
        return a ** 2


    @app.local_entrypoint()
    def main():
        my_func.spawn_map([1, 2, 3, 4])
    ```

    Programmatic retrieval of results will be supported in a future update.
    rT   r  )r   r  r  r.   r.   r/   _spawn_map_sync  s   r  c                G   s    t t| g|R ||dddS )a  Execute function for all inputs, ignoring outputs. Waits for completion of the inputs.

    Convenient alias for `.map()` in cases where the function just needs to be called.
    as the caller doesn't have to consume the generator to process the inputs.
    r|  zhYou can't run `Function.for_each()` from an async function. Use `await Function.for_each.aio()` instead.r  )r   r~  )r-   rT   r}  rt  r.   r.   r/   _for_each_sync  s   r  zFunction.starmapc             	   C   s(   t d|| tt| |||||dddS )aR  Like `map`, but spreads arguments over multiple function arguments.

    Assumes every input is a sequence (e.g. a tuple).

    Example:
    ```python
    @app.function()
    def my_func(a, b):
        return a + b


    @app.local_entrypoint()
    def main():
        assert list(my_func.starmap([(1, 2), (3, 4)])) == [3, 7]
    ```
    starmaprs  zsYou can't `iter(Function.starmap())` from an async function. Use `async for ... in Function.starmap.aio()` instead.r  )rh  r
   r{  )r-   rz  rT   r   r   r   r.   r.   r/   _starmap_sync  s   	r  c                   @   s    e Zd ZdZdZdZdZdZdS )_MapItemStaterN               N)r5   r6   r7   SENDINGWAITING_FOR_OUTPUTWAITING_TO_RETRYr   COMPLETEr.   r.   r.   r/   r  >  s    r  c                   @   s$   e Zd ZdZdZdZdZdZdZdS )r   rN   r  r  r  r  rf  N)	r5   r6   r7   r   r   r   r   r   r   r.   r.   r.   r/   r   K  s    r   c                
   @   s   e Zd ZU eed< ejed< eed< eed< e	j
ed< e	j
ed< ee ed< e	jed< 		d%dejdeded
efddZdefddZdejfddZdejdedddedef
ddZdejfddZdejfddZdefdd Zd!edejfd"d#Zd$S )&_MapItemContextstater$  retry_managerr  r   r   previous_input_jwt_event_loopFr   c                 C   sF   t j| _|| _|| _|| _t | _| j	 | _
| j	 | _|| _d S r(   )r  r  r  r$  r  r  r)   get_event_loopr  create_futurer   r   _is_input_plane_instance)r-   r$  r  r  r   r.   r.   r/   rJ   a  s   

z_MapItemContext.__init__r*  c                 C   sJ   | j  s| j | nt | _ | j | | jtjkr#tj| _d S d S r(   )	r   r   
set_resultr)   Futurer  r  r  r  )r-   r*  r.   r.   r/   %handle_map_start_or_continue_responsev  s   

z5_MapItemContext.handle_map_start_or_continue_responser3   c                 C   s8   | j |j  | j|j | jtjkrtj| _d S d S r(   )r   r  r   r  r  r  r  r2   r.   r.   r/   rq     s
   z*_MapItemContext.handle_put_inputs_responser   r   r   r   rv   c              
      s`  | j tjkrtd|j d|j d|j  tj	S |j| j
jkr;td|j d|j d|j d| j
j  tjS |jjtjjksK|tjksK| js]tj| _ |jjtjjkrZtjS tjS | j
 }|jjtjjkrld}|du sx|jjtjjkrtj| _ tjS tj| _ | jr| |jI dH }|||d  |I dH  tjS |||d  |jI dH  tjS )	z
        Processes the output, and determines if it is complete or needs to be retried.

        Return True if input state was changed to COMPLETE, otherwise False.
        zRReceived output for input marked as complete. Must be duplicate, so ignoring. idx=
 input_id= retry_count=z9Received output with stale retry_count, so ignoring. idx=z expected_retry_count=r   Nr=   )r  r  r  r"   rl   rP   r   retry_countr   r   r  r   r   statusr$   GenericResultGENERIC_STATUS_SUCCESSrU  r  r   r   get_delay_msGENERIC_STATUS_INTERNAL_FAILUREGENERIC_STATUS_TERMINATEDr  r  !create_map_start_or_continue_itemr1   r   )r-   r3   r   r   r   delay_ms
retry_itemr.   r.   r/   r     s\   

z+_MapItemContext.handle_get_outputs_responsec                    s8   t j| _| jI d H }| j | _tj|| j| j	j
dS )N)r   r$  r  )r  r   r  r   r  r  r$   FunctionRetryInputsItemr$  r  r  r-   r   r.   r.   r/   prepare_item_for_retry  s   z&_MapItemContext.prepare_item_for_retryr  c                 C   s   t || _d S r(   )r#   r  r-   r  r.   r.   r/   r:    s   z _MapItemContext.set_retry_policyc                 C   s   | j | tj| _d S r(   )r   r  r  r  r  r  r.   r.   r/   r     s   z%_MapItemContext.handle_retry_responserP   c                    s(   | j I d H }tjtj| j|d|dS )Nr/  r)  )r   r$   r%  FunctionPutInputsItemr$  )r-   rP   r*  r.   r.   r/   r    s   z1_MapItemContext.create_map_start_or_continue_itemNF)r5   r6   r7   r  r<   r$   FunctionInputr#   rV  r)   r  r   r   AbstractEventLooprJ   r  FunctionPutInputsResponseItemrq   r  r[   r   r   r   r  r  rT  r:  r   r%  r  r.   r.   r.   r/   r  T  sH   
 





Ir  c                   @   s^  e Zd Z	d2dejdddedededefd	d
ZdejfddZ	de
ej fddZde
ej fddZde
e de
ej fddZdejfddZde
e fddZde
eeef  fddZdefddZdedefdd Zde
eeef  fd!d"Zde
ej fd#d$Zd%e
e fd&d'Zd(e
eeef  fd)d*Zd+ejd,edefd-d.Z d/d0 Z!d1S )3ra   Fr  r   r   r   r  r  r   c                 C   s4   || _ || _|| _t|| _i | _|| _|| _d S r(   )	_retry_policyr   _retry_queuer)   BoundedSemaphore_inputs_outstanding_item_context_sync_client_retries_enabledr  )r-   r  r   r   r  r  r   r.   r.   r/   rJ     s   	
z_MapItemsManager.__init__c                 C   s
   || _ d S r(   )r  r  r.   r.   r/   r:       
z!_MapItemsManager.set_retry_policyrr   c                    s@   |D ]}| j  I d H  t|jt| j| jd| j|j< qd S )N)r$  r  r  )	r  acquirer  r$  r#   r  r  r  rP   r-   rr   r3   r.   r.   r/   ri   	  s   z_MapItemsManager.add_itemsc                    sf   |D ]-}|j dkrtj| j|jj _q| j I d H  t	|jjt
| j| j| jd| j|jj< qd S )Nr   )r$  r  r  r   )r*  r  r  r  r$  rP   r  r  r  r  r#   r  r  r  r  r.   r.   r/   r2    s   
z%_MapItemsManager.add_items_inputplaner   rv   c                    s    fdd|D I d H S )Nc                    s"   g | ]} j |  I d H qS r(   )r  r  )r   rP   r,   r.   r/   r   $  s     z<_MapItemsManager.prepare_items_for_retry.<locals>.<listcomp>r.   )r-   r   r.   r,   r/   r   #  s   z(_MapItemsManager.prepare_items_for_retryc                 C   s   | j  D ]}|| qd S r(   )r  valuesr:  )r-   r  ctxr.   r.   r/   r;  &  s   z*_MapItemsManager.update_items_retry_policyc                 C      dd | j  D S )zV
        Returns a list of input_jwts for inputs that are waiting for output.
        c                 S   s,   g | ]}|j tjkr|j r|j qS r.   r  r  r  r   r   r   )r   r  r.   r.   r/   r   /  s
    zF_MapItemsManager.get_input_jwts_waiting_for_output.<locals>.<listcomp>)r  r  r,   r.   r.   r/   r   *     z2_MapItemsManager.get_input_jwts_waiting_for_outputc                 C   r  )zV
        Returns a list of input_idxs for inputs that are waiting for output.
        c                 S   s4   g | ]\}}|j tjkr|j r||j fqS r.   r  )r   rP   r  r.   r.   r/   r   :  s
    zF_MapItemsManager.get_input_idxs_waiting_for_output.<locals>.<listcomp>)r  rr   r,   r.   r.   r/   rE  5  r  z2_MapItemsManager.get_input_idxs_waiting_for_outputitem_idxc                 C   s   | j |= | j  d S r(   )r  r  releaser-   r  r.   r.   r/   _remove_item@  s   z_MapItemsManager._remove_itemc                 C   s   | j |S r(   )r  r4   r  r.   r.   r/   get_item_contextD  s   z!_MapItemsManager.get_item_contextc                 C   s2   |D ]\}}| j |d }|d ur|| qd S r(   )r  r4   r  )r-   rr   indexr3   r  r.   r.   r/   r9  G  s   
z-_MapItemsManager.handle_put_continue_responsec                 C   s0   |D ]}| j |jd }|d ur|| qd S r(   )r  r4   rP   rq   )r-   rr   r3   r  r.   r.   r/   rq   S  s   
z+_MapItemsManager.handle_put_inputs_responser   c                 C   s>   |D ]}t |}| j|jd d }|d ur|| qd S )NrP   )r!   decode_without_verificationr  r4   payloadr   )r-   r   r   decoded_jwtr  r.   r.   r/   r   \  s   

z&_MapItemsManager.handle_retry_responser   c                    sh   |D ].\}}| j |d }|d ur1|r1tj|_||I d H }|j }| j	t

 |I d H  qd S r(   )r  r4   r  r  r  r  r  r  r  r1   r   )r-   r   rP   rA  r  r  r   r.   r.   r/   rH  f  s   
z-_MapItemsManager.handle_check_inputs_responser3   r   c              	      s   | j |jd }|d u r"td|j d|j d|j d tjS |	||| j
| jI d H }|tjks9|tjkr?| |j |S )NzOReceived output that does not have entry in item_context map, so ignoring. idx=r  r  r   )r  r4   rP   r"   rl   r   r  r   r   r   r   r  r   r   r  )r-   r3   r   r  r   r.   r.   r/   r   p  s&   
z,_MapItemsManager.handle_get_outputs_responsec                 C   s
   t | jS r(   )rm   r  r,   r.   r.   r/   __len__  r  z_MapItemsManager.__len__Nr  )"r5   r6   r7   r$   rT  r   rV  r[   rJ   r:  listr  ri   r%  r2  r  r   r;  r   r   r  rE  r  r  r  r9  r  rq   r   rH  r  r   r   r  r.   r.   r.   r/   ra     s@    

	

ra   )rv   r  )r-   rZ  rv   r  )rv   N)dr)   enumrj  r   typingr   dataclassesr   r   r   r   r   grpclibr   modal.exceptionr    modal._runtime.execution_contextr	   modal._utils.async_utilsr
   r   r   r   r   r   r   r   r   r   r   r   r   modal._utils.blob_utilsr   modal._utils.deprecationr   modal._utils.function_utilsr   r   r   r   modal._utils.grpc_utilsr   r   r    modal._utils.jwt_utilsr!   modal.configr"   modal.retriesr#   modal_protor$   TYPE_CHECKINGmodal.clientr   'PUMP_INPUTS_RETRYABLE_GRPC_STATUS_CODESPUMP_INPUTS_MAX_RETRIESr   r'   ra  r:   r  r   r   modal.functionsr?   r\   r   r   r  r   r[   r   rV  r  AsyncGeneratorrY  rd  rh  rq  IterableAsyncIterablery  Sequencer{  r~  r  r  r  r  r  r  r  r  Enumr  r   r  ra   r.   r.   r.   r/   <module>   sz  <<C,

G
  
  n
,	$=
*	 