Source code for simdb.validation.file.ids_validator

from .validator_base import FileValidatorBase
from ...uri import URI

from pathlib import Path

[docs] class IdsValidator(FileValidatorBase):
[docs] def configure(self, arguments: dict): from imas_validator.validate_options import ValidateOptions from imas_validator.validate_options import RuleFilter # needs to be able to configure from both the [file_validation] server configuration section and the dictionary # returned from options() list_of_rulesets = [] list_of_extra_rulesets = [] list_of_filter_idses = [] list_of_filter_names = [] apply_generic = True bundled_ruleset = True if "rulesets" in arguments: rule_files = arguments.get("rulesets") if isinstance(rule_files, str): # rulesets will be a comma separated string of file names when read from server config list_of_rulesets = rule_files.strip('"').split(",") if "extra_rule_dirs" in arguments: extra_rule_paths = arguments.get("extra_rule_dirs") if isinstance(extra_rule_paths, str): list_of_extra_rulesets = [ Path(ruleset_path) for ruleset_path in extra_rule_paths.strip('"').split(",") ] ### Define logic for rule_filter if ("rule_filter_name" in arguments and isinstance(arguments.get("rule_filter_name"), str)): list_of_filter_names = arguments.get("rule_filter_name").strip('"').split(",") if ("rule_filter_ids" in arguments and isinstance(arguments.get("rule_filter_ids"), str)): list_of_filter_idses = arguments.get("rule_filter_ids").strip('"').split(",") # Check if option apply_generic is used and wether it a bool if ("apply_generic" in arguments and isinstance(arguments.get("apply_generic"), bool)): apply_generic = arguments.get("apply_generic") # Check if option bundled_ruleset is used and wether it a bool if ("bundled_ruleset" in arguments and isinstance(arguments.get("bundled_ruleset"), bool)): bundled_ruleset = arguments.get("bundled_ruleset") options = ValidateOptions( rulesets = list_of_rulesets, extra_rule_dirs = list_of_extra_rulesets, apply_generic = apply_generic, use_pdb = False, use_bundled_rulesets = bundled_ruleset, rule_filter = RuleFilter(name=list_of_filter_names, ids=list_of_filter_idses), ) return options
[docs] def options(self) -> dict: # return the rules files as base64 encoded strings return { "rule_files": [], }
[docs] def validate_uri(self, uri: URI, validate_options): if uri.scheme != "imas": # Skip non IMAS data return from ..validator import ValidationError from imas_validator.validate.validate import validate from imas_validator.report.validationReportGenerator import ValidationReportGenerator try: backend = uri.query.get("backend") path = uri.query.get("path") validate_uri = f"imas:{backend}?path={path}" validate_output = validate(imas_uri=URI(validate_uri), validate_options=validate_options) validate_result = all([result.success for result in validate_output.results]) report_generator = ValidationReportGenerator(validate_output) if validate_result == False: raise ValidationError(f"Validation of following URI: [{validate_uri}], failed with following report: \n{report_generator.txt}") except Exception as err: raise ValidationError(f"validate_uri exception [{err}]")