Source code for esgprep.drs.main

# -*- coding: utf-8 -*-

"""
    :platform: Unix
    :synopsis: Manages the filesystem tree according to the project the Data Reference Syntax and versioning.

"""

import itertools
import traceback
from multiprocessing import Pool

from constants import *
from context import ProcessingContext
from custom_exceptions import *
from esgprep.utils.custom_print import *
from esgprep.utils.misc import load, store, evaluate, ProcessContext, get_tracking_id, get_checksum
from handler import File, DRSPath, DRSTree


[docs]def process(source): """ process(collector_input) File process that: * Handles files, * Deduces facet key, values pairs from file attributes * Checks facet values against CV, * Applies the versioning * Populates the DRS tree crating the appropriate leaves, * Stores dataset statistics. :param str source: The file full path to process """ # Get process content from process global env assert 'pctx' in globals().keys() pctx = globals()['pctx'] # Block to avoid program stop if a thread fails try: # Instantiate file handler fh = File(source) # Ignore files from incoming if fh.filename in pctx.ignore_from_incoming: msg = TAGS.SKIP + COLORS.HEADER(source) with pctx.lock: Print.exception(msg, buffer=True) return None # Loads attributes from filename, netCDF attributes, command-line fh.load_attributes(root=pctx.root, pattern=pctx.pattern, set_values=pctx.set_values) # Checks the facet values provided by the loaded attributes fh.check_facets(facets=pctx.facets, config=pctx.cfg, set_keys=pctx.set_keys) # Get parts of DRS path parts = fh.get_drs_parts(pctx.facets) # Instantiate file DRS path handler fh.drs = DRSPath(parts) # Ensure that the called project section is ALWAYS part of the DRS path elements (case insensitive) if not fh.drs.path().lower().startswith(pctx.project.lower()): raise InconsistentDRSPath(pctx.project, fh.drs.path()) # Evaluate if processing file already exists in the latest existing dataset version (i.e., "is duplicated") # Default: fh.is_duplicate = False # 1. If a latest dataset version exists if fh.drs.v_latest: # Build corresponding latest file path latest_file = os.path.join(fh.drs.path(latest=True, root=True), fh.filename) # 2. Test if a file with the same filename exists in latest version if os.path.exists(latest_file): # Get tracking ID (None if not recorded into the file) fh.tracking_id = get_tracking_id(fh.ffp, pctx.project) latest_tracking_id = get_tracking_id(latest_file, pctx.project) # 3. Test if tracking IDs are different (i.e., keep is_duplicate = False) if fh.tracking_id == latest_tracking_id: latest_size = os.stat(latest_file).st_size # 4. Test if file sizes are different (i.e., keep is_duplicate = False) if fh.size == latest_size and not pctx.no_checksum: # Read or compute the checksums fh.checksum = get_checksum(fh.ffp, pctx.checksum_type, pctx.checksums_from) latest_checksum = get_checksum(latest_file, pctx.checksum_type, pctx.checksums_from) # store checksum if fh.checksum == latest_checksum: fh.is_duplicate = True elif fh.tracking_id and latest_tracking_id: # If the checksums are different, the tracking ID must not be identical if exist. # If no tracking IDs keep to is_duplicate = False raise UnchangedTrackingID(latest_file, latest_tracking_id, fh.ffp, fh.tracking_id) elif fh.tracking_id and latest_tracking_id: # If the sizes are different, the tracking ID must not be identical if exist. # If no tracking IDs keep to is_duplicate = False raise UnchangedTrackingID(latest_file, latest_tracking_id, fh.ffp, fh.tracking_id) msg = TAGS.SUCCESS + 'Processing {}'.format(COLORS.HEADER(fh.ffp)) Print.info(msg) return fh except KeyboardInterrupt: raise except Exception: exc = traceback.format_exc().splitlines() msg = TAGS.SKIP + COLORS.HEADER(source) + '\n' msg += '\n'.join(exc) with pctx.lock: Print.exception(msg, buffer=True) return None finally: with pctx.lock: pctx.progress.value += 1 percentage = int(pctx.progress.value * 100 / pctx.nbsources) msg = COLORS.OKBLUE('\rScanning incoming file(s): ') msg += '{}% | {}/{} file(s)'.format(percentage, pctx.progress.value, pctx.nbsources) Print.progress(msg)
[docs]def tree_builder(fh): """ Builds the DRS tree accord to a source :param esgprep.drs.handler.File fh: The file handler object """ # Get process content from process global env assert 'pctx' in globals().keys() pctx = globals()['pctx'] try: # If a latest version already exists it should be older than upgrade version if fh.drs.v_latest and int(DRSPath.TREE_VERSION[1:]) <= int(fh.drs.v_latest[1:]): raise OlderUpgrade(DRSPath.TREE_VERSION, fh.drs.v_latest) # Start the tree generation if not fh.is_duplicate: # Add the processed file to the "vYYYYMMDD" node src = ['..'] * len(fh.drs.items(d_part=False)) src.extend(fh.drs.items(d_part=False, file_folder=True)) src.append(fh.filename) tree.create_leaf(nodes=fh.drs.items(root=True), leaf=fh.filename, label='{}{}{}'.format(fh.filename, LINK_SEPARATOR, os.path.join(*src)), src=os.path.join(*src), mode='symlink', origin=fh.ffp, force=True) # Add the "latest" node for symlink tree.create_leaf(nodes=fh.drs.items(f_part=False, version=False, root=True), leaf='latest', label='{}{}{}'.format('latest', LINK_SEPARATOR, fh.drs.v_upgrade), src=fh.drs.v_upgrade, mode='symlink') # Add the processed file to the "files" node tree.create_leaf(nodes=fh.drs.items(file_folder=True, root=True), leaf=fh.filename, label=fh.filename, src=fh.ffp, mode=pctx.mode) if fh.drs.v_latest and pctx.upgrade_from_latest: # Walk through the latest dataset version and create a symlink for each file with a different # filename than the processed one for root, _, filenames in os.walk(fh.drs.path(f_part=False, latest=True, root=True)): for filename in filenames: # Add latest files as tree leaves with version to upgrade instead of latest version # i.e., copy latest dataset leaves to Tree # Except if file has be ignored from latest version (i.e., with known issue) # Except if file leaf has already been created to avoid overwriting new version # leaf will be not create if already exists if filename != fh.filename and filename not in pctx.ignore_from_latest: src = os.path.join(root, filename) tree.create_leaf(nodes=fh.drs.items(root=True), leaf=filename, label='{}{}{}'.format(filename, LINK_SEPARATOR, os.readlink(src)), src=os.readlink(src), mode='symlink', origin=os.path.realpath(src)) else: # Pickup the latest file version latest_file = os.path.join(fh.drs.path(latest=True, root=True), fh.filename) if pctx.upgrade_from_latest: # If upgrade from latest is activated, raise the error, no duplicated files allowed # Because incoming must only contain modifed/corrected files raise DuplicatedFile(latest_file, fh.ffp) else: # If default behavior, the incoming contains all data for a new version # In the case of a duplicated file, just pass to the expected symlink creation # and records duplicated file for further removal only if migration mode is the # default (i.e., moving files). In the case of --copy or --link, keep duplicates # in place into the incoming directory src = os.readlink(latest_file) tree.create_leaf(nodes=fh.drs.items(root=True), leaf=fh.filename, label='{}{}{}'.format(fh.filename, LINK_SEPARATOR, src), src=src, mode='symlink', origin=fh.ffp) if pctx.mode == 'move': tree.duplicates.append(fh.ffp) # Record entry for list() record = {'src': fh.ffp, 'dst': fh.drs.path(root=True), 'dset_root': os.path.dirname(fh.drs.path(f_part=False, root=True)), 'filename': fh.filename, 'latest': fh.drs.v_latest or 'Initial', 'size': fh.size, 'is_duplicate': fh.is_duplicate} if fh.drs.path(f_part=False) in tree.paths.keys(): tree.paths[fh.drs.path(f_part=False)].append(record) else: tree.paths[fh.drs.path(f_part=False)] = [record] msg = TAGS.SUCCESS + 'DRS Path = {}'.format(COLORS.HEADER(fh.drs.path(f_part=False))) msg += ' <-- ' + fh.filename Print.info(msg) return True except KeyboardInterrupt: raise except Exception: exc = traceback.format_exc().splitlines() msg = TAGS.FAIL + 'Build {}'.format(COLORS.HEADER(fh.drs.path())) + '\n' msg += '\n'.join(exc) Print.exception(msg, buffer=True) return None finally: pctx.progress.value += 1 percentage = int(pctx.progress.value * 100 / pctx.nbsources) msg = COLORS.OKBLUE('\rBuilding DRS tree: ') msg += '{}% | {}/{} file(s)'.format(percentage, pctx.progress.value, pctx.nbsources) Print.progress(msg)
[docs]def initializer(keys, values): """ Initialize process context by setting particular variables as global variables. :param list keys: Argument name :param list values: Argument value """ assert len(keys) == len(values) global pctx pctx = ProcessContext({key: values[i] for i, key in enumerate(keys)})
[docs]def do_scanning(ctx): """ Returns True if file scanning is necessary regarding command-line arguments :param esgprep.drs.context.ProcessingContext ctx: New processing context to evaluate :returns: True if file scanning is necessary :rtype: *boolean* """ if ctx.rescan: return True elif ctx.action == 'list': return True elif os.path.isfile(TREE_FILE): reader = load(TREE_FILE) old_args = reader.next() # Ensure that processing context is similar to previous step for k in CONTROLLED_ARGS: if getattr(ctx, k) != old_args[k]: msg = '"{}" argument has changed: "{}" instead of "{}" -- '.format(k, getattr(ctx, k), old_args[k]) msg += 'Rescanning files.' Print.warning(msg) return True return False else: return True
[docs]def run(args): """ Main process that: * Instantiates processing context, * Loads previous program instance, * Parallelizes file processing with threads pools, * Apply command-line action to the whole DRS tree, * Evaluate exit status. :param ArgumentParser args: The command-line arguments parser """ # Instantiate processing context with ProcessingContext(args) as ctx: # Init global variable global tree # Init DRS tree tree = DRSTree(ctx.root, ctx.version, ctx.mode, ctx.commands_file) # Init process context cctx = {name: getattr(ctx, name) for name in PROCESS_VARS} # Disable file scan if a previous DRS tree have generated using same context and no "list" action if do_scanning(ctx): if ctx.use_pool: # Init processes pool pool = Pool(processes=ctx.processes, initializer=initializer, initargs=(cctx.keys(), cctx.values())) processes = pool.imap(process, ctx.sources) else: initializer(cctx.keys(), cctx.values()) processes = itertools.imap(process, ctx.sources) # Process supplied sources handlers = [x for x in processes] # Close pool of workers if exists if 'pool' in locals().keys(): locals()['pool'].close() locals()['pool'].join() Print.progress('\n') # Build DRS tree cctx['progress'].value = 0 initializer(cctx.keys(), cctx.values()) handlers = [h for h in handlers if h is not None] results = [x for x in itertools.imap(tree_builder, handlers)] Print.progress('\n') else: reader = load(TREE_FILE) msg = 'Skip incoming files scan (use "--rescan" to force it) -- ' msg += 'Using cached DRS tree from {}'.format(TREE_FILE) Print.warning(msg) _ = reader.next() tree = reader.next() handlers = reader.next() results = reader.next() # Flush buffer Print.flush() # Rollback --commands-file value to command-line argument in any case tree.commands_file = ctx.commands_file # Get number of files scanned (including errors/skipped files) ctx.scan_data = len(results) # Get number of scan errors ctx.scan_errors = results.count(None) # Backup tree context for later usage with other command lines store(TREE_FILE, data=[{key: ctx.__getattribute__(key) for key in CONTROLLED_ARGS}, tree, handlers, results]) Print.info(TAGS.INFO + 'DRS tree recorded for next usage onto {}.'.format(COLORS.HEADER(TREE_FILE))) # Evaluates the scan results to trigger the DRS tree action if evaluate(results): # Check upgrade uniqueness tree.check_uniqueness() # Apply tree action tree.get_display_lengths() getattr(tree, ctx.action)() # Evaluate errors and exit with appropriated return code if ctx.scan_errors > 0: sys.exit(ctx.scan_errors)