Source code for ingestion.parser_lowrance

"""
Module permettant de définir un parser pour les données de type Lowrance.
"""

from pathlib import Path

import geopandas as gpd
import i18n
from loguru import logger
import pandas as pd

from .parser_abc import DataParserABC
from .parsing_exception import (
    ColumnException,
    ParsingDataframeTimeError,
    ParsingDataframeLongitudeError,
    ParsingDataframeLatitudeError,
    ParsingDataframeDepthError,
    ParsingError,
)
from . import parser_ids as ids
import schema
from schema import model_ids as schema_ids

LOGGER = logger.bind(name=f"CSB-Processing.Ingestion.Parser.{ids.LOWRANCE}")

DTYPE_DICT: dict[str, str] = {
    ids.LONGITUDE_LOWRANCE: ids.FLOAT64,
    ids.LATITUDE_LOWRANCE: ids.FLOAT64,
    ids.DEPTH_LOWRANCE: ids.FLOAT64,
    ids.SPEED_LOWRANCE: ids.FLOAT64,
}

MANDATORY_COLUMN_EXCEPTIONS: list[ColumnException] = [
    ColumnException(column_name=ids.TIME_LOWRANCE, error=ParsingDataframeTimeError),
    ColumnException(
        column_name=ids.LONGITUDE_LOWRANCE, error=ParsingDataframeLongitudeError
    ),
    ColumnException(
        column_name=ids.LATITUDE_LOWRANCE, error=ParsingDataframeLatitudeError
    ),
    ColumnException(column_name=ids.DEPTH_LOWRANCE, error=ParsingDataframeDepthError),
    ColumnException(column_name=ids.SURVEY_TYPE_LOWRANCE, error=ParsingError),
]


[docs] class DataParserLowrance(DataParserABC): """ Classe permettant de parser les données de type Lowrance. """
[docs] def read(self, file: Path, dtype_dict: dict[str, str] = None) -> gpd.GeoDataFrame: """ Méthode permettant de lire un fichier brut et retourne un geodataframe. :param file: Le fichier à lire. :type file: Path :param dtype_dict: Un dictionnaire de type de données. :type dtype_dict: dict[str, str] :return: Un GeoDataFrame. :rtype: gpd.GeoDataFrame """ LOGGER.debug( i18n.t("ingestion.parser_shared.loading_file", type=ids.LOWRANCE, file=file) ) if dtype_dict is None: dtype_dict = DTYPE_DICT dataframe: pd.DataFrame = pd.read_csv(file) self.validate_columns( dataframe=dataframe, file=file, column_exceptions=MANDATORY_COLUMN_EXCEPTIONS, ) dataframe = self.convert_dtype( dataframe=dataframe, dtype_dict=dtype_dict, time_column=ids.TIME_LOWRANCE, file=file, time_format="%Y-%m-%d %H:%M:%S.%fZ", ) # Arrondir les timestamps pour uniformiser la précision dataframe[ids.TIME_LOWRANCE] = dataframe[ids.TIME_LOWRANCE].dt.round("100ms") dataframe = dataframe.query( f"{ids.SURVEY_TYPE_LOWRANCE} == '{ids.PRIMARY_LOWRANCE}'" ) LOGGER.debug( i18n.t("ingestion.parser_shared.converting_to_geodataframe", file=file) ) gdf: gpd.GeoDataFrame = gpd.GeoDataFrame( data=dataframe, geometry=gpd.points_from_xy( x=dataframe[ids.LONGITUDE_LOWRANCE], y=dataframe[ids.LATITUDE_LOWRANCE], crs=ids.EPSG_WGS84, ), ) return gdf
[docs] @staticmethod def rename_columns(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Méthode permettant de renommer les colonnes du geodataframe. :param data: Le geodataframe à renommer. :type data: gpd.GeoDataFrame :return: Le geodataframe renommé. :rtype: gpd.GeoDataFrame """ columns = { ids.TIME_LOWRANCE: schema_ids.TIME_UTC, ids.DEPTH_LOWRANCE: schema_ids.DEPTH_RAW_METER, ids.LONGITUDE_LOWRANCE: schema_ids.LONGITUDE_WGS84, ids.LATITUDE_LOWRANCE: schema_ids.LATITUDE_WGS84, ids.SPEED_LOWRANCE: schema_ids.SPEED_KN, } LOGGER.debug( i18n.t("ingestion.parser_lowrance.renaming_columns_detail", columns=columns) ) data: gpd.GeoDataFrame[schema.DataLoggerSchema] = data.rename(columns=columns) return data
[docs] @staticmethod def remove_special_characters_from_columns( data: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Méthode permettant de supprimer les caractères spéciaux des noms de colonnes. :param data: Le geodataframe à transformer. :type data: gpd.GeoDataFrame :return: Le geodataframe transformé. :rtype: gpd.GeoDataFrame """ LOGGER.debug(i18n.t("ingestion.parser_lowrance.removing_special_chars")) data.columns = ( data.columns.str.replace("[", "_") .str.replace("]", "") .str.replace("/", "-") ) return data
[docs] @staticmethod def convert_depth_to_meters(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Méthode permettant de convertir les profondeurs en mètres. :param data: Le geodataframe à transformer. :type data: gpd.GeoDataFrame :return: Le geodataframe transformé. :rtype: gpd.GeoDataFrame """ LOGGER.debug(i18n.t("ingestion.parser_lowrance.converting_depth_feet")) data[schema_ids.DEPTH_RAW_METER] = round(data[ids.DEPTH_LOWRANCE] * 0.3048, 3) data = data.drop(columns=[ids.DEPTH_LOWRANCE]) return data
[docs] @staticmethod def convert_speed_to_knots(data: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Méthode permettant de convertir les vitesses en mètres par seconde en noeuds. :param data: Le geodataframe à transformer. :type data: gpd.GeoDataFrame :return: Le geodataframe transformé. :rtype: gpd.GeoDataFrame """ if ids.SPEED_LOWRANCE not in data.columns: LOGGER.warning( i18n.t( "ingestion.parser_shared.missing_speed_column", column=ids.SPEED_LOWRANCE, ) ) return data LOGGER.debug(i18n.t("ingestion.parser_lowrance.converting_speed_ms")) data[schema_ids.SPEED_KN] = round(data[ids.SPEED_LOWRANCE] * 1.94384, 3) data = data.drop(columns=[ids.SPEED_LOWRANCE]) return data
[docs] def transform(self, data: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """ Méthode permettant de transformer le geodataframe pour respecter le schéma de données. :param data: Le geodataframe à transformer. :type data: gpd.GeoDataFrame :return: Le geodataframe transformé respectant le schéma de données DataLoggerSchema. :rtype: gpd.GeoDataFrame[schema.DataLoggerSchema] """ LOGGER.debug(i18n.t("ingestion.parser_shared.transforming_geodataframe")) data = self.convert_depth_to_meters(data) data = self.convert_speed_to_knots(data) data = self.rename_columns(data) data = self.remove_special_characters_from_columns(data) return data