o
    ;iqj                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlZd dlZd dlmZ ddlmZmZ dd	l m!Z!m"Z" dd
l#m$Z$ ddl%m&Z&m'Z'm(Z( ddl%m)Z)m*Z* ddl+m,Z, ddl-m.Z.m/Z/m0Z0 ddl-m1Z1m2Z2m3Z3 ddl-m4Z4m5Z5 ddl6m7Z7m8Z8 ddl9m:Z:m;Z;m<Z< ddl=m>Z>m?Z?m@Z@ ddlAmBZBmCZC ddlAmDZD ddlEmFZG e	rd dlHZIddlJm Z  ddlJm6Z6 ddlKmLZL eMeNZOeeePePf  ZQG dd de(e) ee)e*f ZR		d4dddeSde
e! d e
eP d!df
d"d#ZTd$eePd%f d&dd'eQd(e?d)e
e@ d*e,d+eg ef d!dfd,d-ZUG d.d/ d/e jVZWG d0d1 d1eWe8ZXG d2d3 d3eWZYdS )5    N)TracebackType)TYPE_CHECKINGOptional
CollectionGenericTypecast)ListTupleDictAnyCallableContextManagerSet)nullcontext)	MultiDict   )DeadlineWrapperWrapper)StatusCardinality)Configuration)send_messagerecv_messageStreamIterator)	_RecvType	_SendType)_DispatchServerEvents)Deadlineencode_grpc_message	_Metadata)encode_metadatadecode_metadata_MetadataLike)_STATUS_DETAILS_KEYencode_bin_value)
H2ProtocolAbstractHandler)	GRPCErrorProtocolErrorStreamTerminatedError)GRPC_CONTENT_TYPE	CodecBaseStatusDetailsCodecBase)
ProtoCodecProtoStatusDetailsCodec)_googleapis_available)servers)const)protocol)	IServablec                   @   s>  e Zd ZdZdZdZdZdZdZdZ	dddddde
d	ed
ee dee dedee dedee dee
 fddZede
fddZdee fddZdddee ddfddZdeddfddZejddddd ed!ee
 d"edee ddf
d#d$Zd/d%d&Zd0d(d)Z d*eee!  d+ee! d,ee" dee# fd-d.Z$dS )1Streamu  
    Represents gRPC method call – HTTP/2 request/stream, and everything you
    need to communicate with client in order to handle this request.

    As you can see, every method handler accepts single positional argument -
    stream:

    .. code-block:: python3

        async def MakeLatte(self, stream: grpclib.server.Stream):
            task: cafe_pb2.LatteOrder = await stream.recv_message()
            ...
            await stream.send_message(empty_pb2.Empty())

    This is true for every gRPC method type.
    Fr   N)deadline
user_agentstreamprotocol.Streammethod_namecardinality	recv_type	send_typecodecstatus_details_codecdispatchr6   r7   c                C   sT   || _ || _|| _|| _|| _|| _|| _|| _|	| _d | _	|
| _
| j j | _d S N)_stream_method_name_cardinality
_recv_type
_send_type_codec_status_details_codec	_dispatchr6   metadatar7   
connectionget_peerpeer)selfr8   r:   r;   r<   r=   r>   r?   r@   r6   r7    rO   B/home/ubuntu/.local/lib/python3.10/site-packages/grpclib/server.py__init__J   s   zStream.__init__returnc                 C   s   t d | jj S )N+)r+   rG   __content_subtype__rN   rO   rO   rP   _content_typej   s   zStream._content_typec                    sl   t | j| j| jI dH }|dur4| j |I dH \}|  jd7  _| jj jd7  _t	 | jj_
|S dS )ab  Coroutine to receive incoming message from the client.

        If client sends UNARY request, then you can call this coroutine
        only once. If client sends STREAM request, then you should call this
        coroutine several times, until it returns None when the client has
        ended the stream. To simplify your code in this case,
        :py:class:`Stream` class implements async iteration protocol, so
        you can use it like this:

        .. code-block:: python3

            async for message in stream:
                do_smth_with(message)

        or even like this:

        .. code-block:: python3

            messages = [msg async for msg in stream]

        HTTP/2 has flow control mechanism, so server will acknowledge received
        DATA frames as a message only after user consumes this coroutine.

        :returns: message
        Nr   )r   rB   rG   rE   rI   _messages_receivedrK   messages_receivedtime	monotoniclast_message_receivedrN   messagerO   rO   rP   r   n   s   zStream.recv_message)rJ   rJ   c                   sn   | j rtddd| jfg}t|pd}| j|I dH \}|ttt	| | j
|I dH  d| _ dS )a  Coroutine to send headers with initial metadata to the client.

        In gRPC you can send initial metadata as soon as possible, because
        gRPC doesn't use `:status` pseudo header to indicate success or failure
        of the current request. gRPC uses trailers for this purpose, and
        trailers are sent during :py:meth:`send_trailing_metadata` call, which
        should be called in the end.

        .. note:: This coroutine will be called implicitly during first
            :py:meth:`send_message` coroutine call, if not called before
            explicitly.

        :param metadata: custom initial metadata, dict or list of pairs
        z!Initial metadata was already sent:status200content-typerO   NT)_send_initial_metadata_doner)   rV   r   rI   send_initial_metadataextendr!   r   r    rB   send_headers)rN   rJ   headersrO   rO   rP   rc      s   
zStream.send_initial_metadatar]   c                    s   | j s|  I dH  | jjs| jrtd| j|I dH \}t| j| j	|| j
I dH  d| _|  jd7  _| jj jd7  _t | jj_dS )a  Coroutine to send message to the client.

        If server sends UNARY response, then you should call this coroutine only
        once. If server sends STREAM response, then you can call this coroutine
        as many times as you need.

        :param message: message object
        NzMessage was already sentTr   )rb   rc   rD   server_streaming_send_message_doner)   rI   r   rB   rG   rF   _messages_sentrK   messages_sentrY   rZ   last_message_sentr\   rO   rO   rP   r      s   	zStream.send_message)statusstatus_messagestatus_detailsrJ   rl   rm   rn   c                   s2  | j rtd| jjs| js|tju rtd| jrg }ndd| jfg}|	dt
|jf |dur<|	dt|f |durY| jdurYt| j|||d}|	t|f t|p]d	}| jj||||d
I dH \}|ttt| | jj|ddI dH  d| _ |tjkr| jjr| j  dS dS dS )a  Coroutine to send trailers with trailing metadata to the client.

        This coroutine allows sending trailers-only responses, in case of some
        failure conditions during handling current request, i.e. when
        ``status is not OK``.

        .. note:: This coroutine will be called implicitly at exit from
            request handler, with appropriate status code, if not called
            explicitly during handler execution.

        :param status: resulting status of this coroutine call
        :param status_message: description for a status
        :param metadata: custom trailing metadata, dict or list of pairs
        z"Trailing metadata was already sentzBUnary response with OK status requires a single message to be sentr^   ra   grpc-statusNgrpc-messageasciirO   rl   rm   rn   T
end_stream)_send_trailing_metadata_doner)   rD   rg   rh   r   OKrb   rV   appendstrvaluer   rH   r%   encodedecoder$   r   rI   send_trailing_metadatard   r!   r   r    rB   re   closablereset_nowait)rN   rl   rm   rn   rJ   rf   status_details_binrO   rO   rP   r|      sP   

zStream.send_trailing_metadatac                    s*   | j rtd| j I dH  d| _ dS )zCoroutine to cancel this request/stream.

        Server will send RST_STREAM frame to the client, so it will be
        explicitly informed that there is nothing to expect from the server
        regarding this request/stream.
        zStream was already cancelledNT)_cancel_doner)   rB   resetrU   rO   rO   rP   cancel  s
   
zStream.cancelStream[_RecvType, _SendType]c                       | S rA   rO   rU   rO   rO   rP   
__aenter__     zStream.__aenter__exc_typeexc_valexc_tbc                    s   | j s| js| jj rdS d }|d ur3t|tr$|j}|j}|j	}n+t|t
r1tj}d}d }nd S | jjsH| jsHtj}d}d }d| j}ntj}d }d }z| j|||dI d H  W n tjjyg   Y nw |d urpt|dS )NTzInternal Server ErrorzHUnary response with OK status requires a single message to be sent: {!r}rr   )ru   r   rB   
_transport
is_closing
isinstancer(   rl   r]   details	Exceptionr   UNKNOWNrD   rg   rh   formatrC   rv   r|   h2
exceptionsStreamClosedErrorr)   )rN   r   r   r   protocol_errorrl   rm   rn   rO   rO   rP   	__aexit__"  sV   


zStream.__aexit__rR   N)rR   r   )%__name__
__module____qualname____doc__rb   rh   ru   r   ri   rW   rx   r   r   r   r   r,   r   r-   r   r   rQ   propertyrV   r   r#   rc   r   r   rv   r   r|   r   r   BaseExceptionr   boolr   rO   rO   rO   rP   r5   /   s    	

 '
!

H

r5   	h2_streamr9   	h2_statusgrpc_statusgrpc_messagerR   c                    sl   dt |fg}|d ur|dt |jf |d ur!|d|f | j|ddI d H  | jr4|   d S d S )Nr_   ro   rp   Trs   )rx   rw   ry   re   r}   r~   )r   r   r   r   rf   rO   rO   rP   _abort`  s   r   mappingconst.HandlerrB   rf   r>   r?   r@   release_streamc                    s  zސzt |}|d dkrt|dI d H  W W |  d S |d}|d u r9t|dtjdI d H  W W |  d S |d\}	}
}|pEtj}|	tksO||jkrat|dtjdI d H  W W |  d S |d	d
krzt|dtjdI d H  W W |  d S |d }| |}|d u rt|dtj	dI d H  W W |  d S zt
|}W n ty   t|dtjdI d H  Y W W |  d S w t|}|d}t|||j|j|j|||||d
4 I d H }|d u rt  }|_t }nt  }|_||}zC|6 |# |j||j|||||jdI d H \|_}||I d H  W d    n	1 s w   Y  W d    n	1 s0w   Y  W nc ty?     tjyi   |jrUt d ttj!|j"rct#d ttj!t d   t$y } z|jr{t d  |j"sJ t#d|  d }~w t%y   t d  w W d   I d H  n1 I d H sw   Y  W n t&y   t d Y n t%y   t d Y nw W |  d S W |  d S W |  d S |  w )Nz:methodPOSTi  ra   i  zMissing content-type headerrS   z Unacceptable content-type headertetrailersi  z)Required "te: trailers" header is missingz:path   zMethod not foundzInvalid grpc-timeout headerz
user-agent)r>   r?   r@   r6   r7   )r:   r6   content_typer7   rM   zFailed to handle cancellationzDeadline exceededzTimeout occurredzRequest was cancelled: %szApplication errorzServer error)'dictr   getr   r   	partitionr.   rT   r+   UNIMPLEMENTEDr   from_headers
ValueErrorr"   r5   r;   request_type
reply_typer   wrapperr   r   startrecv_requestfuncrM   rJ   r(   asyncioTimeoutErrorcancel_failedlog	exceptionDEADLINE_EXCEEDED	cancelledinfor*   r   r)   )r   rB   rf   r>   r?   r@   r   headers_mapr   base_content_type_sub_typer:   methodr6   rJ   r7   r8   r   deadline_wrappermethod_funcerrrO   rO   rP   request_handlerp  s   	
]



W




M


H



A


:

	 






*0

r   c                   @   sB   e Zd ZdZeejdefddZejd
ddZ	d
dd	Z
dS )_GCr   rR   c                 C   s   t rA   )NotImplementedErrorrU   rO   rO   rP   __gc_interval__  s   z_GC.__gc_interval__Nc                 C   s   d S rA   rO   rU   rO   rO   rP   __gc_collect__  s   z_GC.__gc_collect__c                 C   s*   |  j d7  _ | j | j s|   d S d S )Nr   )_gc_counterr   r   rU   rO   rO   rP   __gc_step__  s   z_GC.__gc_step__r   )r   r   r   r   r   abcabstractmethodintr   r   r   rO   rO   rO   rP   r     s    r   c                
   @   s   e Zd ZdZdZdeedf dedee	 de
dd	f
d
dZdddZdddedeg ef dd	fddZdddZdddZdddZdefddZd	S )Handler
   Fr   r   r>   r?   r@   rR   Nc                 C   s4   || _ || _|| _|| _t | _i | _t | _	d S rA   )
r   r>   r?   r@   r   get_event_looploop_tasksset
_cancelled)rN   r   r>   r?   r@   rO   rO   rP   rQ     s   
zHandler.__init__c                 C   s,   dd | j  D | _ dd | jD | _d S )Nc                 S   s   i | ]\}}|  s||qS rO   done).0strO   rO   rP   
<dictcomp>  s    z*Handler.__gc_collect__.<locals>.<dictcomp>c                 S   s   h | ]}|  s|qS rO   r   )r   r   rO   rO   rP   	<setcomp>  s    
z)Handler.__gc_collect__.<locals>.<setcomp>)r   itemsr   rU   rO   rO   rP   r     s   zHandler.__gc_collect__r8   r9   rf   r   c              
   C   s6   |    | jt| j||| j| j| j|| j|< d S rA   )	r   r   create_taskr   r   r>   r?   r@   r   )rN   r8   rf   r   rO   rO   rP   accept
  s
   
zHandler.acceptc                 C   s$   | j |}|  | j| d S rA   )r   popr   r   add)rN   r8   taskrO   rO   rP   r     s   zHandler.cancelc                 C   s4   | j  D ]}|  q| j| j   d| _d S )NT)r   valuesr   r   updateclosing)rN   r   rO   rO   rP   close  s   

zHandler.closec                    s"   | j rt| j I d H  d S d S rA   )r   r   waitrU   rO   rO   rP   wait_closed!  s   zHandler.wait_closedc                 C   s   |    | j o| j S rA   )r   r   r   rU   rO   rO   rP   check_closed%  s   zHandler.check_closedr   )r8   r9   rR   N)r   r   r   r   r   r   rx   r,   r   r-   r   rQ   r   _Headersr   r   r   r   r   r   r   r   rO   rO   rO   rP   r     s8    







r   c                   @   s*  e Zd ZdZdZdddddded deej dee	 d	ee
 d
ee ddfddZd.ddZdefddZ		d/dejejdddddddee dee dee dddddeej deded dee d ee ddfd!d"Zd.d#d$Zd.d%d&Zd0d'd(Zd)eee  d*ee d+ee ddfd,d-ZdS )1Servera  
    HTTP/2 server, which uses gRPC service handlers to handle requests.

    Handler is a subclass of the abstract base class, which was generated
    from .proto file:

    .. code-block:: python3

        class CoffeeMachine(cafe_grpc.CoffeeMachineBase):

            async def MakeLatte(self, stream):
                task: cafe_pb2.LatteOrder = await stream.recv_message()
                ...
                await stream.send_message(empty_pb2.Empty())

        server = Server([CoffeeMachine()])
    r   N)r   r>   r?   confighandlersr4   r   r>   r?   r   rR   c                C   s   |r
t jdtdd i }|D ]	}||  q|| _|p t | _|du r3t	 }|du r3t
 r3t }|| _|| _tjjddddddd| _|du rMt n|}| | _d| _d| _t | _t | _t|  dS )a  
        :param handlers: list of handlers

        :param loop: (deprecated) asyncio-compatible event loop

        :param codec: instance of a codec to encode and decode messages,
            if omitted ``ProtoCodec`` is used by default

        :param status_details_codec: instance of a status details codec to
            encode error details in a trailing metadata, if omitted
            ``ProtoStatusDetailsCodec`` is used by default
        zHThe loop argument is deprecated and scheduled for removal in grpclib 0.5   )
stacklevelNFrq   )client_sideheader_encodingvalidate_inbound_headersvalidate_outbound_headersnormalize_inbound_headersnormalize_outbound_headers)warningswarnDeprecationWarningr   __mapping___mappingr   r   _loopr.   r0   r/   rG   rH   r   r   H2Configuration
_h2_configr   __for_server___config_server_server_closed_futr   	_handlersr   __dispatch___serversr   )rN   r   r   r>   r?   r   r   handlerrO   rO   rP   rQ   >  s<   	
zServer.__init__c                 C   s   dd | j D | _ d S )Nc                 S   s   h | ]}|j r| s|qS rO   )r   r   r   hrO   rO   rP   r   {  s
    
z(Server.__gc_collect__.<locals>.<setcomp>)r  rU   rO   rO   rP   r   z  s   zServer.__gc_collect__c                 C   s:   |    t| j| j| j| j}| j| t|| j	| j
S rA   )r   r   r   rG   rH   r  r  r   r&   r  r   rN   r  rO   rO   rP   _protocol_factory~  s   zServer._protocol_factoryd   )pathfamilyflagssockbacklogsslreuse_address
reuse_porthostportr  r  zsocket.AddressFamilyr  zsocket.AddressInfor  r  r  z_ssl.SSLContextr  r  c                   s   |dur|dus|durt d| jdurtd|dur/| jj| j||||dI dH | _n| jj| j||||||||	|
d
I dH | _| j | _dS )a  Coroutine to start the server.

        :param host: can be a string, containing IPv4/v6 address or domain name.
            If host is None, server will be bound to all available interfaces.

        :param port: port number.

        :param path: UNIX domain socket path. If specified, host and port should
            be omitted (must be None).

        :param family: can be set to either :py:data:`python:socket.AF_INET` or
            :py:data:`python:socket.AF_INET6` to force the socket to use IPv4 or
            IPv6. If not set it will be determined from host.

        :param flags: is a bitmask for
            :py:meth:`~python:asyncio.AbstractEventLoop.getaddrinfo`.

        :param sock: sock can optionally be specified in order to use a
            preexisting socket object. If specified, host and port should be
            omitted (must be None).

        :param backlog: is the maximum number of queued connections passed to
            listen().

        :param ssl: can be set to an :py:class:`~python:ssl.SSLContext`
            to enable SSL over the accepted connections.

        :param reuse_address: tells the kernel to reuse a local socket in
            TIME_WAIT state, without waiting for its natural timeout to expire.

        :param reuse_port: tells the kernel to allow this endpoint to be bound
            to the same port as other existing endpoints are bound to,
            so long as they all set this flag when being created.
        NzJThe 'path' parameter can not be used with the 'host' or 'port' parameters.zServer is already started)r  r  r  )r  r  r  r  r  r  r  )	r   r  RuntimeErrorr   create_unix_serverr  create_servercreate_futurer  )rN   r  r  r  r  r  r  r  r  r  r  rO   rO   rP   r     s&   0

zServer.startc                 C   sT   | j du s
| jdu rtd| j   | j s| jd | jD ]}|  q!dS )zStops accepting new connections, cancels all currently running
        requests. Request handlers are able to handle `CancelledError` and
        exit properly.
        NServer is not started)r  r  r  r   r   
set_resultr  r
  rO   rO   rP   r     s   



zServer.closec                    sh    j du s jdu rtd jI dH   j  I dH   jr2t fdd jD I dH  dS dS )zZCoroutine to wait until all existing request handlers will exit
        properly.
        Nr  c                    s   h | ]
} j | qS rO   )r   r   r   r  rU   rO   rP   r     s    z%Server.wait_closed.<locals>.<setcomp>)r  r  r  r   r  r   r   rU   rO   rU   rP   r     s   zServer.wait_closedc                    r   rA   rO   rU   rO   rO   rP   r     r   zServer.__aenter__r   r   r   c                    s   |    |  I d H  d S rA   )r   r   )rN   r   r   r   rO   rO   rP   r     s   zServer.__aexit__r   NN)rR   r   )r   r   r   r   r   r   r   r   AbstractEventLoopr,   r-   r   rQ   r   r&   r  socket	AF_UNSPEC
AI_PASSIVErx   r   r   r   r   r   r   r   r   r   r   rO   rO   rO   rP   r   *  s    

<	


I


r   r  )Zr   rY   r  loggingr   r   typesr   typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   
contextlibr   	h2.configr   h2.exceptions	multidictr   utilsr   r   r2   r   r   r   r   r8   r   r   r   r   r   eventsr   rJ   r   r   r    r!   r"   r#   r$   r%   r3   r&   r'   r   r(   r)   r*   encoding.baser+   r,   r-   encoding.protor.   r/   r0   	_registryr1   r  r  _ssl _typingr4   	getLoggerr   r   rx   r   r5   r   r   r   ABCr   r   r   rO   rO   rO   rP   <module>   s     $
  6



n: