Source code for simdb.imas.checksum

import hashlib
from pathlib import Path
import struct
import multiprocessing as mp
from typing import cast

from .utils import open_imas, list_idss, imas_files
from ..uri import URI


IGNORED_FIELDS = ("data_dictionary", "access_layer", "access_layer_language")


[docs] class Hash:
[docs] def digest(self) -> bytes: pass
[docs] def update(self, data: bytes): pass
# def walk_imas(imas_obj, check: Hash, path="") -> None: # from imas import imasdef # import numpy as np # for name in (i for i in dir(imas_obj) if not i.startswith("_")): # if name in IGNORED_FIELDS: # continue # attr = getattr(imas_obj, name) # if "numpy.ndarray" in str(type(attr)): # if attr.size != 0: # # if np.isnan(attr).any(): # # print(path, name) # if attr.dtype == np.int32: # attr[np.isnan(attr)] = imasdef.INT_0D # elif attr.dtype == np.float32: # attr[np.isnan(attr)] = imasdef.FLT_0D # elif attr.dtype == np.float64: # attr[np.isnan(attr)] = imasdef.EMPTY_DOUBLE # check.update(attr.tobytes()) # elif isinstance(attr, int): # if attr != imasdef.EMPTY_INT: # check.update(struct.pack("<l", attr)) # elif isinstance(attr, str): # if attr and attr[0] != chr(0): # check.update(attr.encode()) # elif isinstance(attr, float): # if attr != imasdef.EMPTY_FLOAT: # check.update(struct.pack("f", attr)) # elif "__structure" in str(type(attr)): # walk_imas(attr, check, path=f"{path}.{name}") # elif "__structArray" in str(type(attr)): # for i, el in enumerate(attr): # walk_imas(el, check, path=f"{path}.{name}[{i}]") # def ids_checksum(ids) -> Hash: # check = cast(Hash, hashlib.sha256()) # walk_imas(ids, check) # return check def _checksum(q: mp.Queue, uri: URI) -> str: entry = open_imas(uri) idss = list_idss(entry) check = hashlib.sha256() for name in idss: print(f"Checksumming {name}", flush=True) ids = entry.get(name) check.update(ids_checksum(ids).digest()) entry.close() q.put(check.hexdigest())
[docs] def checksum(uri: URI, ids_list: list) -> str: if uri.scheme != "imas": raise ValueError("invalid scheme for imas checksum: %s" % uri.scheme) import hashlib sha1 = hashlib.sha1() if not ids_list: entry = open_imas(uri) ids_list = list_idss(entry) entry.close() for path in imas_files(uri): with open(path, "rb") as file: ids_name = Path(path).name.split(".") if ids_name[1] == "h5": if ids_name[0] != "master" and ids_list is not None and ids_name[0] not in ids_list: continue for chunk in iter(lambda: file.read(4096), b""): sha1.update(chunk) return sha1.hexdigest()