"""
Module permettant de définir un parser pour les données de type Lowrance.
"""
from pathlib import Path
import geopandas as gpd
import i18n
from loguru import logger
import pandas as pd
from .parser_abc import DataParserABC
from .parsing_exception import (
ColumnException,
ParsingDataframeTimeError,
ParsingDataframeLongitudeError,
ParsingDataframeLatitudeError,
ParsingDataframeDepthError,
ParsingError,
)
from . import parser_ids as ids
import schema
from schema import model_ids as schema_ids
LOGGER = logger.bind(name=f"CSB-Processing.Ingestion.Parser.{ids.LOWRANCE}")
DTYPE_DICT: dict[str, str] = {
ids.LONGITUDE_LOWRANCE: ids.FLOAT64,
ids.LATITUDE_LOWRANCE: ids.FLOAT64,
ids.DEPTH_LOWRANCE: ids.FLOAT64,
ids.SPEED_LOWRANCE: ids.FLOAT64,
}
MANDATORY_COLUMN_EXCEPTIONS: list[ColumnException] = [
ColumnException(column_name=ids.TIME_LOWRANCE, error=ParsingDataframeTimeError),
ColumnException(
column_name=ids.LONGITUDE_LOWRANCE, error=ParsingDataframeLongitudeError
),
ColumnException(
column_name=ids.LATITUDE_LOWRANCE, error=ParsingDataframeLatitudeError
),
ColumnException(column_name=ids.DEPTH_LOWRANCE, error=ParsingDataframeDepthError),
ColumnException(column_name=ids.SURVEY_TYPE_LOWRANCE, error=ParsingError),
]
[docs]
class DataParserLowrance(DataParserABC):
"""
Classe permettant de parser les données de type Lowrance.
"""
[docs]
def read(self, file: Path, dtype_dict: dict[str, str] = None) -> gpd.GeoDataFrame:
"""
Méthode permettant de lire un fichier brut et retourne un geodataframe.
:param file: Le fichier à lire.
:type file: Path
:param dtype_dict: Un dictionnaire de type de données.
:type dtype_dict: dict[str, str]
:return: Un GeoDataFrame.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(
i18n.t("ingestion.parser_shared.loading_file", type=ids.LOWRANCE, file=file)
)
if dtype_dict is None:
dtype_dict = DTYPE_DICT
dataframe: pd.DataFrame = pd.read_csv(file)
self.validate_columns(
dataframe=dataframe,
file=file,
column_exceptions=MANDATORY_COLUMN_EXCEPTIONS,
)
dataframe = self.convert_dtype(
dataframe=dataframe,
dtype_dict=dtype_dict,
time_column=ids.TIME_LOWRANCE,
file=file,
time_format="%Y-%m-%d %H:%M:%S.%fZ",
)
# Arrondir les timestamps pour uniformiser la précision
dataframe[ids.TIME_LOWRANCE] = dataframe[ids.TIME_LOWRANCE].dt.round("100ms")
dataframe = dataframe.query(
f"{ids.SURVEY_TYPE_LOWRANCE} == '{ids.PRIMARY_LOWRANCE}'"
)
LOGGER.debug(
i18n.t("ingestion.parser_shared.converting_to_geodataframe", file=file)
)
gdf: gpd.GeoDataFrame = gpd.GeoDataFrame(
data=dataframe,
geometry=gpd.points_from_xy(
x=dataframe[ids.LONGITUDE_LOWRANCE],
y=dataframe[ids.LATITUDE_LOWRANCE],
crs=ids.EPSG_WGS84,
),
)
return gdf
[docs]
@staticmethod
def rename_columns(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de renommer les colonnes du geodataframe.
:param data: Le geodataframe à renommer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe renommé.
:rtype: gpd.GeoDataFrame
"""
columns = {
ids.TIME_LOWRANCE: schema_ids.TIME_UTC,
ids.DEPTH_LOWRANCE: schema_ids.DEPTH_RAW_METER,
ids.LONGITUDE_LOWRANCE: schema_ids.LONGITUDE_WGS84,
ids.LATITUDE_LOWRANCE: schema_ids.LATITUDE_WGS84,
ids.SPEED_LOWRANCE: schema_ids.SPEED_KN,
}
LOGGER.debug(
i18n.t("ingestion.parser_lowrance.renaming_columns_detail", columns=columns)
)
data: gpd.GeoDataFrame[schema.DataLoggerSchema] = data.rename(columns=columns)
return data
[docs]
@staticmethod
def remove_special_characters_from_columns(
data: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""
Méthode permettant de supprimer les caractères spéciaux des noms de colonnes.
:param data: Le geodataframe à transformer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe transformé.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(i18n.t("ingestion.parser_lowrance.removing_special_chars"))
data.columns = (
data.columns.str.replace("[", "_")
.str.replace("]", "")
.str.replace("/", "-")
)
return data
[docs]
@staticmethod
def convert_depth_to_meters(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de convertir les profondeurs en mètres.
:param data: Le geodataframe à transformer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe transformé.
:rtype: gpd.GeoDataFrame
"""
LOGGER.debug(i18n.t("ingestion.parser_lowrance.converting_depth_feet"))
data[schema_ids.DEPTH_RAW_METER] = round(data[ids.DEPTH_LOWRANCE] * 0.3048, 3)
data = data.drop(columns=[ids.DEPTH_LOWRANCE])
return data
[docs]
@staticmethod
def convert_speed_to_knots(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""
Méthode permettant de convertir les vitesses en mètres par seconde en noeuds.
:param data: Le geodataframe à transformer.
:type data: gpd.GeoDataFrame
:return: Le geodataframe transformé.
:rtype: gpd.GeoDataFrame
"""
if ids.SPEED_LOWRANCE not in data.columns:
LOGGER.warning(
i18n.t(
"ingestion.parser_shared.missing_speed_column",
column=ids.SPEED_LOWRANCE,
)
)
return data
LOGGER.debug(i18n.t("ingestion.parser_lowrance.converting_speed_ms"))
data[schema_ids.SPEED_KN] = round(data[ids.SPEED_LOWRANCE] * 1.94384, 3)
data = data.drop(columns=[ids.SPEED_LOWRANCE])
return data