# import datetime
import re
import os
import urllib
from enum import Enum, auto
from typing import Iterable, Union, Dict, List, Tuple, Optional, TextIO, Type
import glob
from pathlib import Path
import numpy as np
import yaml
from ..uri import URI
[docs]
class InvalidManifest(Exception):
"""Exception to throw when a manifest fails to validate."""
pass
[docs]
class InvalidAlias(InvalidManifest):
"""Exception to throw when the alias specified in the manifest is invalid."""
pass
def _expand_path(path: Path, base_path: Path) -> Path:
os.environ["MANIFEST_DIR"] = str(base_path)
path = Path(os.path.expanduser(os.path.expandvars(path)))
path = Path(str(path).replace("//", "/"))
if not path.is_absolute():
if not base_path.is_absolute():
raise ValueError("base_path must be absolute")
return base_path / path
else:
# Expand any /./ and /../ in absolute path
path = os.path.abspath(path)
return path
def _to_uri(uri_str: str, base_path: Path) -> Tuple["DataObject.Type", "URI"]:
from ..uri import URI
uri = URI(uri_str)
if uri.authority:
raise InvalidManifest(f"invalid uri: {uri_str} - path must be absolute")
if uri.scheme is None:
raise InvalidManifest(f"invalid uri: {uri_str} - no scheme provided")
if uri.scheme == "file":
uri = URI(uri, path=_expand_path(uri.path, base_path))
return DataObject.Type.FILE, uri
if uri.scheme == "imas":
if "path" not in uri.query and not all(
("shot" in uri.query, "run" in uri.query, "database" in uri.query)
):
raise InvalidManifest(
f"invalid uri: {uri_str} - no path or (shot, run, database) provided in IMAS uri"
)
return DataObject.Type.IMAS, uri
if uri.scheme == "uda":
return DataObject.Type.UDA, uri
if uri.scheme == "simdb":
return DataObject.Type.UUID, uri
raise InvalidManifest(f"invalid uri: {uri_str}")
[docs]
class DataObject:
"""
Simulation data object, either a file, an IDS or an already registered object identifiable by the UUID.
PATH: file:///<PATH>
IMAS: imas:<BACKEND>?path=<PATH>
"""
[docs]
class Type(Enum):
UNKNOWN = auto()
UUID = auto()
FILE = auto()
IMAS = auto()
UDA = auto()
type: Type = Type.UNKNOWN
uri: Union[URI, None] = None
def __init__(self, base_path: Path, uri: str) -> None:
(self.type, self.uri) = _to_uri(uri, base_path)
if self.type == DataObject.Type.UNKNOWN or not self.uri:
raise InvalidManifest("invalid input")
@property
def name(self) -> str:
return str(self.uri)
[docs]
class Source(DataObject):
"""
Simulation data inputs.
"""
pass
[docs]
class Sink(DataObject):
"""
Simulation data outputs.
"""
pass
[docs]
class ManifestValidator:
"""
Base class for validation of manifests.
"""
version: int
def __init__(self, version: int):
self.version = version
[docs]
def validate(self, values: Union[List, Dict]) -> None:
pass
[docs]
class ListValuesValidator(ManifestValidator):
"""
Class for the validation of list items in the manifest.
"""
def __init__(
self,
version: int,
section_name: str = NotImplemented,
expected_keys: Iterable = NotImplemented,
required_keys: Iterable = NotImplemented,
) -> None:
self.section_name: str = section_name
self.expected_keys: Iterable = expected_keys
self.required_keys: Iterable = required_keys
super().__init__(version)
[docs]
def validate(self, values: Union[list, dict]) -> None:
if values is None:
return
if isinstance(values, dict):
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} should be provided as a list"
)
for item in values:
if not isinstance(item, dict) or len(item) > 1:
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} values should be a name value pair"
)
name = next(iter(item))
# if isinstance(self.expected_keys, tuple) and name not in self.expected_keys:
# raise InvalidManifest(
# f"unknown {self.section_name} entry in manifest: {name}"
# )
if isinstance(self.required_keys, tuple) and name not in self.required_keys:
raise InvalidManifest(
f"required {self.section_name} key not found in manifest: {name}"
)
[docs]
class DictValuesValidator(ManifestValidator):
"""
Class for the validation of dictionary items in the manifest.
"""
def __init__(
self,
version: int,
section_name: str = NotImplemented,
expected_keys: Iterable = NotImplemented,
required_keys: Iterable = NotImplemented,
) -> None:
self.section_name: str = section_name
self.expected_keys: Iterable = expected_keys
self.required_keys: Iterable = required_keys
super().__init__(version)
[docs]
def validate(self, values: Union[list, dict]) -> None:
if isinstance(values, list):
raise InvalidManifest(
f"badly formatted manifest - {self.section_name} should be provided as a dict"
)
for key in values.keys():
if key not in self.expected_keys:
if re.match(r"code[0-9]+", key):
for code_key in values[key]:
if code_key not in ("name", "repo", "commit"):
raise InvalidManifest(
f"unknown {self.section_name}.{key} key in manifest: {code_key}"
)
else:
raise InvalidManifest(
f"unknown {self.section_name} key in manifest: {key}"
)
for key in self.required_keys:
if isinstance(self.expected_keys, list) and key not in values.keys():
raise InvalidManifest(
f"required {self.section_name} key not found in manifest: {key}"
)
[docs]
class DataObjectValidator(ListValuesValidator):
"""
Validator for the manifest data objects (inputs or outputs).
"""
def __init__(self, version: int, section_name: str) -> None:
if version == 0:
expected_keys = ("uuid", "path", "imas", "uda")
elif version > 0:
expected_keys = ("uri",)
else:
raise KeyError("Invalid version.")
super().__init__(version, section_name, expected_keys)
[docs]
def validate(self, values: Union[list, dict]) -> None:
from ..uri import URI
super().validate(values)
if values is None:
return
seen_uris = set()
for value in values:
if self.version > 0:
uri = URI(value["uri"])
if uri.scheme not in ("uda", "file", "imas"):
raise InvalidManifest(f"unknown uri scheme: {uri.scheme}")
if str(uri) in seen_uris:
raise InvalidManifest(f"Duplicate URI found in {self.section_name}: {uri}")
seen_uris.add(str(uri))
[docs]
class OutputsValidator(DataObjectValidator):
"""
Validator for the manifest outputs list.
"""
def __init__(self, version):
super().__init__(version, "outputs")
[docs]
class VersionValidator(ManifestValidator):
"""
Validator for manifest version.
"""
def __init__(self, version: int):
super().__init__(version)
[docs]
def validate(self, value):
if not isinstance(value, int):
raise InvalidManifest("version must be an integer")
[docs]
class AliasValidator(ManifestValidator):
"""
Validator for simulation alias.
"""
def __init__(self, version: int):
super().__init__(version)
[docs]
def validate(self, value):
if not isinstance(value, str):
raise InvalidManifest("alias must be a string")
if urllib.parse.quote(value) != value:
raise InvalidAlias(f"illegal characters in alias: {value}")
# class CreationDateValidator(ManifestValidator):
# """
# Validator for simulation CreationDate.
# """
# def __init__(self, version: int):
# super().__init__(version)
# def validate(self, value):
# if not isinstance(value, str):
# raise InvalidManifest("CreationDate must be a string")
# # Validate the datetime format
# try:
# datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
# except ValueError:
# raise InvalidManifest(f"Invalid datetime format for CreationDate: {value}. Expected format: YYYY-MM-DD HH:MM:SS")
[docs]
class DescriptionValidator(ManifestValidator):
"""
Validator for simulation description.
"""
pass
[docs]
class ResponsibleValidator(ManifestValidator):
"""
Validator for simulation Responsible.
"""
pass
[docs]
def ndarray_constructor(loader: yaml.SafeLoader, node: yaml.nodes.MappingNode) -> np.ndarray:
mapping = loader.construct_mapping(node, deep=True)
return np.array(mapping['data'], mapping.get('dtype', None))
[docs]
def get_loader() -> Type[yaml.SafeLoader]:
loader = yaml.SafeLoader
loader.add_constructor('!ndarray', ndarray_constructor)
return loader
[docs]
class WorkflowValidator(DictValuesValidator):
"""
Validator for the manifest workflow dictionary.
"""
def __init__(self, version: int) -> None:
section_name = "workflow"
if version == 0:
expected_keys = ("name", "git", "repo", "commit", "codes")
required_keys = ("name", "commit", "codes")
elif version == 1:
expected_keys = (
"name",
"developer",
"date",
"repo",
"commit",
"codes",
"branch",
)
required_keys = ("name", "repo", "commit", "branch")
else:
raise KeyError("Invalid version.")
super().__init__(version, section_name, expected_keys, required_keys)
def _update_dict(old: Dict, new: Dict) -> None:
for k, v in new.items():
if k in old:
if isinstance(old[k], list):
old[k].append(v)
else:
old[k] = [old[k], v]
else:
old[k] = v
[docs]
class Manifest:
"""
Class to handle reading, writing & validation of simulation manifest files.
"""
_data: Union[Dict, List, None] = None
_path: Path = Path()
_metadata: Dict = {}
@property
def metadata(self) -> Dict:
return self._metadata
[docs]
@classmethod
def from_template(cls) -> "Manifest":
"""
Create an empty manifest from a template file.
:return: A new manifest object.
"""
manifest = cls()
dir_path = Path(__file__).resolve().parent
manifest.load(dir_path / "template.yaml")
return manifest
@property
def inputs(self) -> Iterable[Source]:
sources = []
base_path = self._path.absolute().parent
if isinstance(self._data, dict) and "inputs" in self._data and self._data["inputs"]:
for i in self._data["inputs"]:
source = Source(base_path, i["uri"])
if source.type == DataObject.Type.FILE:
names = glob.glob(str(source.uri.path))
if not names:
raise InvalidManifest(
f"No files found matching path {source.uri.path}"
)
for name in names:
sources.append(Source(base_path, "file://" + name))
else:
sources.append(source)
return sources
@property
def outputs(self) -> Iterable[Sink]:
sinks = []
base_path = self._path.absolute().parent
if isinstance(self._data, dict) and self._data["outputs"]:
for i in self._data["outputs"]:
sink = Sink(base_path, i["uri"])
if sink.type == DataObject.Type.FILE:
names = glob.glob(str(sink.uri.path))
for name in names:
sinks.append(Sink(base_path, "file://" + name))
else:
sinks.append(sink)
return sinks
@property
def alias(self) -> Optional[str]:
if isinstance(self._data, dict):
return self._data.get("alias", None)
return None
@property
def responsible_name(self) -> Optional[str]:
if isinstance(self._data, dict):
return self._data.get("responsible_name", None)
return None
@property
def version(self) -> int:
if isinstance(self._data, dict):
return self._data.get("version", 2)
return 0
@property
def manifest_version(self) -> int:
if isinstance(self._data, dict):
return self._data.get("manifest_version", 2)
return 0
def _load_metadata(self, root_path: Path, path: Path):
try:
if not path.is_absolute():
root_dir = root_path.absolute().parent
path = root_dir / path
with open(path) as metadata_file:
_update_dict(
self._metadata, yaml.load(metadata_file, Loader=get_loader())
)
except yaml.YAMLError as err:
raise InvalidManifest("failed to read metadata file %s - %s" % (path, err))
def _convert_version(self):
if self.version == 0:
self._convert_metadata()
self._data["inputs"] = self._convert_files(self._data["inputs"])
self._data["outputs"] = self._convert_files(self._data["outputs"])
self._data["version"] = 1
def _convert_metadata(self) -> None:
for item in ("description", "workflow"):
if item in self._data:
self._metadata[item] = self._data[item]
del self._data[item]
for key, value in self._metadata.items():
if key == "workflow":
if "git" in value:
value["repo"] = value["git"]
del value["git"]
if "codes" in value:
codes = value["codes"]
new_codes = []
for code in codes:
for _, v in code.items():
new_codes.append(v)
value["codes"] = new_codes
@classmethod
def _convert_files(cls, files: List[Dict[str, str]]) -> List[Dict[str, "URI"]]:
from ..uri import URI
scheme_map = {
"uuid": "simdb",
"path": "file",
"imas": "imas",
"uda": "uda",
}
new_files = []
for file in files:
for k, v in file.items():
new_files.append({"uri": URI(scheme=scheme_map[k], path=v)})
return new_files
[docs]
def load(self, file_path: Path) -> None:
"""
Load a manifest from the given file.
:param file_path: Path to the file read.
:return: None
"""
import yaml
self._path: Path = file_path
with open(file_path) as file:
try:
self._data = yaml.load(file, Loader=get_loader())
except yaml.YAMLError as err:
raise InvalidManifest("badly formatted manifest - " + str(err))
if isinstance(self._data, dict) and "metadata" in self._data:
metadata = self._data["metadata"] or []
self._metadata["metadata"] = self._data["metadata"]
# for item in metadata:
# if "path" in item:
# path = Path(item["path"])
# if not path.exists():
# raise InvalidManifest("metadata path %s does not exist" % path)
# self._load_metadata(file_path, path)
# elif "summary" in item:
# self._metadata["summary"] = item["summary"]
# _update_dict(self._metadata, item["values"])
[docs]
def save(self, out_file: TextIO) -> None:
"""
Save the manifest to the given file.
:param out_file: The output text stream to write the manifest to.
:return: None
"""
import yaml
yaml.dump(self._data, out_file, default_flow_style=False)
[docs]
def validate(self) -> None:
"""
Validate the manifest object.
:return: None
"""
if self._data is None:
raise InvalidManifest("failed to read manifest")
if isinstance(self._data, list):
raise InvalidManifest(
"badly formatted manifest - top level sections must be keys not a list"
)
if "manifest_version" not in self._data.keys():
print("warning: no version given in manifest, assuming version 2.")
version = self.version
if version == 2:
section_validators = {
"manifest_version": VersionValidator(version),
"alias": AliasValidator(version),
"inputs": InputsValidator(version),
"outputs": OutputsValidator(version),
"metadata": MetaDataValidator(version),
"responsible_name": ResponsibleValidator(version),
}
else:
raise InvalidManifest(f"Unknown manifest version {version}.")
for section in self._data.keys():
if section not in section_validators.keys():
raise InvalidManifest(f"Unknown manifest section found {section}.")
required_sections = ("manifest_version", "outputs", "inputs")
for section in required_sections:
if section not in self._data.keys():
raise InvalidManifest(f"Required manifest section \'{section}\' not found.")
for name, values in self._data.items():
section_validators[name].validate(values)
self._convert_version()