o
    iB*                     @  s  d Z ddlmZ ddlZddlZddlZddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZ dd	lmZmZmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZm Z m!Z!m"Z" ddl#m$Z$ ddl%m&Z&m'Z'm(Z( ddl)m*Z* ddl+m,Z,m-Z-m.Z. ddl/m0Z0m1Z1m2Z2m3Z3m4Z4 ddl5m6Z6 dZ7e8de9e7g Z:e;e:Z<dZ=e4e2dde0e<ddde2dde1de3 d e3 dd!dd"e2d#d$e0e=ddd"Z>d%Z?G d&d' d'e-Z@ed(e.d)ZAe	G d*d+ d+eeA ZBG d,d- d-eeA ZCG d.d/ d/eeA ZDdS )0z#Avro reader for reading Avro files.    )annotationsN)Callable)	dataclass)Enum)TracebackType)GenericTypeVar)AVRO_CODEC_KEYCODEC_MAPPING_ICEBERG_TO_AVROKNOWN_CODECS)Codec)BinaryDecodernew_decoder)BinaryEncoder)Reader)construct_readerconstruct_writerresolve_readerresolve_writer)Writer)	InputFile
OutputFileOutputStream)Schema)
EMPTY_DICTRecordStructProtocol)	FixedTypeMapTypeNestedField
StringType
StructType)AvroSchemaConversion   s   Obj   magicd   )lengthT)namefield_id
field_typerequired   meta      )key_idkey_typevalue_id
value_typevalue_required)r)   r(   r*   r+   i,  synczavro.schemac                   @  sJ   e Zd ZedddZedddZeddd	ZdddZdddZdS )AvroFileHeaderreturnbytesc                 C  
   | j d S )Nr   _dataself r>   Q/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/pyiceberg/avro/file.pyr%   H      
zAvroFileHeader.magicdict[str, str]c                 C  r9   )Nr#   r:   r<   r>   r>   r?   r-   L   r@   zAvroFileHeader.metac                 C  r9   )N   r:   r<   r>   r>   r?   r5   P   r@   zAvroFileHeader.synctype[Codec] | Nonec                 C  s:   ddl m} | jt|j}|tvrtd| t| S zGet the file's compression codec algorithm from the file's metadata.

        In the case of a null codec, we return a None indicating that we
        don't need to compress/decompress.
        r   TablePropertieszUnsupported codec: )pyiceberg.tablerF   r-   getr	   WRITE_AVRO_COMPRESSION_DEFAULTr   
ValueError)r=   rF   
codec_namer>   r>   r?   compression_codecT   s
   z AvroFileHeader.compression_codecr   c                 C  s2   t | jv r| jt  }t|}t |S td)Nz$No schema found in Avro file headers)_SCHEMA_KEYr-   jsonloadsr"   avro_to_icebergrJ   )r=   avro_schema_stringavro_schemar>   r>   r?   
get_schemab   s
   


zAvroFileHeader.get_schemaN)r7   r8   )r7   rA   r7   rC   )r7   r   )	__name__
__module____qualname__propertyr%   r-   r5   rL   rS   r>   r>   r>   r?   r6   G   s    
r6   D)boundc                   @  sP   e Zd ZU ded< ded< ded< dZded< dddZdddZdddZdS )Blockr   readerintblock_recordsr   block_decoderr   positionr7   Block[D]c                 C     | S )z'Return an iterator for the Block class.r>   r<   r>   r>   r?   __iter__u      zBlock.__iter__boolc                 C  s   | j | jk S N)r`   r^   r<   r>   r>   r?   has_nexty   s   zBlock.has_nextrY   c                 C  s(   |   r|  jd7  _| j| jS t)z9Return the next item when iterating over the Block class.r#   )rg   r`   r\   readr_   StopIterationr<   r>   r>   r?   __next__|   s   zBlock.__next__N)r7   ra   )r7   re   r7   rY   )rU   rV   rW   __annotations__r`   rc   rg   rj   r>   r>   r>   r?   r[   n   s   
 

r[   c                   @  s   e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< deefd.ddZd/ddZd0d"d#Zd/d$d%Z	d1d'd(Z
d2d*d+Zd3d,d-ZdS )4AvroFile)	
input_fileread_schema
read_types
read_enumsheaderschemar\   decoderblockr   rn   Schema | Nonero   (dict[int, Callable[..., StructProtocol]]rp   dict[int, Callable[..., Enum]]rq   r6   rr   r   rs   r   r\   r   rt   zBlock[D] | Noneru   Nr7   Nonec                 C  s"   || _ || _|| _|| _d | _d S rf   )rn   ro   rp   rq   ru   )r=   rn   ro   rp   rq   r>   r>   r?   __init__   s
   
zAvroFile.__init__AvroFile[D]c                 C  sx   | j  }t| | _W d   n1 sw   Y  |  | _| j | _| j	s.| j| _	t
| j| j	| j| j| _| S )zGenerate a reader tree for the payload within an avro file.

        Return:
            A generator returning the AvroStructs.
        N)rn   openr   rh   rt   _read_headerrr   rS   rs   ro   r   rp   rq   r\   )r=   fr>   r>   r?   	__enter__   s   
zAvroFile.__enter__exctypetype[BaseException] | NoneexcinstBaseException | NoneexctbTracebackType | Nonec                 C  s   dS z=Perform cleanup when exiting the scope of a 'with' statement.Nr>   r=   r   r   r   r>   r>   r?   __exit__   s    zAvroFile.__exit__c                 C  rb   )z*Return an iterator for the AvroFile class.r>   r<   r>   r>   r?   rc      rd   zAvroFile.__iter__r]   c                 C  s|   | j r| jt}|| jjkrtd| jjd|| j }| j }| j	  }r1|
|}t| j|t|d| _ |S )NzExpected sync bytes z
, but got )r\   r^   r_   )ru   rt   rh   	SYNC_SIZErr   r5   rJ   read_int
read_bytesrL   
decompressr[   r\   r   )r=   sync_markerr^   block_bytescodecr>   r>   r?   _read_block   s   


zAvroFile._read_blockrY   c              
   C  sZ   | j r| j  rt| j S z|  }W n ty" } zt|d}~ww |dkr+|  S t)z<Return the next item when iterating over the AvroFile class.Nr   )ru   rg   nextr   EOFErrorri   rj   )r=   	new_blockexcr>   r>   r?   rj      s   
zAvroFile.__next__c                 C  s   t tdti| jS )N)r   META_SCHEMAr6   rh   rt   r<   r>   r>   r?   r}      s   zAvroFile._read_header)
rn   r   ro   rv   rp   rw   rq   rx   r7   ry   )r7   r{   r   r   r   r   r   r   r7   ry   )r7   r]   rk   )r7   r6   )rU   rV   rW   	__slots__rl   r   rz   r   r   rc   r   rj   r}   r>   r>   r>   r?   rm      s*   
 




rm   c                   @  s   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< defd,ddZd-ddZd.d!d"Zd/d#d$Zd0d&d'Z	d1d*d+Z
dS )2AvroOutputFiler   output_filer   output_streamr   file_schemastrschema_namer   encoderr8   
sync_bytesr   writerNrecord_schemarv   metadatarA   r7   ry   c                 C  sL   || _ || _|| _tt| _|d u rt| jdnt|| jd| _	|| _
d S )N)r   )r   r   )r   r   r   osurandomr   r   r   r   r   r   )r=   r   r   r   r   r   r>   r>   r?   rz      s   
zAvroOutputFile.__init__AvroOutputFile[D]c                 C  s(   | j jdd| _t| j| _|   | S )zx
        Open the file and writes the header.

        Returns:
            The file object to write records to
        T)	overwrite)r   creater   r   r   _write_headerr<   r>   r>   r?   r      s   zAvroOutputFile.__enter__r   r   r   r   r   r   c                 C  s   | j   dS r   )r   closer   r>   r>   r?   r     s   zAvroOutputFile.__exit__c                 C  s   ddl m} | jt|j}t| }r|}tt	 j
| j| jd}i | jt|t|i}tt|| j}tt| j| d S )Nr   rE   )r   )rG   rF   r   rH   r	   rI   r
   rN   dumpsr"   iceberg_to_avror   r   rM   r6   MAGICr   r   r   writer   )r=   rF   rK   avro_codec_namejson_schemar-   rr   r>   r>   r?   r     s   zAvroOutputFile._write_headerrC   c                 C  sL   ddl m} | jt|j}t| }r|}|tvr"td| t| S rD   )	rG   rF   r   rH   r	   rI   r
   r   rJ   )r=   rF   rK   r   r>   r>   r?   rL     s   z AvroOutputFile.compression_codecobjectslist[D]c           	      C  s   t  }t|d}|D ]	}| j|| q| }| jt| | 	  }r;|
|\}}| j| | j| n| jt| | j| | j| j d S )N)r   )ioBytesIOr   r   r   getvaluer   	write_intlenrL   compressr   )	r=   r   	in_memoryblock_content_encoderobjblock_contentr   contentcontent_lengthr>   r>   r?   write_block.  s   
zAvroOutputFile.write_block)r   r   r   r   r   r   r   rv   r   rA   r7   ry   )r7   r   r   )r7   ry   rT   )r   r   r7   ry   )rU   rV   rW   rl   r   rz   r   r   r   rL   r   r>   r>   r>   r?   r      s    
 



r   )E__doc__
__future__r   r   rN   r   collections.abcr   dataclassesr   enumr   typesr   typingr   r   pyiceberg.avro.codecsr	   r
   r   pyiceberg.avro.codecs.codecr   pyiceberg.avro.decoderr   r   pyiceberg.avro.encoderr   pyiceberg.avro.readerr   pyiceberg.avro.resolverr   r   r   r   pyiceberg.avro.writerr   pyiceberg.ior   r   r   pyiceberg.schemar   pyiceberg.typedefr   r   r   pyiceberg.typesr   r   r   r    r!   !pyiceberg.utils.schema_conversionr"   VERSIONr8   	bytearrayr   r   
MAGIC_SIZEr   r   rM   r6   rY   r[   rm   r   r>   r>   r>   r?   <module>   sV   $]