o
    ;i                     @   s`   d dl Z d dlZd dlmZ d dlmZmZmZ d dlm	Z	 d dl
mZ dZG dd de	ZdS )	    N)contextmanager)BinaryIOCallableOptional)Payload)AbstractStreamWriteri   c                       s   e Zd ZU dZeed< edfdededededee	 f
 fd	d
Z
d%dededefddZdd Zed&defddZedefddZdd Zd'ddZded ee fd!d"Zd#d$ Z  ZS )(BytesIOSegmentPayloadaT  Modified bytes payload for concurrent sends of chunks from the same file.

    Adds:
    * read limit using remaining_bytes, in order to split files across streams
    * larger read chunk (to prevent excessive read contention between parts)
    * calculates an md5 for the segment

    Feels like this should be in some standard lib...
    _valueNbytes_iosegment_startsegment_length
chunk_sizeprogress_report_cbc                    sl   t  | || _| | _|| _|| _| j| j|  | jt  j	ks&J || _
|p.dd | _|   d S )Nc                  _   s   d S N )___r   r   Y/home/ubuntu/.local/lib/python3.10/site-packages/modal/_utils/bytes_io_segment_payload.py<lambda>1   s    z0BytesIOSegmentPayload.__init__.<locals>.<lambda>)super__init___sizetellinitial_seek_posr   r   r	   seeksizer   r   reset_state)selfr
   r   r   r   r   	__class__r   r   r      s   	
zBytesIOSegmentPayload.__init__utf-8strictencodingerrorsreturnc                 C   s    | j | j | j  ||S r   )r	   r   r   readdecode)r   r"   r#   r   r   r   r&   4   s   zBytesIOSegmentPayload.decodec                 C   s"   t  | _d| _| j| j d S )Nr   )hashlibmd5_md5_checksumnum_bytes_readr	   r   r   r   r   r   r   r   8   s   
z!BytesIOSegmentPayload.reset_stateFsubtract_progressc                 c   s    zAzd V  W n4 t y; } z(z|r| j }| j|d W || jdd W | t y6 } z||d }~ww d }~ww W |   d S |   w )NadvanceT)reset)	Exceptionr*   r   r   )r   r,   excnegative_progresscb_excr   r   r   reset_on_error=   s&   
z$BytesIOSegmentPayload.reset_on_errorc                 C      | j S r   )r   r+   r   r   r   r   N   s   zBytesIOSegmentPayload.sizec                 C   r5   r   )r)   r+   r   r   r   md5_checksumR   s   z"BytesIOSegmentPayload.md5_checksumwriterr   c                    s   |  |d I d H  d S r   )write_with_length)r   r7   r   r   r   writeU   s   zBytesIOSegmentPayload.writecontent_lengthc                    s   t   fdd}| I d H }|r9 dkr9||I d H  jt|d | I d H }|r9 dks|rM||I d H  jt|d d S d S )Nc                     s   j j j } j|  tj } d ur!t| }d jj	|I d H }d j
j|I d H   jt|7  _|S r   )r   r   r*   r	   r   minr   remaining_bytesrun_in_executorr%   r)   updatelen)
read_start	num_byteschunkr:   loopr   r   r   	safe_read]   s   
z:BytesIOSegmentPayload.write_with_length.<locals>.safe_readr   r-   )asyncioget_event_loopr<   r9   r   r?   )r   r7   r:   rE   rB   r   rC   r   r8   Z   s   z'BytesIOSegmentPayload.write_with_lengthc                 C   s   | j | j S r   )r   r*   r+   r   r   r   r<   r   s   z%BytesIOSegmentPayload.remaining_bytes)r    r!   )F)r7   r   )__name__
__module____qualname____doc__r   __annotations__DEFAULT_SEGMENT_CHUNK_SIZEintr   r   r   strr&   r   r   boolr4   propertyr   r6   r9   r   r8   r<   __classcell__r   r   r   r   r      s4   
 

r   )rF   r'   
contextlibr   typingr   r   r   aiohttpr   aiohttp.abcr   rM   r   r   r   r   r   <module>   s   