o
    i3N                     @  sD  d dl mZ d dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZm Z  ddl!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1 ddl2m3Z3 d	d
l4m5Z5 d	dl6m7Z7 d	dl8m9Z9 erd	dl:m;Z; e<e=Z>G dd dZ?dS )    )annotationsN)TYPE_CHECKINGAnyCallableDictListMappingOptional)assert_never)BroadcastCallbackBroadcastPayloadCallbackChannelEventsChannelStatesPostgresChangesCallbackPostgresChangesDataPresenceOnJoinCallbackPresenceOnLeaveCallbackRealtimeAcknowledgementStatusRealtimeChannelBroadcastConfigRealtimeChannelConfigRealtimeChannelOptionsRealtimeChannelPresenceConfig"RealtimePostgresChangesListenEventRealtimePresenceStateRealtimeSubscribeStates   )BroadcastMessageChannelCloseMessageChannelErrorMessageHeartbeatMessageMessagePostgresChangesMessagePostgresChangesPayloadPostgresRowChangePresenceDiffMessagePresenceStateMessageReplyMessageReplyPostgresChangesServerMessageSuccessReplyMessageSuccessSystemPayloadSystemMessage)http_endpoint_url   )AsyncRealtimePresence)	AsyncPush)
AsyncTimer)AsyncRealtimeClientc                   @  sH  e Zd ZdZ	d_d`ddZdd ZdaddZedd Zedd Z	edd Z
edd Zedd Z	d_dbdd Zdcd!d"Z	d_ddd(d)Zded*d+Zdfd-d.Z			dgdhd5d6Zdid8d9Zdjd;d<Zdcd=d>Zdkd@dAZdldCdDZdmdFdGZdndIdJZdodMdNZdcdOdPZdQdR ZdcdSdTZdUdV ZdodWdXZdpd[d\Z d]d^ Z!dS )qAsyncRealtimeChannela  
    Channel is an abstraction for a topic subscription on an existing socket connection.
    Each Channel has its own topic and a list of event-callbacks that respond to messages.
    Should only be instantiated through `AsyncRealtimeClient.channel(topic)`.
    Nsocketr2   topicstrparams Optional[RealtimeChannelOptions]returnNonec                   s   | _ |r|ndddddddddi _| _d _t  _tj _g  _	 j j
 _
t tj j _i  _g  _g  _g  _d _t jdd	  _   _d fdd} fdd} jtj|tj| dS )z
        Initialize the Channel object.

        :param socket: RealtimeClient object
        :param topic: Topic that it subscribes to on the realtime server
        :param params: Optional parameters for connection.
        configF)ackself keyenabled)	broadcastpresenceprivateNc                 S  s   d|  S )Nr    )triesrE   rE   U/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/realtime/_async/channel.py<lambda>n   s    z/AsyncRealtimeChannel.__init__.<locals>.<lambda>payloadr(   c                   s6   t j _ j   jD ]	}t|  qg  _d S N)	r   JOINEDstaterejoin_timerreset_push_bufferasynciocreate_tasksend)rI   pushr=   rE   rG   on_join_push_oks   s
   


z6AsyncRealtimeChannel.__init__.<locals>.on_join_push_okc                     s2    j sd S td j  tj _ j  d S )Nzjoin push timeout for channel )	
is_joiningloggererrorr5   r   ERROREDrL   rM   schedule_timeoutrE   rT   rE   rG   on_join_push_timeoutz   s
   z;AsyncRealtimeChannel.__init__.<locals>.on_join_push_timeoutrI   r(   )r4   r7   r5   _joined_oncer/   rC   r   CLOSEDrL   rO   timeoutr0   r   join	join_pushmessages_waiting_for_ackbroadcast_callbackssystem_callbackspostgres_changes_callbackssubscribe_callbackr1   _rejoin_until_connectedrM   _broadcast_endpoint_urlbroadcast_endpoint_urlreceiver   OkTimeout)r=   r4   r5   r7   rU   r[   rE   rT   rG   __init__D   sB   


zAsyncRealtimeChannel.__init__c                 C  s6   t d| j d | j  tj| _| j	|  d S )Nchannel z closed)
rW   infor5   rM   rN   r   r^   rL   r4   _remove_channelrT   rE   rE   rG   on_close   s   
zAsyncRealtimeChannel.on_closerI   dict[str, Any]c                 C  s>   | j s| jrd S td| j d|  tj| _| j	  d S )Nrn   z error: )

is_leaving	is_closedrW   ro   r5   r   rY   rL   rM   rZ   )r=   rI   rE   rE   rG   on_error   s
   zAsyncRealtimeChannel.on_errorc                 C     | j tjkS rJ   )rL   r   r^   rT   rE   rE   rG   rt         zAsyncRealtimeChannel.is_closedc                 C  rv   rJ   )rL   r   JOININGrT   rE   rE   rG   rV      rw   zAsyncRealtimeChannel.is_joiningc                 C  rv   rJ   )rL   r   LEAVINGrT   rE   rE   rG   rs      rw   zAsyncRealtimeChannel.is_leavingc                 C  rv   rJ   )rL   r   rY   rT   rE   rE   rG   
is_errored   rw   zAsyncRealtimeChannel.is_erroredc                 C  rv   rJ   )rL   r   rK   rT   rE   rE   rG   	is_joined   rw   zAsyncRealtimeChannel.is_joinedcallbackHOptional[Callable[[RealtimeSubscribeStates, Optional[Exception]], None]]c                   s  j jsj  I dH  jrtdjd }|d}|dp(tddd}|d	d}jj	p8|d
d}||d
< d|||dd j
D di}j jrVj j|d< j| d_d fdd}d fdd}	 fdd}
jtj|tj|	tj|
  I dH  S )a  
        Subscribe to the channel. Can only be called once per channel instance.

        :param callback: Optional callback function that receives subscription state updates
                        and any errors that occur during subscription
        :return: The Channel instance for method chaining
        :raises: Exception if called multiple times on the same channel instance
        NzdTried to subscribe multiple times. 'subscribe' can only be called a single time per channel instancer;   rB   rC   r>   Fr?   rD   rA   c                 S     g | ]}|j qS rE   binding_filter.0crE   rE   rG   
<listcomp>       z2AsyncRealtimeChannel.subscribe.<locals>.<listcomp>rB   rC   rD   postgres_changesaccess_tokenTrI   r(   c                   s   | j }g }|rbtjD ]U\}}|t|k r|| nd }t| d|  |rJ|j|jkrJ|j|jkrJ|j	|j	krJ|j
|j
krJ|j|_|| qt   o^ tjtd  d S   d S |_ oo tjd  d S  d S )Nz, z@mismatch between server and client bindings for postgres changes)r   	enumeratere   lenrW   debugeventschema_schematablefilteridappendrP   rQ   unsubscriber   CHANNEL_ERROR	Exception
SUBSCRIBED)rI   server_postgres_changesnew_postgres_bindingsipostgres_callbackserver_bindingr|   r=   rE   rG   rU      s>   z7AsyncRealtimeChannel.subscribe.<locals>.on_join_push_okDict[str, Any]c                   s$    o t jtt|  d S  d S rJ   )r   r   r   jsondumps)rI   r|   rE   rG   on_join_push_error  s   z:AsyncRealtimeChannel.subscribe.<locals>.on_join_push_errorc                    s    o
 t jd  d S  d S rJ   )r   	TIMED_OUTargsr   rE   rG   r[   	  s   z<AsyncRealtimeChannel.subscribe.<locals>.on_join_push_timeoutr\   )rI   r   )r4   is_connectedconnectr]   r   r7   getr   rC   _has_callback_attachedre   r   ra   update_payloadrj   r   rk   Errorrl   _rejoin)r=   r|   r;   rB   rC   rD   presence_enabledconfig_payloadrU   r   r[   rE   r   rG   	subscribe   sR   

&zAsyncRealtimeChannel.subscribec                   sd   t j _ j   j  d fdd}t tj	i }|
tj|
tj| | I dH  dS )z
        Unsubscribe from the channel and leave the topic.
        Sets channel state to LEAVING and cleans up timers and pushes.
        r9   r:   c                    s    t d j d    d S )Nrn   z leave)rW   ro   r5   rq   r   rT   rE   rG   _close   s   z0AsyncRealtimeChannel.unsubscribe.<locals>._closeNr9   r:   )r   ry   rL   rM   rN   ra   destroyr0   r   leaverj   r   rk   r   rR   )r=   r   
leave_pushrE   rT   rG   r     s   

z AsyncRealtimeChannel.unsubscriber   r   r_   Optional[int]r0   c                   s|   | j std| d| j d|p| j}t| |||}|  r2| I dH  |jdus0J d|S |  | j	
| |S )aN  
        Push a message to the channel.

        :param event: The event name to push
        :param payload: The payload to send
        :param timeout: Optional timeout in milliseconds
        :return: AsyncPush instance representing the push operation
        :raises: Exception if called before subscribing to the channel
        ztried to push 'z' to 'z?' before joining. Use channel.subscribe() before pushing eventsNz Sent AsyncPush should have a ref)r]   r   r5   r_   r0   	_can_pushrR   refstart_timeoutrO   r   )r=   r   rI   r_   rS   rE   rE   rG   rS   *  s   
zAsyncRealtimeChannel.pushc                   s~   | j d }|d}|d}|dd}d|||dd | jD di}t| jtjd|i| j d	}| j	|I d
H  | S )zx
        Coroutine that attempts to join Phoenix Realtime server via a certain topic.

        :return: Channel
        r;   rB   rC   rD   Fc                 S  r~   rE   r   r   rE   rE   rG   r   W  r   z-AsyncRealtimeChannel.join.<locals>.<listcomp>r   )r5   r   rI   r   N)
r7   r   re   r!   r5   r   r`   r4   	_make_refrR   )r=   r;   rB   rC   rD   r   messagerE   rE   rG   r`   G  s,   



zAsyncRealtimeChannel.join"Callable[[BroadcastPayload], None]c                 C  s   | j t||d | S )a"  
        Set up a listener for a specific broadcast event.

        :param event: The name of the broadcast event to listen for
        :param callback: Function called with the payload when a matching broadcast is received
        :return: The Channel instance for method chaining
        )r|   r   )rc   r   r   )r=   r   r|   rE   rE   rG   on_broadcaste  s   

z!AsyncRealtimeChannel.on_broadcastr   (Callable[[PostgresChangesPayload], None]r   Optional[str]r   r   c                 C  s"   t |||||d}| j| | S )a  
        Set up a listener for Postgres database changes.

        :param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *)
        :param callback: Function called with the payload when a matching change is detected
        :param table: The table name to monitor. Defaults to "*" for all tables
        :param schema: The database schema to monitor. Defaults to "public"
        :param filter: Optional filter string to apply
        :return: The Channel instance for method chaining
        )r|   r   r   r   r   )r   re   r   )r=   r   r|   r   r   r   rE   rE   rG   on_postgres_changest  s
   
z(AsyncRealtimeChannel.on_postgres_changes&Callable[[SuccessSystemPayload], None]c                 C  s   | j | | S )z
        Set up a listener for system events.

        :param callback: The callback function to execute when a system event is received.
        :return: The Channel instance for method chaining.
        )rd   r   r=   r|   rE   rE   rG   	on_system  s   	zAsyncRealtimeChannel.on_systemuser_statusc                   s   |  d|I dH  dS )z
        Track presence status for the current user.

        :param user_status: Dictionary containing the user's presence information
        trackNsend_presence)r=   r   rE   rE   rG   r     s   zAsyncRealtimeChannel.trackc                   s   |  di I dH  dS )z>
        Stop tracking presence for the current user.
        untrackNr   rT   rE   rE   rG   r     s   zAsyncRealtimeChannel.untrackr   c                 C  s   | j jS )z
        Get the current state of presence on this channel.

        :return: Dictionary mapping presence keys to lists of presence payloads
        )rC   rL   rT   rE   rE   rG   presence_state  s   z#AsyncRealtimeChannel.presence_stateCallable[[], None]c                 C  8   | j | | jrtd| j d t|   | S )z
        Register a callback for presence sync events.

        :param callback: The callback function to execute when a presence sync event occurs.
        :return: The Channel instance for method chaining.
        rn   B resubscribe due to change in presence callbacks on joined channel)	rC   on_syncr{   rW   ro   r5   rP   rQ   _resubscriber   rE   rE   rG   on_presence_sync  s   z%AsyncRealtimeChannel.on_presence_syncr   c                 C  r   )z
        Register a callback for presence join events.

        :param callback: The callback function to execute when a presence join event occurs.
        :return: The Channel instance for method chaining.
        rn   r   )	rC   on_joinr{   rW   ro   r5   rP   rQ   r   r   rE   rE   rG   on_presence_join  s   	z%AsyncRealtimeChannel.on_presence_joinr   c                 C  r   )z
        Register a callback for presence leave events.

        :param callback: The callback function to execute when a presence leave event occurs.
        :return: The Channel instance for method chaining.
        rn   r   )	rC   on_leaver{   rW   ro   r5   rP   rQ   r   r   rE   rE   rG   on_presence_leave  s   	z&AsyncRealtimeChannel.on_presence_leavedatar   c                   s"   |  tjd||dI dH  dS )z
        Send a broadcast message through this channel.

        :param event: The name of the broadcast event
        :param data: The payload to broadcast
        rB   )typer   rI   N)rS   r   rB   r=   r   r   rE   rE   rG   send_broadcast  s
   
z#AsyncRealtimeChannel.send_broadcastc                   s"   |   I d H  |  I d H  d S rJ   )r   r   rT   rE   rE   rG   r     s   z!AsyncRealtimeChannel._resubscribec                 C  s   t | jj dS )Nz/api/broadcast)r-   r4   http_endpointrT   rE   rE   rG   rh     s   z,AsyncRealtimeChannel._broadcast_endpoint_urlc                   s:   | j rd S td| j  tj| _| j I d H  d S )Nz&Rejoining channel after reconnection: )	rs   rW   r   r5   r   rx   rL   ra   resendrT   rE   rE   rG   r     s   zAsyncRealtimeChannel._rejoinc                 C  s   | j jo| jS rJ   )r4   r   r]   rT   rE   rE   rG   r      s   zAsyncRealtimeChannel._can_pushc                   s    |  tj||dI d H  d S )N)r   rI   )rS   r   rC   r   rE   rE   rG   r     s   z"AsyncRealtimeChannel.send_presencer   r)   c           	      C  s  t | j d| t|tr-t|jtr#| jD ]}||j qd S | t	|j d S t|t
r_|j}|jr[| j|jd  }r]|jdkrQ|tj|j d S |tj|j d S d S d S t|trs|j}| jD ]}|| qjd S t|tr| j|j d S t|tr| j|j d S t|tr|j}| jD ]}|| qd S t|tr| |j d S t|tr|   d S t|t rd S t!| d S )Nz : ok)"rW   r   r5   
isinstancer,   rI   r+   rd   ru   dictr'   r   rb   popstatustriggerr   rk   responser   r   rc   r&   rC   _on_state_eventr%   _on_diff_eventr"   re   r   r   rq   r    r
   )	r=   r   r|   reply_payloadrS   broadcast_payloadbroadcast_callbackrI   r   rE   rE   rG   _handle_message  sN   




	









z$AsyncRealtimeChannel._handle_messagec                   s*   | j   | jjr|  I d H  d S d S rJ   )rM   rZ   r4   r   r   rT   rE   rE   rG   rg   .  s
   
z,AsyncRealtimeChannel._rejoin_until_connectedrJ   )r4   r2   r5   r6   r7   r8   r9   r:   )rI   rr   )r|   r}   r9   r3   r   )r   r6   rI   r   r_   r   r9   r0   )r9   r3   )r   r6   r|   r   r9   r3   )NNN)r   r   r|   r   r   r   r   r   r   r   r9   r3   )r|   r   r9   r3   )r   r   r9   r:   )r9   r   )r|   r   r9   r3   )r|   r   r9   r3   )r|   r   r9   r3   )r   r6   r   r   r9   r:   )r   r)   )"__name__
__module____qualname____doc__rm   rq   ru   propertyrt   rV   rs   rz   r{   r   r   rS   r`   r   r   r   r   r   r   r   r   r   r   r   rh   r   r   r   r   rg   rE   rE   rE   rG   r3   =   sT    
B
	





l













(r3   )@
__future__r   rP   r   loggingtypingr   r   r   r   r   r   r	   typing_extensionsr
   realtime.typesr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   transformersr-   rC   r/   rS   r0   timerr1   clientr2   	getLoggerr   rW   r3   rE   rE   rE   rG   <module>   s     $LH
