"""
Module qui contient les schémas des dataframes.
Ce module contient les schémas des dataframes des DataLoggers, des stations, des séries temporelles et des zones de marées.
"""
from dataclasses import dataclass, field
from enum import StrEnum
import functools
from typing import Optional, Callable, Type, Any
import geopandas as gpd
import i18n
import numpy as np
import pandas as pd
import pandera.pandas as pa
from loguru import logger
from pandas import DataFrame
from pandera.typing import Series
from pandera.typing.geopandas import GeoSeries
from . import model_ids as schema_ids
LOGGER = logger.bind(name="CSB-Processing.Schema")
[docs]
@dataclass
class WaterLevelInfo:
"""
Classe pour les informations sur les niveaux d'eau.
"""
water_level_meter: float = np.nan
"""Le niveau d'eau en mètre."""
time_series: str = None
"""Le code de la série temporelle."""
id: str = None
"""L'identifiant de la station."""
name: str = None
"""Le nom de la station."""
code: str = None
"""Le code de la station."""
def __str__(self) -> str:
water_level: str = (
f"{self.water_level_meter} m"
if not np.isnan(self.water_level_meter)
else None
)
return (
f"WaterLevelInfo("
f"{self.time_series if water_level else '<NA>'} - "
f"{self.id if water_level else '<NA>'} - "
f"{self.name if water_level else '<NA>'} - "
f"{self.code if water_level else '<NA>'})"
)
[docs]
class Status(StrEnum):
"""
Enum pour les statuts de filtrage des données.
"""
ACCEPTED = "accepted"
REJECTED_BY_SPEED_FILTER = "rejected by speed filter"
REJECTED_BY_LATITUDE_FILTER = "rejected by latitude filter"
REJECTED_BY_LONGITUDE_FILTER = "rejected by longitude filter"
REJECTED_BY_TIME_FILTER = "rejected by time filter"
REJECTED_BY_DEPTH_FILTER = "rejected by depth filter"
[docs]
@dataclass
class OutlierInfo:
tags: list[Status] = field(default_factory=list)
def __str__(self) -> str:
return " | ".join(map(str, self.tags)) if self.tags else ""
[docs]
class DataLoggerSchema(pa.DataFrameModel):
"""
Schéma des données des DataLoggers.
"""
Latitude_WGS84: Series[pd.Float64Dtype()]
Longitude_WGS84: Series[pd.Float64Dtype()]
Time_UTC: Series[pd.DatetimeTZDtype("ns", tz="UTC")]
Depth_raw_meter: Series[pd.Float64Dtype()]
Depth_processed_meter: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
Speed_kn: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
Water_level_meter: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
Water_level_info: Series[object] = pa.Field(nullable=True)
Uncertainty_station_meter: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
SSP_uncertainty_percent: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
Uncertainty: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
THU: Series[pd.Float64Dtype()] = pa.Field(nullable=True)
IHO_order: Series[str] = pa.Field(nullable=True)
Outlier: Series[object] = pa.Field(nullable=True)
geometry: GeoSeries
[docs]
class Config:
coerce = True
[docs]
class DataLoggerWithTideZoneSchema(DataLoggerSchema):
"""
Schéma des données des DataLoggers avec les zones de marées.
"""
Time_serie: Series[str] = pa.Field(nullable=True)
Tide_zone_id: Series[str] = pa.Field(nullable=True)
Tide_zone_code: Series[str] = pa.Field(nullable=True)
Tide_zone_name: Series[str] = pa.Field(nullable=True)
[docs]
class Config:
coerce = True
[docs]
class DataLoggerWithVoronoiSchema(DataLoggerSchema):
"""
Schéma des données des DataLoggers avec les zones de Voronoi.
"""
id: Series[str] = pa.Field(nullable=True)
code: Series[str] = pa.Field(nullable=True)
name: Series[str] = pa.Field(nullable=True)
Time_serie: Series[object] = pa.Field(nullable=True)
[docs]
class WaterLevelSerieDataSchema(pa.DataFrameModel):
"""
Schéma des séries temporelles.
"""
event_date: Series[pd.DatetimeTZDtype("ns", tz="UTC")]
value: Series[pd.Float64Dtype()]
time_serie_code: Series[str]
[docs]
class Config:
coerce = True
[docs]
class StationsSchema(pa.DataFrameModel):
"""
Schéma des stations.
"""
id: Series[str]
code: Series[str]
name: Series[str]
time_series: Series[object]
is_tidal: Series[object] = pa.Field(
nullable=True
) # On utilise object pour accepter les booléens et les None
geometry: GeoSeries
[docs]
class Config:
coerce = True
[docs]
class TideZoneProtocolSchema(pa.DataFrameModel):
"""
Schéma des protocoles des zones de marées.
"""
id: Series[str]
time_series: Series[object]
[docs]
class Config:
coerce = True
[docs]
class TideZoneStationSchema(TideZoneProtocolSchema):
"""
Schéma des zones de marées extraite des stations.
"""
code: Series[str]
name: Series[str]
is_tidal: Series[object] = pa.Field(
nullable=True
) # On utilise object pour accepter les booléens et les None
geometry: GeoSeries
station_position: GeoSeries
[docs]
class TideZoneInfoSchema(pa.DataFrameModel):
"""
Schéma des informations des zones de marées.
"""
Tide_zone_id: Series[str]
time_series: Series[object]
min_time: Series[pd.DatetimeTZDtype("ns", tz="UTC")]
max_time: Series[pd.DatetimeTZDtype("ns", tz="UTC")]
[docs]
class Config:
coerce = True
[docs]
def validate_schema(
data: gpd.GeoDataFrame | DataFrame, schema: Type[pa.DataFrameModel]
) -> None:
"""
Valide le schéma des stations.
:param data: Les stations.
:type data: gpd.GeoDataFrame | DataFrame
:param schema: Le schéma.
:type schema: Type[pa.DataFrameModel]
"""
try:
LOGGER.debug(i18n.t("schema.model.validating_schema", schema=schema))
schema.validate(data)
except pa.errors.SchemaError as error:
LOGGER.error(
i18n.t("schema.model.schema_validation_error", schema=schema, error=error)
)
LOGGER.error(
i18n.t(
"schema.model.schema_expected_attrs",
schema=schema,
attrs=schema.__annotations__,
)
)
raise error
[docs]
def validate_schemas(
return_schema: Optional[Type[pa.DataFrameModel]] = None,
**schemas: Type[pa.DataFrameModel],
) -> Callable:
"""
Valide les schémas des dataframes.
:param return_schema: Le schéma de retour.
:type return_schema: Optional[Type[pa.DataFrameModel]]
:param schemas: Les schémas.
:type schemas: : Type[pa.DataFrameModel]]
:return: La fonction décorée.
:rtype: Callable
"""
def decorator_validate(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper_validate(*args, **kwargs):
# Valider les arguments d'entrée
for arg_name, arg_schema in schemas.items():
if arg_name in kwargs:
validate_schema(kwargs[arg_name], arg_schema)
else:
raise ValueError(
i18n.t("schema.model.error_param_not_found", param=arg_name)
)
# Appeler la fonction originale
result = func(*args, **kwargs)
# Valider le résultat de la fonction
if return_schema is not None and not result.empty:
validate_schema(result, return_schema)
return result
return wrapper_validate
return decorator_validate