# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import math
import threading
from abc import ABC, abstractmethod
from collections.abc import Iterator
from copy import copy
from enum import Enum
from types import TracebackType
from typing import (
    Any,
    Literal,
)

from cachetools import LRUCache
from pydantic_core import to_json

from pyiceberg.avro.codecs import AVRO_CODEC_KEY, AvroCompressionCodec
from pyiceberg.avro.file import AvroFile, AvroOutputFile
from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ValidationError
from pyiceberg.io import FileIO, InputFile, OutputFile
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record, TableVersion
from pyiceberg.types import (
    BinaryType,
    BooleanType,
    IntegerType,
    ListType,
    LongType,
    MapType,
    NestedField,
    PrimitiveType,
    StringType,
    StructType,
)

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864  # 64 * 1024 * 1024
DEFAULT_READ_VERSION: Literal[2] = 2

INITIAL_SEQUENCE_NUMBER = 0


class DataFileContent(int, Enum):
    DATA = 0
    POSITION_DELETES = 1
    EQUALITY_DELETES = 2

    def __repr__(self) -> str:
        """Return the string representation of the DataFileContent class."""
        return f"DataFileContent.{self.name}"

    @staticmethod
    def from_rest_type(content_type: str) -> DataFileContent:
        """Convert REST API content type string to DataFileContent.

        Args:
            content_type: REST API content type.

        Returns:
            The corresponding DataFileContent enum value.

        Raises:
            ValueError: If the content type is unknown.
        """
        mapping = {
            "data": DataFileContent.DATA,
            "position-deletes": DataFileContent.POSITION_DELETES,
            "equality-deletes": DataFileContent.EQUALITY_DELETES,
        }
        if content_type not in mapping:
            raise ValueError(f"Invalid file content value: {content_type}")
        return mapping[content_type]


class ManifestContent(int, Enum):
    DATA = 0
    DELETES = 1

    def __repr__(self) -> str:
        """Return the string representation of the ManifestContent class."""
        return f"ManifestContent.{self.name}"


class ManifestEntryStatus(int, Enum):
    EXISTING = 0
    ADDED = 1
    DELETED = 2

    def __repr__(self) -> str:
        """Return the string representation of the ManifestEntryStatus class."""
        return f"ManifestEntryStatus.{self.name}"


class FileFormat(str, Enum):
    AVRO = "AVRO"
    PARQUET = "PARQUET"
    ORC = "ORC"
    PUFFIN = "PUFFIN"

    @classmethod
    def _missing_(cls, value: object) -> None | str:
        for member in cls:
            if member.value == str(value).upper():
                return member
        return None

    def __repr__(self) -> str:
        """Return the string representation of the FileFormat class."""
        return f"FileFormat.{self.name}"


DATA_FILE_TYPE: dict[int, StructType] = {
    1: StructType(
        NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(
            field_id=101,
            name="file_format",
            field_type=StringType(),
            required=True,
            doc="File format name: avro, orc, or parquet",
        ),
        NestedField(
            field_id=102,
            name="partition",
            field_type=StructType(),
            required=True,
            doc="Partition data tuple, schema based on the partition spec",
        ),
        NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
        NestedField(
            field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
        ),
        NestedField(
            field_id=105,
            name="block_size_in_bytes",
            field_type=LongType(),
            required=True,
            doc="Deprecated. Always write a default in v1. Do not write in v2.",
            write_default=DEFAULT_BLOCK_SIZE,
        ),
        NestedField(
            field_id=108,
            name="column_sizes",
            field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
            required=False,
            doc="Map of column id to total size on disk",
        ),
        NestedField(
            field_id=109,
            name="value_counts",
            field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
            required=False,
            doc="Map of column id to total count, including null and NaN",
        ),
        NestedField(
            field_id=110,
            name="null_value_counts",
            field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
            required=False,
            doc="Map of column id to null value count",
        ),
        NestedField(
            field_id=137,
            name="nan_value_counts",
            field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
            required=False,
            doc="Map of column id to number of NaN values in the column",
        ),
        NestedField(
            field_id=125,
            name="lower_bounds",
            field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
            required=False,
            doc="Map of column id to lower bound",
        ),
        NestedField(
            field_id=128,
            name="upper_bounds",
            field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
            required=False,
            doc="Map of column id to upper bound",
        ),
        NestedField(
            field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
        ),
        NestedField(
            field_id=132,
            name="split_offsets",
            field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
            required=False,
            doc="Splittable offsets",
        ),
        NestedField(field_id=140, name="sort_order_id", field_type=IntegerType(), required=False, doc="Sort order ID"),
    ),
    2: StructType(
        NestedField(
            field_id=134,
            name="content",
            field_type=IntegerType(),
            required=True,
            doc="File format name: avro, orc, or parquet",
            initial_default=DataFileContent.DATA,
        ),
        NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(
            field_id=101,
            name="file_format",
            field_type=StringType(),
            required=True,
            doc="File format name: avro, orc, or parquet",
        ),
        NestedField(
            field_id=102,
            name="partition",
            field_type=StructType(),
            required=True,
            doc="Partition data tuple, schema based on the partition spec",
        ),
        NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
        NestedField(
            field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
        ),
        NestedField(
            field_id=108,
            name="column_sizes",
            field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
            required=False,
            doc="Map of column id to total size on disk",
        ),
        NestedField(
            field_id=109,
            name="value_counts",
            field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
            required=False,
            doc="Map of column id to total count, including null and NaN",
        ),
        NestedField(
            field_id=110,
            name="null_value_counts",
            field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
            required=False,
            doc="Map of column id to null value count",
        ),
        NestedField(
            field_id=137,
            name="nan_value_counts",
            field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
            required=False,
            doc="Map of column id to number of NaN values in the column",
        ),
        NestedField(
            field_id=125,
            name="lower_bounds",
            field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
            required=False,
            doc="Map of column id to lower bound",
        ),
        NestedField(
            field_id=128,
            name="upper_bounds",
            field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
            required=False,
            doc="Map of column id to upper bound",
        ),
        NestedField(
            field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
        ),
        NestedField(
            field_id=132,
            name="split_offsets",
            field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
            required=False,
            doc="Splittable offsets",
        ),
        NestedField(
            field_id=135,
            name="equality_ids",
            field_type=ListType(element_id=136, element_type=LongType(), element_required=True),
            required=False,
            doc="Field ids used to determine row equality in equality delete files.",
        ),
        NestedField(
            field_id=140,
            name="sort_order_id",
            field_type=IntegerType(),
            required=False,
            doc="ID representing sort order for this file",
        ),
    ),
    3: StructType(
        NestedField(
            field_id=134,
            name="content",
            field_type=IntegerType(),
            required=True,
            doc="File format name: avro, orc, or parquet",
            initial_default=DataFileContent.DATA,
        ),
        NestedField(field_id=100, name="file_path", field_type=StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(
            field_id=101,
            name="file_format",
            field_type=StringType(),
            required=True,
            doc="File format name: avro, orc, or parquet",
        ),
        NestedField(
            field_id=102,
            name="partition",
            field_type=StructType(),
            required=True,
            doc="Partition data tuple, schema based on the partition spec",
        ),
        NestedField(field_id=103, name="record_count", field_type=LongType(), required=True, doc="Number of records in the file"),
        NestedField(
            field_id=104, name="file_size_in_bytes", field_type=LongType(), required=True, doc="Total file size in bytes"
        ),
        NestedField(
            field_id=108,
            name="column_sizes",
            field_type=MapType(key_id=117, key_type=IntegerType(), value_id=118, value_type=LongType()),
            required=False,
            doc="Map of column id to total size on disk",
        ),
        NestedField(
            field_id=109,
            name="value_counts",
            field_type=MapType(key_id=119, key_type=IntegerType(), value_id=120, value_type=LongType()),
            required=False,
            doc="Map of column id to total count, including null and NaN",
        ),
        NestedField(
            field_id=110,
            name="null_value_counts",
            field_type=MapType(key_id=121, key_type=IntegerType(), value_id=122, value_type=LongType()),
            required=False,
            doc="Map of column id to null value count",
        ),
        NestedField(
            field_id=137,
            name="nan_value_counts",
            field_type=MapType(key_id=138, key_type=IntegerType(), value_id=139, value_type=LongType()),
            required=False,
            doc="Map of column id to number of NaN values in the column",
        ),
        NestedField(
            field_id=125,
            name="lower_bounds",
            field_type=MapType(key_id=126, key_type=IntegerType(), value_id=127, value_type=BinaryType()),
            required=False,
            doc="Map of column id to lower bound",
        ),
        NestedField(
            field_id=128,
            name="upper_bounds",
            field_type=MapType(key_id=129, key_type=IntegerType(), value_id=130, value_type=BinaryType()),
            required=False,
            doc="Map of column id to upper bound",
        ),
        NestedField(
            field_id=131, name="key_metadata", field_type=BinaryType(), required=False, doc="Encryption key metadata blob"
        ),
        NestedField(
            field_id=132,
            name="split_offsets",
            field_type=ListType(element_id=133, element_type=LongType(), element_required=True),
            required=False,
            doc="Splittable offsets",
        ),
        NestedField(
            field_id=135,
            name="equality_ids",
            field_type=ListType(element_id=136, element_type=LongType(), element_required=True),
            required=False,
            doc="Field ids used to determine row equality in equality delete files.",
        ),
        NestedField(
            field_id=140,
            name="sort_order_id",
            field_type=IntegerType(),
            required=False,
            doc="ID representing sort order for this file",
        ),
        NestedField(
            field_id=142,
            name="first_row_id",
            field_type=LongType(),
            required=False,
            doc="The _row_id for the first row in the data file.",
        ),
        NestedField(
            field_id=143,
            name="referenced_data_file",
            field_type=StringType(),
            required=False,
            doc="Fully qualified location (URI with FS scheme) of a data file that all deletes reference",
        ),
        NestedField(
            field_id=144,
            name="content_offset",
            field_type=LongType(),
            required=False,
            doc="The offset in the file where the content starts.",
        ),
        NestedField(
            field_id=145,
            name="content_size_in_bytes",
            field_type=LongType(),
            required=False,
            doc="The length of a referenced content stored in the file; required if content_offset is present",
        ),
    ),
}


def data_file_with_partition(partition_type: StructType, format_version: TableVersion) -> StructType:
    data_file_partition_type = StructType(
        *[
            NestedField(
                field_id=field.field_id,
                name=field.name,
                field_type=field.field_type,
                required=field.required,
            )
            for field in partition_type.fields
        ]
    )

    return StructType(
        *[
            NestedField(
                field_id=102,
                name="partition",
                field_type=data_file_partition_type,
                required=True,
                doc="Partition data tuple, schema based on the partition spec",
            )
            if field.field_id == 102
            else field
            for field in DATA_FILE_TYPE[format_version].fields
        ]
    )


class DataFile(Record):
    @classmethod
    def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> DataFile:
        struct = DATA_FILE_TYPE[_table_format_version]
        return super()._bind(struct, **arguments)

    @property
    def content(self) -> DataFileContent:
        return self._data[0]

    @property
    def file_path(self) -> str:
        return self._data[1]

    @property
    def file_format(self) -> FileFormat:
        return self._data[2]

    @property
    def partition(self) -> Record:
        return self._data[3]

    @property
    def record_count(self) -> int:
        return self._data[4]

    @property
    def file_size_in_bytes(self) -> int:
        return self._data[5]

    @property
    def column_sizes(self) -> dict[int, int]:
        return self._data[6]

    @property
    def value_counts(self) -> dict[int, int]:
        return self._data[7]

    @property
    def null_value_counts(self) -> dict[int, int]:
        return self._data[8]

    @property
    def nan_value_counts(self) -> dict[int, int]:
        return self._data[9]

    @property
    def lower_bounds(self) -> dict[int, bytes]:
        return self._data[10]

    @property
    def upper_bounds(self) -> dict[int, bytes]:
        return self._data[11]

    @property
    def key_metadata(self) -> bytes | None:
        return self._data[12]

    @property
    def split_offsets(self) -> list[int] | None:
        return self._data[13]

    @property
    def equality_ids(self) -> list[int] | None:
        return self._data[14]

    @property
    def sort_order_id(self) -> int | None:
        return self._data[15]

    # Spec ID should not be stored in the file
    _spec_id: int

    @property
    def spec_id(self) -> int:
        return self._spec_id

    @spec_id.setter
    def spec_id(self, value: int) -> None:
        self._spec_id = value

    def __setattr__(self, name: str, value: Any) -> None:
        """Assign a key/value to a DataFile."""
        # The file_format is written as a string, so we need to cast it to the Enum
        if name == "file_format":
            value = FileFormat[value]
        super().__setattr__(name, value)

    def __hash__(self) -> int:
        """Return the hash of the file path."""
        return hash(self.file_path)

    def __eq__(self, other: Any) -> bool:
        """Compare the datafile with another object.

        If it is a datafile, it will compare based on the file_path.
        """
        return self.file_path == other.file_path if isinstance(other, DataFile) else False


MANIFEST_ENTRY_SCHEMAS = {
    1: Schema(
        NestedField(0, "status", IntegerType(), required=True),
        NestedField(1, "snapshot_id", LongType(), required=True),
        NestedField(2, "data_file", DATA_FILE_TYPE[1], required=True),
    ),
    2: Schema(
        NestedField(0, "status", IntegerType(), required=True),
        NestedField(1, "snapshot_id", LongType(), required=False),
        NestedField(3, "sequence_number", LongType(), required=False),
        NestedField(4, "file_sequence_number", LongType(), required=False),
        NestedField(2, "data_file", DATA_FILE_TYPE[2], required=True),
    ),
    3: Schema(
        NestedField(0, "status", IntegerType(), required=True),
        NestedField(1, "snapshot_id", LongType(), required=False),
        NestedField(3, "sequence_number", LongType(), required=False),
        NestedField(4, "file_sequence_number", LongType(), required=False),
        NestedField(2, "data_file", DATA_FILE_TYPE[3], required=True),
    ),
}

MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version: schema.as_struct() for format_version, schema in MANIFEST_ENTRY_SCHEMAS.items()}


def manifest_entry_schema_with_data_file(format_version: TableVersion, data_file: StructType) -> Schema:
    return Schema(
        *[
            NestedField(2, "data_file", data_file, required=True) if field.field_id == 2 else field
            for field in MANIFEST_ENTRY_SCHEMAS[format_version].fields
        ]
    )


class ManifestEntry(Record):
    @classmethod
    def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestEntry:
        return super()._bind(**arguments, struct=MANIFEST_ENTRY_SCHEMAS_STRUCT[_table_format_version])

    @property
    def status(self) -> ManifestEntryStatus:
        return self._data[0]

    @status.setter
    def status(self, value: ManifestEntryStatus) -> None:
        self._data[0] = value

    @property
    def snapshot_id(self) -> int | None:
        return self._data[1]

    @snapshot_id.setter
    def snapshot_id(self, value: int) -> None:
        self._data[0] = value

    @property
    def sequence_number(self) -> int | None:
        return self._data[2]

    @sequence_number.setter
    def sequence_number(self, value: int) -> None:
        self._data[2] = value

    @property
    def file_sequence_number(self) -> int | None:
        return self._data[3]

    @file_sequence_number.setter
    def file_sequence_number(self, value: int) -> None:
        self._data[3] = value

    @property
    def data_file(self) -> DataFile:
        return self._data[4]

    @data_file.setter
    def data_file(self, value: DataFile) -> None:
        self._data[4] = value


PARTITION_FIELD_SUMMARY_TYPE = StructType(
    NestedField(509, "contains_null", BooleanType(), required=True),
    NestedField(518, "contains_nan", BooleanType(), required=False),
    NestedField(510, "lower_bound", BinaryType(), required=False),
    NestedField(511, "upper_bound", BinaryType(), required=False),
)


class PartitionFieldSummary(Record):
    @classmethod
    def from_args(cls, **arguments: Any) -> PartitionFieldSummary:
        return super()._bind(**arguments, struct=PARTITION_FIELD_SUMMARY_TYPE)

    @property
    def contains_null(self) -> bool:
        return self._data[0]

    @property
    def contains_nan(self) -> bool | None:
        return self._data[1]

    @property
    def lower_bound(self) -> bytes | None:
        return self._data[2]

    @property
    def upper_bound(self) -> bytes | None:
        return self._data[3]


class PartitionFieldStats:
    _type: PrimitiveType
    _contains_null: bool
    _contains_nan: bool
    _min: Any | None
    _max: Any | None

    def __init__(self, iceberg_type: PrimitiveType) -> None:
        self._type = iceberg_type
        self._contains_null = False
        self._contains_nan = False
        self._min = None
        self._max = None

    def to_summary(self) -> PartitionFieldSummary:
        return PartitionFieldSummary(
            self._contains_null,
            self._contains_nan,
            to_bytes(self._type, self._min) if self._min is not None else None,
            to_bytes(self._type, self._max) if self._max is not None else None,
        )

    def update(self, value: Any) -> None:
        if value is None:
            self._contains_null = True
        elif isinstance(value, float) and math.isnan(value):
            self._contains_nan = True
        else:
            if self._min is None:
                self._min = value
                self._max = value
            else:
                self._max = max(self._max, value)
                self._min = min(self._min, value)


def construct_partition_summaries(spec: PartitionSpec, schema: Schema, partitions: list[Record]) -> list[PartitionFieldSummary]:
    types = [field.field_type for field in spec.partition_type(schema).fields]
    field_stats = [PartitionFieldStats(field_type) for field_type in types]
    for partition_keys in partitions:
        for i, field_type in enumerate(types):
            if not isinstance(field_type, PrimitiveType):
                raise ValueError(f"Expected a primitive type for the partition field, got {field_type}")
            partition_key = partition_keys[i]
            field_stats[i].update(partition_key)
    return [field.to_summary() for field in field_stats]


MANIFEST_LIST_FILE_SCHEMAS: dict[int, Schema] = {
    1: Schema(
        NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(501, "manifest_length", LongType(), required=True),
        NestedField(502, "partition_spec_id", IntegerType(), required=True),
        NestedField(503, "added_snapshot_id", LongType(), required=True),
        NestedField(504, "added_files_count", IntegerType(), required=False),
        NestedField(505, "existing_files_count", IntegerType(), required=False),
        NestedField(506, "deleted_files_count", IntegerType(), required=False),
        NestedField(512, "added_rows_count", LongType(), required=False),
        NestedField(513, "existing_rows_count", LongType(), required=False),
        NestedField(514, "deleted_rows_count", LongType(), required=False),
        NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
        NestedField(519, "key_metadata", BinaryType(), required=False),
    ),
    2: Schema(
        NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(501, "manifest_length", LongType(), required=True),
        NestedField(502, "partition_spec_id", IntegerType(), required=True),
        NestedField(517, "content", IntegerType(), required=True, initial_default=ManifestContent.DATA),
        NestedField(515, "sequence_number", LongType(), required=True, initial_default=0),
        NestedField(516, "min_sequence_number", LongType(), required=True, initial_default=0),
        NestedField(503, "added_snapshot_id", LongType(), required=True),
        NestedField(504, "added_files_count", IntegerType(), required=True),
        NestedField(505, "existing_files_count", IntegerType(), required=True),
        NestedField(506, "deleted_files_count", IntegerType(), required=True),
        NestedField(512, "added_rows_count", LongType(), required=True),
        NestedField(513, "existing_rows_count", LongType(), required=True),
        NestedField(514, "deleted_rows_count", LongType(), required=True),
        NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
        NestedField(519, "key_metadata", BinaryType(), required=False),
    ),
    3: Schema(
        NestedField(500, "manifest_path", StringType(), required=True, doc="Location URI with FS scheme"),
        NestedField(501, "manifest_length", LongType(), required=True),
        NestedField(502, "partition_spec_id", IntegerType(), required=True),
        NestedField(517, "content", IntegerType(), required=True, initial_default=ManifestContent.DATA),
        NestedField(515, "sequence_number", LongType(), required=True, initial_default=0),
        NestedField(516, "min_sequence_number", LongType(), required=True, initial_default=0),
        NestedField(503, "added_snapshot_id", LongType(), required=True),
        NestedField(504, "added_files_count", IntegerType(), required=True),
        NestedField(505, "existing_files_count", IntegerType(), required=True),
        NestedField(506, "deleted_files_count", IntegerType(), required=True),
        NestedField(512, "added_rows_count", LongType(), required=True),
        NestedField(513, "existing_rows_count", LongType(), required=True),
        NestedField(514, "deleted_rows_count", LongType(), required=True),
        NestedField(507, "partitions", ListType(508, PARTITION_FIELD_SUMMARY_TYPE, element_required=True), required=False),
        NestedField(519, "key_metadata", BinaryType(), required=False),
        NestedField(520, "first_row_id", LongType(), required=False),
    ),
}

MANIFEST_LIST_FILE_STRUCTS = {format_version: schema.as_struct() for format_version, schema in MANIFEST_LIST_FILE_SCHEMAS.items()}


POSITIONAL_DELETE_SCHEMA = Schema(
    NestedField(2147483546, "file_path", StringType()), NestedField(2147483545, "pos", IntegerType())
)


class ManifestFile(Record):
    @classmethod
    def from_args(cls, _table_format_version: TableVersion = DEFAULT_READ_VERSION, **arguments: Any) -> ManifestFile:
        return super()._bind(**arguments, struct=MANIFEST_LIST_FILE_SCHEMAS[_table_format_version])

    @property
    def manifest_path(self) -> str:
        return self._data[0]

    @property
    def manifest_length(self) -> int:
        return self._data[1]

    @property
    def partition_spec_id(self) -> int:
        return self._data[2]

    @property
    def content(self) -> ManifestContent:
        return self._data[3]

    @property
    def sequence_number(self) -> int:
        return self._data[4]

    @sequence_number.setter
    def sequence_number(self, value: int) -> None:
        self._data[4] = value

    @property
    def min_sequence_number(self) -> int:
        return self._data[5]

    @min_sequence_number.setter
    def min_sequence_number(self, value: int) -> None:
        self._data[5] = value

    @property
    def added_snapshot_id(self) -> int | None:
        return self._data[6]

    @property
    def added_files_count(self) -> int | None:
        return self._data[7]

    @property
    def existing_files_count(self) -> int | None:
        return self._data[8]

    @property
    def deleted_files_count(self) -> int | None:
        return self._data[9]

    @property
    def added_rows_count(self) -> int | None:
        return self._data[10]

    @property
    def existing_rows_count(self) -> int | None:
        return self._data[11]

    @property
    def deleted_rows_count(self) -> int | None:
        return self._data[12]

    @property
    def partitions(self) -> list[PartitionFieldSummary] | None:
        return self._data[13]

    @property
    def key_metadata(self) -> bytes | None:
        return self._data[14]

    def has_added_files(self) -> bool:
        return self.added_files_count is None or self.added_files_count > 0

    def has_existing_files(self) -> bool:
        return self.existing_files_count is None or self.existing_files_count > 0

    def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> list[ManifestEntry]:
        """
        Read the manifest entries from the manifest file.

        Args:
            io: The FileIO to fetch the file.
            discard_deleted: Filter on live entries.

        Returns:
            An Iterator of manifest entries.
        """
        input_file = io.new_input(self.manifest_path)
        with AvroFile[ManifestEntry](
            input_file,
            MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
            read_types={-1: ManifestEntry, 2: DataFile},
            read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
        ) as reader:
            return [
                _inherit_from_manifest(entry, self)
                for entry in reader
                if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
            ]

    def __eq__(self, other: Any) -> bool:
        """Return the equality of two instances of the ManifestFile class."""
        return self.manifest_path == other.manifest_path if isinstance(other, ManifestFile) else False

    def __hash__(self) -> int:
        """Return the hash of manifest_path."""
        return hash(self.manifest_path)


# Global cache for ManifestFile objects, keyed by manifest_path.
# This deduplicates ManifestFile objects across manifest lists, which commonly
# share manifests after append operations.
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)

# Lock for thread-safe cache access
_manifest_cache_lock = threading.RLock()


def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
    """Read manifests from a manifest list, deduplicating ManifestFile objects via cache.

    Caches individual ManifestFile objects by manifest_path. This is memory-efficient
    because consecutive manifest lists typically share most of their manifests:

        ManifestList1: [ManifestFile1]
        ManifestList2: [ManifestFile1, ManifestFile2]
        ManifestList3: [ManifestFile1, ManifestFile2, ManifestFile3]

    With per-ManifestFile caching, each ManifestFile is stored once and reused.

    Note: The manifest list file is re-read on each call. This is intentional to
    keep the implementation simple and avoid O(N²) memory growth from caching
    overlapping manifest list tuples. Re-reading is cheap since manifest lists
    are small metadata files.

    Args:
        io: FileIO instance for reading the manifest list.
        manifest_list: Path to the manifest list file.

    Returns:
        A tuple of ManifestFile objects.
    """
    file = io.new_input(manifest_list)
    manifest_files = list(read_manifest_list(file))

    result = []
    with _manifest_cache_lock:
        for manifest_file in manifest_files:
            manifest_path = manifest_file.manifest_path
            if manifest_path in _manifest_cache:
                result.append(_manifest_cache[manifest_path])
            else:
                _manifest_cache[manifest_path] = manifest_file
                result.append(manifest_file)

    return tuple(result)


def read_manifest_list(input_file: InputFile) -> Iterator[ManifestFile]:
    """
    Read the manifests from the manifest list.

    Args:
        input_file: The input file where the stream can be read from.

    Returns:
        An iterator of ManifestFiles that are part of the list.
    """
    with AvroFile[ManifestFile](
        input_file,
        MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
        read_types={-1: ManifestFile, 508: PartitionFieldSummary},
        read_enums={517: ManifestContent},
    ) as reader:
        yield from reader


def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> ManifestEntry:
    """
    Inherits properties from manifest file.

    The properties that will be inherited are:
    - sequence numbers
    - partition spec id.

    More information about inheriting sequence numbers: https://iceberg.apache.org/spec/#sequence-number-inheritance

    Args:
        entry: The manifest entry.
        manifest: The manifest file.

    Returns:
        The manifest entry with properties inherited.
    """
    # Inherit sequence numbers.
    # The snapshot_id is required in V1, inherit with V2 when null
    if entry.snapshot_id is None and manifest.added_snapshot_id is not None:
        entry.snapshot_id = manifest.added_snapshot_id

    # in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
    # in v2 tables, the sequence number should be inherited iff the entry status is ADDED
    if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
        entry.sequence_number = manifest.sequence_number

    # in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
    # in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
    if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
        # Only available in V2, always 0 in V1
        entry.file_sequence_number = manifest.sequence_number

    # Inherit partition spec id.
    entry.data_file.spec_id = manifest.partition_spec_id

    return entry


class ManifestWriter(ABC):
    closed: bool
    _spec: PartitionSpec
    _schema: Schema
    _output_file: OutputFile
    _writer: AvroOutputFile[ManifestEntry]
    _snapshot_id: int
    _added_files: int
    _added_rows: int
    _existing_files: int
    _existing_rows: int
    _deleted_files: int
    _deleted_rows: int
    _min_sequence_number: int | None
    _partitions: list[Record]
    _compression: AvroCompressionCodec

    def __init__(
        self,
        spec: PartitionSpec,
        schema: Schema,
        output_file: OutputFile,
        snapshot_id: int,
        avro_compression: AvroCompressionCodec,
    ) -> None:
        self.closed = False
        self._spec = spec
        self._schema = schema
        self._output_file = output_file
        self._snapshot_id = snapshot_id

        self._added_files = 0
        self._added_rows = 0
        self._existing_files = 0
        self._existing_rows = 0
        self._deleted_files = 0
        self._deleted_rows = 0
        self._min_sequence_number = None
        self._partitions = []
        self._compression = avro_compression

    def __enter__(self) -> ManifestWriter:
        """Open the writer."""
        self._writer = self.new_writer()
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Close the writer."""
        if (self._added_files + self._existing_files + self._deleted_files) == 0:
            # This is just a guard to ensure that we don't write empty manifest files
            raise ValueError("An empty manifest file has been written")

        self.closed = True
        self._writer.__exit__(exc_type, exc_value, traceback)

    @abstractmethod
    def content(self) -> ManifestContent: ...

    @property
    @abstractmethod
    def version(self) -> TableVersion: ...

    @property
    def _meta(self) -> dict[str, str]:
        return {
            "schema": self._schema.model_dump_json(),
            "partition-spec": to_json(self._spec.fields).decode("utf-8"),
            "partition-spec-id": str(self._spec.spec_id),
            "format-version": str(self.version),
            AVRO_CODEC_KEY: self._compression,
        }

    def _with_partition(self, format_version: TableVersion) -> Schema:
        data_file_type = data_file_with_partition(
            format_version=format_version, partition_type=self._spec.partition_type(self._schema)
        )
        return manifest_entry_schema_with_data_file(format_version=format_version, data_file=data_file_type)

    def new_writer(self) -> AvroOutputFile[ManifestEntry]:
        return AvroOutputFile[ManifestEntry](
            output_file=self._output_file,
            file_schema=self._with_partition(self.version),
            record_schema=self._with_partition(DEFAULT_READ_VERSION),
            schema_name="manifest_entry",
            metadata=self._meta,
        )

    @abstractmethod
    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry: ...

    def to_manifest_file(self) -> ManifestFile:
        """Return the manifest file."""
        # once the manifest file is generated, no more entries can be added
        self.closed = True
        min_sequence_number = self._min_sequence_number or UNASSIGNED_SEQ
        return ManifestFile.from_args(
            manifest_path=self._output_file.location,
            manifest_length=len(self._writer.output_file),
            partition_spec_id=self._spec.spec_id,
            content=self.content(),
            sequence_number=UNASSIGNED_SEQ,
            min_sequence_number=min_sequence_number,
            added_snapshot_id=self._snapshot_id,
            added_files_count=self._added_files,
            existing_files_count=self._existing_files,
            deleted_files_count=self._deleted_files,
            added_rows_count=self._added_rows,
            existing_rows_count=self._existing_rows,
            deleted_rows_count=self._deleted_rows,
            partitions=construct_partition_summaries(self._spec, self._schema, self._partitions),
            key_metadata=None,
        )

    def add_entry(self, entry: ManifestEntry) -> ManifestWriter:
        if self.closed:
            raise RuntimeError("Cannot add entry to closed manifest writer")
        if entry.status == ManifestEntryStatus.ADDED:
            self._added_files += 1
            self._added_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.EXISTING:
            self._existing_files += 1
            self._existing_rows += entry.data_file.record_count
        elif entry.status == ManifestEntryStatus.DELETED:
            self._deleted_files += 1
            self._deleted_rows += entry.data_file.record_count
        else:
            raise ValueError(f"Unknown entry: {entry.status}")

        self._partitions.append(entry.data_file.partition)

        if (
            (entry.status == ManifestEntryStatus.ADDED or entry.status == ManifestEntryStatus.EXISTING)
            and entry.sequence_number is not None
            and (self._min_sequence_number is None or entry.sequence_number < self._min_sequence_number)
        ):
            self._min_sequence_number = entry.sequence_number

        self._writer.write_block([self.prepare_entry(entry)])
        return self

    def add(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.ADDED,
                snapshot_id=self._snapshot_id,
                sequence_number=entry.sequence_number if entry.sequence_number != UNASSIGNED_SEQ else None,
                data_file=entry.data_file,
            )
        )

        return self

    def delete(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.DELETED,
                snapshot_id=self._snapshot_id,
                sequence_number=entry.sequence_number,
                file_sequence_number=entry.file_sequence_number,
                data_file=entry.data_file,
            )
        )
        return self

    def existing(self, entry: ManifestEntry) -> ManifestWriter:
        self.add_entry(
            ManifestEntry.from_args(
                status=ManifestEntryStatus.EXISTING,
                snapshot_id=entry.snapshot_id,
                sequence_number=entry.sequence_number,
                file_sequence_number=entry.file_sequence_number,
                data_file=entry.data_file,
            )
        )
        return self


class ManifestWriterV1(ManifestWriter):
    def __init__(
        self,
        spec: PartitionSpec,
        schema: Schema,
        output_file: OutputFile,
        snapshot_id: int,
        avro_compression: AvroCompressionCodec,
    ):
        super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

    def content(self) -> ManifestContent:
        return ManifestContent.DATA

    @property
    def version(self) -> TableVersion:
        return 1

    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
        return entry


class ManifestWriterV2(ManifestWriter):
    def __init__(
        self,
        spec: PartitionSpec,
        schema: Schema,
        output_file: OutputFile,
        snapshot_id: int,
        avro_compression: AvroCompressionCodec,
    ):
        super().__init__(spec, schema, output_file, snapshot_id, avro_compression)

    def content(self) -> ManifestContent:
        return ManifestContent.DATA

    @property
    def version(self) -> TableVersion:
        return 2

    @property
    def _meta(self) -> dict[str, str]:
        return {
            **super()._meta,
            "content": "data",
        }

    def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
        if entry.sequence_number is None:
            if entry.snapshot_id is not None and entry.snapshot_id != self._snapshot_id:
                raise ValueError(f"Found unassigned sequence number for an entry from snapshot: {entry.snapshot_id}")
            if entry.status != ManifestEntryStatus.ADDED:
                raise ValueError("Only entries with status ADDED can have null sequence number")
        return entry


def write_manifest(
    format_version: TableVersion,
    spec: PartitionSpec,
    schema: Schema,
    output_file: OutputFile,
    snapshot_id: int,
    avro_compression: AvroCompressionCodec,
) -> ManifestWriter:
    if format_version == 1:
        return ManifestWriterV1(spec, schema, output_file, snapshot_id, avro_compression)
    elif format_version == 2:
        return ManifestWriterV2(spec, schema, output_file, snapshot_id, avro_compression)
    else:
        raise ValueError(f"Cannot write manifest for table version: {format_version}")


class ManifestListWriter(ABC):
    _format_version: TableVersion
    _output_file: OutputFile
    _meta: dict[str, str]
    _manifest_files: list[ManifestFile]
    _commit_snapshot_id: int
    _writer: AvroOutputFile[ManifestFile]

    def __init__(self, format_version: TableVersion, output_file: OutputFile, meta: dict[str, Any]):
        self._format_version = format_version
        self._output_file = output_file
        self._meta = meta
        self._manifest_files = []

    def __enter__(self) -> ManifestListWriter:
        """Open the writer for writing."""
        self._writer = AvroOutputFile[ManifestFile](
            output_file=self._output_file,
            record_schema=MANIFEST_LIST_FILE_SCHEMAS[DEFAULT_READ_VERSION],
            file_schema=MANIFEST_LIST_FILE_SCHEMAS[self._format_version],
            schema_name="manifest_file",
            metadata=self._meta,
        )
        self._writer.__enter__()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Close the writer."""
        self._writer.__exit__(exc_type, exc_value, traceback)
        return

    @abstractmethod
    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile: ...

    def add_manifests(self, manifest_files: list[ManifestFile]) -> ManifestListWriter:
        self._writer.write_block([self.prepare_manifest(manifest_file) for manifest_file in manifest_files])
        return self


class ManifestListWriterV1(ManifestListWriter):
    def __init__(
        self,
        output_file: OutputFile,
        snapshot_id: int,
        parent_snapshot_id: int | None,
        compression: AvroCompressionCodec,
    ):
        super().__init__(
            format_version=1,
            output_file=output_file,
            meta={
                "snapshot-id": str(snapshot_id),
                "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
                "format-version": "1",
                AVRO_CODEC_KEY: compression,
            },
        )

    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
        if manifest_file.content != ManifestContent.DATA:
            raise ValidationError("Cannot store delete manifests in a v1 table")
        return manifest_file


class ManifestListWriterV2(ManifestListWriter):
    _commit_snapshot_id: int
    _sequence_number: int

    def __init__(
        self,
        output_file: OutputFile,
        snapshot_id: int,
        parent_snapshot_id: int | None,
        sequence_number: int,
        compression: AvroCompressionCodec,
    ):
        super().__init__(
            format_version=2,
            output_file=output_file,
            meta={
                "snapshot-id": str(snapshot_id),
                "parent-snapshot-id": str(parent_snapshot_id) if parent_snapshot_id is not None else "null",
                "sequence-number": str(sequence_number),
                "format-version": "2",
                AVRO_CODEC_KEY: compression,
            },
        )
        self._commit_snapshot_id = snapshot_id
        self._sequence_number = sequence_number

    def prepare_manifest(self, manifest_file: ManifestFile) -> ManifestFile:
        wrapped_manifest_file = copy(manifest_file)

        if wrapped_manifest_file.sequence_number == UNASSIGNED_SEQ:
            # if the sequence number is being assigned here, then the manifest must be created by the current operation.
            # To validate this, check that the snapshot id matches the current commit
            if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
                raise ValueError(
                    f"Found unassigned sequence number for a manifest from snapshot: "
                    f"{self._commit_snapshot_id} != {wrapped_manifest_file.added_snapshot_id}"
                )
            wrapped_manifest_file.sequence_number = self._sequence_number

        if wrapped_manifest_file.min_sequence_number == UNASSIGNED_SEQ:
            if self._commit_snapshot_id != wrapped_manifest_file.added_snapshot_id:
                raise ValueError(
                    f"Found unassigned sequence number for a manifest from snapshot: {wrapped_manifest_file.added_snapshot_id}"
                )
            # if the min sequence number is not determined, then there was no assigned sequence number for any file
            # written to the wrapped manifest. Replace the unassigned sequence number with the one for this commit
            wrapped_manifest_file.min_sequence_number = self._sequence_number
        return wrapped_manifest_file


def write_manifest_list(
    format_version: TableVersion,
    output_file: OutputFile,
    snapshot_id: int,
    parent_snapshot_id: int | None,
    sequence_number: int | None,
    avro_compression: AvroCompressionCodec,
) -> ManifestListWriter:
    if format_version == 1:
        return ManifestListWriterV1(output_file, snapshot_id, parent_snapshot_id, avro_compression)
    elif format_version == 2:
        if sequence_number is None:
            raise ValueError(f"Sequence-number is required for V2 tables: {sequence_number}")
        return ManifestListWriterV2(output_file, snapshot_id, parent_snapshot_id, sequence_number, avro_compression)
    else:
        raise ValueError(f"Cannot write manifest list for table version: {format_version}")
