"""
Module permettant de définir une classe abstraite pour les parsers de données.
"""
from abc import ABC, abstractmethod
import concurrent.futures
from dataclasses import dataclass
from pathlib import Path
from typing import Collection, Optional
import geopandas as gpd
import i18n
from loguru import logger
import pandas as pd
from .parsing_exception import ColumnException
from .warning_capture import WarningCapture
import schema
from schema import model_ids as schema_ids
LOGGER = logger.bind(name="CSB-Processing.Ingestion.Parser.ABC")
MANDATORY_COLUNMS: list[str] = [
schema_ids.TIME_UTC,
schema_ids.LATITUDE_WGS84,
schema_ids.LONGITUDE_WGS84,
schema_ids.DEPTH_RAW_METER,
]
[docs]
@dataclass
class DataParserABC(ABC):
"""
Classe abstraite pour les parsers de données.
"""
[docs]
@staticmethod
def validate_columns(
dataframe: pd.DataFrame,
file: Path,
column_exceptions: Collection[ColumnException],
) -> None:
"""
Méthode permettant de valider les colonnes du dataframe.
:param dataframe: Le dataframe à valider.
:type dataframe: pd.DataFrame
:param file: Le fichier source.
:type file: Path
:param column_exceptions: Les noms et les exceptions de colonnes.
:type column_exceptions: Collection[ColumnException]
:raises ParsingDataframeLongitudeError: Erreur si la colonne de longitude est absente.
:raises ParsingDataframeLatitudeError: Erreur si la colonne de latitude est absente.
:raises ParsingDataframeDepthError: Erreur si la colonne de profondeur est absente.
:raises ParsingDataframeTimeError: Erreur si la colonne de temps est absente.
"""
LOGGER.debug(
i18n.t(
"ingestion.parser_abc.validating_columns",
columns=[c.column_name for c in column_exceptions],
file=file,
)
)
for column_ in column_exceptions:
if column_.column_name not in dataframe.columns:
raise column_.error(file=file, column=column_.column_name) # type: ignore[arg-type]
[docs]
@staticmethod
def convert_dtype(
dataframe: pd.DataFrame | gpd.GeoDataFrame,
dtype_dict: dict[str, str],
file: Path,
time_column: Optional[str] = None,
time_format: Optional[str] = None,
) -> pd.DataFrame | gpd.GeoDataFrame:
"""
Méthode permettant de convertir et nettoyer le dataframe.
:param dataframe: Le dataframe à convertir.
:type dataframe: pd.DataFrame | gpd.GeoDataFrame
:param dtype_dict: Un dictionnaire de type de données.
:type dtype_dict: dict[str, str]
:param time_column: Le nom de la colonne de temps.
:type time_column: str | None
:param file: Le fichier source.
:type file: Path
:param time_format: Le format de la colonne de temps.
:type time_format: str | None
:return: Le dataframe converti et nettoyé.
:rtype: pd.DataFrame | gpd.GeoDataFrame
"""
LOGGER.debug(
i18n.t(
"ingestion.parser_abc.converting_dtype",
columns=list(dtype_dict.keys())
+ ([time_column] if time_column else []),
file=file,
)
)
with WarningCapture() as warnings_list:
if time_column is not None:
LOGGER.debug(
i18n.t(
"ingestion.parser_abc.converting_time_column",
column=time_column,
file=file,
)
)
series = pd.to_datetime(
dataframe[time_column],
errors="coerce",
utc=True,
format=time_format,
)
dataframe[time_column] = series
for column_ in dtype_dict.keys():
if column_ in dataframe.columns:
dataframe[column_] = pd.to_numeric(
dataframe[column_], errors="coerce"
)
if warnings_list.captured_warnings:
LOGGER.warning(
i18n.t(
"ingestion.parser_abc.conversion_warnings",
file=file,
warnings=warnings_list.captured_warnings,
)
)
return dataframe
[docs]
@abstractmethod
def read(self, file: Path, **kwargs) -> gpd.GeoDataFrame:
"""
Méthode permettant de lire un fichier brut et retourne un geodataframe.
:param file: Le fichier à lire.
:type file: Path
:return: Un GeoDataFrame.
:rtype: gpd.GeoDataFrame
"""
pass
[docs]
def read_files(self, files: Collection[Path]) -> gpd.GeoDataFrame:
"""
Méthode permettant de lire les fichiers brutes et retourne un geodataframe.
:param files: Les fichiers à lire.
:type files: Collection[Path]
:return: Un GeoDataFrame.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(i18n.t("ingestion.parser_abc.converting_files", files=files))
geodataframe_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(self.read, file) for file in files]
for future in futures:
geodataframe_list.append(future.result())
return gpd.GeoDataFrame(pd.concat(geodataframe_list, ignore_index=True))
[docs]
@staticmethod
def drop_na(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de supprimer les lignes contenant des valeurs manquantes.
:param data: Le geodataframe à nettoyer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe nettoyé.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(
i18n.t("ingestion.parser_abc.dropping_na", columns=MANDATORY_COLUNMS)
)
initial_count: int = len(data)
data = data.dropna(subset=MANDATORY_COLUNMS)
missing_values_count: int = initial_count - len(data)
if missing_values_count > 0:
LOGGER.warning(
i18n.t(
"ingestion.parser_abc.missing_values_removed",
count=f"{missing_values_count:,}",
columns=MANDATORY_COLUNMS,
)
)
return data
[docs]
@staticmethod
def remove_duplicates(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de supprimer les doublons du geodataframe.
:param data: Le geodataframe à nettoyer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe nettoyé.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(i18n.t("ingestion.parser_abc.removing_duplicates"))
initial_count: int = len(data)
data: gpd.GeoDataFrame = data.drop_duplicates(subset=MANDATORY_COLUNMS)
duplicates_count: int = initial_count - len(data)
if duplicates_count > 0:
LOGGER.warning(
i18n.t(
"ingestion.parser_abc.duplicates_removed",
count=f"{duplicates_count:,}",
columns=MANDATORY_COLUNMS,
)
)
return data
[docs]
@staticmethod
def sort_geodataframe_by_datetime(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de trier le geodataframe par datetime.
:param data: Le geodataframe à trier.
:type data: gpd.GeoDataFrame
:return: Le geodataframe trié.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(i18n.t("ingestion.parser_abc.sorting_by_datetime"))
data = data.reset_index(drop=True)
data = data.sort_values(by=[schema_ids.TIME_UTC])
return data
[docs]
@staticmethod
def add_empty_columns_to_geodataframe(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Ajoute des colonnes vides à un GeoDataFrame.
:param data: Données brutes.
:type data: gpd.GeoDataFrame[schema.DataLoggerSchema]
:return: Données avec des colonnes vides.
:rtype: gpd.GeoDataFrame[schema.DataLoggerWithTideZoneSchema]
"""
columns: dict[str, pd.Series] = {
schema_ids.SPEED_KN: pd.Series(dtype="float64"),
schema_ids.DEPTH_PROCESSED_METER: pd.Series(dtype="float64"),
schema_ids.WATER_LEVEL_INFO: pd.Series(dtype="object"),
schema_ids.UNCERTAINTY: pd.Series(dtype="float64"),
schema_ids.THU: pd.Series(dtype="float64"),
schema_ids.IHO_ORDER: pd.Series(dtype="string"),
schema_ids.OUTLIER: pd.Series(dtype="object"),
schema_ids.WATER_LEVEL_METER: pd.Series(dtype="float64"),
schema_ids.UNCERTAINTY_STATION_METER: pd.Series(dtype="float64"),
schema_ids.SSP_UNCERTAINTY_PERCENT: pd.Series(dtype="float64"),
schema_ids.TIME_SERIE: pd.Series(dtype="string"),
schema_ids.TIDE_ZONE_ID: pd.Series(dtype="string"),
schema_ids.TIDE_ZONE_CODE: pd.Series(dtype="string"),
schema_ids.TIDE_ZONE_NAME: pd.Series(dtype="string"),
}
for column_name, empty_column in columns.items():
if column_name not in data.columns:
LOGGER.debug(
i18n.t(
"ingestion.parser_abc.adding_empty_column", column=column_name
)
)
data[column_name] = empty_column
if column_name == schema_ids.OUTLIER:
data[column_name] = data[column_name].apply(
lambda _: schema.OutlierInfo()
)
return data
[docs]
@classmethod
@schema.validate_schemas(return_schema=schema.DataLoggerWithTideZoneSchema)
def from_files(cls, files: Collection[Path]) -> gpd.GeoDataFrame:
"""
Méthode permettant de lire les fichiers brutes et retourne un geodataframe.
:param files: Les fichiers à lire.
:type files: Collection[Path]
:return: Un GeoDataFrame respectant le schéma de données DataLoggerSchema.
:rtype: gpd.GeoDataFrame[DataLoggerWithTideZoneSchema]
"""
parser = cls()
data_geodataframe: gpd.GeoDataFrame = parser.read_files(files=files)
if data_geodataframe.empty:
LOGGER.warning(i18n.t("ingestion.parser_abc.no_data_found"))
return data_geodataframe
data_geodataframe: gpd.GeoDataFrame[schema.DataLoggerSchema] = parser.transform(
data=data_geodataframe
)
LOGGER.debug(
i18n.t(
"ingestion.parser_abc.raw_soundings_count",
count=f"{len(data_geodataframe):,}",
)
)
data_geodataframe = parser.drop_na(data=data_geodataframe)
data_geodataframe = parser.remove_duplicates(data=data_geodataframe)
data_geodataframe: gpd.GeoDataFrame[schema.DataLoggerWithTideZoneSchema] = (
parser.add_empty_columns_to_geodataframe(data=data_geodataframe)
)
data_geodataframe = parser.sort_geodataframe_by_datetime(data=data_geodataframe)
return data_geodataframe