Source code for pavise.polars

"""Polars backend for type-parameterized DataFrame with Protocol-based schema validation."""

from typing import Any, Generic, Literal, TypeVar, get_args, get_origin, get_type_hints

try:
    import polars as pl
except ImportError:
    raise ImportError("Polars is not installed. Install it with: pip install pavise[polars]")

from pavise._polars.validation import validate_dataframe, validate_lazyframe_schema
from pavise.types import NotRequiredColumn

__all__ = ["DataFrame", "LazyFrame", "NotRequiredColumn"]

SchemaT_co = TypeVar("SchemaT_co", covariant=True)


def _build_empty_columns(schema: type) -> dict[str, pl.Series]:
    """
    Build empty column dict from a Protocol schema.

    Used by both DataFrame.make_empty() and LazyFrame.make_empty().
    """
    from pavise._polars.validation import _extract_type_and_validators

    type_hints = get_type_hints(schema, include_extras=True)
    columns = {}

    for col_name, col_type in type_hints.items():
        base_type, _, _, _ = _extract_type_and_validators(col_type)

        # Handle Union types (represented as tuple) - use first type
        if isinstance(base_type, tuple):
            base_type = base_type[0]

        if get_origin(base_type) is Literal:
            literal_values = get_args(base_type)
            if literal_values:
                base_type = type(literal_values[0])

        dtype = _get_dtype_for_type(base_type)
        columns[col_name] = pl.Series([], dtype=dtype)

    return columns


def _get_dtype_for_type(base_type: type) -> pl.DataType:
    """
    Get polars dtype for a given Python type.

    Args:
        base_type: Python type (int, str, float, bool, datetime, date, timedelta)

    Returns:
        Polars DataType
    """
    from pavise._polars.validation import TYPE_TO_DTYPE

    if isinstance(base_type, type) and issubclass(base_type, pl.DataType):
        return base_type()

    return TYPE_TO_DTYPE.get(base_type, pl.Utf8())


[docs] class DataFrame(pl.DataFrame, Generic[SchemaT_co]): """ Type-parameterized DataFrame with runtime validation for Polars. Usage:: # Static type checking only def process(df: DataFrame[UserSchema]) -> DataFrame[UserSchema]: return df # Runtime validation validated = DataFrame[UserSchema](raw_df) The type parameter is covariant, allowing structural subtyping. DataFrame[ChildSchema] is compatible with DataFrame[ParentSchema] when ChildSchema has all columns of ParentSchema. """ _schema: type | None = None
[docs] def __class_getitem__(cls, schema: type): """Create a new DataFrame class with schema validation.""" class TypedDataFrame(DataFrame): _schema = schema return TypedDataFrame
[docs] def __init__(self, data: Any, *args: Any, strict: bool = False, **kwargs: Any): """ Initialize DataFrame with optional schema validation. Args: data: Data to create DataFrame from (pl.DataFrame or dict/list) *args: Additional arguments passed to pl.DataFrame strict: If True, raise error on extra columns not in schema **kwargs: Additional keyword arguments passed to pl.DataFrame Raises: ValueError: If required column is missing TypeError: If column has wrong type """ pl.DataFrame.__init__(self, data, *args, **kwargs) # type: ignore[misc] if self._schema is not None: validate_dataframe(self, self._schema, strict=strict)
[docs] @classmethod def make_empty(cls) -> "DataFrame[SchemaT_co]": """ Create an empty DataFrame with columns from the schema. Returns: DataFrame: Empty DataFrame with correct column types """ if cls._schema is None: return cls({}) return cls(_build_empty_columns(cls._schema))
[docs] class LazyFrame(pl.LazyFrame, Generic[SchemaT_co]): """ Type-parameterized LazyFrame with runtime schema validation for Polars. Schema validation happens at construction time using collect_schema(). Value-based validators (Range, Unique, etc.) are only checked on collect(). """ _schema: type | None = None
[docs] def __class_getitem__(cls, schema: type): """Create a new LazyFrame class with schema validation.""" class TypedLazyFrame(LazyFrame): _schema = schema return TypedLazyFrame
[docs] def __new__(cls, data: pl.LazyFrame, strict: bool = False): # noqa: ARG003 """Create LazyFrame instance by copying internal state from source.""" instance = object.__new__(cls) instance._ldf = data._ldf return instance
[docs] def __init__(self, _data: pl.LazyFrame, strict: bool = False): """ Initialize LazyFrame with schema validation. Args: data: Polars LazyFrame to wrap strict: If True, raise error on extra columns not in schema """ if self._schema is not None: validate_lazyframe_schema(self, self._schema, strict=strict)
[docs] def collect(self) -> "DataFrame[SchemaT_co]": # type: ignore[override] """ Collect LazyFrame into DataFrame with full validation. Returns: DataFrame[Schema] with all validators applied """ df = pl.LazyFrame.collect(self) if self._schema is not None: return DataFrame[self._schema](df) # type: ignore[valid-type] return DataFrame(df)
[docs] @classmethod def make_empty(cls) -> "LazyFrame[SchemaT_co]": """ Create an empty LazyFrame with columns from the schema. Returns: LazyFrame: Empty LazyFrame with correct column types """ if cls._schema is None: return cls(pl.LazyFrame({})) return cls(pl.DataFrame(_build_empty_columns(cls._schema)).lazy())