# -*- coding: utf-8 -*-
:platform: Unix
:synopsis: Useful functions to use with this package.
import hashlib
import pickle
from uuid import UUID
from netCDF4 import Dataset
from custom_print import *
from esgprep.drs.constants import PID_PREFIXES
[docs]class ProcessContext(object):
Encapsulates the processing context/information for child process.
:param dict args: Dictionary of argument to pass to child process
:returns: The processing context
:rtype: *ProcessContext*
def __init__(self, args):
assert isinstance(args, dict)
for key, value in args.items():
setattr(self, key, value)
[docs]class ncopen(object):
Properly opens a netCDF file
:param str path: The netCDF file full path
:returns: The netCDF dataset object
:rtype: *netCDF4.Dataset*
def __init__(self, path, mode='r'):
self.path = path
self.mode = mode
self.nc = None
def __enter__(self):
self.nc = Dataset(self.path, self.mode)
except IOError:
raise InvalidNetCDFFile(self.path)
return self.nc
def __exit__(self, *exc):
[docs]def remove(pattern, string):
Removes a substring catched by a regular expression.
:param str pattern: The regular expression to catch
:param str string: The string to test
:returns: The string without the catched substring
:rtype: *str*
return re.compile(pattern).sub("", string)
[docs]def match(pattern, string, inclusive=True):
Validates a string against a regular expression.
Only match at the beginning of the string.
Default is to match inclusive regex.
:param str pattern: The regular expression to match
:param str string: The string to test
:param boolean inclusive: False if negative matching (i.e., exclude the regex)
:returns: True if it matches
:rtype: *boolean*
# Assert inclusive and exclusive flag are mutually exclusive
if inclusive:
return True if re.search(pattern, string) else False
return True if not re.search(pattern, string) else False
[docs]def load(path):
Loads data from Pickle file.
:param str path: The Pickle file path
:returns: The Pickle file content
:rtype: *object*
with open(path, 'rb') as f:
while True:
if f.read(1) == b'':
f.seek(-1, 1)
yield pickle.load(f)
[docs]def store(path, data):
Stores data into a Pickle file.
:param str path: The Pickle file path
:param *list* data: A list of data objects to store
with open(path, 'wb') as f:
for i in range(len(data)):
pickle.dump(data[i], f)
[docs]def evaluate(results):
Evaluates a list depending on absence/presence of None values.
:param list results: The list to evaluate
:returns: True if no blocking errors
:rtype: *boolean*
if all(results) and any(results):
# The list contains only True value = no errors
return True
elif not all(results) and any(results):
# The list contains some None values = some errors occurred
return True
return False
[docs]def checksum(ffp, checksum_type, include_filename=False, human_readable=True):
Does the checksum by the Shell avoiding Python memory limits.
:param str ffp: The file full path
:param str checksum_type: Checksum type
:param boolean human_readable: True to return a human readable digested message
:param boolean include_filename: True to include filename in hash calculation
:returns: The checksum
:rtype: *str*
:raises Error: If the checksum fails
hash_algo = getattr(hashlib, checksum_type)()
with open(ffp, 'rb') as f:
blocksize = os.stat(ffp).st_blksize
for block in iter(lambda: f.read(blocksize), b''):
if include_filename:
if human_readable:
return hash_algo.hexdigest()
return hash_algo.digest()
except AttributeError:
raise InvalidChecksumType(checksum_type)
except KeyboardInterrupt:
except Exception:
raise ChecksumFail(ffp, checksum_type)
[docs]def get_checksum_pattern(checksum_type):
Build the checksum pattern depending on the checksum type.
:param str checksum_type: The checksum type
:return: The checksum pattern
:rtype: *re.Object*
hash_algo = getattr(hashlib, checksum_type)()
checksum_length = len(hash_algo.hexdigest())
return re.compile('^[0-9a-f]{{{}}}$'.format(checksum_length))
[docs]def get_tracking_id(ffp, project):
Get and validate tracking_id/PID string from netCDF global attributes of file
:param str ffp: The file full path
:param str project: The project name
:returns: THe tracking_id string
with ncopen(ffp) as f:
if 'tracking_id' in f.ncattrs():
id = f.getncattr('tracking_id')
prefix, uid = id.split('/')
assert prefix == PID_PREFIXES[project]
except ValueError:
uid = id
assert project not in PID_PREFIXES.keys()
assert is_uuid(uid)
return id
return None
[docs]def is_uuid(uuid_string, version=4):
Returns True is validated string is a UUID.
:param str uuid_string: The string to validate
:param int version: The UUID version to use, default is 4
:returns: True if uuid_string is a valid uuid
:rtype: *boolean*
uid = UUID(uuid_string, version=version)
return uid.hex == uuid_string.replace('-', '')
except ValueError:
return False
[docs]def load_checksums(checksum_file):
Convert checksums file input as dictionary where (key: value) pairs respectively
are the file path and its checksum.
:param FileObject checksum_file: The submitted checksum file
:returns: The loaded checksums
:rtype: *dict*
checksums = dict()
for checksum, path in [entry.split() for entry in checksum_file.read().splitlines()]:
path = os.path.abspath(os.path.normpath(path))
checksums[path] = checksum
return checksums
[docs]def get_checksum(ffp, checksum_type='sha256', checksums_from_file=None):
Get file checksum.
Allows to submit a list of checksums in a dictionary way {file: checksum}, to be used by --checksums-from flag.
:param str checksum_type: Checksum type
:param dict checksums_from_file: Checksums from file
:returns: The checksum
:rtype: *str*
:raises Error: If the checksum fails
if checksums_from_file:
if ffp in checksums_from_file:
if re.match(get_checksum_pattern(checksum_type), checksums_from_file[ffp]):
return checksums_from_file[ffp]
return checksum(ffp, checksum_type)