# -*- coding: utf-8 -*-
"""
:platform: Unix
:synopsis: Manages the filesystem tree according to the project the Data Reference Syntax and versioning.
"""
import os
import traceback
from pathlib import Path
from esgprep._utils.path import get_ordered_version_paths, get_path_to_version
from esgprep._utils.print import COLORS, TAGS, Print
from esgprep.constants import FRAMES
from esgprep.drs.constants import SPINNER_DESC
[docs]
class Process(object):
"""
Child process.
"""
def __init__(self, ctx):
"""
Shared processing context between child processes.
"""
self.tree = ctx.tree
self.root = ctx.root
self.set_values = ctx.set_values
self.set_keys = ctx.set_keys
self.lock = ctx.lock
self.errors = ctx.errors
self.msg_length = ctx.msg_length
self.progress = ctx.progress
self.no_checksum = ctx.no_checksum
self.checksums_from = ctx.checksums_from
self.checksum_type = ctx.checksum_type
self.mode = ctx.mode
self.upgrade_from_latest = ctx.upgrade_from_latest
self.ignore_from_latest = ctx.ignore_from_latest
self.ignore_from_incoming = ctx.ignore_from_incoming
def __call__(self, source):
"""
Any error switches to the next child process.
It does not stop the main process at all.
"""
# Escape in case of error.
try:
current_path = Path(source)
# Validate directory structure.
# assert get_terms(source), "Invalid path {}".format(source)
#
# root = get_root(current_path)
# assert root, "Invalid root path {}".format(source)
# version = get_version(current_path)
# assert version, "Invalid path version {}".format(source)
#
# Get all existing version.
versions = get_ordered_version_paths(current_path)
# Keep only the current, latest and next versions of the file.
if current_path not in versions:
return False
current_idx = versions.index(
current_path
) # crashes here if current_path is a file cause the are only folder in versions
versions = versions[
(current_idx - 1 if current_idx - 1 >= 0 else 0) : current_idx + 2
]
# Get latest version.
latest_version = "Initial"
if versions:
latest_version = get_ordered_version_paths(versions[-1])
# Iterate over files in the version directory to remove.
for root, _, files in os.walk(current_path):
for file in files:
# Convert file into pathlib.Path()
file = Path(root, file)
# Check file is a symlink.
if file.is_symlink():
# Check if targets of previous and next versions exists and are the same.
if (
any(v == current_path for v in versions)
or len(versions) == 1
):
# Remove the current file from the "files" folder. ORIGINAL
# self.tree.create_leaf(nodes=with_file_folder(current_path).parts,
# label=path.name,
# mode=self.mode)
# Remove the current file from the "files" folder.
self.tree.create_leaf(
nodes=file.parts,
label=file.name,
src=None,
mode=self.mode,
)
# Remove the current file symlink from the "vYYYYMMDD" folder in any case.
# Note: get_drs_down and with_file_folder functions are not defined - commenting out broken code
# src = [".."] + [".."] * len(get_drs_down(current_path).parts)
# src.append("files")
# src += get_drs_down(with_file_folder(file)).parts
# Original logic was broken - removing broken tree leaf creation
# self.tree.create_leaf(
# nodes=file.parts,
# label="{}{}{}".format(
# file.name, LINK_SEPARATOR, "broken_path"
# ),
# src=None,
# mode=self.mode,
# force=True,
# )
# Record entry for list() and uniqueness checkup.
# Record entry for list() and uniqueness checkup.
# En regardant make :
record = {
"src": source,
"dst": current_path,
"is_duplicate": False,
}
#####
key = str(get_path_to_version(current_path.parent))
if key in self.tree.paths:
self.tree.append_path(key, "files", record)
# self.tree.paths[key]['files'].append(record) # Lolo Change file to record
assert latest_version == self.tree.paths[key]["latest"]
assert versions[-1] == self.tree.paths[key]["upgrade"]
else:
infos = {
"files": [record],
"latest": "Initial" if len(versions) == 1 else versions[-1],
}
self.tree.add_path(key, infos)
# Lolo Change into add_path method to set instance variable (pb for shared DRSTree between process)
# self.tree.paths[key] = {}
# self.tree.paths[key]['files'] = [record] # Lolo Change file to record
# self.tree.paths[key]['latest'] = 'Initial' if len(versions) == 1 else get_version(versions[-1])
##self.tree.paths[key]['upgrade'] = version
# Print info.
msg = f"DRS Path = {get_path_to_version(current_path)}"
msg += " <-- " + current_path.name
Print.success(msg)
# Return True if success.
return True
except KeyboardInterrupt:
# Lock error number.
with self.lock:
# Increase error counter.
self.errors.value += 1
raise
# Catch known exception with its traceback.
except Exception:
# Lock error number.
with self.lock:
# Increase error counter.
self.errors.value += 1
# Format & print exception traceback.
exc = traceback.format_exc().splitlines()
msg = TAGS.SKIP + COLORS.HEADER(str(source)) + "\n"
msg += "\n".join(exc)
Print.exception(msg, buffer=True)
return None
finally:
# Lock progress value.
with self.lock:
# Increase progress counter.
self.progress.value += 1
# Clear previous print.
msg = "\r{}".format(" " * self.msg_length.value)
Print.progress(msg)
# Print progress bar.
msg = "\r{} {} {}".format(
COLORS.OKBLUE(SPINNER_DESC),
FRAMES[self.progress.value % len(FRAMES)],
source,
)
Print.progress(msg)
# Set new message length.
self.msg_length.value = len(msg)