o
    ;i                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZ d d	lm Z  d d
l!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' d dl(m)Z)m*Z*m+Z+m,Z,m-Z- d dl.m/Z/m0Z0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z;m<Z< d dl=m>Z>m?Z? d dl@mAZAmBZBmCZC d dlDmEZE erd dlFZGdZHdZIdZJeKeLd< dZMeNeLd< G dd deOZPG dd dZQG dd  d ZRG d!d" d"ZSG d#d$ d$ZTe,eTZUd%eVd&dfd'd(ZWdS ))    N)AsyncGeneratorAsyncIterator)AsyncExitStack)Path)TYPE_CHECKINGAnyCallableClassVar	GeneratorOptionalcast)Empty)Status)asynccontextmanager)gpu_memory_snapshot)deserialize_data_formatpickle_exceptionpickle_tracebackserialize_data_format)print_exception)TaskContextaclosingasyncifysynchronize_apisynchronizer)MAX_OBJECT_SIZE_BYTESblob_downloadblob_uploadformat_blob_data)_stream_function_call_data)Retry)parse_major_minor_version)HEARTBEAT_INTERVALHEARTBEAT_TIMEOUT_Client)configlogger)ClientClosedInputCancellationInvalidError)api_pb2   
   1   MAX_OUTPUT_BATCH_SIZEg      ?RTT_Sc                   @      e Zd ZdZdS )UserExceptionz&Used to shut down the task gracefully.N__name__
__module____qualname____doc__ r7   r7   W/home/ubuntu/.local/lib/python3.10/site-packages/modal/_runtime/container_io_manager.pyr1   9       r1   c                   @   r0   )Sentinelz0Used to get type-stubs to work with this object.Nr2   r7   r7   r7   r8   r:   =   r9   r:   c                   @   s(  e Zd ZU dZee ed< ee ed< ee ed< ee ed< eej	 ed< ded< d	Z
eed
< dZeeg df  ed< dee dee dee dee dddeej	 dedefddZededeedf deeeeeeej	f  dedd f
ddZdeg df fddZdd Zdeeedf eeee f f fddZd;dd Zd!edee fd"d#Zdee fd$d%Zdee fd&d'Zdeeddf fd(d)Zdeedf fd*d+Z d,e!fd-d.Z"d<d0d1Z#d,e!d2ed3e$deej% fd4d5Z&d,e!d6edeej% fd7d8Z'd,e!d!ee deej% fd9d:Z(dS )=	IOContextzyContext object for managing input, function calls, and function executions
    in a batched or single input context.
    	input_idsretry_countsfunction_call_idsattempt_tokensfunction_inputs2modal._runtime.user_code_imports.FinalizedFunctionfinalized_functionF_cancel_issuedN_cancel_callback
is_batchedclientc	           	      C   s4   || _ || _|| _|| _|| _|| _|| _|| _d S N)r<   r=   r>   r?   rB   r@   _is_batched_client)	selfr<   r=   r>   r?   rB   r@   rE   rF   r7   r7   r8   __init__P   s   
zIOContext.__init__finalized_functionsinputsreturnc              	      s   |rt |dksJ t |dksJ t| \}}}}}	dtdtjdtjfdd tj fdd|	D  I d H }	|	d	 jtfd
d|	D sKJ | }
| t	t
t |t	t
t |t	t
t |t	t
t ||
t	t
tj |	|S )N   rF   inputrN   c                    s8   | ddkrt|j| jI d H }|d ||_|S )N
args_oneofargs_blob_id)
WhichOneofr   rR   stub
ClearFieldargs)rF   rP   rV   r7   r7   r8   _populate_input_blobso   s   
z/IOContext.create.<locals>._populate_input_blobsc                    s   g | ]} |qS r7   r7   .0rP   )rW   rF   r7   r8   
<listcomp>y   s    z$IOContext.create.<locals>.<listcomp>r   c                 3   s    | ]} |j kV  qd S rG   method_namerX   r[   r7   r8   	<genexpr>|   s    z#IOContext.create.<locals>.<genexpr>)lenzipr$   r*   FunctionInputasynciogatherr\   allr   liststrint)clsrF   rL   rM   rE   r<   r=   r>   r?   r@   rB   r7   )rW   rF   r\   r8   created   s$   $ 

zIOContext.createcbc                 C   s
   || _ d S rG   )rD   )rJ   ri   r7   r7   r8   set_cancel_callback   s   
zIOContext.set_cancel_callbackc                 C   sB   | j rd S | jrtd| j  d| _ |   d S td d S )Nz6Received a cancellation signal while processing input Tz"Unexpected: Could not cancel input)rC   rD   r&   warningr<   rJ   r7   r7   r8   cancel   s   zIOContext.cancel.c              	      s  g }| j D ]}|jr|j}|t|j|| j q|di f q| js(|d S | jjj	}g }t
| jjj D ]}||j q9dd tt| jD  t|D ]d\}\}}	t|t|	  }
t|krvtd| dt| d|
 dt|D ]\}}| | || < qz|	 D ]*\}}||vrtd| d	| d
| | v rtd| d| d
| | |< qqR fdd|D }d|fS )Nr7   r   c                 S   s   g | ]}i qS r7   r7   )rY   _r7   r7   r8   rZ      s    z.IOContext._args_and_kwargs.<locals>.<listcomp>zModal batched function z takes z; positional arguments, but one invocation in the batch has .z! got unexpected keyword argument z  in one invocation in the batch.z" got multiple values for argument c                    s    i | ]   fd dD qS )c                    s   g | ]}|  qS r7   r7   )rY   kwargs
param_namer7   r8   rZ      s    z9IOContext._args_and_kwargs.<locals>.<dictcomp>.<listcomp>r7   )rY   kwargs_by_inputsrq   r8   
<dictcomp>   s    z.IOContext._args_and_kwargs.<locals>.<dictcomp>)r@   rV   data_formatappendr   rI   rH   rB   callabler3   inspect	signature
parametersvaluesnameranger^   r<   	enumerater)   items)rJ   deserialized_argsrP   rv   	func_nameparam_namesparamirV   rp   
num_paramsjargkvformatted_kwargsr7   rs   r8   _args_and_kwargs   sF   


zIOContext._args_and_kwargsapi_pb2.DataFormat.ValueTypec                 C   s   |  | jd jS Nr   )_determine_output_formatr@   rv   rl   r7   r7   r8   _generator_output_format   s   z"IOContext._generator_output_formatdatac                 C   sV   | j r(| jjj}t|tstd| dt|t| jkr&td| d|S |gS )NzOutput of batched function z must be a list.z. must be a list of equal length as its inputs.)	rH   rB   rx   r3   
isinstancerd   r)   r^   r<   )rJ   r   function_namer7   r7   r8   _prepare_batch_output   s   


zIOContext._prepare_batch_outputc                 C   s~   t d| j  |  \}}| jj|i |}t|s't|s't	|r1t
dt| dt d| j  | |S )NStarting input z3Sync (non-generator) function return value of type z>. You might need to use @app.function(..., is_generator=True).Finished input r&   debugr<   r   rB   rx   ry   iscoroutineisgenerator
isasyncgenr)   typer   )rJ   rV   rp   expected_value_or_valuesr7   r7   r8   call_function_sync   s   
zIOContext.call_function_syncc                    s   t d| j  |  \}}| jj|i |}t|r(t|s(t	|r2t
dt| d|I d H }t d| j  | |S )Nr   z6Async (non-generator) function returned value of type z= You might need to use @app.function(..., is_generator=True).r   r   )rJ   rV   rp   expected_corovaluer7   r7   r8   call_function_async   s    

zIOContext.call_function_asyncc                 c   s~    | j rJ td| j  |  \}}| jj|i |}t|s,t	dt
| |D ]}|V  q.td| j  d S )NStarting generator input z*Generator function returned value of type Finished generator input )rH   r&   r   r<   r   rB   rx   ry   r   r)   r   )rJ   rV   rp   expected_genresultr7   r7   r8   call_generator_sync   s   

zIOContext.call_generator_syncc              	   C  s   | j rJ td| j  |  \}}| jj|i |}t|s,t	dt
| t|4 I d H }|2 z	3 d H W }|V  q76 W d   I d H  n1 I d H sRw   Y  td| j  d S )Nr   z0Async generator function returned value of type r   )rH   r&   r   r<   r   rB   rx   ry   r   r)   r   r   )rJ   rV   rp   expected_async_gengenr   r7   r7   r8   call_generator_async  s   

(zIOContext.call_generator_async
started_atc                    s(   t     fddt| j| jD S )Nc              	      s0   g | ]\}}t j| t jt jjd |dqS ))status)input_idinput_started_atoutput_created_atr   retry_count)r*   FunctionPutOutputsItemGenericResultGENERIC_STATUS_TERMINATED)rY   r   r   r   r   r7   r8   rZ     s    z7IOContext.output_items_cancellation.<locals>.<listcomp>)timer_   r<   r=   )rJ   r   r7   r   r8   output_items_cancellation  s
   z#IOContext.output_items_cancellationinput_formatc                 C   sV   || j jv r|S | j jrtd| d| j j  | j jd S td| d tjS )NzGot an input with format z,, but can only produce output using formats r   z0, but the function has no defined output formats)rB   supported_output_formatsr&   r   rk   r*   DATA_FORMAT_PICKLE)rJ   r   r7   r7   r8   r   &  s   z"IOContext._determine_output_formattask_idexcc                    s   t |ttkr#tt d }d td   d| dt|}t|jjI d H t||\dddtffdd t		  fd	d
t
jjjD S )Ni  z...
Trimmed z bytes from original exceptionr   r   rN   c              	      s\    | }|tjkr|tjdtjjt d dS |tjtjjt ddS )N)r   	exception	tracebackserialized_tbtb_line_cache)rv   r   )r   r   r   r7   )r   r*   r   r   GENERIC_STATUS_FAILUREr   
format_exc)r   output_format)data_result_partrepr_excrJ   r   r   r7   r8   data_format_specific_outputN  s(   

zEIOContext.output_items_exception.<locals>.data_format_specific_outputc              	      s2   g | ]\}}}t jd||d  |jqS ))r   r   r   r   r7   )r*   r   rv   )rY   r   r   function_input)r   r   r   r7   r8   rZ   h  s    z4IOContext.output_items_exception.<locals>.<listcomp>)reprr^   r   r   r   rI   rT   r   dictr   r_   r<   r=   r@   )rJ   r   r   r   trimmed_bytesr   r7   )r   r   r   r   rJ   r   r   r   r8   output_items_exception8  s   z IOContext.output_items_exceptionitems_totalc                 C   sr   | j rJ dt| jdksJ dttj|dtjd}tj| jd |t		 tj
tj
j|dtj| jd dgS )	Nz0generators are not supported with batched inputsrO   z'generators are expected to have 1 input)r   rv   r   )r   r   r   r   r   r   rv   r   )rH   r^   r@   r   r*   GeneratorDoneDATA_FORMAT_GENERATOR_DONEr   r<   r   r   GENERIC_STATUS_SUCCESSr=   )rJ   r   r   serialized_bytesr7   r7   r8   output_items_generator_dones  s"   z%IOContext.output_items_generator_donec              
      s`   t    dtdtdtdddtjf
 fddtjfd	d
t|j	j
jD  I d H S )Nitemr   r   r   r   rN   c                    sZ    |}t| |d}t|jjI d H }tjddtjji|}tj| |||dS )Nr   r   r   r7   )	r   r   r   rI   rT   r*   r   r   r   )r   r   r   r   r   r   	formattedr   )r   rJ   r   r7   r8   package_output  s"   
z.IOContext.output_items.<locals>.package_outputc                    s$   g | ]\}}}} ||||j qS r7   r   )rY   r   r   r   r   )r   r7   r8   rZ     s    
z*IOContext.output_items.<locals>.<listcomp>)r   r   re   rf   r*   r   ra   rb   r_   r<   r=   r@   )rJ   r   r   r7   )r   r   rJ   r   r8   output_items  s(   

zIOContext.output_items)rN   r   )r   r   rN   r   ))r3   r4   r5   r6   rd   re   __annotations__rf   r*   r`   rC   boolrD   r   r   r$   rK   classmethodr   tuplerh   rj   rm   r   r   r   r   r   r   r
   r   r   r   floatr   r   BaseExceptionr   r   r   r   r7   r7   r7   r8   r;   A   sz   
 	

%*
0

;$r;   c                   @   s   e Zd ZU dZeed< eed< eej ed< e	ed< deddfdd	Z
dd
dZdddZdddZdeddfddZdddZdS )
InputSlotsz>A semaphore that allows dynamically adjusting the concurrency.activer   waiterclosedrN   Nc                 C   s   d| _ || _d | _d| _d S )Nr   F)r   r   r   r   rJ   r   r7   r7   r8   rK     s   
zInputSlots.__init__c                    sP   | j | jk r|  j d7  _ d S | jd u r$t  | _| jI d H  d S td)NrO   z%Concurrent waiters are not supported.)r   r   r   ra   get_running_loopcreate_futureRuntimeErrorrl   r7   r7   r8   acquire  s   
zInputSlots.acquirec                 C   sL   | j | jk r"| jd ur$| j s| jd  d | _|  j d7  _ d S d S d S NrO   )r   r   r   	cancelled
set_resultrl   r7   r7   r8   _wake_waiter  s   
zInputSlots._wake_waiterc                 C   s   |  j d8  _ |   d S r   )r   r   rl   r7   r7   r8   release  s   zInputSlots.releasec                 C   s   | j rd S || _|   d S rG   )r   r   r   r   r7   r7   r8   	set_value  s   zInputSlots.set_valuec                    s*   d| _ t| jD ]	}|  I d H  q	d S )NT)r   r~   r   r   )rJ   rn   r7   r7   r8   close  s
   zInputSlots.closerN   N)r3   r4   r5   r6   rf   r   r   ra   Futurer   rK   r   r   r   r   r   r7   r7   r7   r8   r     s   
 

	
r   c                       s  e Zd ZU dZeed< eed< eed< ejed< ee ed< ee ed< e	ed< e
ed	< ee ed
< eeef ed< ee
 ed< eed< e	ed< e	ed< eej ed< eed< eed< eej ed< eej ed< eed< eed< eed< eed< dZeed   ed< dejdefddZedejfd d!Zdejdedd f fd"d#Zed$d% Zd&d' Zd(d) Zdefd*d+Zed,ede d- fd.d/Z!d0d1 Z"ede d- fd2d3Z#d4d5 Z$d6ed7ee de%e& fd8d9Z'd6ed7ed:e	d;d<d=e(e& ddfd>d?Z)ed6ed7ed;d<d@ej*de d- f
dAdBZ+dCe	dej*fdDdEZ,dFej*dGe&ddfdHdIZ-de
fdJdKZ.dLdM Z/e0j1dNe	dOe	de%e(e2ee	eeej3f   fdPdQZ4e0j1	R	Rd~dSeedTf dNe	dOe	de%e fdUdVZ5dWe
dXe(ej6 ddfdYdZZ7ede d- fd[d\Z8ed]edWe
de d- fd^d_Z9d`e(e fdadbZ:e0j1d]edWe
dce(e& ddfdddeZ;ddfdgZ<ddhdiZ=dje(e ddfdkdlZ>ddnefdodpZ?ede	fdqdrZ@ede	fdsdtZAede	fdudvZBede	fdwdxZCedye	fdzd{ZDed|d} ZE  ZFS )_ContainerIOManagerzSynchronizes all RPC calls and network operations for a running container.

    TODO: maybe we shouldn't synchronize the whole class.
    Then we could potentially move a bunch of the global functions onto it.
    r   function_idapp_idfunction_defcheckpoint_idinput_plane_server_urlcalls_completedtotal_user_timecurrent_input_idcurrent_inputscurrent_input_started_at_input_concurrency_enabled_target_concurrency_max_concurrency_concurrency_loop_input_slots_environment_name_heartbeat_loop_heartbeat_condition_waiting_for_memory_snapshot_is_interactivity_enabled_fetching_inputsrI   N
_singletoncontainer_argsrF   c                 C   s   |j | _ |j| _|j| _|j| _|jpd | _|j| _d| _d| _d | _i | _	d | _
|jjjtjjkr7d}d}n|jjp<d}|jjpB|}|| _|| _d | _d| _t|| _|j| _d | _d | _d| _d | _d| _d| _|| _ t!| j t"suJ d S )Nr           rO   FT)#r   r   r   r   r   r   r   r   r   r   r   pty_infopty_typer*   PTYInfoPTY_TYPE_SHELLmax_concurrent_inputstarget_concurrent_inputsr   r   r   _stop_concurrency_loopr   r   environment_namer   r   r   r  _cuda_checkpoint_sessionr  r  rI   r   r$   )rJ   r  rF   max_concurrencytarget_concurrencyr7   r7   r8   _init   s<   
z_ContainerIOManager._initrN   c                 C   s   | j d u r
t | _ | j S rG   )r   ra   	Conditionrl   r7   r7   r8   heartbeat_condition(  s   

z'_ContainerIOManager.heartbeat_conditionc                    s"   t  | | _| j|| | jS rG   )super__new__r  r  )rg   r  rF   	__class__r7   r8   r  0  s   z_ContainerIOManager.__new__c                 C   s
   d| _ dS )zOnly used for tests.N)r  rg   r7   r7   r8   _reset_singleton5  s   
z$_ContainerIOManager._reset_singletonc                    s   | j jt I d H  d S rG   )rI   rT   ContainerHellor   rl   r7   r7   r8   hello:  s   z_ContainerIOManager.helloc                    s  t  }	 t  }z|  I d H }t  }|r"tdI d H  W qW nS ty2   td Y d S  tyv } z9t  | }t  | }|}t	d|dd|dd|d |t
d	 krl|d
 }t	d|dd W Y d }~nd }~ww t  | }	tdt
|	 }
t|
I d H  q)NrO   g      ?z.Stopping heartbeat loop due to client shutdownuD   Modal Client → Modal Worker Heartbeat attempt failed (attempt_dur=z.2fz, time_since_heartbeat_success=z, error=)2   <   uL   Modal Client → Modal Worker heartbeat attempts have been failing for over z} minutes. Container will eventually be marked unhealthy. See https://modal.com/docs/guide/troubleshooting#heartbeat-timeout. r  )r   	monotonic_heartbeat_handle_cancellationsra   sleepr'   r&   info	Exceptionrk   r"   max)rJ   t_last_successt0got_cancellationr   attempt_durtime_since_heartbeat_successerrortrouble_minsheartbeat_durationtime_until_next_heartbeatr7   r7   r8   _run_heartbeat_loop=  sP   	
z'_ContainerIOManager._run_heartbeat_loopc              	      s   | j 4 I d H . | jr| j  I d H  | jstjdd}| jjj|tt	ddI d H }W d   I d H  n1 I d H s<w   Y  |
dr_|jj}|r]|D ]}|| jv r\| j|   qNdS dS )NT)!canceled_inputs_return_outputs_v2attempt_timeoutretrycancel_input_eventF)r  r  waitr*   ContainerHeartbeatRequestrI   rT   ContainerHeartbeatr    r#   HasFieldr5  r<   r   rm   )rJ   requestresponseinput_ids_to_cancelr   r7   r7   r8   r!  f  s&   
(

z3_ContainerIOManager._heartbeat_handle_cancellationswait_for_mem_snapNNc              	   C  s   t  4 I d H -}||   | _}|d || _z	d V  W |  n|  w W d   I d H  d S 1 I d H s;w   Y  d S )Nzheartbeat loop)r   create_taskr/  r   set_namer  rm   )rJ   r=  tctr7   r7   r8   
heartbeats  s   
.z_ContainerIOManager.heartbeatsc                 C   s   | j r
| j   d S d S rG   )r   rm   rl   r7   r7   r8   stop_heartbeat  s   z"_ContainerIOManager.stop_heartbeatc              	   C  s~   t  4 I d H *}||   | _}|d z	d V  W |  n|  w W d   I d H  d S 1 I d H s8w   Y  d S )Nzdynamic concurrency loop)r   r?  _dynamic_concurrency_loopr   r@  rm   )rJ   rA  rB  r7   r7   r8   dynamic_concurrency_manager  s   
.z/_ContainerIOManager.dynamic_concurrency_managerc              
      s   t d| j  | jstz;tj| j| j| jd}| j	j
j|ttddI d H }|j| jjkrG| jsGt d| jj d|j  | j|j W n tyf } zt d| j d|  W Y d }~nd }~ww ttI d H  | jrd S d S )	Nz+Starting dynamic concurrency loop for task )r   r  r  r1  r3  zDynamic concurrency set from z to z+Failed to get dynamic concurrency for task z, )r&   r   r   r  r*   $FunctionGetDynamicConcurrencyRequestr   r   r   rI   rT   FunctionGetDynamicConcurrencyr     DYNAMIC_CONCURRENCY_TIMEOUT_SECSconcurrencyr   r   r   r$  ra   r"  !DYNAMIC_CONCURRENCY_INTERVAL_SECS)rJ   r:  respr   r7   r7   r8   rE    s.   $z-_ContainerIOManager._dynamic_concurrency_loopfunction_call_idattempt_tokenc                 C  sR   | j j}| jr| j | jI dH }t| j ||d|d2 z	3 dH W }|V  q6 dS )z2Read from the `data_in` stream of a function call.Ndata_in)variantrN  )rI   rT   r   get_stubr   )rJ   rM  rN  rT   r   r7   r7   r8   get_data_in  s   z_ContainerIOManager.get_data_instart_indexrv   r   serialized_messagesc                    s   g }t |D ]'\}}tj||| d}	t|tkr&t|| jjI dH |	_n||	_	|
|	 qtj||d}
|r;||
_| jrR| j| jI dH }||
I dH  dS | jj|
I dH  dS )a]  Put data onto the `data_out` stream of a function call.

        This is used for generator outputs, which includes web endpoint responses. Note that this
        was introduced as a performance optimization in client version 0.57, so older clients will
        still use the previous Postgres-backed system based on `FunctionPutOutputs()`.
        )rv   indexN)rM  data_chunks)r   r*   	DataChunkr^   r   r   rI   rT   data_blob_idr   rw   FunctionCallPutDataRequestrN  r   rQ  FunctionCallPutDataOut)rJ   rM  rN  rS  rv   rT  rV  r   message_byteschunkreqrT   r7   r7   r8   put_data_out  s   z _ContainerIOManager.put_data_out
message_rxc              	     sn   t    fdd}t| }zdV  W  I dH  |I dH  dS  I dH  |I dH  w )z[Runs background task that feeds generator outputs into a function call's `data_out` stream.c                     s   d} d}|sv  I d H }| u rd S | dkr tdI d H  t|g}t|d d }|dk r`z }W n
 tjyB   Y nw | u rJd}n|t| |t|d d 7 }|dk s2| |I d H  | t|7 } |rd S d S )	NrO   FgMbP?r   i   i   T)	getra   r"  r   r^   
get_nowait
QueueEmptyrw   r^  )rU  received_sentinelmessagerT  
total_sizeGENERATOR_STOP_SENTINELrN  rv   rM  r_  rJ   r7   r8   generator_output_task  s4   zJ_ContainerIOManager.generator_output_sender.<locals>.generator_output_taskN)r:   ra   r?  put)rJ   rM  rN  rv   r_  ri  taskr7   rg  r8   generator_output_sender  s   	z+_ContainerIOManager.generator_output_sendersizec                    s   t |S )zOCreate a queue, on the synchronicity event loop (needed on Python 3.8 and 3.9).)ra   Queue)rJ   rm  r7   r7   r8   _queue_create	  s   
z!_ContainerIOManager._queue_createqueuer   c                    s   | |I dH  dS )z=Put a value onto a queue, using the synchronicity event loop.N)rj  )rJ   rp  r   r7   r7   r8   
_queue_put  s   z_ContainerIOManager._queue_putc                 C   s   | j dkrdS | j| j  S r   )r   r   rl   r7   r7   r8   get_average_call_time  s   
z)_ContainerIOManager.get_average_call_timec                 C   s&   | j dkrdS ttt|  d S )Nr   rO   gư>)r   mathceilr/   r%  rr  rl   r7   r7   r8   get_max_inputs_to_fetch  s   
z+_ContainerIOManager.get_max_inputs_to_fetchbatch_max_sizebatch_wait_msc                 C  s  t j| jd}d}| jr| j I d H  |  |_|  |_	| 
 |_|||_|_d}z|d7 }| jj|I d H }|jrQtd|j  t|jI d H  n|jrdt|j  k rftd|jksiJ  J g }d}|jD ]B}	|	jrtd| j d  W |s| j  d S d S ||	j|	j|	j |	j!|	j"f |	j"j#r|jdkrtd| j d d	} nqp|V  d	}| j$j%p| j$j&dk}
|s|
rW |s| j  d S d S W |s| j  n	|s| j  w w | jsd S d S )
N)r   r   FrO   zATask exceeded rate limit, sleeping for %.2fs before trying again.zTask z input kill signal input.z/ Final input not expected in batch input streamT)'r*   FunctionGetInputsRequestr   r  r   r   rr  average_call_timeru  
max_valuesget_input_concurrencyinput_concurrencyrv  batch_linger_msrI   rT   FunctionGetInputsrate_limit_sleep_durationr&   r#  ra   r"  rM   r^   r%  kill_switchr   r   r   rw   r   r   rM  rN  rP   final_inputr   single_use_containers
max_inputs)rJ   rv  rw  r:  	iterationyieldedr;  rM   final_input_receivedr   single_use_containerr7   r7   r8   _generate_inputs  sp   


*

 
z$_ContainerIOManager._generate_inputsr   rL   rA   c              	   C  s   | j | jkr|  nt }|4 I d H P | ||2 z43 d H W }t| j|||dkI d H }|jD ]}|| j	|< q2|jd t

 | _| _|V  d\| _| _q6 | j I d H  W d   I d H  d S 1 I d H sjw   Y  d S )Nr   r>  )r   r   rF  r   r  r;   rh   rI   r<   r   r   r   r   r   r   )rJ   rL   rv  rw  rF  rM   
io_contextr   r7   r7   r8   run_inputs_outputsX  s   
	.z&_ContainerIOManager.run_inputs_outputsr   outputsc                    sr   d}t dt||D ]}| jjjtj||||  dttj	gdddI dH  qdd |D }| 
|| dS )	z4Send pre-built output items with retry and chunking.   r   )r  N)additional_status_codesmax_retriesr3  c                 S   s   g | ]}|j qS r7   )r   )rY   outputr7   r7   r8   rZ     s    z5_ContainerIOManager._send_outputs.<locals>.<listcomp>)r~   r^   rI   rT   FunctionPutOutputsr*   FunctionPutOutputsRequestr    r   RESOURCE_EXHAUSTEDexit_context)rJ   r   r  output_batch_sizer   r<   r7   r7   r8   _send_outputsr  s   z!_ContainerIOManager._send_outputsc                 C  s   zdV  W dS  t y     tyw } z]t|trt| tt|||j t|| j	\}}t
t|| jjI dH }tjddtjji|t|dtt|||j|pXd|p[dd}tj|d}| jj|I dH  t d}~ww )zSets the task as failed in a way where it's not retried.

        Used for handling exceptions from container lifecycle methods at the moment, which should
        trigger a task failure state.
        Nr        )r   r   r   r   )r   r7   )KeyboardInterruptr   r   ImportError$check_fastapi_pydantic_compatibilityr   r   __traceback__r   r   r   r   rI   rT   r*   r   r   r   joinr   format_exceptionTaskResultRequest
TaskResultr1   )rJ   r   r   r   data_or_blobr   r]  r7   r7   r8   handle_user_exception  s4   

z)_ContainerIOManager.handle_user_exceptionr  c              
   C  s   zdV  W dS  t tfy     ttjfy7   ||I dH }| ||I dH  td|j	  Y dS  t
yl } z*t|trGt| tt   ||| j|I dH }| ||I dH  W Y d}~dS d}~ww )z6Handle an exception while processing a function input.NzSuccessfully canceled input )r  GeneratorExitr(   ra   CancelledErrorr   r  r&   rk   r<   r   r   r  r  r   sysexc_infor   r   )rJ   r  r   r  r   r7   r7   r8   handle_input_exception  s$   
 z*_ContainerIOManager.handle_input_exceptionr<   c                 C   sH   |  j t | 7  _ |  jd7  _|D ]}| j| q| j  d S r   )r   r   r   r   popr   r   )rJ   r   r<   r   r7   r7   r8   r    s
   z _ContainerIOManager.exit_contextoutput_datac                    s*   | ||I d H }| ||I d H  d S rG   )r   r  )rJ   r  r   r  r  r7   r7   r8   push_outputs  s   z _ContainerIOManager.push_outputsc                    sR  t td}td | stdI d H  qtd |d}t	
|}W d    n1 s4w   Y  t|ddrItd t  d	D ]}|| }rgtd
| d|  t| |||  qK| D ]\}}|dkrzt|| ql| jjr| jjjjrtd | jsJ d| j  d | _i | _d | _t I d H | _d S )Nrestore_state_pathzWaiting for restoreg{Gz?zContainer: restoredrsnapshot_debugr   zEntering snapshot debugger)r   r   zUpdating ContainerIOManager.z = r  z>GPU memory snapshot enabled. Attempting to restore GPU memory.zECudaCheckpointSession not found when attempting to restore GPU memory)r   r%   ra  r&   r   existsra   r"  openjsonloadrf   
breakpointsetattrr   override_locallyr   !_experimental_enable_gpu_snapshot	resources
gpu_configgpu_typer  restorer   r   r   r$   from_envrI   )rJ   restored_pathfilerestored_statekeyr   r7   r7   r8   memory_restore  s@   




z"_ContainerIOManager.memory_restorec              	      s  | j rtd| j  d ntd| j4 I dH Z | jjr4| jjjj	r4td t
 | _| j  d| _| j  | jjtj| j dI dH  | jjddI dH  td	 |  I dH  d
| _| j  W d  I dH  dS 1 I dH szw   Y  dS )zDMessage server indicating that function is ready to be checkpointed.zCheckpoint ID: z (Memory Snapshot ID)z-No checkpoint ID provided for memory snapshotNz?GPU memory snapshot enabled. Attempting to snapshot GPU memory.T)r   )prep_for_restorez0Memory snapshot request sent. Connection closed.F)r   r&   r   
ValueErrorr  r   r  r  r  r  r   CudaCheckpointSessionr  
checkpointr  
notify_allrI   rT   ContainerCheckpointr*   ContainerCheckpointRequest_closer  rl   r7   r7   r8   memory_snapshot  s(   





.z#_ContainerIOManager.memory_snapshot
volume_idsc                    s   |sdS t tj I dH  tj fdd|D ddiI dH }t||D ]\}}t|tr:t	d| d|  q%t
d| d	 q%dS )
z
        Perform volume commit for given `volume_ids`.
        Only used on container exit to persist uncommitted changes on behalf of user.
        Nc              
      s2   g | ]} j jjtj|d tddddddqS ))	volume_id	   g      ?      )r  
base_delay	max_delaydelay_factorr3  )rI   rT   VolumeCommitr*   VolumeCommitRequestr    )rY   v_idrl   r7   r8   rZ   9  s    

z5_ContainerIOManager.volume_commit.<locals>.<listcomp>return_exceptionsTz*modal.Volume background commit failed for z. Exception: z+modal.Volume background commit success for ro   )r   ossyncra   rb   r_   r   r$  r&   r+  r   )rJ   r  resultsr  resr7   rl   r8   volume_commit0  s    


z!_ContainerIOManager.volume_commitFfrom_breakpointc              
      s|   | j rd S d| _ | jjjs|rdnd}td| dz| jjt I d H  W d S  t	y= } zt
d |d }~ww )NTzbreakpoint()zmodal.interact()zCannot use z+ without running Modal in interactive mode.zFailed to start PTY shell.)r  r   r  r  r)   rI   rT   FunctionStartPtyShellr   r$  r&   r+  )rJ   r  triggerer7   r7   r8   interactM  s   

z_ContainerIOManager.interactc                 C      | j S rG   )r   rl   r7   r7   r8   r  ]     z&_ContainerIOManager.target_concurrencyc                 C   r  rG   )r   rl   r7   r7   r8   r  a  r  z#_ContainerIOManager.max_concurrencyc                 C   s   t | j| jdkS r   )r%  r   r   rl   r7   r7   r8   input_concurrency_enablede  s   z-_ContainerIOManager.input_concurrency_enabledc                 C   s    | j }|sJ t|jj|jjS )z
        Returns the number of usable input slots.

        If concurrency is reduced, active slots can exceed allotted slots. Returns the larger value
        in this case.
        )r  r%  r   r   r   )rg   
io_managerr7   r7   r8   r{  i  s   	z)_ContainerIOManager.get_input_concurrencyrJ  c                 C   s0   | j }|sJ d|_t||j}|j| dS )z
        Edit the number of input slots.

        This disables the background loop which automatically adjusts concurrency
        within [target_concurrency, max_concurrency].
        TN)r  r  minr   r   r   )rg   rJ  r  r7   r7   r8   set_input_concurrencyv  s
   z)_ContainerIOManager.set_input_concurrencyc                 C   s   | j stdd| j _d S )Nz-Must be called from within a Modal container.F)r  r   r  r  r7   r7   r8   stop_fetching_inputs  s   z(_ContainerIOManager.stop_fetching_inputs)r   r   r   )F)Gr3   r4   r5   r6   re   r   r*   Functionr   rf   r   r   r;   r   ra   Taskr   r  r$   r  r	   ContainerArgumentsr  propertyr  r  r   r  r  r/  r!  r   r   rC  rD  rF  rE  r   r   rR  rd   r^  rn  rl  ro  rq  rr  ru  r   no_io_translationr   r`   r  r  r   r  r  r  r  r  r  r  r  r  r  r  r  r{  r  r  __classcell__r7   r7   r  r8   r     s  
 
(
)
	
!-:
(



/"r   r   rN   c                 C   s   d}| j pd}|drIz2ttjd}ttjd}|dkr9|dk r<tjdk r1|g| _W dS | 	| W dS W dS W dS  t
yH   Y dS w dS )	a  Add a helpful note to an exception that is likely caused by a pydantic<>fastapi version incompatibility.

    We need this becasue the legacy set of container requirements (image_builder_version=2023.12) contains a
    version of fastapi that is not forwards-compatible with pydantic 2.0+, and users commonly run into issues
    building an image that specifies a more recent version only for pydantic.
    zPlease ensure that your Image contains compatible versions of fastapi and pydantic. If using pydantic>=2.0, you must also install fastapi>=0.100.r  pydanticfastapi)r  r   )r   d   )r+      N)r}   
startswithr!   	importlibmetadataversionr  version_info	__notes__add_noter$  )r   noter}   fastapi_versionpydantic_versionr7   r7   r8   r    s    


r  )Xra   importlib.metadatar  ry   r  rs  r  r  r   r   collections.abcr   r   
contextlibr   pathlibr   typingr   r   r   r	   r
   r   r   google.protobuf.empty_pb2r   grpclibr   synchronicity.async_wrapr   modal._runtimer   modal._serializationr   r   r   r   modal._tracebackr   modal._utils.async_utilsr   r   r   r   r   modal._utils.blob_utilsr   r   r   r   modal._utils.function_utilsr   modal._utils.grpc_utilsr    modal._utils.package_utilsr!   modal.clientr"   r#   r$   modal.configr%   r&   modal.exceptionr'   r(   r)   modal_protor*    modal._runtime.user_code_importsmodalrK  rI  r.   rf   r   r/   r   r$  r1   r:   r;   r   r   ContainerIOManagerr  r  r7   r7   r7   r8   <module>   s`   
$
  m.     6