Source code for esgprep._utils.checksum

# -*- coding: utf-8 -*-

"""
.. module:: esgprep._utils.checksum.py
   :platform: Unix
   :synopsis: Checksumming utilities.

.. moduleauthor:: Guillaume Levavasseur <glipsl@ipsl.fr>

"""

import hashlib
import os
import re

from esgprep._exceptions import InvalidChecksumType, ChecksumFail

# Multihash support - implement varint encoding directly to avoid dependency


def _varint_encode(n: int) -> bytes:
    """
    Encode an integer as varint (variable-length integer encoding).

    Args:
        n: The integer to encode

    Returns:
        The varint-encoded bytes
    """
    result = bytearray()
    while n >= 0x80:
        result.append((n & 0x7F) | 0x80)
        n >>= 7
    result.append(n & 0x7F)
    return bytes(result)


# Map of supported algorithms for multihash
MULTIHASH_ALGOS = {
    "sha1": (0x11, "sha1"),
    "sha2-256": (0x12, "sha256"),
    "sha2-512": (0x13, "sha512"),
    "sha3-512": (0x14, "sha3_512"),
    "sha3-256": (0x16, "sha3_256"),
}


[docs] def multihash(data: bytes, algo: str) -> bytes: """ Generate a multihash for the given data using the specified algorithm. Args: data: The data to hash algo: The multihash algorithm name (e.g., "sha2-256") Returns: The multihash as bytes (code + length + digest) Raises: ValueError: If the algorithm is not supported """ if algo not in MULTIHASH_ALGOS: raise ValueError(f"Unsupported multihash algorithm: {algo}") code, hashlib_name = MULTIHASH_ALGOS[algo] h = hashlib.new(hashlib_name) h.update(data) digest = h.digest() code_bytes = _varint_encode(code) length_bytes = _varint_encode(len(digest)) return code_bytes + length_bytes + digest
[docs] def multihash_hex(data: bytes, algo: str) -> str: """ Generate a multihash for the given data and return it as a hex string. Args: data: The data to hash algo: The multihash algorithm name (e.g., "sha2-256") Returns: The multihash as a hexadecimal string """ return multihash(data, algo).hex()
def _varint_decode(data: bytes, offset: int = 0) -> tuple[int, int]: """ Decode a varint from bytes. Args: data: The bytes to decode from offset: Starting position in the bytes Returns: A tuple of (decoded_value, new_offset) """ result = 0 shift = 0 pos = offset while pos < len(data): byte = data[pos] result |= (byte & 0x7F) << shift pos += 1 if (byte & 0x80) == 0: break shift += 7 return result, pos
[docs] def detect_multihash_algo(hash_hex: str) -> str: """ Detect the multihash algorithm from a multihash hex string. Args: hash_hex: The multihash as a hexadecimal string Returns: The algorithm name (e.g., "sha2-256") or None if not a valid multihash """ try: # Convert hex to bytes hash_bytes = bytes.fromhex(hash_hex) # Decode the algorithm code (first varint) code, offset = _varint_decode(hash_bytes, 0) # Find the algorithm by code for algo_name, (algo_code, _) in MULTIHASH_ALGOS.items(): if algo_code == code: return algo_name return None except (ValueError, IndexError): return None
[docs] def is_multihash_algo(checksum_type: str) -> bool: """ Check if a checksum type is a multihash algorithm. Args: checksum_type: The checksum type to check Returns: True if it's a multihash algorithm, False otherwise """ return checksum_type in MULTIHASH_ALGOS
[docs] def checksum(ffp, checksum_type, include_filename=False, human_readable=True): """ Computes a file checksum. Supports both standard hashlib algorithms and multihash algorithms. """ try: # Get file size for progress indication file_size = os.path.getsize(ffp) show_progress = file_size > 1000 * 1024 * 1024 # Show progress for files > 1GB if show_progress: from esgprep._utils.print import Print Print.info( f"Computing {checksum_type} checksum for large file: {os.path.basename(ffp)} ({file_size / (1024 * 1024):.1f} MB)" ) # Check if this is a multihash algorithm if is_multihash_algo(checksum_type): # Handle multihash algorithms if show_progress: # For large files, read in chunks and show progress hash_data = bytearray() bytes_read = 0 chunk_size = 64 * 1024 # 64KB chunks with open(ffp, "rb") as f: while True: chunk = f.read(chunk_size) if not chunk: break hash_data.extend(chunk) bytes_read += len(chunk) # Show progress every 1GB if bytes_read % (1024 * 1024 * 1024) == 0: progress_pct = int((bytes_read / file_size) * 100) Print.info(f" Progress: {progress_pct}%") if show_progress: Print.info(" Converting data to bytes for hash calculation...") data = bytes(hash_data) else: # Read file data normally for smaller files with open(ffp, "rb") as f: data = f.read() # Include filename into the data if requested if include_filename: data += os.path.basename(ffp).encode() # Generate multihash if show_progress: Print.info( f" Generating {checksum_type} multihash (this may take a while for large files)..." ) if human_readable: result = multihash_hex(data, checksum_type) else: result = multihash(data, checksum_type) if show_progress: Print.info(f" Checksum completed: {result[:32]}...") return result else: # Handle standard hashlib algorithms # Get checksum client. hash_algo = getattr(hashlib, checksum_type)() # Checksumming file. with open(ffp, "rb") as f: blocksize = os.stat(ffp).st_blksize bytes_read = 0 for block in iter(lambda: f.read(blocksize), b""): hash_algo.update(block) bytes_read += len(block) # Show progress for large files every 1GB if show_progress and bytes_read % (1024 * 1024 * 1024) == 0: progress_pct = int((bytes_read / file_size) * 100) Print.info(f" Progress: {progress_pct}%") # Include filename into the checksum. if include_filename: hash_algo.update(os.path.basename(ffp).encode()) if show_progress: Print.info(" Checksum completed") # Return human readable checksum. if human_readable: return hash_algo.hexdigest() else: return hash_algo.digest() # Catch checksum type error. except AttributeError: raise InvalidChecksumType(checksum_type) # Catch manual stop error. except KeyboardInterrupt: raise # Catch any other error. except Exception: raise ChecksumFail(ffp, checksum_type)
[docs] def get_checksum_pattern(checksum_type): """ Builds a regular expression describing a checksum pattern. """ # Handle multihash algorithms if is_multihash_algo(checksum_type): # Multihash patterns are variable length hex strings # They start with varint-encoded code and length, followed by the digest # For hex representation, this is quite variable, so we use a more flexible pattern return re.compile(r"^[0-9a-f]+$") else: # Handle standard hashlib algorithms # Get checksum client. hash_algo = getattr(hashlib, checksum_type)() # Get checksum length. checksum_length = len(hash_algo.hexdigest()) # Return corresponding regex. return re.compile(f"^[0-9a-f]{{{checksum_length}}}$")
[docs] def get_checksum(ffp, checksum_type="sha256", checksums=None): """ Global method to get file checksum: 1. By computing the checksum directly. 2. Through a list of checksums in a dictionary way {file: checksum}. """ # Verify checksum dictionary. if checksums: # Verify file in dictionary keys. if ffp in checksums: # Verify checksum pattern. if re.match(get_checksum_pattern(checksum_type), checksums[ffp]): # Return pre-computed checksum. return checksums[ffp] # Return computed checksum. return checksum(ffp, checksum_type)