# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib
import logging
import uuid
from abc import ABC, abstractmethod

import mmh3

from pyiceberg.partitioning import PartitionKey
from pyiceberg.typedef import Properties
from pyiceberg.utils.properties import property_as_bool

logger = logging.getLogger(__name__)


class LocationProvider(ABC):
    """A base class for location providers, that provide file locations for a table's write tasks.

    Args:
        table_location (str): The table's base storage location.
        table_properties (Properties): The table's properties.
    """

    table_location: str
    table_properties: Properties

    data_path: str
    metadata_path: str

    def __init__(self, table_location: str, table_properties: Properties):
        self.table_location = table_location
        self.table_properties = table_properties

        from pyiceberg.table import TableProperties

        if path := table_properties.get(TableProperties.WRITE_DATA_PATH):
            self.data_path = path.rstrip("/")
        else:
            self.data_path = f"{self.table_location.rstrip('/')}/data"

        if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
            self.metadata_path = path.rstrip("/")
        else:
            self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"

    @abstractmethod
    def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
        """Return a fully-qualified data file location for the given filename.

        Args:
            data_file_name (str): The name of the data file.
            partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned.

        Returns:
            str: A fully-qualified location URI for the data file.
        """

    def new_table_metadata_file_location(self, new_version: int = 0) -> str:
        """Return a fully-qualified metadata file location for a new table version.

        Args:
            new_version (int): Version number of the metadata file.

        Returns:
            str: fully-qualified URI for the new table metadata file.

        Raises:
            ValueError: If the version is negative.
        """
        if new_version < 0:
            raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

        file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
        return self.new_metadata_location(file_name)

    def new_metadata_location(self, metadata_file_name: str) -> str:
        """Return a fully-qualified metadata file location for the given filename.

        Args:
            metadata_file_name (str): Name of the metadata file.

        Returns:
            str: A fully-qualified location URI for the metadata file.
        """
        return f"{self.metadata_path}/{metadata_file_name}"


class SimpleLocationProvider(LocationProvider):
    def __init__(self, table_location: str, table_properties: Properties):
        super().__init__(table_location, table_properties)

    def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
        return (
            f"{self.data_path}/{partition_key.to_path()}/{data_file_name}"
            if partition_key
            else f"{self.data_path}/{data_file_name}"
        )


class ObjectStoreLocationProvider(LocationProvider):
    HASH_BINARY_STRING_BITS = 20
    ENTROPY_DIR_LENGTH = 4
    ENTROPY_DIR_DEPTH = 3

    _include_partition_paths: bool

    def __init__(self, table_location: str, table_properties: Properties):
        super().__init__(table_location, table_properties)
        from pyiceberg.table import TableProperties

        self._include_partition_paths = property_as_bool(
            self.table_properties,
            TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS,
            TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT,
        )

    def new_data_location(self, data_file_name: str, partition_key: PartitionKey | None = None) -> str:
        if self._include_partition_paths and partition_key:
            return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")

        hashed_path = self._compute_hash(data_file_name)

        return (
            f"{self.data_path}/{hashed_path}/{data_file_name}"
            if self._include_partition_paths
            else f"{self.data_path}/{hashed_path}-{data_file_name}"
        )

    @staticmethod
    def _compute_hash(data_file_name: str) -> str:
        # Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip.
        top_mask = 1 << ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS
        hash_code = mmh3.hash(data_file_name) & (top_mask - 1) | top_mask
        return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-ObjectStoreLocationProvider.HASH_BINARY_STRING_BITS :])

    @staticmethod
    def _dirs_from_hash(file_hash: str) -> str:
        """Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH."""
        total_entropy_length = ObjectStoreLocationProvider.ENTROPY_DIR_DEPTH * ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH

        hash_with_dirs = []
        for i in range(0, total_entropy_length, ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH):
            hash_with_dirs.append(file_hash[i : i + ObjectStoreLocationProvider.ENTROPY_DIR_LENGTH])

        if len(file_hash) > total_entropy_length:
            hash_with_dirs.append(file_hash[total_entropy_length:])

        return "/".join(hash_with_dirs)


def _import_location_provider(
    location_provider_impl: str, table_location: str, table_properties: Properties
) -> LocationProvider | None:
    try:
        path_parts = location_provider_impl.split(".")
        if len(path_parts) < 2:
            from pyiceberg.table import TableProperties

            raise ValueError(
                f"{TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL} should be full path "
                f"(module.CustomLocationProvider), got: {location_provider_impl}"
            )
        module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1]
        module = importlib.import_module(module_name)
        class_ = getattr(module, class_name)
        return class_(table_location, table_properties)
    except ModuleNotFoundError:
        logger.warning(
            f"Could not initialize LocationProvider: {location_provider_impl}",
            exc_info=logger.isEnabledFor(logging.DEBUG),
        )
        return None


def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider:
    from pyiceberg.table import TableProperties

    table_location = table_location.rstrip("/")

    if location_provider_impl := table_properties.get(TableProperties.WRITE_PY_LOCATION_PROVIDER_IMPL):
        if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties):
            logger.info("Loaded LocationProvider: %s", location_provider_impl)
            return location_provider
        else:
            raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}")

    if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT):
        return ObjectStoreLocationProvider(table_location, table_properties)
    else:
        return SimpleLocationProvider(table_location, table_properties)
