o
    ;iZ                     @   sb  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZ d
dl m!Z! ddl"m#Z# ddl$m%Z%m&Z& de'de(fddZ)de'dee* fddZ+dd
ddde*de*dee, fddZ-de'dej.fdd Z/d!d" Z0G d#d$ d$Z1dS )%    N)suppress)AsyncGeneratorOptional)	GRPCErrorStatus)StreamTerminatedError)logger)ConflictErrorExecTimeoutError)api_pb2task_command_router_pb2)TaskCommandRouterStub   )grpc_error_converter   )aclosing)RETRYABLE_GRPC_STATUS_CODESconnect_channeldatareturnc                 C   s    dt |  d  }t| | S )z9Decode a base64url string with missing padding tolerated.=   )lenbase64urlsafe_b64decode)r   padding r   [/home/ubuntu/.local/lib/python3.10/site-packages/modal/_utils/task_command_router_client.py_b64url_decode   s   r   	jwt_tokenc                 C   sz   z-|  d}t|dkrW dS t|d }t|}|d}t|ttfr+t|W S W dS  t	y<   t
d Y dS w )zParse exp from a JWT without verification. Returns UNIX time seconds or None.

    This is best-effort; if parsing fails or claim missing, returns None.
    .   Nr   expzFailed to parse JWT expiration)splitr   r   jsonloadsget
isinstanceintfloat	Exceptionr   warning)r   parts	payload_bpayloadr"   r   r   r   _parse_jwt_expiration"   s   




r/   {Gz?
   base_delay_secsdelay_factormax_retriesr3   r4   r5   c             
      sr  |ddt f fdd}	 z|  I dH W S  tyA } z|du s(|k r5|jtv r5||I dH  n|W Y d}~n{d}~w tyj } z|du sP|k r^dt|v r^||I dH  n|W Y d}~nRd}~w ty } z|du sy|k r||I dH  n|W Y d}~n/d}~w ttj	fy } z|du s|k r||I dH  nt
t|W Y d}~nd}~ww q)zCall func() with transient error retries and exponential backoff.

    Authentication retries are expected to be handled by the caller.
    r   ec                    s<   t d d|   tI d H   9 d7 d S )NzRetrying RPC with delay s due to error: r   )r   debugasynciosleepr6   r4   
delay_secsnum_retriesr   r   0sleep_and_update_delay_and_num_retries_remainingE   s
   z_call_with_retries_on_transient_errors.<locals>.sleep_and_update_delay_and_num_retries_remainingTN_write_appdata)r*   r   statusr   AttributeErrorstrr   OSErrorr9   TimeoutErrorConnectionError)funcr3   r4   r5   r?   r6   r   r<   r   %call_with_retries_on_transient_errors7   sB   rH   task_idc                    s   | j tj|dI dH S )z:Fetch direct command router access info from Modal server.)rI   N)stubTaskGetCommandRouterAccessr   !TaskGetCommandRouterAccessRequest)server_clientrI   r   r   r   fetch_command_router_accessh   s   

rN   c                 C   sF   |   s!tt | |j W d    d S 1 sw   Y  d S d S N)	is_closedr   r*   call_soon_threadsafeclose)loopchannelr   r   r   _finalize_channelo   s
   
"rU   c                   @   s  e Zd ZdZededed  fddZdddd	ded
ededej	j
dejdejdedededdfddZdd Zd6ddZdejdejfddZ	d7dededddee deejdf f
d d!Zdeded"ed#ed$edejfd%d&Z	d7dededee dejfd'd(Z	d7dededee dej fd)d*Z!d6d+d,Z"d-d. Z#	d7dededd/dee deejdf f
d0d1Z$dej%fd2d3Z&dej'dej(fd4d5Z)dS )8TaskCommandRouterClientz
    Client used to talk directly to TaskCommandRouter service on worker hosts.

    A new instance should be created per task.
    rI   r   c                    s  z
t ||I dH }W n ty   td|  Y dS w td|  tj|j}|jdkr:t	d|j |j
d\}}}|rIt|nd}t }	|jr^td d	|	_tj|	_tjj|||	tjjd
d
dd}
t|
I dH  t }t }| |||j|j|
||S )zAttempt to initialize a TaskCommandRouterClient by fetching direct access.

        Returns None if command router access is not enabled (FAILED_PRECONDITION).
        Nz.Command router access is not enabled for task z%Using command router access for task httpsz$Task router URL must be https, got: :i  zTUsing insecure TLS for task command router because server client points to localhostFi   )http2_connection_window_sizehttp2_stream_window_size)sslconfig)rN   r	   r   r8   urllibparseurlparseurlscheme
ValueErrornetloc	partitionr(   r[   create_default_context_is_localhostr+   check_hostname	CERT_NONEverify_modegrpclibclientChannelr\   Configurationr   r9   get_running_loopLockjwt)clsrM   rI   respohost_port_strportssl_contextrT   rS   jwt_refresh_lockr   r   r   try_init   s>   



z TaskCommandRouterClient.try_initr0   r   r1   )stream_stdio_retry_delay_secsstream_stdio_retry_delay_factorstream_stdio_max_retries
server_urlrp   rT   rS   ry   r{   r|   r}   Nc                C   sn   || _ || _|| _|| _|| _|| _|| _|	| _|
| _t	|| _
|| _d| _t| t||| _t| j| _dS )zUCallers should not use this directly. Use TaskCommandRouterClient.try_init() instead.FN)_loop_server_client_task_id_server_url_jwt_channelr{   r|   r}   r/   _jwt_exp_jwt_refresh_lock_closedweakreffinalizerU   _channel_finalizerr   _stub)selfrM   rI   r~   rp   rT   rS   ry   r{   r|   r}   r   r   r   __init__   s&   
z TaskCommandRouterClient.__init__c                 C   s   dd| j  iS )NauthorizationzBearer )r   r   r   r   r   _get_metadata   s   z%TaskCommandRouterClient._get_metadatac                    s6   | j rdS d| _ | j  | jjr| j  dS dS )zClose the client.NT)r   r   rR   r   alivedetachr   r   r   r   rR      s   
zTaskCommandRouterClient.closerequestc                    sD   t   t fddI dH W  d   S 1 sw   Y  dS )z?Start an exec'd command, properly retrying on transient errors.c                          jj S rO   )_call_with_auth_retryr   TaskExecStartr   r   r   r   r   <lambda>       z4TaskCommandRouterClient.exec_start.<locals>.<lambda>Nr   rH   r   r   r   r   r   
exec_start   s   $z"TaskCommandRouterClient.exec_startexec_idfile_descriptorz api_pb2.FileDescriptor.ValueTypedeadlinec              
   C  s   |t jkr
tj}n!|t jkrtj}n|t jks|t jkr$td| td| t	 B t
| ||||4 I dH }|2 z	3 dH W }|V  q@6 W d  I dH  n1 I dH s[w   Y  W d   dS W d   dS 1 ssw   Y  dS )a.  Stream stdout/stderr batches from the task, properly retrying on transient errors.

        Args:
            task_id: The task ID of the task running the exec'd command.
            exec_id: The execution ID of the command to read from.
            file_descriptor: The file descriptor to read from.
            deadline: The deadline by which all output must be streamed. If
              None, wait forever. If the deadline is exceeded, raises an
              ExecTimeoutError.
        Returns:
            AsyncGenerator[sr_pb2.TaskExecStdioReadResponse, None]: A stream of stdout/stderr batches.
        Raises:
            ExecTimeoutError: If the deadline is exceeded.
            Other errors: If retries are exhausted on transient errors or if there's an error
              from the RPC itself.
        zUnsupported file descriptor: zInvalid file descriptor: N)r   FILE_DESCRIPTOR_STDOUTsr_pb2&TASK_EXEC_STDIO_FILE_DESCRIPTOR_STDOUTFILE_DESCRIPTOR_STDERR&TASK_EXEC_STDIO_FILE_DESCRIPTOR_STDERRFILE_DESCRIPTOR_INFOFILE_DESCRIPTOR_UNSPECIFIEDrb   r   r   _stream_stdio)r   rI   r   r   r   sr_fdstreamitemr   r   r   exec_stdio_read   s"   

*"z'TaskCommandRouterClient.exec_stdio_readoffsetr   eofc                    sX   t j|||||d t  t fddI dH W  d   S 1 s%w   Y  dS )aB  Write to the stdin stream of an exec'd command, properly retrying on transient errors.

        Args:
            task_id: The task ID of the task running the exec'd command.
            exec_id: The execution ID of the command to write to.
            offset: The offset to start writing to.
            data: The data to write to the stdin stream.
            eof: Whether to close the stdin stream after writing the data.
        Raises:
            Other errors: If retries are exhausted on transient errors or if there's an error
              from the RPC itself.
        )rI   r   r   r   r   c                      r   rO   )r   r   TaskExecStdinWriter   r   r   r   r   +  r   z:TaskCommandRouterClient.exec_stdin_write.<locals>.<lambda>N)r   TaskExecStdinWriteRequestr   rH   )r   rI   r   r   r   r   r   r   r   exec_stdin_write  s   $z(TaskCommandRouterClient.exec_stdin_writec              	      s   t j||d |dur|t  nd}|dur#|dkr#td| t * ztjt fdd|dI dH W W  d   S  tj	yO   td| w 1 sSw   Y  dS )a2  Poll for the exit status of an exec'd command, properly retrying on transient errors.

        Args:
            task_id: The task ID of the task running the exec'd command.
            exec_id: The execution ID of the command to poll on.
        Returns:
            sr_pb2.TaskExecPollResponse: The exit status of the command if it has completed.

        Raises:
            ExecTimeoutError: If the deadline is exceeded.
            Other errors: If retries are exhausted on transient errors or if there's an error
              from the RPC itself.
        rI   r   Nr   z)Deadline exceeded while polling for exec c                      r   rO   )r   r   TaskExecPollr   r   r   r   r   I  r   z3TaskCommandRouterClient.exec_poll.<locals>.<lambda>timeout)
r   TaskExecPollRequesttime	monotonicr
   r   r9   wait_forrH   rE   r   rI   r   r   r   r   r   r   	exec_poll.  s$   z!TaskCommandRouterClient.exec_pollc              	      s   t j||d |dur|t  nd}|dur#|dkr#td| t . ztjt fdddddd|d	I dH W W  d   S  tj	yS   td| w 1 sWw   Y  dS )
a  Wait for an exec'd command to exit and return the exit code, properly retrying on transient errors.

        Args:
            task_id: The task ID of the task running the exec'd command.
            exec_id: The execution ID of the command to wait on.
        Returns:
            Optional[sr_pb2.TaskExecWaitResponse]: The exit code of the command.
        Raises:
            ExecTimeoutError: If the deadline is exceeded.
            Other errors: If there's an error from the RPC itself.
        r   Nr   z)Deadline exceeded while waiting for exec c                      s   j jj ddS )N<   r   )r   r   TaskExecWaitr   r   r   r   r   t  s    z3TaskCommandRouterClient.exec_wait.<locals>.<lambda>r   r2   r   )
r   TaskExecWaitRequestr   r   r
   r   r9   r   rH   rE   r   r   r   r   	exec_waitP  s*   z!TaskCommandRouterClient.exec_waitc              	      s  | j 4 I dH o | jr	 W d  I dH  dS | jdur<| jt  dkr<td| j d 	 W d  I dH  dS td| j  t| j| jI dH }td| j  |j	| j
ksbJ d|j| _t|j| _W d  I dH  dS 1 I dH s}w   Y  dS )z6Refresh JWT from the server and update internal state.N   z+Skipping JWT refresh for exec with task ID z; because its expiration is already far enough in the futurez%Refreshing JWT for exec with task ID z.Finished refreshing JWT for exec with task ID z&Task router URL changed during session)r   r   r   r   r   r8   r   rN   r   r`   r   rp   r   r/   )r   rr   r   r   r   _refresh_jwt~  s$   .z$TaskCommandRouterClient._refresh_jwtc              
      s   z||i |d|   iI d H W S  tyB } z$|jtjkr=|  I d H  ||i |d|   iI d H W  Y d }~S  d }~ww )Nmetadata)r   r   rA   r   UNAUTHENTICATEDr   )r   rG   argskwargsexcr   r   r   r     s   ",z-TaskCommandRouterClient._call_with_auth_retryz,sr_pb2.TaskExecStdioFileDescriptor.ValueTypec                   s  d}| j | j| jd}dtf fdd}	  dur)td t  nd}z| jjj	|| 
 d}	|	4 I dH l}
tj|||d	}z'|
j|dd
I dH  |
2 z3 dH W }|r^d}| j |t|j7 }|V  qT6 W n. ty } z"|jtjkr|s|  I dH  d}W Y d}~W d  I dH  W q d}~ww W d  I dH  W dS 1 I dH sw   Y  W dS  ty } zdkr|jtv r||I dH  n|W Y d}~nd}~w ty } zdkrdt|v r||I dH  n|W Y d}~nqd}~w ty" } zdkr||I dH  n|W Y d}~nPd}~w tjyH } zdkr8||I dH  ntt|W Y d}~n*d}~w tym } zdkr]||I dH  ntt|W Y d}~nd}~ww q)zStream stdio from the task, properly updating the offset and retrying on transient errors.
        Raises ExecTimeoutError if the deadline is exceeded.
        r   Fr6   c                    sb   t d d|    d ur t  krtd tI d H  9 d8 d S )NzRetrying stdio read with delay r7   z1Deadline exceeded while streaming stdio for exec r   )r   r8   r   r   r
   r9   r:   r;   r   r4   r=   r   num_retries_remainingr   r   r?     s   z_TaskCommandRouterClient._stream_stdio.<locals>.sleep_and_update_delay_and_num_retries_remainingTN)r   r   )rI   r   r   r   )endr@   )r{   r|   r}   r*   maxr   r   r   TaskExecStdioReadopenr   r   TaskExecStdioReadRequestsend_messager   r   r   rA   r   r   r   r   rB   rC   r   r9   rE   rF   rD   )r   rI   r   r   r   r   did_auth_retryr?   r   r   sreqr   r   r6   r   r   r   r     s   





z%TaskCommandRouterClient._stream_stdioc                    D   t   t fddI d H W  d    S 1 sw   Y  d S )Nc                      r   rO   )r   r   TaskMountDirectoryr   r   r   r   r     r   z5TaskCommandRouterClient.mount_image.<locals>.<lambda>r   r   r   r   r   mount_image   s   $z#TaskCommandRouterClient.mount_imagec                    r   )Nc                      r   rO   )r   r   TaskSnapshotDirectoryr   r   r   r   r     r   z<TaskCommandRouterClient.snapshot_directory.<locals>.<lambda>r   r   r   r   r   snapshot_directory  s   $z*TaskCommandRouterClient.snapshot_directory)r   NrO   )*__name__
__module____qualname____doc__classmethodrC   r   rz   rj   rk   rl   r9   AbstractEventLoopro   r)   r(   r   r   rR   r   TaskExecStartRequestTaskExecStartResponser   r   TaskExecStdioReadResponser   bytesboolTaskExecStdinWriteResponser   TaskExecPollResponser   TaskExecWaitResponser   r   r   r   TaskMountDirectoryRequestr   TaskSnapshotDirectoryRequestTaskSnapshotDirectoryResponser   r   r   r   r   rV   x   s    :

.

&

&

.
^rV   )2r9   r   r$   r[   r   urllib.parser]   r   
contextlibr   typingr   r   grpclib.clientrj   grpclib.configgrpclib.eventsr   r   grpclib.exceptionsr   modal.configr   modal.exceptionr	   r
   modal_protor   r   r   $modal_proto.task_command_router_grpcr   _grpc_clientr   async_utilsr   
grpc_utilsr   r   rC   r   r   r)   r/   r(   rH   "TaskGetCommandRouterAccessResponserN   rU   rV   r   r   r   r   <module>   sH   
1	