Code source de app.processing_handler

"""Handles the processing workflow for CSB data files."""

import asyncio
from pathlib import Path
from typing import Any, Iterable

import i18n
from loguru import logger

from csb_processing import run_processing_workflow
from vessel import UNKNOWN_VESSEL_CONFIG, UNKNOWN_DATE, Waterline

from .component.log_display import LogDisplay
from .component.notifications import show_notification
from .component.status_display import StatusDisplay
from .config_manager import ConfigManager
from .file_manager import FileManager
from .ui_validation import Validator

LOGGER = logger.bind(name="CSB-Processing.ProcessingHandler")


def is_valid_file(file: Path) -> bool:
    """
    Vérifie si le fichier est valide pour le traitement.

    :param file: Chemin du fichier.
    :type file: Path
    :return: Vrai si le fichier est valide, faux sinon.
    :rtype: bool
    """
    extension = file.suffix.lower()

    # Vérifier les extensions connues
    if extension in {".csv", ".txt", ".xyz", ".geojson"}:
        return True

    # Vérifier si l'extension est un nombre (ex: .1, .2, .3)
    if extension.startswith(".") and extension[1:].isdigit():
        return True

    return False


def get_files(paths: Iterable[Path]) -> list[Path]:
    """
    Récupère les fichiers à traiter.

    :param paths: Chemins des fichiers ou répertoires.
    :type paths: Iterable[Path]
    :return: Liste des fichiers à traiter.
    :rtype: list[Path]
    """
    files: list[Path] = []

    for path in paths:
        path = Path(path)

        if path.is_file() and is_valid_file(path):
            files.append(path)

        elif path.is_dir():
            files.extend(file for file in path.glob("**/*") if is_valid_file(file))

    return files


[docs] class ProcessingHandler: """Handles the file processing workflow.""" def __init__( self, config_manager: ConfigManager, file_manager: FileManager, validator: Validator, status_display: StatusDisplay, log_display: LogDisplay, log_settings: dict | None = None, ): self.config_manager = config_manager self.file_manager = file_manager self.validator = validator self.status_display = status_display self.log_display = log_display self.log_settings = log_settings
[docs] async def process_files(self) -> None: """Process the uploaded files.""" # Show logs during processing if self.log_display: self.log_display.show() self.log_display.clear_logs() LOGGER.info("Starting CSB Processing Workflow") # Validate inputs using the validator validation_errors = self.validator.validate_inputs() if validation_errors: await self._handle_validation_errors(validation_errors) return # Update status and start processing self.status_display.set_status("🔄 Processing in progress...", "processing") show_notification("Processing started...", type="info") await asyncio.sleep(0.5) try: await self._execute_processing_workflow() await self._handle_success() except Exception as e: await self._handle_error(e)
[docs] def _perform_validation(self) -> list[str]: """Perform comprehensive validation and return list of errors.""" # Log validation status for debugging validations = { "File selection": self.validator.validate_file_selection(), "Output path": self.validator.validate_output_path(), "Mutual exclusivity": self.validator.validate_mutual_exclusivity(), "Vessel configuration": self.validator.validate_vessel_configuration(), "Waterline configuration": self.validator.validate_waterline_configuration(), } failed_validations = [ name for name, passed in validations.items() if not passed ] if failed_validations: LOGGER.warning( f"Validation issues detected: {', '.join(failed_validations)}" ) # Return the comprehensive validation from validator return self.validator.validate_inputs()
[docs] async def _handle_validation_errors(self, validation_errors: list[str]) -> None: """Handle validation errors.""" for error in validation_errors: show_notification(error, type="negative") if self.log_display: self.log_display.show()
[docs] async def _execute_processing_workflow(self) -> None: """Execute the main processing workflow.""" # Get and validate files files_paths = self.file_manager.get_file_paths() LOGGER.info(f"Processing files: {[str(p) for p in files_paths]}") self.status_display.set_status("🔍 Validating files...", "processing") await asyncio.sleep(1) valid_files = get_files(files_paths) if not valid_files: raise ValueError("No valid files found for processing") LOGGER.info(f"Found {len(valid_files)} valid files for processing") # Prepare processing parameters self.status_display.set_status( f"📁 Preparing {len(valid_files)} valid file(s)...", "processing" ) await asyncio.sleep(1) output = self.config_manager.output_path config = self.config_manager.get_effective_config_path() vessel_config = self._prepare_vessel_config() processing_config = self.config_manager.build_processing_config() # Create output directory output.mkdir(parents=True, exist_ok=True) LOGGER.info(f"Created output directory: {output}") # Execute processing self.status_display.set_status( "⚙️ Executing processing workflow...", "processing" ) await asyncio.sleep(1) await self._run_processing_workflow( valid_files, vessel_config, output, config, processing_config )
[docs] def _prepare_vessel_config(self) -> Any: """Prepare vessel configuration based on current settings.""" vessel_config = ( self.config_manager.vessel_id if self.config_manager.use_vessel and self.config_manager.vessel_id else UNKNOWN_VESSEL_CONFIG ) # Handle waterline configuration if ( self.config_manager.use_waterline and self.config_manager.waterline_value is not None ): LOGGER.info(f"Using waterline: {self.config_manager.waterline_value}m") UNKNOWN_VESSEL_CONFIG.waterline = [ Waterline( time_stamp=UNKNOWN_DATE, z=-self.config_manager.waterline_value ) ] vessel_config = UNKNOWN_VESSEL_CONFIG return vessel_config
[docs] async def _run_processing_workflow( self, valid_files: list, vessel_config: Any, output: Path, config: Path, processing_config, ) -> None: """Run the processing workflow in a separate thread.""" merge_files: bool = self.config_manager.merge_files def run_processing(): LOGGER.info("Starting bathymetric data processing...") run_processing_workflow( files=valid_files, merge_files=merge_files, vessel=vessel_config, output=output, config_path=config, apply_water_level=self.config_manager.apply_water_level, extra_logger=(self.log_settings,), processing_config=processing_config, vessel_name=self.config_manager.vessel_name or None, already_at_chart_datum=self.config_manager.already_at_chart_datum, ) LOGGER.info("Processing workflow completed successfully.") LOGGER.info("Launching processing in background thread...") await asyncio.get_event_loop().run_in_executor(None, run_processing)
[docs] async def _handle_success(self) -> None: """Handle successful processing completion.""" success_msg = i18n.t( "app.processing_handler.status_success", output_path=str(self.config_manager.output_path), ) self.status_display.set_status(success_msg, "success") show_notification( i18n.t("app.processing_handler.notif_completed"), type="positive" ) LOGGER.info(i18n.t("app.processing_handler.workflow_done"))
[docs] async def _handle_error(self, error: Exception) -> None: """Handle processing errors.""" error_msg = i18n.t("app.processing_handler.status_error", error=str(error)) self.status_display.set_status(error_msg, "error") show_notification( i18n.t("app.processing_handler.notif_error", error=str(error)), type="negative", ) LOGGER.error( i18n.t("app.processing_handler.processing_failed", error=str(error)) ) LOGGER.error(i18n.t("app.processing_handler.workflow_failed")) if self.log_display: self.log_display.show()