Source code for esgprep.checkvocab.main

# -*- coding: utf-8 -*-

"""
    :platform: Unix
    :synopsis: Checks DRS vocabulary against configuration files.

"""

import itertools
import traceback
from multiprocessing import Pool

from ESGConfigParser import split_map_header
from ESGConfigParser.custom_exceptions import ExpressionNotMatch, NoConfigOptions
from fuzzywuzzy.fuzz import partial_ratio
from fuzzywuzzy.process import extractOne

from constants import *
from context import ProcessingContext
from esgprep.utils.custom_print import *
from esgprep.utils.misc import ProcessContext, ncopen


[docs]def process(source): """ process(collector_input) Data process that: * Retrieve facet key, values pairs from file or directory attributes :param str source: The file full path to process or the dataset ID """ # Get process content from process global env assert 'pctx' in globals().keys() pctx = globals()['pctx'] # Block to avoid program stop if a thread fails try: if pctx.directory or pctx.dataset_id or pctx.dataset_list: # Get attributes from directory format or dataset_id format attributes = re.match(pctx.pattern, source).groupdict() else: # Get attributes from NetCDF global attributes attributes = dict() with ncopen(source) as nc: for attr in nc.ncattrs(): attributes[attr] = nc.getncattr(attr) # Get attributes from filename, overwriting existing ones match = re.search(pctx.pattern, source) if not match: raise ExpressionNotMatch(source, pctx.pattern) attributes.update(match.groupdict()) # Get source values from attributes for facet in pctx.facets: if facet in pctx.set_keys.keys(): try: # Rename attribute key attributes[facet] = attributes.pop(pctx.set_keys[facet]) except KeyError: raise NoNetCDFAttribute(pctx.set_keys[facet], source) elif facet in attributes.keys(): # Facet exists in attribute keys pass else: # Find closest NetCDF attributes in terms of partial string comparison key, score = extractOne(facet, attributes.keys(), scorer=partial_ratio) if score >= 80: # Rename attribute key attributes[facet] = attributes.pop(key) Print.debug('Consider "{}" attribute instead of "{}" facet'.format(key, facet)) else: raise NoNetCDFAttribute(pctx.set_keys[facet], source) with pctx.lock: s = pctx.source_values[0] s[facet].add(attributes[facet]) pctx.source_values[0] = s msg = TAGS.SUCCESS + 'Deserialize {}'.format(COLORS.HEADER(source)) with pctx.lock: Print.info(msg) return 1 except KeyboardInterrupt: raise except Exception: exc = traceback.format_exc().splitlines() msg = TAGS.FAIL + COLORS.HEADER(source) + '\n' msg += '\n'.join(exc) with pctx.lock: Print.exception(msg, buffer=True) return 0 finally: with pctx.lock: pctx.progress.value += 1 percentage = int(pctx.progress.value * 100 / pctx.nbsources) msg = COLORS.OKBLUE('\rHarvesting facets values from data: ') msg += '{}% | {}/{} {}'.format(percentage, pctx.progress.value, pctx.nbsources, SOURCE_TYPE[pctx.source_type]) Print.progress(msg)
[docs]def initializer(keys, values): """ Initialize process context by setting particular variables as global variables. :param list keys: Argument name :param list values: Argument value """ assert len(keys) == len(values) global pctx pctx = ProcessContext({key: values[i] for i, key in enumerate(keys)})
[docs]def run(args): """ Main process that: * Instantiates processing context * Parses the configuration files options and values, * Deduces facets and values from directories or dataset lists, * Compares the values of each facet between both, * Print or log the checking. :param ArgumentParser args: The command-line arguments parser """ # Instantiate processing context manager with ProcessingContext(args) as ctx: # Init process context cctx = {name: getattr(ctx, name) for name in PROCESS_VARS} cctx['source_values'][0] = dict((facet, set()) for facet in ctx.facets) if ctx.use_pool: # Init processes pool pool = Pool(processes=ctx.processes, initializer=initializer, initargs=(cctx.keys(), cctx.values())) processes = pool.imap(process, ctx.sources) else: initializer(cctx.keys(), cctx.values()) processes = itertools.imap(process, ctx.sources) # Process supplied sources results = [x for x in processes] # Close pool of workers if exists if 'pool' in locals().keys(): locals()['pool'].close() locals()['pool'].join() Print.progress('\n') # Flush buffer Print.flush() ctx.scan_data = sum(results) ctx.scan_errors = results.count(0) # Get source values source_values = cctx['source_values'][0] # Get facets values declared in configuration file config_values = {} progress = 0 nfacets = len(ctx.facets) for facet in ctx.facets: try: try: config_values[facet], _ = ctx.cfg.get_options(facet) if not isinstance(config_values[facet], type(re.compile(""))): config_values[facet] = set(config_values[facet]) except NoConfigOptions: for option in ctx.cfg.get_options_from_list('maps'): maptable = ctx.cfg.get(option) from_keys, _ = split_map_header(maptable.split('\n')[0]) if facet in from_keys: config_values[facet] = set(ctx.cfg.get_options_from_map(option, facet)) finally: if facet not in config_values.keys(): raise NoConfigOptions(facet) msg = TAGS.SUCCESS msg += 'Get values from {} for {}'.format(COLORS.HEADER(ctx.cfg.file), COLORS.HEADER(facet)) Print.info(msg) except KeyboardInterrupt: raise except Exception: exc = traceback.format_exc().splitlines() msg = TAGS.FAIL msg += 'Get values from {} for {}'.format(COLORS.HEADER(ctx.cfg.file), COLORS.HEADER(facet)) + '\n' msg += '\n'.join(exc) Print.exception(msg, buffer=True) ctx.scan_errors += 1 finally: progress += 1 percentage = int(progress * 100 / nfacets) msg = COLORS.OKBLUE('\rCollecting facet values from INI file(s): ') msg += '{}% | {}/{} facet(s)'.format(percentage, progress, nfacets) Print.progress(msg) Print.progress('\n') # Flush buffer Print.flush() # Compare values from sources against values from configuration file Print.result(''.center(WIDTH, '=')) Print.result('{} :: {}'.format('Facet'.ljust(FACET_WIDTH), 'Status'.rjust(STATUS_WIDTH))) Print.result(''.center(WIDTH, '-')) for facet in ctx.facets: if isinstance(config_values[facet], type(re.compile(""))): config_values[facet] = set([v for v in source_values[facet] if config_values[facet].search(v)]) if not source_values[facet]: line = '{} :: '.format(facet.ljust(FACET_WIDTH)) line += COLORS.WARNING(STATUS[2].rjust(STATUS_WIDTH)) Print.result(line) elif not config_values[facet]: line = '{} :: '.format(facet.ljust(FACET_WIDTH)) line += COLORS.WARNING(STATUS[3].rjust(STATUS_WIDTH)) Print.result(line) else: undeclared_values = source_values[facet].difference(config_values[facet]) updated_values = source_values[facet].union(config_values[facet]) if undeclared_values: line = '{} :: '.format(facet.ljust(FACET_WIDTH)) line += COLORS.FAIL(STATUS[1].rjust(STATUS_WIDTH)) Print.result(line) _values = ', '.join(sorted(undeclared_values)) msg = COLORS.FAIL(':: UNDECLARED VALUES :: ') msg += COLORS.HEADER(facet) msg += COLOR().bold(' :: {}'.format(_values)) Print.error(msg, buffer=True) _values = ', '.join(sorted(updated_values)) msg = COLORS.SUCCESS(':: UPDATED VALUES :: ') msg += COLORS.HEADER(facet) msg += ' :: {}'.format(_values) Print.error(msg, buffer=True) ctx.any_undeclared = True else: line = '{} :: '.format(facet.ljust(FACET_WIDTH)) line += COLORS.SUCCESS(STATUS[0].rjust(STATUS_WIDTH)) Print.result(line) Print.result(''.center(WIDTH, '=')) # Flush buffer Print.flush() # Evaluate errors and exit with appropriated return code if ctx.scan_errors > 0: sys.exit(ctx.scan_errors) if ctx.any_undeclared: sys.exit(-2)