Source code for esgprep.mapfile.main

# -*- coding: utf-8 -*-

"""
    :platform: Unix
    :synopsis: Generates ESGF mapfiles upon a local ESGF node or not.

"""

import itertools
import traceback
from multiprocessing import Pool

from ESGConfigParser import interpolate, MissingPatternKey, BadInterpolation, InterpolationDepthError
from lockfile import LockFile

from constants import *
from context import ProcessingContext
from custom_exceptions import *
from esgprep.utils.custom_print import *
from esgprep.utils.misc import evaluate, remove, get_checksum, ProcessContext
from esgprep.utils.output_control import OutputControl
from handler import File, Dataset


[docs]def get_output_mapfile(outdir, attributes, mapfile_name, dataset_id, dataset_version, mapfile_drs=None, basename=False): """ Builds the mapfile full path depending on: * the --mapfile name using tokens, * an optional mapfile tree declared in configuration file with ``mapfile_drs``, * the --outdir output directory. :param str outdir: The output directory (default is current working directory) :param dict attributes: The facets values deduces from file full path :param str mapfile_name: An optional mapfile name from the command-line :param str dataset_id: The dataset id :param str dataset_version: The dataset version :param str mapfile_drs: The optional mapfile tree :param boolean basename: True to only get mapfile name without root directory :returns: The mapfile full path :rtype: *str* """ # Deduce output directory from --outdir and 'mapfile_drs' if not basename: if mapfile_drs: try: outdir = os.path.join(outdir, interpolate(mapfile_drs, attributes)) except (BadInterpolation, InterpolationDepthError): raise MissingPatternKey(attributes.keys(), mapfile_drs) else: outdir = os.path.realpath(outdir) # Create output directory if not exists, catch OSError instead try: os.makedirs(outdir) except OSError: pass # Deduce mapfile name from --mapfile argument if re.compile(r'{dataset_id}').search(mapfile_name): mapfile_name = re.sub(r'{dataset_id}', dataset_id, mapfile_name) if re.compile(r'{version}').search(mapfile_name): if dataset_version: mapfile_name = re.sub(r'{version}', dataset_version, mapfile_name) else: mapfile_name = re.sub(r'\.{version}', '', mapfile_name) if re.compile(r'{date}').search(mapfile_name): mapfile_name = re.sub(r'{date}', datetime.now().strftime("%Y%d%m"), mapfile_name) if re.compile(r'{job_id}').search(mapfile_name): mapfile_name = re.sub(r'{job_id}', str(os.getpid()), mapfile_name) # Add a "working extension" pending for the end of process if basename: return mapfile_name + WORKING_EXTENSION else: return os.path.join(outdir, mapfile_name) + WORKING_EXTENSION
[docs]def mapfile_entry(dataset_id, dataset_version, ffp, size, optional_attrs): """ Builds the mapfile entry corresponding to a processed file. :param str dataset_id: The dataset id :param str dataset_version: The dataset version :param str ffp: The file full path :param str size: The file size :param dict optional_attrs: Optional attributes to append to mapfile lines :returns: The mapfile line/entry :rtype: *str* """ line = [dataset_id] # Add version number to dataset identifier if --no-version flag is disabled if dataset_version: line = ['{}#{}'.format(dataset_id, dataset_version[1:])] line.append(ffp) line.append(str(size)) for k, v in optional_attrs.items(): if v: line.append('{}={}'.format(k, v)) return ' | '.join(line) + '\n'
[docs]def write(outfile, entry): """ Inserts a mapfile entry. It generates a lockfile to avoid that several threads write on the same file at the same time. A LockFile is acquired and released after writing. Acquiring LockFile is timeouted if it's locked by other thread. Each process adds one line to the appropriate mapfile :param str outfile: The output mapfile full path :param str entry: The mapfile entry to write """ lock = LockFile(outfile) with lock: with open(outfile, 'a+') as mapfile: mapfile.write(entry)
[docs]def process(source): """ File process that: * Handles file, * Harvests directory attributes, * Check DRS attributes against CV, * Builds dataset ID, * Retrieves file size, * Does checksums, * Deduces mapfile name, * Writes the corresponding mapfile entry. Any error leads to skip the file. It does not stop the process. :param str source: The source to process could be a path or a dataset ID :returns: The output mapfile full path :rtype: *str* """ # Get process content from process global env assert 'pctx' in globals().keys() pctx = globals()['pctx'] # Block to avoid program stop if a thread fails try: if pctx.source_type == 'file': # Instantiate source handle as file sh = File(source) else: # Instantiate source handler as dataset sh = Dataset(source) # Matching between directory_format and file full path sh.load_attributes(pattern=pctx.pattern) # Deduce dataset_id dataset_id = pctx.dataset_name if not pctx.dataset_name: sh.check_facets(facets=pctx.facets, config=pctx.cfg) dataset_id = sh.get_dataset_id(pctx.cfg.get('dataset_id', raw=True)) # Ensure that the first facet is ALWAYS the same as the called project section (case insensitive) if not dataset_id.lower().startswith(pctx.project.lower()): raise InconsistentDatasetID(pctx.project, dataset_id.lower()) # Deduce dataset_version dataset_version = sh.get_dataset_version(pctx.no_version) # Build mapfile name depending on the --mapfile flag and appropriate tokens outfile = get_output_mapfile(outdir=pctx.outdir, attributes=sh.attributes, mapfile_name=pctx.mapfile_name, dataset_id=dataset_id, dataset_version=dataset_version, mapfile_drs=pctx.mapfile_drs, basename=pctx.basename) # Dry-run: don't write mapfile to only show their paths if pctx.action == 'make': # Generate the corresponding mapfile entry/line optional_attrs = dict() optional_attrs['mod_time'] = sh.mtime if pctx.no_checksum: optional_attrs['checksum'] = get_checksum(sh.source, pctx.checksum_type, pctx.checksums_from) optional_attrs['checksum_type'] = pctx.checksum_type.upper() optional_attrs['dataset_tech_notes'] = pctx.notes_url optional_attrs['dataset_tech_notes_title'] = pctx.notes_title line = mapfile_entry(dataset_id=dataset_id, dataset_version=dataset_version, ffp=source, size=sh.size, optional_attrs=optional_attrs) write(outfile, line) msg = TAGS.SUCCESS msg += '{}'.format(os.path.splitext(os.path.basename(outfile))[0]) msg += ' <-- ' + COLORS.HEADER(source) with pctx.lock: Print.info(msg) # Return mapfile name return outfile # Catch any exception into error log instead of stop the run except KeyboardInterrupt: raise except Exception: exc = traceback.format_exc().splitlines() msg = TAGS.SKIP + COLORS.HEADER(source) + '\n' msg += '\n'.join(exc) with pctx.lock: Print.exception(msg, buffer=True) return None finally: with pctx.lock: pctx.progress.value += 1 percentage = int(pctx.progress.value * 100 / pctx.nbsources) msg = COLORS.OKBLUE('\rMapfile(s) generation: ') msg += '{}% | {}/{} {}'.format(percentage, pctx.progress.value, pctx.nbsources, SOURCE_TYPE[pctx.source_type]) Print.progress(msg)
[docs]def initializer(keys, values): """ Initialize process context by setting particular variables as global variables. :param list keys: Argument name list :param list values: Argument value list """ assert len(keys) == len(values) global pctx pctx = ProcessContext({key: values[i] for i, key in enumerate(keys)})
[docs]def run(args): """ Main process that: * Instantiates processing context, * Parallelizes file processing with threads pools, * Copies mapfile(s) to the output directory, * Evaluate exit status. :param ArgumentParser args: Command-line arguments parser """ # Deal with 'quiet' option separately. If set, turn off all output # before creating ProcessingContext, and turn it on only when needed quiet = args.quiet if hasattr(args, 'quiet') else False if quiet: output_control = OutputControl() output_control.stdout_off() # Instantiate processing context with ProcessingContext(args) as ctx: # Init process context cctx = {name: getattr(ctx, name) for name in PROCESS_VARS} # Init progress bar if ctx.use_pool: # Init processes pool pool = Pool(processes=ctx.processes, initializer=initializer, initargs=(cctx.keys(), cctx.values())) processes = pool.imap(process, ctx.sources) else: initializer(cctx.keys(), cctx.values()) processes = itertools.imap(process, ctx.sources) # Process supplied sources results = [x for x in processes] # Close pool of workers if exists if 'pool' in locals().keys(): locals()['pool'].close() locals()['pool'].join() Print.progress('\n') # Flush buffer Print.flush() # Get number of files scanned (excluding errors/skipped files) ctx.scan_data = len(filter(None, results)) # Get number of scan errors ctx.scan_errors = results.count(None) # Get number of generated mapfiles ctx.nbmap = len(filter(None, set(results))) # Evaluates the scan results to finalize mapfiles writing if evaluate(results): for mapfile in filter(None, set(results)): # Remove mapfile working extension if ctx.action == 'show': # Print mapfiles to be generated result = remove(WORKING_EXTENSION, mapfile) if quiet: output_control.stdout_on() print result output_control.stdout_off() else: Print.result(result) elif ctx.action == 'make': # A final mapfile is silently overwritten if already exists os.rename(mapfile, remove(WORKING_EXTENSION, mapfile)) # Evaluate errors and exit with appropriated return code if ctx.scan_errors > 0: sys.exit(ctx.scan_errors)