"""
Module ABC récupérer des données des stations de marée.
Ce module contient la classe abstraite `StationsHandlerABC` qui définit les méthodes pour récupérer les données stations de marée.
"""
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta, datetime
from itertools import repeat
from pathlib import Path
from typing import Optional, Collection
import geopandas as gpd
import pandas as pd
from loguru import logger
from shapely.geometry import Point
import i18n
from .cache_wrapper import cache_result, init_cache
from .exception_stations import StationsError
from .stations_models import TimeSeriesProtocol, ResponseProtocol, IWLSapiProtocol
import schema
from schema import model_ids as schema_ids, validate_schemas
LOGGER = logger.bind(name="CSB-Processing.Tide.Station.ABC")
[docs]
class StationsHandlerABC(ABC):
"""
Classe abstraite pour récupérer des données stations de marée.
"""
def __init__(
self,
api: IWLSapiProtocol,
ttl: int,
cache_path: Path,
):
"""
Initialisation de la classe abstraite `StationsHandlerABC`.
:param api: API pour récupérer les données des stations.
:type api: IWLSapiProtocol
:param ttl: Durée de vie du cache en secondes.
:type ttl: int
:param cache_path: Chemin du cache.
:type cache_path: Path
"""
LOGGER.debug(
i18n.t(
"tide.stations.stations_abc.init_object",
class_name=self.__class__.__name__,
)
)
self.api: IWLSapiProtocol = api
self.ttl: int = ttl
init_cache(cache_path=cache_path)
@property
def stations(self) -> list[dict]:
"""
Récupère la liste des stations.
:return: Liste des stations.
:rtype: list[dict]
"""
stations: ResponseProtocol = self.api.get_all_stations()
if not stations.is_ok:
LOGGER.error(
i18n.t(
"tide.stations.stations_abc.error_getting_stations",
message=stations.message,
error=stations.error,
)
)
raise StationsError(
message=stations.message,
error=stations.error,
status_code=stations.status_code,
)
return stations.data
[docs]
def get_station_id_by_code(
self,
station_code: str,
) -> str | None:
"""
Récupère l'identifiant d'une station en fonction de son code.
:param station_code: Code de la station.
:type station_code: str
:return: Identifiant de la station.
:rtype: str | None
"""
@cache_result(ttl=self.ttl)
def create_code_id_map() -> dict[str, str]:
"""
Crée une carte d'identifiant et de code pour les stations.
:return: Carte d'identifiant et de code pour les stations.
:rtype: dict[str, str]
"""
LOGGER.debug(i18n.t("tide.stations.stations_abc.creating_code_id_map"))
return {station["code"]: station["id"] for station in self.stations}
code_id_map: dict[str, str] = create_code_id_map()
return code_id_map.get(station_code)
[docs]
@staticmethod
def _create_index_map(
filter_time_series: Collection[TimeSeriesProtocol],
) -> dict[TimeSeriesProtocol, int]:
"""
Crée une carte d'index pour les séries temporelles.
:param filter_time_series: Liste des séries temporelles en ordre de priorité.
:type filter_time_series: Collection[TimeSeriesProtocol]
:return: Carte d'index pour les séries temporelles.
:rtype: dict[TimeSeriesProtocol, int]
"""
return {code: index for index, code in enumerate(filter_time_series)}
[docs]
@staticmethod
@abstractmethod
def _filter_stations(
stations: Collection[dict],
filter_time_series: Collection[TimeSeriesProtocol],
excluded_stations: Collection[str] | None,
) -> list[dict]:
"""
Filtre les stations en fonction des séries temporelles.
:param stations: Liste des stations.
:type stations: Collection[dict]
:param filter_time_series: Liste des séries temporelles pour filtrer les stations.
:type filter_time_series: Collection[TimeSeriesProtocol]
:param excluded_stations: Liste des stations à exclure.
:type excluded_stations: Collection[str] | None
:return: Liste des stations filtrées.
:rtype: list[dict]
"""
...
[docs]
@staticmethod
def _create_geometry(stations: Collection[dict]) -> list[Point]:
"""
Crée une liste de points à partir des données des stations.
:param stations: Liste des stations.
:type stations: Collection[dict]
:return: Liste des points.
:rtype: list[Point]
"""
LOGGER.debug(i18n.t("tide.stations.stations_abc.creating_geometry"))
return [
Point(station["longitude"], station["latitude"]) for station in stations
]
[docs]
@staticmethod
@abstractmethod
def _get_time_series(
station: dict, index_map: dict[TimeSeriesProtocol, int] | None
) -> list[str]:
"""
Récupère les séries temporelles de la station.
:param station: Données de la station.
:type station: dict
:param index_map: Carte d'index pour les séries temporelles.
:type index_map: dict[str, int] | None
:return: Liste des séries temporelles.
:rtype: list[str]
"""
...
[docs]
def _create_attributes(
self,
stations: Collection[dict],
index_map: dict[TimeSeriesProtocol, int] | None,
station_name_key: str,
) -> list[dict]:
"""
Crée une liste d'attributs pour les stations.
:param stations: Liste des stations.
:type stations: Collection[dict]
:param index_map: Carte d'index pour les séries temporelles.
:type index_map: dict[str, int] | None
:param station_name_key: Clé du nom de la station.
:type station_name_key: str
:return: Liste des attributs.
:rtype: list[dict]
"""
LOGGER.debug(i18n.t("tide.stations.stations_abc.creating_attributes"))
return [
{
schema_ids.ID: station["id"],
schema_ids.CODE: station["code"],
schema_ids.NAME: station[station_name_key].replace("/", "-"),
schema_ids.TIME_SERIES: (
["Unknown"]
if index_map is None
else sorted(
self._get_time_series(station=station, index_map=index_map),
key=lambda code: index_map.get(code, float("inf")),
)
),
schema_ids.IS_TIDAL: str(station["isTidal"]),
}
for station in stations
]
[docs]
def _fetch_is_tidal_station(
self, sation_id: str, api: str, column_name: str
) -> bool | None:
"""
Récupère l'information si la station est une station de marée.
:param sation_id: dentifiant de la station.
:type sation_id: str
:param api: Type de l'API.
:type api: str
:param column_name: Nom de la colonne.
:type sation_id: str
:return: True si la station est une station de marée, False sinon.
:rtype: bool | None
"""
@cache_result(ttl=self.ttl)
def _is_tidal_station(station_id_: str, **kwargs) -> bool | None:
metadata: dict = self.api.get_metadata_station( # type: ignore[arg-type]
station=station_id_
).data
if metadata is None:
return None
return metadata.get(column_name)
return _is_tidal_station(station_id_=sation_id, api=api)
[docs]
def _get_stations_tidal_info(
self, stations: list[dict], api: str, column_name: str
) -> list[bool | None]:
"""
Récupère les informations sur les stations de marée.
:param stations: Liste des stations.
:type stations: list[dict]
:param api: Type de l'API.
:type api: str
:param column_name: Nom de la colonne.
:type column_name: str
:return: Liste des informations sur les stations de marée.
:rtype: list[bool | None]
"""
with ThreadPoolExecutor(max_workers=10) as executor:
tidal_info_list = list(
executor.map(
self._fetch_is_tidal_station,
stations,
repeat(api),
repeat(column_name),
)
)
return tidal_info_list
[docs]
@schema.validate_schemas(return_schema=schema.StationsSchema)
def _get_stations_geodataframe(
self,
stations: Collection[dict],
filter_time_series: Collection[TimeSeriesProtocol],
excluded_stations: Collection[str] | None,
station_name_key: str,
) -> gpd.GeoDataFrame:
"""
Récupère les données des stations sous forme de GeoDataFrame.
:param stations: Liste des stations.
:type stations: Collection[dict]
:param filter_time_series: Liste des séries temporelles pour filtrer les stations.
:type filter_time_series: Collection[TimeSeriesProtocol]
:param excluded_stations: Liste des stations à exclure.
:type excluded_stations: Collection[str] | None
:param station_name_key: Clé du nom de la station.
:type station_name_key: str
:return: Données des stations sous forme de GeoDataFrame.
:rtype: gpd.GeoDataFrame[schema.StationsSchema]
"""
LOGGER.debug(i18n.t("tide.stations.stations_abc.creating_stations_gdf"))
filtered_stations: list[dict] = (
self._filter_stations(
stations=stations,
filter_time_series=filter_time_series,
excluded_stations=excluded_stations,
)
if filter_time_series or excluded_stations
else stations
)
geometry: list[Point] = self._create_geometry(stations=filtered_stations)
attributes: list[dict] = self._create_attributes(
stations=filtered_stations,
index_map=(
self._create_index_map(filter_time_series)
if filter_time_series
else None
),
station_name_key=station_name_key,
)
gdf_stations: gpd.GeoDataFrame[schema.StationsSchema] = gpd.GeoDataFrame(
attributes, geometry=geometry, crs="EPSG:4326"
).astype(
{
schema_ids.ID: pd.StringDtype(),
schema_ids.NAME: pd.StringDtype(),
schema_ids.CODE: pd.StringDtype(),
}
)
return gdf_stations
[docs]
@abstractmethod
def get_stations_geodataframe(
self,
filter_time_series: Collection[TimeSeriesProtocol],
excluded_stations: Collection[str] | None = None,
station_name_key: Optional[str] = "officialName",
) -> gpd.GeoDataFrame:
"""
Récupère les données des stations sous forme de GeoDataFrame.
:param filter_time_series: Liste des séries temporelles pour filtrer les stations.
:type filter_time_series: Collection[TimeSeriesProtocol]
:param excluded_stations: Liste des stations à exclure.
:type excluded_stations: Collection[str] | None
:param station_name_key: Clé du nom de la station.
:type station_name_key: str
:return: Données des stations sous forme de GeoDataFrame.
:rtype: gpd.GeoDataFrame[schema.StationsSchema]
"""
...
[docs]
def get_stations_geodataframe_from_codes(self, station_codes, filter_time_series):
"""
Récupère les données d'une station sous forme de GeoDataFrame.
:param station_codes: Liste des codes des stations.
:type station_codes: Collection[str]
:param filter_time_series: Liste des séries temporelles pour filtrer les stations.
:type filter_time_series: Collection[TimeSeriesProtocol]
:return: Données de la station sous forme de GeoDataFrame.
:rtype: gpd.GeoDataFrame[schema.StationsSchema]
"""
LOGGER.debug(
i18n.t(
"tide.stations.stations_abc.getting_stations_data", codes=station_codes
)
)
gdf_stations: gpd.GeoDataFrame = self.get_stations_geodataframe(
filter_time_series=filter_time_series
)
gdf_station: gpd.GeoDataFrame = gdf_stations[
gdf_stations[schema_ids.CODE].isin(station_codes)
]
if gdf_station.empty:
LOGGER.error(
i18n.t(
"tide.stations.stations_abc.no_data_for_stations",
codes=station_codes,
)
)
raise StationsError(
message=i18n.t(
"tide.stations.stations_abc.no_data_for_stations",
codes=station_codes,
),
error="StationNotFound",
status_code=404,
)
return gdf_station
[docs]
@staticmethod
@abstractmethod
def _get_event_date(event: dict) -> datetime:
"""
Récupère la date de l'événement.
:param event: Données de l'événement.
:type event: dict
:return: Date de l'événement.
:rtype: datetime
"""
...
[docs]
@staticmethod
@abstractmethod
def _get_qc_flag(event: dict) -> str:
"""
Récupère le type du flag de qualité.
:param event: Données de l'événement.
:type event: dict
:return: Type du flag de qualité.
:rtype: str
"""
...
[docs]
def create_data_list(
self, data: Collection[dict], time_serie_code: TimeSeriesProtocol
) -> list[dict]:
"""
Crée une liste de données pour les séries temporelles.
:param data: Données de la série temporelle.
:type data: Collection[dict]
:param time_serie_code: Le code de la série temporelle.
:type time_serie_code: TimeSeriesProtocol
:return: Liste des données.
:rtype: list[dict]
"""
return [
{
schema_ids.EVENT_DATE: self._get_event_date(event=event),
schema_ids.VALUE: event["value"],
schema_ids.TIME_SERIE_CODE: time_serie_code,
"qc_flag": self._get_qc_flag(event=event),
}
for event in data
]
[docs]
@staticmethod
def filter_wlo_qc_flag(
data_dataframe: pd.DataFrame,
time_serie_code: TimeSeriesProtocol,
wlo_qc_flag_filter: Optional[Collection[str] | None] = None,
) -> pd.DataFrame:
"""
Filtre les données de la série temporelle WLO en fonction des flags de qualité.
:param data_dataframe: Données des séries temporelles sous forme de DataFrame.
:type data_dataframe: pd.DataFrame
:param time_serie_code: e code de la série temporelle des données.
:type time_serie_code: TimeSeriesProtocol
:param wlo_qc_flag_filter: Liste des flags de qualité à filtrer pour la série temporelle WLO.
:type wlo_qc_flag_filter: Collection[str] | None
:return: Données des séries temporelles sous forme de DataFrame.
:rtype: pd.DataFrame
"""
if time_serie_code == TimeSeriesProtocol.WLO:
data_dataframe = (
data_dataframe[~data_dataframe["qc_flag"].isin(wlo_qc_flag_filter)]
if wlo_qc_flag_filter
else data_dataframe
)
return data_dataframe
[docs]
@validate_schemas(return_schema=schema.WaterLevelSerieDataSchema)
def get_time_series_dataframe(
self,
station: str,
from_time: str,
to_time: str,
time_serie_code: Optional[TimeSeriesProtocol],
time_delta: Optional[timedelta] = timedelta(days=7),
datetime_sorted: Optional[bool] = True,
wlo_qc_flag_filter: Optional[Collection[str] | None] = None,
**kwargs,
) -> pd.DataFrame:
"""
Récupère les séries temporelles sous forme de DataFrame.
:param station: Code de la station.
:type station: str
:param from_time: La date de début en format ISO 8601 (ex: 2019-11-13T19:18:00Z).
:type from_time: str
:param to_time: La date de fin en format ISO 8601 (ex: 2019-11-13T19:18:00Z).
:type to_time: str
:param time_serie_code: Le code de la série temporelle désirée.
:type time_serie_code: TimeSeriesProtocol
:param time_delta: L'intervalle de temps maximale pour chaque requête.
:type time_delta: timedelta
:param datetime_sorted: Si les données doivent être triées par date.
:type datetime_sorted: bool
:param wlo_qc_flag_filter: Liste des flags de qualité à filtrer pour la série temporelle WLO.
:type wlo_qc_flag_filter: Collection[str] | None
:return: Données des séries temporelles sous forme de DataFrame.
:rtype: pd.DataFrame[schema.WaterLevelSerieDataSchema]
"""
LOGGER.debug(
i18n.t(
"tide.stations.stations_abc.fetching_time_series_data",
time_serie=time_serie_code,
station=station,
from_time=from_time,
to_time=to_time,
time_delta=time_delta,
)
)
data: ResponseProtocol = self.api.get_time_serie_block_data(
station=station,
from_time=from_time,
to_time=to_time,
time_serie_code=time_serie_code,
time_delta=time_delta,
datetime_sorted=datetime_sorted,
**kwargs,
)
if not data.is_ok:
LOGGER.error(
i18n.t(
"tide.stations.stations_abc.error_fetching_data",
status_code=data.status_code,
station=station,
time_serie=time_serie_code,
from_time=from_time,
to_time=to_time,
message=data.message,
error=data.error,
)
)
return pd.DataFrame()
if not data.data:
LOGGER.warning(
i18n.t(
"tide.stations.stations_abc.no_time_serie_data",
time_serie=time_serie_code,
station=station,
from_time=from_time,
to_time=to_time,
)
)
return pd.DataFrame(
columns=list(schema.WaterLevelSerieDataSchema.__annotations__.keys())
)
data_list: list[dict] = self.create_data_list(
data=data.data, time_serie_code=time_serie_code # type: ignore
)
data_dataframe: pd.DataFrame[schema.WaterLevelSerieDataSchema] = pd.DataFrame(
data_list
).astype({schema_ids.TIME_SERIE_CODE: pd.StringDtype()})
data_dataframe = self.filter_wlo_qc_flag(
data_dataframe=data_dataframe,
time_serie_code=time_serie_code,
wlo_qc_flag_filter=wlo_qc_flag_filter,
)
data_dataframe.drop(columns=["qc_flag"], inplace=True)
return data_dataframe