o
    ;ik                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m	Z	 d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ ddlmZ ddlmZmZmZmZmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z&m'Z'm(Z( ddl)m*Z*m+Z+ ddl,m-Z- ddl.m/Z/ ddl0m1Z1m2Z2 ddl3m4Z4 ddl5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z; eG dd dZ<G dd dZ=e'e=Z>G dd deddZ?e'e?Z@dS )    N)AsyncGeneratorAsyncIterator)	dataclass)datetime)AnyOptionalUnion)Message)Status)classproperty)asynccontextmanager)api_pb2   )LoadContext) EPHEMERAL_OBJECT_HEARTBEAT_SLEEP_get_environment_name_Objectlive_methodlive_method_gen)Resolver)deserialize	serialize)TaskContextsynchronize_api!warn_if_generator_is_not_consumed)deprecation_warningwarn_if_passing_namespace)Retry)check_object_name)as_timestamptimestamp_to_localized_dt)_Client)AlreadyExistsErrorErrorInvalidErrorNotFoundErrorRequestSizeErrorResourceExhaustedErrorc                   @   s2   e Zd ZU dZee ed< eed< ee ed< dS )	QueueInfoz#Information about the Queue object.name
created_at
created_byN)__name__
__module____qualname____doc__r   str__annotations__r    r2   r2   ?/home/ubuntu/.local/lib/python3.10/site-packages/modal/queue.pyr(   %   s
   
 r(   c                   @   s   e Zd ZdZedddddededee dee d	df
d
dZ	eddddddee
 deeeef  dedee d	ejd f
ddZedddddededee dee fddZdS )_QueueManagerz8Namespace with methods for managing named Queue objects.FN)allow_existingenvironment_nameclientr)   r5   r6   r7   returnc                   s~   t | d |du rt I dH n|}|rtjntj}tj| t||d}z|j	|I dH  W dS  t
y>   |s; Y dS w )a$  Create a new Queue object.

        **Examples:**

        ```python notest
        modal.Queue.objects.create("my-queue")
        ```

        Queues will be created in the active environment, or another one can be specified:

        ```python notest
        modal.Queue.objects.create("my-queue", environment_name="dev")
        ```

        By default, an error will be raised if the Queue already exists, but passing
        `allow_existing=True` will make the creation attempt a no-op in this case.

        ```python notest
        modal.Queue.objects.create("my-queue", allow_existing=True)
        ```

        Note that this method does not return a local instance of the Queue. You can use
        `modal.Queue.from_name` to perform a lookup after creation.

        Added in v1.1.2.

        QueueNdeployment_namer6   object_creation_type)r   r!   from_envr   &OBJECT_CREATION_TYPE_CREATE_IF_MISSING*OBJECT_CREATION_TYPE_CREATE_FAIL_IF_EXISTSQueueGetOrCreateRequestr   stubQueueGetOrCreater"   )r)   r5   r6   r7   r<   reqr2   r2   r3   create4   s&   
#z_QueueManager.create )max_objectscreated_beforer6   r7   rF   rG   _Queuec                    s    du rt  I dH n  durdk rtdg dtdtf fdd}|t|I dH }	 |r7n|d	 jjjI dH }q4 fd
dD }durX|d S |S )a  Return a list of hydrated Queue objects.

        **Examples:**

        ```python
        queues = modal.Queue.objects.list()
        print([q.name for q in queues])
        ```

        Queues will be retreived from the active environment, or another one can be specified:

        ```python notest
        dev_queues = modal.Queue.objects.list(environment_name="dev")
        ```

        By default, all named Queues are returned, newest to oldest. It's also possible to limit the
        number of results and to filter by creation date:

        ```python
        queues = modal.Queue.objects.list(max_objects=10, created_before="2025-01-01")
        ```

        Added in v1.1.2.

        Nr   zmax_objects cannot be negativerG   r8   c                    s   d u rdnt dt }tj|| d}tjt|d} j|I d H }|j	 t|j	|k p?d uo?tk}|S )Nd   )rF   rG   )r6   
pagination)
minlenr   ListPaginationQueueListRequestr   rA   	QueueListextendqueues)rG   max_page_sizerJ   rC   respfinishedr7   r6   itemsrF   r2   r3   retrieve_page   s   "z)_QueueManager.list.<locals>.retrieve_pageTc                    s.   g | ]}t j|j |jd t |jdqS )T)is_another_apprep)rH   _new_hydratedqueue_idmetadata_reprr)   ).0itemr7   r6   r2   r3   
<listcomp>   s    z&_QueueManager.list.<locals>.<listcomp>)	r!   r=   r$   floatboolr   r]   creation_infor*   )rF   rG   r6   r7   rW   rT   rQ   r2   rU   r3   listi   s    !
z_QueueManager.list)allow_missingr6   r7   rg   c                   sb   zt j| |d|I dH }W n ty   |s Y dS w tj|jd}|jj	|I dH  dS )a+  Delete a named Queue.

        Warning: This deletes an *entire Queue*, not just a specific entry or partition.
        Deletion is irreversible and will affect any Apps currently using the Queue.

        **Examples:**

        ```python notest
        await modal.Queue.objects.delete("my-queue")
        ```

        Queues will be deleted from the active environment, or another one can be specified:

        ```python notest
        await modal.Queue.objects.delete("my-queue", environment_name="dev")
        ```

        Added in v1.1.2.

        )r6   Nr\   )
rH   	from_namehydrater%   r   QueueDeleteRequest	object_id_clientrA   QueueDelete)r)   rg   r6   r7   objrC   r2   r2   r3   delete   s   z_QueueManager.delete)r,   r-   r.   r/   staticmethodr0   rd   r   r!   rD   intr   r   builtinsrf   rp   r2   r2   r2   r3   r4   1   s\    4Cr4   c                   @   sV  e Zd ZU dZdZeej ed< dd Z	e
defddZedee fd	d
Zdee fddZdejfddZedee defddZeeddefded  dee dee deded  f
ddZeddddddedee dedee dd f
ddZe	dNdedee dd fd d!Z eddd"dedee dee fd#d$Z!e"de#fd%d&Z$dee d'e%de&e' fd(d)Z(dee d*ee d'e%de&e' fd+d,Z)e"ddd-dee d.eddfd/d0Z*e"	dOdd2d3ed*ee dee dee' fd4d5Z+e"	dOdd2d'e%d3ed*ee dee de&e' f
d6d7Z,e"	1	dOdd8d9d:e'd3ed*ee dee d;e%ddfd<d=Z-e"	1	dOdd8d9d>e&e' d3ed*ee dee d;e%ddfd?d@Z.	dNdee d;e%d>e&e' d*ee fdAdBZ/dee d;e%d>e&e' fdCdDZ0e"dddEdee dFede%fdGdHZ1e2 e3ddIdJdee dKede4e'df fdLdMZ5dS )PrH   a
  Distributed, FIFO queue for data flow in Modal apps.

    The queue can contain any object serializable by `cloudpickle`, including Modal objects.

    By default, the `Queue` object acts as a single FIFO queue which supports puts and gets (blocking and non-blocking).

    **Usage**

    ```python
    from modal import Queue

    # Create an ephemeral queue which is anonymous and garbage collected
    with Queue.ephemeral() as my_queue:
        # Putting values
        my_queue.put("some value")
        my_queue.put(123)

        # Getting values
        assert my_queue.get() == "some value"
        assert my_queue.get() == 123

        # Using partitions
        my_queue.put(0)
        my_queue.put(1, partition="foo")
        my_queue.put(2, partition="bar")

        # Default and "foo" partition are ignored by the get operation.
        assert my_queue.get(partition="bar") == 2

        # Set custom 10s expiration time on "foo" partition.
        my_queue.put(3, partition="foo", partition_ttl=10)

        # (beta feature) Iterate through items in place (read immutably)
        my_queue.put(1)
        assert [v for v in my_queue.iterate()] == [0, 1]

    # You can also create persistent queues that can be used across apps
    queue = Queue.from_name("my-persisted-queue", create_if_missing=True)
    queue.put(42)
    assert queue.get() == 42
    ```

    For more examples, see the [guide](https://modal.com/docs/guide/dicts-and-queues#modal-queues).

    **Queue partitions (beta)**

    Specifying partition keys gives access to other independent FIFO partitions within the same `Queue` object.
    Across any two partitions, puts and gets are completely independent.
    For example, a put in one partition does not affect a get in any other partition.

    When no partition key is specified (by default), puts and gets will operate on a default partition.
    This default partition is also isolated from all other partitions.
    Please see the Usage section below for an example using partitions.

    **Lifetime of a queue and its partitions**

    By default, each partition is cleared 24 hours after the last `put` operation.
    A lower TTL can be specified by the `partition_ttl` argument in the `put` or `put_many` methods.
    Each partition's expiry is handled independently.

    As such, `Queue`s are best used for communication between active functions and not relied on for persistent storage.

    On app completion or after stopping an app any associated `Queue` objects are cleaned up.
    All its partitions will be cleared.

    **Limits**

    A single `Queue` can contain up to 100,000 partitions, each with up to 5,000 items. Each item can be up to 1 MiB.

    Partition keys must be non-empty and must not exceed 64 bytes.
    N	_metadatac                 C   s   t d)zmdmd:hiddenzYQueue() is not allowed. Please use `Queue.from_name(...)` or `Queue.ephemeral()` instead.)RuntimeErrorselfr2   r2   r3   __init__!  s   z_Queue.__init__r8   c                 C   s   t S N)r4   )clsr2   r2   r3   objects%  s   z_Queue.objectsc                 C   s   | j S ry   )_namerv   r2   r2   r3   r)   )  s   z_Queue.namer]   c                 C   s*   |rt |tjs
J || _|j| _d S d S ry   )
isinstancer   QueueMetadatart   r)   r|   )rw   r]   r2   r2   r3   _hydrate_metadata-  s
   z_Queue._hydrate_metadatac                 C   s   | j sJ | j S ry   )rt   rv   r2   r2   r3   _get_metadata3  s   
z_Queue._get_metadata	partitionc                 C   s>   | d ur|  d}t|dkst|dkrtd|S d}|S )Nzutf-8r   @   z8Queue partition key must be between 1 and 64 characters.    )encoderL   r$   )r   partition_keyr2   r2   r3   validate_partition_key7  s   
z_Queue.validate_partition_keyrz   r7   r6   _heartbeat_sleepc              	     s    du rt  I dH  tjtjt|d jI dH }t 4 I dH +}tj	|j
d|j fdd|d | j|j
 |jddV  W d  I dH  dS 1 I dH sWw   Y  dS )	a=  Creates a new ephemeral queue within a context manager:

        Usage:
        ```python
        from modal import Queue

        with Queue.ephemeral() as q:
            q.put(123)
        ```

        ```python notest
        async with Queue.ephemeral() as q:
            await q.put.aio(123)
        ```
        N)r<   r6   rh   c                      s    j S ry   )rA   QueueHeartbeatr2   r7   requestr2   r3   <lambda>b  s    z"_Queue.ephemeral.<locals>.<lambda>)sleepT)rY   )r!   r=   r   r@   OBJECT_CREATION_TYPE_EPHEMERALr   rA   rB   r   QueueHeartbeatRequestr\   infinite_loopr[   r]   )rz   r7   r6   r   responsetcr2   r   r3   	ephemeralB  s   .z_Queue.ephemeralF)	namespacer6   create_if_missingr7   r)   r   c             
      sb   t d t|d dtdtdtdtt f fdd}t|}tj||d	d	t||d
dS )aP  Reference a named Queue, creating if necessary.

        This is a lazy method the defers hydrating the local
        object with metadata from Modal servers until the first
        time it is actually used.

        ```python
        q = modal.Queue.from_name("my-queue", create_if_missing=True)
        q.put(123)
        ```
        r9   zmodal.Queue.from_namerw   resolverload_contextexisting_object_idc                    sJ   t j|j rt jnd d}|jj|I d H }| |j|j|j	 d S )Nr:   )
r   r@   r6   r>   r7   rA   rB   _hydrater\   r]   rw   r   r   r   rC   r   r   r)   r2   r3   _load|  s   z_Queue.from_name.<locals>._loadTr6   r7   )rY   hydrate_lazilyr)   load_context_overrides)	r   r   rH   r   r   r   r0   r^   _from_loader)r)   r   r6   r   r7   r   rZ   r2   r   r3   ri   e  s   

$	
z_Queue.from_namer\   c              	      sH   dt dtdtdtt f fdd}d d}t j||d	d	t|d
dS )a  Construct a Queue from an id and look up the Queue metadata.

        This is a lazy method that defers hydrating the local
        object with metadata from Modal servers until the first
        time it is actually used.

        The ID of a Queue object can be accessed using `.object_id`.

        **Example:**

        ```python notest
        @app.function()
        def my_consumer(queue_id: str):
            queue = modal.Queue.from_id(queue_id)
            queue.put("Hello from remote function!")

        with modal.Queue.ephemeral() as q:
            # Pass the queue ID to a remote function
            my_consumer.remote(q.object_id)
            print(q.get())  # "Hello from remote function!"
        ```
        rw   r   r   r   c                    s:   t j d}|jj|I d H }| |j|j|j d S )Nrh   )r   QueueGetByIdRequestr7   rA   QueueGetByIdr   r\   r]   r   rh   r2   r3   r     s   z_Queue.from_id.<locals>._loadzQueue.from_id()T)r7   )rY   r   r   )rH   r   r   r   r0   r   )r\   r7   r   rZ   r2   rh   r3   from_id  s   "z_Queue.from_idra   c                   s(   t dd tjj| ||dI dH  dS )aK  mdmd:hidden
        Delete a named Queue.

        Warning: This deletes an *entire Queue*, not just a specific entry or partition.
        Deletion is irreversible and will affect any Apps currently using the Queue.

        DEPRECATED: This method is deprecated; we recommend using `modal.Queue.objects.delete` instead.

        )i        z\`modal.Queue.delete` is deprecated; we recommend using `modal.Queue.objects.delete` instead.r   N)r   rH   r{   rp   )r)   r7   r6   r2   r2   r3   rp     s   z_Queue.deletec                    s0   |   }|j}t|jpdt|j|jpddS )z*Return information about the Queue object.N)r)   r*   r+   )r   re   r(   r)   r    r*   r+   )rw   r]   re   r2   r2   r3   info  s   z_Queue.infon_valuesc                    sN   t j j |d|d} jj|I d H }|jr% fdd|jD S g S )Nr   r\   r   timeoutr   c                       g | ]}t | jqS r2   r   rm   r_   valuerv   r2   r3   rb         z+_Queue._get_nonblocking.<locals>.<listcomp>)r   QueueGetRequestrl   r   rm   rA   QueueGetvalues)rw   r   r   r   r   r2   rv   r3   _get_nonblocking  s   z_Queue._get_nonblockingr   c                    s   |d urt   | }nd }	 d}|d urt||t    }tj j |||d} jj|I d H }|j	rB fdd|j	D S |d urQt   |krQ	 t
 q)NTg      I@r   c                    r   r2   r   r   rv   r2   r3   rb     r   z(_Queue._get_blocking.<locals>.<listcomp>)timerK   r   r   rl   r   rm   rA   r   r   queueEmpty)rw   r   r   r   deadlinerequest_timeoutr   r   r2   rv   r3   _get_blocking  s*   z_Queue._get_blocking)r   allr   c                   sB   |r	|r	t dtj| j| ||d}| jj|I dH  dS )a&  Clear the contents of a single partition or all partitions.

        Warning: this is a destructive operation and will irrevocably delete data.

        **Examples:**

        ```python
        q = modal.Queue.from_name("my-queue", create_if_missing=True)
        q.clear()
        ```
        z4Partition must be null when requesting to clear all.)r\   r   all_partitionsN)r$   r   QueueClearRequestrl   r   rm   rA   
QueueClear)rw   r   r   r   r2   r2   r3   clear   s   z_Queue.clearT)r   blockc                   sP   |r|  ||dI dH }n|durtd | |dI dH }|r&|d S dS )a  Remove and return the next object in the queue.

        If `block` is `True` (the default) and the queue is empty, `get` will wait indefinitely for
        an object, or until `timeout` if specified. Raises a native `queue.Empty` exception
        if the `timeout` is reached.

        If `block` is `False`, `get` returns `None` immediately if the queue is empty. The `timeout` is
        ignored in this case.
        r   N(Timeout is ignored for non-blocking get.r   r   warningswarnr   )rw   r   r   r   r   r2   r2   r3   get  s   
z
_Queue.getc                   s>   |r|  |||I dH S |durtd | ||I dH S )a  Remove and return up to `n_values` objects from the queue.

        If there are fewer than `n_values` items in the queue, return all of them.

        If `block` is `True` (the default) and the queue is empty, `get` will wait indefinitely for
        at least 1 object to be present, or until `timeout` if specified. Raises the stdlib's `queue.Empty`
        exception if the `timeout` is reached.

        If `block` is `False`, `get` returns `None` immediately if the queue is empty. The `timeout` is
        ignored in this case.
        Nr   r   )rw   r   r   r   r   r2   r2   r3   get_many0  s   
z_Queue.get_manyiQ r   partition_ttlvr   c                   s"   | j |g||||dI dH  dS )a  Add an object to the end of the queue.

        If `block` is `True` and the queue is full, this method will retry indefinitely or
        until `timeout` if specified. Raises the stdlib's `queue.Full` exception if the `timeout` is reached.
        If blocking it is not recommended to omit the `timeout`, as the operation could wait indefinitely.

        If `block` is `False`, this method raises `queue.Full` immediately if the queue is full. The `timeout` is
        ignored in this case.r   N)put_many)rw   r   r   r   r   r   r2   r2   r3   putG  s    z
_Queue.putvsc                   sJ   |r|  ||||I dH  dS |durtd | |||I dH  dS )a  Add several objects to the end of the queue.

        If `block` is `True` and the queue is full, this method will retry indefinitely or
        until `timeout` if specified. Raises the stdlib's `queue.Full` exception if the `timeout` is reached.
        If blocking it is not recommended to omit the `timeout`, as the operation could wait indefinitely.

        If `block` is `False`, this method raises `queue.Full` immediately if the queue is full. The `timeout` is
        ignored in this case.
        Nz3`timeout` argument is ignored for non-blocking put.)_put_many_blockingr   r   _put_many_nonblocking)rw   r   r   r   r   r   r2   r2   r3   r   [  s   
z_Queue.put_manyc           	   
      s   dd |D }t j| j| |||d}z| jjj|ttj	gdd |ddI d H  W d S  t
y_ } z'dt|v rMt|dkrBd	nd
}td| d|t|trYtt||d }~ww )Nc                 S      g | ]}t |qS r2   r   r_   r   r2   r2   r3   rb   x      z-_Queue._put_many_blocking.<locals>.<listcomp>r\   r   r   partition_ttl_seconds      >@)additional_status_codes	max_delaymax_retriestotal_timeout)retrystatus = '413'r   r   r   Queue. request is too large)r   QueuePutRequestrl   r   rm   rA   QueuePutr   r
   RESOURCE_EXHAUSTEDr#   r0   rL   r&   r}   r'   r   Full)	rw   r   r   r   r   
vs_encodedr   excmethodr2   r2   r3   r   u  s6   

z_Queue._put_many_blockingc              
      s   dd |D }t j| j| |||d}z| jj|I d H  W d S  tyU } z'dt|v rCt	|dkr8dnd}t
d| d	|t|trOtt||d }~ww )
Nc                 S   r   r2   r   r   r2   r2   r3   rb     r   z0_Queue._put_many_nonblocking.<locals>.<listcomp>r   r   r   r   r   r   r   )r   r   rl   r   rm   rA   r   r#   r0   rL   r&   r}   r'   r   r   )rw   r   r   r   r   r   r   r   r2   r2   r3   r     s&   
z_Queue._put_many_nonblocking)r   totalr   c                   sD   |r	|r	t dtj| j| ||d}| jj|I dH }|jS )z4Return the number of objects in the queue partition.z4Partition must be null when requesting total length.)r\   r   r   N)	r$   r   QueueLenRequestrl   r   rm   rA   QueueLenrL   )rw   r   r   r   r   r2   r2   r3   rL     s   z
_Queue.len        )r   item_poll_timeoutr   c                C  s   d}|  |}t | }d}	 tdt||t  }tj| j|||d}| jj	|I dH }	|	j
rL|	j
D ]}
t|
j| jV  |
j}q7t | }nt |krTdS q)z(Beta feature) Iterate through items in the queue without mutation.

        Specify `item_poll_timeout` to control how long the iterator should wait for the next time before giving up.
        Nr   Tr   )r\   r   last_entry_idr   )r   r   maxrK   r   QueueNextItemsRequestrl   rm   rA   QueueNextItemsrV   r   r   entry_id)rw   r   r   r   validated_partition_keyfetch_deadlineMAX_POLL_DURATIONpoll_durationr   r   r`   r2   r2   r3   iterate  s,   	

z_Queue.iteratery   )TN)6r,   r-   r.   r/   rt   r   r   r~   r1   rx   r   r4   r{   propertyr0   r)   r	   r   r   rq   bytesr   classmethodr   r   typer!   rc   r   r   rd   ri   r   rp   r   r(   r   rr   rf   r   r   r   r   r   r   r   r   r   r   rL   r   r   r   r   r2   r2   r2   r3   rH      s,  
 H
!))(
&$
$
rH   qu)type_prefix)Ars   r   r   r   collections.abcr   r   dataclassesr   r   typingr   r   r   google.protobuf.messager	   grpclibr
   synchronicityr   synchronicity.async_wrapr   modal_protor   _load_contextr   _objectr   r   r   r   r   	_resolverr   _serializationr   r   _utils.async_utilsr   r   r   _utils.deprecationr   r   _utils.grpc_utilsr   _utils.name_utilsr   _utils.time_utilsr   r    r7   r!   	exceptionr"   r#   r$   r%   r&   r'   r(   r4   QueueManagerrH   r9   r2   r2   r2   r3   <module>   sF     #    