o
    ˳iz^                  
   @   sF  U d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
 ddlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZm Z  ddl!m"Z" e#e$Z%e e e e e e e e e d	Z&e'e(ef e)d< e e e e  e  d	Z*e'e+e(e(f ef e)d
< e(eB Z,G dd dZ-G dd dee, Z.dS )z>Utility class for converting between Avro and Iceberg schemas.    N)Any)FIELD_ID_PROPICEBERG_FIELD_NAME_PROPSchemaSchemaVisitorPerPrimitiveTypemake_compatible_namevisit)
BinaryTypeBooleanTypeDateTypeDecimalType
DoubleType	FixedType	FloatTypeIcebergTypeIntegerTypeListTypeLongTypeMapTypeNestedFieldPrimitiveType
StringType
StructTypeTimestampTypeTimestamptzTypeTimeTypeUnknownTypeUUIDType)decimal_required_bytes)	booleanbytesdoublefloatintlongstringenumnullPRIMITIVE_FIELD_TYPE_MAPPING))dater#   )time-microsr$   )timestamp-microsr$   )uuidfixed)r,   r%   LOGICAL_FIELD_TYPE_MAPPINGc                   @   sv  e Zd Zdeeef defddZd%dededB defdd	Z	d
eeef e
eeeef B  B eB deeeeef B ef fddZdeeeef B defddZdeeef defddZdeeef defddZdeeef defddZdeeef defddZdeeef defddZdeeef defdd Zdeeef defd!d"Zdeeef defd#d$ZdS )&AvroSchemaConversionavro_schemareturnc                    s    t  fdd|d D ddiS )a  Convert an Apache Avro into an Apache Iceberg schema equivalent.

        This expects to have field id's to be encoded in the Avro schema:

            {
                "type": "record",
                "name": "manifest_file",
                "fields": [
                    {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
                    {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
                ]
            }

        Example:
            This converts an Avro schema into an Iceberg schema:

            >>> avro_schema = AvroSchemaConversion().avro_to_iceberg({
            ...     "type": "record",
            ...     "name": "manifest_file",
            ...     "fields": [
            ...         {"name": "manifest_path", "type": "string", "doc": "Location URI with FS scheme", "field-id": 500},
            ...         {"name": "manifest_length", "type": "long", "doc": "Total file size in bytes", "field-id": 501}
            ...     ]
            ... })
            >>> iceberg_schema = Schema(
            ...     NestedField(
            ...         field_id=500, name="manifest_path", field_type=StringType(),
            ...         required=False, doc="Location URI with FS scheme"
            ...     ),
            ...     NestedField(
            ...         field_id=501, name="manifest_length", field_type=LongType(),
            ...         required=False, doc="Total file size in bytes"
            ...     ),
            ...     schema_id=1
            ... )
            >>> avro_schema == iceberg_schema
            True

        Args:
            avro_schema (Dict[str, Any]): The JSON decoded Avro schema.

        Returns:
            Equivalent Iceberg schema.
        c                       g | ]}  |qS  _convert_field.0fieldselfr3   U/home/ubuntu/.local/lib/python3.10/site-packages/pyiceberg/utils/schema_conversion.py
<listcomp>       z8AvroSchemaConversion.avro_to_iceberg.<locals>.<listcomp>fields	schema_id   )r   )r:   r0   r3   r9   r;   avro_to_icebergS   s    -z$AvroSchemaConversion.avro_to_icebergNschemaschema_namec                 C   s   t |t|S )zQConvert an Iceberg schema into an Avro dictionary that can be serialized to JSON.)r   ConvertSchemaToAvro)r:   rB   rC   r3   r3   r;   iceberg_to_avro   s   z$AvroSchemaConversion.iceberg_to_avro
type_unionc                 C   sp   t |tr	|dfS t |tr|dfS |}t|dkr!td| d|d kr+tdttdd |d d	fS )
a'  
        Convert Unions into their type and resolves if the field is required.

        Examples:
            >>> AvroSchemaConversion()._resolve_union('str')
            ('str', True)
            >>> AvroSchemaConversion()._resolve_union(['null', 'str'])
            ('str', False)
            >>> AvroSchemaConversion()._resolve_union([{'type': 'str'}])
            ({'type': 'str'}, True)
            >>> AvroSchemaConversion()._resolve_union(['null', {'type': 'str'}])
            ({'type': 'str'}, False)

        Args:
            type_union: The field, can be a string 'str', list ['null', 'str'], or dict {"type": 'str'}.

        Returns:
            A tuple containing the type and if required.

        Raises:
            TypeError: In the case non-optional union types are encountered.
        T   z=Non-optional types aren't part of the Iceberg specification: r'   r   zOnly null-unions are supportedc                 S   s   | dkS Nr'   r3   )tr3   r3   r;   <lambda>   s    z5AvroSchemaConversion._resolve_union.<locals>.<lambda>F)
isinstancestrdictlen	TypeErrorlistfilter)r:   rF   
avro_typesr3   r3   r;   _resolve_union   s   

	z#AvroSchemaConversion._resolve_union	avro_typec                 C   s   t |tr|tv rt| S t |trqd|v r| |S d|v r5t |d tr5|d }d|v r5t |d ts&|d }|dkrB| |S |dkrK| |S |dkrT| |S |dkr]| |S t |trj|tv rjt| S t	d| t	d| )z
        Resolve the Avro type.

        Args:
            avro_type: The Avro type, can be simple or complex.

        Returns:
            The equivalent IcebergType.

        Raises:
            ValueError: When there are unknown types
        logicalTypetyperecordarraymapr-   zType not recognized: )
rK   rL   r(   rM   _convert_logical_type_convert_record_type_convert_array_type_convert_map_type_convert_fixed_typerO   )r:   rT   type_identifierr3   r3   r;   _convert_schema   s*   





z$AvroSchemaConversion._convert_schemar8   c                 C   sT   t |vrtdt  d| | |d \}}t|t  |d | |||ddS )zConvert an Avro field into an Iceberg equivalent field.

        Args:
            field: The Avro field.

        Returns:
            The Iceberg equivalent field.
        zCannot convert field, missing z: rV   namedoc)field_idra   
field_typerequiredrb   )r   
ValueErrorrS   r   r`   get)r:   r8   
plain_typere   r3   r3   r;   r5      s   	z#AvroSchemaConversion._convert_fieldrecord_typec                    s4   |d dkrt d| t fdd|d D  S )aE  
        Convert the fields from a record into an Iceberg struct.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> record_type = {
            ...     "type": "record",
            ...     "name": "r508",
            ...     "fields": [{
            ...         "name": "contains_null",
            ...         "type": "boolean",
            ...         "doc": "True if any file has a null partition value",
            ...         "field-id": 509,
            ...      }, {
            ...          "name": "contains_nan",
            ...          "type": ["null", "boolean"],
            ...          "doc": "True if any file has a nan partition value",
            ...          "default": None,
            ...          "field-id": 518,
            ...      }],
            ... }
            >>> actual = AvroSchemaConversion()._convert_record_type(record_type)
            >>> expected = StructType(
            ...     fields=(
            ...         NestedField(
            ...             field_id=509,
            ...             name="contains_null",
            ...             field_type=BooleanType(),
            ...             required=False,
            ...             doc="True if any file has a null partition value",
            ...         ),
            ...         NestedField(
            ...             field_id=518,
            ...             name="contains_nan",
            ...             field_type=BooleanType(),
            ...             required=True,
            ...             doc="True if any file has a nan partition value",
            ...         ),
            ...     )
            ... )
            >>> expected == actual
            True

        Args:
            record_type: The record type itself.

        Returns: A StructType.
        rV   rW   zExpected record type, got: c                    r2   r3   r4   r6   r9   r3   r;   r<   '  r=   z=AvroSchemaConversion._convert_record_type.<locals>.<listcomp>r>   )rf   r   )r:   ri   r3   r9   r;   r[      s   1z)AvroSchemaConversion._convert_record_type
array_typec                 C   s@   d|vrt d| | |d \}}t|d | ||dS )N
element-idz/Cannot convert array-type, missing element-id: items)
element_idelement_typeelement_required)rf   rS   r   r`   )r:   rj   rh   ro   r3   r3   r;   r\   )  s   z(AvroSchemaConversion._convert_array_typemap_typec                 C   s4   |  |d \}}t|d t |d | ||dS )aQ  Convert an avro map type into an Iceberg MapType.

        Args:
            map_type: The dict that describes the Avro map type.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> avro_field = {
            ...     "type": "map",
            ...     "values": ["null", "long"],
            ...     "key-id": 101,
            ...     "value-id": 102,
            ... }
            >>> actual = AvroSchemaConversion()._convert_map_type(avro_field)
            >>> expected = MapType(
            ...     key_id=101,
            ...     key_type=StringType(),
            ...     value_id=102,
            ...     value_type=LongType(),
            ...     value_required=True
            ... )
            >>> actual == expected
            True

        Returns: A MapType.
        valueskey-idvalue-idkey_idkey_typevalue_id
value_typevalue_required)rS   r   r   r`   )r:   rp   rx   ry   r3   r3   r;   r]   5  s   z&AvroSchemaConversion._convert_map_typeavro_logical_typec                 C   s~   |d }|d }|dkr|  |S |dkr| |S |dkr,|dddu r)t S t S ||ftv r8t||f S td	| )
a  Convert a schema with a logical type annotation into an IcebergType.

        For the decimal and map we need to fetch more keys from the dict, and for
        the simple ones we can just look it up in the mapping.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> avro_logical_type = {
            ...     "type": "int",
            ...     "logicalType": "date"
            ... }
            >>> actual = AvroSchemaConversion()._convert_logical_type(avro_logical_type)
            >>> actual == DateType()
            True

        Args:
            avro_logical_type: The logical type.

        Returns:
            The converted logical type.

        Raises:
            ValueError: When the logical type is unknown.
        rU   rV   decimalrY   r+   adjust-to-utcFTz+Unknown logical/physical type combination: )_convert_logical_decimal_type_convert_logical_map_typerg   r   r   r.   rf   )r:   rz   logical_typephysical_typer3   r3   r;   rZ   Z  s   

z*AvroSchemaConversion._convert_logical_typec                 C   s   t |d |d dS )a  Convert an avro type to an Iceberg DecimalType.

        Args:
            avro_type: The Avro type.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> avro_decimal_type = {
            ...     "type": "bytes",
            ...     "logicalType": "decimal",
            ...     "precision": 19,
            ...     "scale": 25
            ... }
            >>> actual = AvroSchemaConversion()._convert_logical_decimal_type(avro_decimal_type)
            >>> expected = DecimalType(
            ...     precision=19,
            ...     scale=25
            ... )
            >>> actual == expected
            True

        Returns:
            A Iceberg DecimalType.
        	precisionscale)r   r   )r   r:   rT   r3   r3   r;   r}     s   z2AvroSchemaConversion._convert_logical_decimal_typec                 C   s~   |d d }t |dkrtd|d  | ttdd |d }| ttdd |d }t|j|j|j|j|jd	S )
as  Convert an avro map type to an Iceberg MapType.

        In the case where a map hasn't a key as a type you can use a logical map to still encode this in Avro.

        Args:
            avro_type: The Avro Type.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> avro_type = {
            ...     "type": "array",
            ...     "logicalType": "map",
            ...     "items": {
            ...         "type": "record",
            ...         "name": "k101_v102",
            ...         "fields": [
            ...             {"name": "key", "type": "int", "field-id": 101},
            ...             {"name": "value", "type": "string", "field-id": 102},
            ...         ],
            ...     },
            ... }
            >>> actual = AvroSchemaConversion()._convert_logical_map_type(avro_type)
            >>> expected = MapType(
            ...         key_id=101,
            ...         key_type=IntegerType(),
            ...         value_id=102,
            ...         value_type=StringType(),
            ...         value_required=False
            ... )
            >>> actual == expected
            True

        .. _Apache Iceberg specification:
            https://iceberg.apache.org/spec/#appendix-a-format-specific-requirements

        Returns:
            The logical map.
        rl   r>   rG   zInvalid key-value pair schema: c                 S      | d dkS )Nra   keyr3   fr3   r3   r;   rJ         z@AvroSchemaConversion._convert_logical_map_type.<locals>.<lambda>r   c                 S   r   )Nra   valuer3   r   r3   r3   r;   rJ     r   rt   )	rN   rf   r5   rP   rQ   r   rc   rd   re   )r:   rT   r>   r   r   r3   r3   r;   r~     s   'z.AvroSchemaConversion._convert_logical_map_typec                 C   s   t |d dS )av  
        Convert Avro Type to the equivalent Iceberg fixed type.

        - https://avro.apache.org/docs/current/spec.html#Fixed

        Args:
            avro_type: The Avro type.

        Examples:
            >>> from pyiceberg.utils.schema_conversion import AvroSchemaConversion
            >>> avro_fixed_type = {
            ...     "name": "md5",
            ...     "type": "fixed",
            ...     "size": 16
            ... }
            >>> FixedType(length=16) == AvroSchemaConversion()._convert_fixed_type(avro_fixed_type)
            True

        Returns:
            An Iceberg equivalent fixed type.
        size)length)r   r   r3   r3   r;   r^     s   z(AvroSchemaConversion._convert_fixed_typeN)__name__
__module____qualname__rM   rL   r   r   rA   AvroTyperE   rP   tupleboolrS   r   r`   r   r5   r   r[   r   r\   r   r]   rZ   r   r}   r~   r   r^   r3   r3   r3   r;   r/   R   s    /F1&6%)4r/   c                   @   s*  e Zd ZU dZedB ed< eed< eed< eed< dedB ddfdd	Zd
ede	de	fddZ
deddfddZdeddfddZdeddfddZdedee	 de	fddZdede	de	fddZded e	de	fd!d"Zd#ed$e	d%e	de	fd&d'Zd(ede	fd)d*Zd+ede	fd,d-Zd.ede	fd/d0Zd1ede	fd2d3Zd4ede	fd5d6Zd7e de	fd8d9Z!d:e"de	fd;d<Z#d=e$de	fd>d?Z%d@e&de	fdAdBZ'dCe(de	fdDdEZ)dCe(de	fdFdGZ*dHe+de	fdIdJZ,dHe+de	fdKdLZ-dMe.de	fdNdOZ/dPe0de	fdQdRZ1dSe2de	fdTdUZ3dVe4de	fdWdXZ5dS )YrD   z,Convert an Iceberg schema to an Avro schema.NrC   last_list_field_idlast_map_key_field_idlast_map_value_field_idr1   c                 C   s
   || _ dS )zzConvert an Iceberg schema to an Avro schema.

        Args:
            schema_name: The name of the root record.
        N)rC   )r:   rC   r3   r3   r;   __init__  s   
zConvertSchemaToAvro.__init__rB   struct_resultc                 C   s"   t |tr| jd ur| j|d< |S )Nra   )rK   rM   rC   )r:   rB   r   r3   r3   r;   rB     s   
zConvertSchemaToAvro.schemaelementc                 C      |j | _d S r   )rc   r   )r:   r   r3   r3   r;   before_list_element      z'ConvertSchemaToAvro.before_list_elementr   c                 C   r   r   )rc   r   )r:   r   r3   r3   r;   before_map_key  r   z"ConvertSchemaToAvro.before_map_keyr   c                 C   r   r   )rc   r   )r:   r   r3   r3   r;   before_map_value  r   z$ConvertSchemaToAvro.before_map_valuestructfield_resultsc                 C   s
   d|dS )NrW   )rV   r>   r3   )r:   r   r   r3   r3   r;   r   	     
zConvertSchemaToAvro.structr8   field_resultc                 C   s   t |tr|ddkrd|j |d< |j}t|}d|t|jd|jr&|nd|gi}||kr3||t< |j	d ur>|j	|d< n|j
rEd |d< |jd urO|j|d< |S )NrV   rW   rra   r'   defaultrb   )rK   rM   rg   rc   ra   r   r   re   r   write_defaultoptionalrb   )r:   r8   r   original_namesanitized_nameresultr3   r3   r;   r8     s"   


zConvertSchemaToAvro.field	list_typeelement_resultc                 C   s6   t |tr|ddkrd| j |d< d| j|dS )NrV   rW   r   ra   rX   )rV   rk   rl   )rK   rM   rg   r   )r:   r   r   r3   r3   r;   rP   '  s   zConvertSchemaToAvro.listrp   
key_resultvalue_resultc              
   C   s`   t |trd|| j| jdS ddd| j d| j ddd	|t| jidd
d	|t| jigdddS )NrY   )rV   rq   rr   rs   rX   rW   k_vra   r   rV   r   )rV   ra   r>   )rV   rl   rU   )rK   r   r   r   r   )r:   rp   r   r   r3   r3   r;   rY   -  s   

zConvertSchemaToAvro.map
fixed_typec                 C   s   dt |dt | dS )Nr-   fixed_)rV   r   ra   )rN   )r:   r   r3   r3   r;   visit_fixedF  s   zConvertSchemaToAvro.visit_fixeddecimal_typec              	   C   s,   dt |jd|j|jd|j d|j dS )Nr-   r{   decimal__)rV   r   rU   r   r   ra   )r   r   r   )r:   r   r3   r3   r;   visit_decimalI  s   z!ConvertSchemaToAvro.visit_decimalboolean_typec                 C      dS )Nr   r3   )r:   r   r3   r3   r;   visit_booleanS     z!ConvertSchemaToAvro.visit_booleaninteger_typec                 C   r   )Nr#   r3   )r:   r   r3   r3   r;   visit_integerV  r   z!ConvertSchemaToAvro.visit_integer	long_typec                 C   r   )Nr$   r3   )r:   r   r3   r3   r;   
visit_longY  r   zConvertSchemaToAvro.visit_long
float_typec                 C   r   )Nr"   r3   )r:   r   r3   r3   r;   visit_float\  r   zConvertSchemaToAvro.visit_floatdouble_typec                 C   r   )Nr!   r3   )r:   r   r3   r3   r;   visit_double_  r   z ConvertSchemaToAvro.visit_double	date_typec                 C   
   dddS )Nr#   r)   rV   rU   r3   )r:   r   r3   r3   r;   
visit_dateb  r   zConvertSchemaToAvro.visit_date	time_typec                 C   r   )Nr$   r*   r   r3   )r:   r   r3   r3   r;   
visit_timee  r   zConvertSchemaToAvro.visit_timetimestamp_typec                 C      ddddS )Nr$   r+   FrV   rU   r|   r3   r:   r   r3   r3   r;   visit_timestamph  r   z#ConvertSchemaToAvro.visit_timestampc                 C   r   )Nr$   timestamp-nanosFr   r3   r   r3   r3   r;   visit_timestamp_nsk  r   z&ConvertSchemaToAvro.visit_timestamp_nstimestamptz_typec                 C   r   )Nr$   r+   Tr   r3   r:   r   r3   r3   r;   visit_timestamptzn  r   z%ConvertSchemaToAvro.visit_timestamptzc                 C   r   )Nr$   r   Tr   r3   r   r3   r3   r;   visit_timestamptz_nsq  r   z(ConvertSchemaToAvro.visit_timestamptz_nsstring_typec                 C   r   )Nr%   r3   )r:   r   r3   r3   r;   visit_stringt  r   z ConvertSchemaToAvro.visit_string	uuid_typec                 C   s   dddddS )Nr-      r,   
uuid_fixed)rV   r   rU   ra   r3   )r:   r   r3   r3   r;   
visit_uuidw  s   zConvertSchemaToAvro.visit_uuidbinary_typec                 C   r   )Nr    r3   )r:   r   r3   r3   r;   visit_binaryz  r   z ConvertSchemaToAvro.visit_binaryunknown_typec                 C   r   rH   r3   )r:   r   r3   r3   r;   visit_unknown}  r   z!ConvertSchemaToAvro.visit_unknown)6r   r   r   __doc__rL   __annotations__r#   r   r   r   rB   r   r   r   r   r   rP   r   r8   r   r   rY   r   r   r   r   r
   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r	   r   r   r   r3   r3   r3   r;   rD     s@   
 
rD   )/r   loggingtypingr   pyiceberg.schemar   r   r   r   r   r   pyiceberg.typesr	   r
   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   pyiceberg.utils.decimalr   	getLoggerr   loggerr(   rM   rL   r   r.   r   r   r/   rD   r3   r3   r3   r;   <module>   s:    \
   