Source code for simdb.validation.validator

import cerberus
import yaml
import re
from pathlib import Path
from typing import Dict, List, Optional

from ..database.models.simulation import Simulation
from ..config import Config, ConfigError


[docs] class TestParameters: pass
[docs] class LoadError(Exception): pass
[docs] class ValidationError(Exception): pass
[docs] class CustomValidator(cerberus.Validator): import numpy as np types_mapping = cerberus.Validator.types_mapping.copy() types_mapping["numpy"] = cerberus.TypeDefinition("numpy", (np.ndarray,), ()) def _validate_exists(self, check_exists, field, value): """The rule's arguments are validated against this schema: {'type': ['string'], 'check_with': 'type'}""" if check_exists and not Path(value).exists(): self._error(field, "File must exist") def _validate_checksum(self, check_checksum, field, value): """The rule's arguments are validated against this schema: {'type': ['string'], 'check_with': 'type'}""" if check_checksum and False: self._error(field, "File checksum must be valid") def _validate_min_value(self, min_value, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ import numpy as np if not isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") self._error(field, "Value is not a numpy array") if min_value is not None and value.min() < min_value: self._error(field, "Minimum %s less than %s" % (value.min(), min_value)) def _validate_max_value(self, max_value, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ import numpy as np if not isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") self._error(field, "Value is not a numpy array") if max_value is not None and value.max() > max_value: self._error(field, "Maximum %s greater than %s" % (value.max(), max_value)) def _compare(self, comparison, field, value, comparator: str, message: str): import numpy as np if comparison is None: return if isinstance(value, np.ndarray): value = value[~np.isnan(value)] if value.size == 0: self._error(field, "Values in numpy array are NaN or empty") if not getattr(value, comparator)(comparison).all(): self._error(field, "Values are not %s %s" % (message, comparison)) elif isinstance(value, float): if not getattr(value, comparator)(comparison): self._error(field, "Value is not %s %s" % (message, comparison)) else: self._error(field, "Value is not a numpy array or a float") def _validate_gt(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__gt__", "greater than") def _validate_ge(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__ge__", "greater than or equal to") def _validate_lt(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__lt__", "less than") def _validate_le(self, comparison, field, value): """The rule's arguments are validated against this schema: {'type': 'float'} """ self._compare(comparison, field, value, "__le__", "less than or equal to") @classmethod def _normalize_coerce_int(cls, value): return int(value) @classmethod def _normalize_coerce_float(cls, value): return float(value) @classmethod def _normalize_coerce_numpy(cls, value): import numpy as np if isinstance(value, np.ndarray): return value elif isinstance(value, str): return np.fromstring(value[1:-1], sep=" ") else: return np.array(value)
def _load_schema(path: Path): if not path.exists(): return [{}] # load schema from file with open(path, "r") as file: try: schema = yaml.load(file, Loader=yaml.SafeLoader) return schema except yaml.YAMLError: raise LoadError("Failed to read validation schema from file %s" % file)
[docs] class Validator: _validator: cerberus.Validator _section_re = re.compile(r"\S+ \"(\S+)=(\S+)\"")
[docs] @classmethod def validation_schemas( cls, config: Config, simulation: Optional[Simulation], path=None ) -> List[Dict]: root = Path( str( config.get_option( "validation.path", default=str(config.config_directory) ) ) ) paths = [] if path: paths.append(path) else: paths.append(root / "validation-schema.yaml") # Look for config sections like [validation "key=value"] and see if the simulation has metadata matching the # given test. If matching, adding the "path" in this section to the paths. if simulation is not None: sections = [ sec for sec in config.sections() if sec.startswith("validation") ] for section in sections: if section == "validation": continue match = cls._section_re.match(section) if match: key = match.group(1) value = match.group(2) for meta in simulation.find_meta(key): if meta.value == value: path = config.get_section(section).get("path", default="") if path: paths.append(path) elif section != "validation": raise ConfigError(f"Invalid validation section {section}") schemas = [] for path in paths: schemas.append(_load_schema(path)) return schemas
def __init__(self, schema: Dict): try: self._validator = CustomValidator(schema) self._validator.allow_unknown = True except cerberus.SchemaError: raise LoadError("Failed to parse validation schema")
[docs] def validate(self, sim: Simulation) -> None: # convert sim to dictionary data = sim.meta_dict() # data = sim.data(recurse=True) # validate using cerberus if not self._validator.validate(data): raise ValidationError(self._validator.errors)