Source code for esgprep.utils.collectors

# -*- coding: utf-8 -*-

"""
    :platform: Unix
    :synopsis: Useful functions to collect data from directories.

"""

import os
import re
import sys
from uuid import uuid4 as uuid

from esgprep.utils.custom_exceptions import NoFileFound
from esgprep.utils.misc import match, remove


[docs]class Collecting: """ Spinner pending data collection. """ STATES = ('/', '-', '\\', '|') step = 0 def __init__(self, spinner): self.spinner = spinner self.next()
[docs] def next(self): """ Print collector spinner """ if self.spinner: sys.stdout.write('\rCollecting data... {}'.format(Collecting.STATES[Collecting.step % 4])) sys.stdout.flush() Collecting.step += 1
[docs]class Collector(object): """ Base collector class to yield regular NetCDF files. :param list sources: The list of sources to parse :returns: The data collector :rtype: *iter* """ def __init__(self, sources, spinner=True): self.spinner = spinner self.sources = sources self.FileFilter = FilterCollection() self.PathFilter = FilterCollection() assert isinstance(self.sources, list) def __iter__(self): for source in self.sources: for root, _, filenames in os.walk(source, followlinks=True): # Apply path filters only on recursion # Source path can include hidden directories if self.PathFilter(root.split(source)[1]): for filename in sorted(filenames): ffp = os.path.join(root, filename) if os.path.isfile(ffp) and self.FileFilter(filename): yield ffp def __len__(self): """ Returns collector length with animation. :returns: The number of items in the collector. :rtype: *int* """ progress = Collecting(self.spinner) try: s = 0 for _ in self.__iter__(): progress.next() s += 1 if self.spinner: sys.stdout.write('\r\033[K') sys.stdout.flush() except StopIteration: raise NoFileFound(self.sources) return s
[docs]class PathCollector(Collector): """ Collector class to yield files from a list of directories to parse. :param str dir_filter: A regular expression to exclude directories from the collection """ def __init__(self, *args, **kwargs): super(PathCollector, self).__init__(*args, **kwargs) def __iter__(self): """ Yields files full path according to filters on path and filename. :returns: The collected file full paths :rtype: *iter* """ for source in self.sources: for root, _, filenames in os.walk(source, followlinks=True): if self.PathFilter(root.split(source)[1]): for filename in sorted(filenames): ffp = os.path.join(root, filename) if os.path.isfile(ffp) and self.FileFilter(filename): yield ffp
[docs]class VersionedPathCollector(PathCollector): """ Collector class to yield files from a list of versioned directories to parse. :param str dir_format: The regular expression of the directory format """ def __init__(self, project, dir_format, *args, **kwargs): super(VersionedPathCollector, self).__init__(*args, **kwargs) self.project = project self.format = dir_format self.default = False def __iter__(self): """ Yields files full path according to filters on path and filename. :returns: The collected file full paths :rtype: *iter* """ for source in self.sources: # Find if the version among path if exists source_version = self.version_finder(directory=source) if source_version: # Path version takes priority on command-line flags and default behavior # Set default behavior to false self.default = False # And overwrite the version filter self.PathFilter.add(name='version_filter', regex='/{}'.format(source_version)) for root, _, filenames in os.walk(source, followlinks=True): for filename in sorted(filenames): ffp = os.path.join(root, filename) path_version = self.version_finder(directory=root) # If no version filter, and path version exists, set default behavior if path_version and self.default: # Find latest version path_versions = [v for v in os.listdir(ffp.split(path_version)[0]) if re.compile(r'^v[\d]+$').search(v)] latest_version = sorted(path_versions)[-1] # Pick up the latest version among encountered versions self.PathFilter.add(name='version_filter', regex='/{}'.format(latest_version)) if self.PathFilter(root): # if self.PathFilter(root.split(source)[1]): # Dereference latest symlink (only) in the end if path_version == 'latest': # Keep parentheses in pattern to get "latest" part of the split list target = os.path.realpath(os.path.join(*re.split(r'/(latest)/', ffp)[:-1])) ffp = os.path.join(target, *re.split(r'/(latest)/', ffp)[-1:]) if os.path.isfile(ffp) and self.FileFilter(filename): yield ffp
[docs] def version_finder(self, directory): """ Returns the version number find into a DRS path :param str directory: The directory to parse :returns: The version :rtype: *str* """ # Replace project regex by its expected lower-cased value # This is to get an anchor in the regex # This ensure to capture the right version group in the directory format if exists in the directory input regex = re.compile(self.format.replace('/(?P<project>[\w.-]+)/', '/{}/'.format(self.project.lower()))) version = None # Test directory_format regex without <filename> part while 'version' in regex.groupindex.keys(): if regex.search(directory.lower()): # If version facet found return its value version = regex.search(directory.lower()).groupdict()['version'] break else: # Walk backward the regex to find the version facet if exists regex = re.compile('/'.join(regex.pattern.split('/')[:-1])) return version
[docs]class DatasetCollector(Collector): """ Collector class to yield datasets from a list of files to read. """ def __init__(self, versioned=True, *args, **kwargs): super(DatasetCollector, self).__init__(*args, **kwargs) self.versioned = versioned def __iter__(self): """ Yields datasets to process from a text file. Each line may contain the dataset with optional appended ``.v<version>`` or ``#<version>`, and only the part without the version is returned. :returns: The dataset ID without the version :rtype: *iter* """ for source in self.sources: if self.versioned: yield source else: yield remove('((\.v|#)[0-9]+)?\s*$', source)
[docs]class FilterCollection(object): """ Regex dictionary with a call method to evaluate a string against several regular expressions. The dictionary values are 2-tuples with the regular expression as a string and a boolean indicating to match (i.e., include) or non-match (i.e., exclude) the corresponding expression. """ FILTER_TYPES = (str, re._pattern_type) def __init__(self): self.filters = dict()
[docs] def add(self, name=None, regex='*', inclusive=True): """Add new filter""" if not name: name = str(uuid()) assert isinstance(regex, self.FILTER_TYPES) assert isinstance(inclusive, bool) self.filters[name] = (regex, inclusive)
def __call__(self, string): return all([match(regex, string, inclusive=inclusive) for regex, inclusive in self.filters.values()])