Source code for esgprep.drs.make

# -*- coding: utf-8 -*-

"""
:platform: Unix
:synopsis: Manages the filesystem tree according to the project the Data Reference Syntax and versioning.

"""

import os
import traceback
from pathlib import Path

from esgvoc.apps.drs.generator import DrsGenerator

from esgprep._exceptions import DuplicatedFile, OlderUpgrade, UnchangedTrackingID
from esgprep._handlers.constants import LINK_SEPARATOR
from esgprep._utils.checksum import get_checksum
from esgprep._utils.ncfile import get_ncattrs, get_tracking_id
from esgprep._utils.path import (
    extract_version,
    get_ordered_version_paths,
    get_path_to_version,
    get_version_and_subpath,
)
from esgprep._utils.print import COLORS, TAGS, Print
from esgprep.constants import FRAMES
from esgprep.drs.constants import SPINNER_DESC


[docs] class Process(object): """ Child process. """ def __init__(self, ctx): """ Shared processing context between child processes. """ self.tree = ctx.tree self.root = ctx.root self.set_values = ctx.set_values self.set_keys = ctx.set_keys self.lock = ctx.lock self.errors = ctx.errors self.version = ctx.version self.msg_length = ctx.msg_length self.progress = ctx.progress self.no_checksum = ctx.no_checksum self.checksums_from = ctx.checksums_from self.checksum_type = ctx.checksum_type self.mode = ctx.mode self.upgrade_from_latest = ctx.upgrade_from_latest self.ignore_from_latest = ctx.ignore_from_latest self.ignore_from_incoming = ctx.ignore_from_incoming self.project = ctx.project def __call__(self, source): """ Any error switches to the next child process. It does not stop the main process at all. """ # Escape in case of error. try: # Ignore files from incoming if source.name in self.ignore_from_incoming: msg = TAGS.SKIP + COLORS.HEADER(str(source)) with self.lock: Print.exception(msg, buffer=True) return None # Print info. msg = f"Scanning {source}" Print.debug(msg) # Get current netcdf file attributes. current_attrs = get_ncattrs(source) # # If attribute value is a separated list, pick up the first item as facet value for k, v in current_attrs.items(): current_attrs[k] = str(v).split()[0] # mainly for activity_id # Add filename to attributes. current_attrs["filename"] = source.name # Add dataset-version to attributes with 'v' prefix for DRS generator. version_for_drs = ( self.version if self.version.startswith("v") else f"v{self.version}" ) current_attrs["version"] = version_for_drs # Validate project compatibility before DRS generation try: from esgprep._utils.ncfile import get_project file_project = get_project(current_attrs) if file_project and file_project.lower() != self.project.lower(): raise Exception( f"Project mismatch: CLI specified '{self.project}' but file contains '{file_project}' " f"(detected from file attributes). Please use '--project {file_project}' or verify the correct project.\n" f"\nTo see all available projects, run: esgvoc status" ) except Exception as e: if "Project mismatch" in str(e): raise e # If project detection fails for other reasons, continue with DRS generation # to get the original error messages Print.debug(f"Project detection failed: {e}") # Instantiate file as no duplicate. is_duplicate = False # Build directory structure. # DRS terms are validated during this step. try: dg = DrsGenerator(self.project) if self.project == "cmip6": drs_path = dg.generate_directory_from_mapping( { **current_attrs, **{"member_id": current_attrs["variant_label"]}, } ) if len(drs_path.errors) != 0: # Build detailed error message with all DRS errors error_details = "\n".join( [f" - {error}" for error in drs_path.errors] ) raise Exception( f"DRS generation failed with {len(drs_path.errors)} error(s):\n{error_details}" ) current_path: Path = ( Path(self.root) / Path(drs_path.generated_drs_expression) / source.name ) # example : CMIP6/CMIP/CCCma/CanESM5/historical/r1i1p2f1/Amon/tas/gn/v20190429/cmip6_IPSL-CM6A-LR_tas_50-60.nc except TypeError: Print.debug("Directory structure is None") return False # Instantiate latest version to "Initial" latest_version = "Initial" # Get latest existing version of the file. all_versions = get_ordered_version_paths(current_path.parent.parent) if len(all_versions) > 0: latest_version = extract_version(all_versions[-1]) dataset_dir = current_path.parent.parent latest_dir = dataset_dir / "latest" latest_path = latest_dir / source.name # latest_path = with_latest_version(current_path) # latest_path = all_versions[-1] if len(all_versions) >= 1 else None # 1. Check if a latest file version exists (i.e. with the same filename). if latest_path and (latest_path.exists()): # 2. Check latest version is older than current version. current_version = extract_version(current_path) latest_version = extract_version(latest_path) if latest_version > current_version: raise OlderUpgrade(current_version, latest_version) # Get latest file version attributes. latest_attrs = get_ncattrs(str(latest_path)) # 3. Check tracking IDs are different. current_tracking_id = get_tracking_id(current_attrs) latest_tracking_id = get_tracking_id(latest_attrs) if current_tracking_id == latest_tracking_id: # 4. Check if file sizes are different. if ( source.stat().st_size == latest_path.stat().st_size and not self.no_checksum ): # 5. Check if file checksums are different. current_checksum = get_checksum( source, self.checksum_type, self.checksums_from ) latest_checksum = get_checksum( latest_path, self.checksum_type, self.checksums_from ) if current_checksum == latest_checksum: # Flags file to duplicate. is_duplicate = True # If different checksums, tracking IDs must be different too if exist. elif current_tracking_id and latest_tracking_id: raise UnchangedTrackingID( latest_path, latest_tracking_id, current_path, current_tracking_id, ) # If different sizes, tracking IDs must be different too if exist. elif current_tracking_id and latest_tracking_id: raise UnchangedTrackingID( latest_path, latest_tracking_id, current_path, current_tracking_id, ) # Print info. Print.debug("Processing {source}") # Start DRS tree generation. if not is_duplicate: # Add the current file to the "vYYYYMMDD" folder. parts_from_version = get_version_and_subpath(current_path) src = [".."] * (len(parts_from_version) - 1) src.append("files") version_nb = parts_from_version[0][1:] src.append("d" + version_nb) # src += parts_from_version src.append( current_path.name ) # Lolo Test to add filename at the end of the relative path reconstructed self.tree.create_leaf( nodes=current_path.parts, label=f"{current_path.name}{LINK_SEPARATOR}{os.path.join(*src)}", src=os.path.join(*src), mode="symlink", force=True, ) # Add the "latest" symlink node. # nodes = list(dataset_path(current_path).parent.parts) nodes = list(current_path.parts)[ : -len(get_version_and_subpath(current_path)) ] nodes.append("latest") self.tree.create_leaf( nodes=nodes, label=f"{'latest'}{LINK_SEPARATOR}{version_for_drs}", src=version_for_drs, mode="symlink", ) nodes = list(current_path.parts)[ 0 : -len(get_version_and_subpath(current_path)) ] nodes.append("files") version_nb = parts_from_version[0][1:] nodes.append("d" + version_nb) nodes.append(current_path.name) # Add the current file to the "files" folder. self.tree.create_leaf( nodes=nodes, label=current_path.name, src=source, # Lolo Change current_path to source mode=self.mode, ) latest_path = all_versions[-1] if len(all_versions) >= 1 else None # If latest file version exist and --upgrade-from-latest submitted. if latest_path and latest_path.exists() and self.upgrade_from_latest: # Walk through the latest dataset version. # Create a symlink for each file with a different filename than the current one for root, _, filenames in os.walk(latest_path): for latest_name in filenames: # Add latest files as tree leaves with version to upgrade instead of latest version # i.e., copy latest dataset leaves to the current tree. # Except if file has be ignored from latest version (i.e., with known issue) # Except if file leaf has already been created to avoid overwriting new version # Leaf is not created if already exists (i.e., force = False). if ( latest_name != current_path.name and latest_name not in self.ignore_from_latest ): src = os.path.join(root, latest_name) node_list = list(current_path.parent.parts) node_list.append(latest_name) self.tree.create_leaf( nodes=node_list, label=f"{latest_name}{LINK_SEPARATOR}{os.readlink(src)}", src=os.readlink(src), mode="symlink", ) # In the case of the file is duplicated. # i.e., incoming file already exists in the latest version folder. else: # If upgrade from latest is activated, raise the error, no duplicated files allowed. # Incoming must only contain modifed/corrected files. if self.upgrade_from_latest: raise DuplicatedFile(latest_path, source) # If default behavior, the incoming contains all data for a new version # In the case of a duplicated file, just pass to the expected symlink creation # and records duplicated file for further removal only if migration mode is the # default (i.e., moving files). In the case of --copy or --link, keep duplicates # in place into the incoming directory. else: assert latest_path is not None src = os.readlink(latest_path) self.tree.create_leaf( nodes=current_path.parts, label=f"{current_path.name}{LINK_SEPARATOR}{src}", src=src, mode="symlink", ) if self.mode == "move": self.tree.duplicates.append(source) # Record entry for list() and uniqueness checkup. record = {"src": source, "dst": current_path, "is_duplicate": is_duplicate} # print(Path(drs_path.generated_drs_expression).parent) # key = str(get_path_to_version(current_path.parent)) key = str(Path(drs_path.generated_drs_expression).parent) if self.tree.has_path(key): # mean we already saw this dataset self.tree.append_path(key, "files", record) else: infos = { "files": [record], "latest": latest_version, "upgrade": self.version, } self.tree.add_path(key, infos) # Print info. msg = f"DRS Path = {get_path_to_version(current_path)}" msg += " <-- " + current_path.name Print.success(msg) # Return True if success. return True except KeyboardInterrupt: # Lock error number. with self.lock: # Increase error counter. self.errors.value += 1 raise # Catch known exception with its traceback. except Exception: # Lock error number. with self.lock: # Increase error counter. self.errors.value += 1 # Format & print exception traceback. exc = traceback.format_exc().splitlines() msg = TAGS.SKIP + COLORS.HEADER(str(source)) + "\n" msg += "\n".join(exc) Print.exception(msg, buffer=True) return None finally: # Lock progress value. with self.lock: # Increase progress counter. self.progress.value += 1 # Clear previous print. msg = f"\r{' ' * self.msg_length.value}" Print.progress(msg) # Print progress bar. msg = f"\r{COLORS.OKBLUE(SPINNER_DESC)} {FRAMES[self.progress.value % len(FRAMES)]} {source}" Print.progress(msg) # Set new message length. self.msg_length.value = len(msg)