import os
from typing import List, Any
from datetime import datetime
from pathlib import Path
from dateutil import parser
from ..uri import URI
from ..config import Config
[docs]
class ImasError(Exception):
pass
FLOAT_MISSING_VALUE = -9.0e40
INT_MISSING_VALUE = -999999999
[docs]
def is_missing(value: Any):
"""
Returns whether the given value is one of IMASs 'missing' values.
@param value: the value to check
@return: whether this value is 'missing'
"""
if not value:
return True
dtype = type(value).__name__
if dtype.startswith("float") and value == FLOAT_MISSING_VALUE:
return True
if dtype.startswith("str") and len(value) == 0:
return True
if dtype.startswith("int") and value == INT_MISSING_VALUE:
return True
if dtype == "ndarray" and value.size > 0:
if dtype.startswith("float"):
for num in value.data:
if num == FLOAT_MISSING_VALUE:
return True
for num in value.data:
if num == FLOAT_MISSING_VALUE:
return True
if dtype.startswith("int"):
for num in value.data:
if num == INT_MISSING_VALUE:
return True
return False
[docs]
class DBEntry:
[docs]
def partial_get(self, ids: str, path: str, occurrence = 0) -> Any:
...
[docs]
def list_all_occurrences(self, ids: str) -> List[int]:
"""
List all occurrences of the given IDS in the IMAS data entry.
@param
ids: the IDS name
@param path: the path to the data
@return: the list of occurrences
"""
...
[docs]
def list_idss(entry: DBEntry) -> List[str]:
"""
List all the IDSs found to be populated for the given IMAS data entry.
Each IDS is defined as being non-empty if the ids_properties/homogeneous_time field has been populated.
@param entry: the IMAS data entry
@return: the list of found IDSs
"""
import imas
idss = []
for ids_name in entry.factory.ids_names():
occurrences = entry.list_all_occurrences(ids_name)
if occurrences:
if len(occurrences) > 0:
for occurrence in range(len(occurrences)):
if occurrence > 0:
idss.append(ids_name + "_" + str(occurrence))
idss.append(ids_name)
return idss
[docs]
def check_time(entry: DBEntry, ids: str, occurrence) -> None:
"""
Check the validity of the ids_properties/homogeneous_time field of the given IDS.
@param entry: the open IMAS data entry
@param ids: the
@return:
"""
import imas
ids_obj = entry.get(ids, occurrence, autoconvert=False, lazy=True)
try:
homo_time = ids_obj.ids_properties.homogeneous_time
if homo_time == imas.ids_defs.IDS_TIME_MODE_HOMOGENEOUS:
time = ids_obj.time
if time is None or time.size == 0:
raise ValueError(
f"IDS {ids} has homogeneous_time flag set to IDS_TIME_MODE_HOMOGENEOUS but invalid time entry."
)
except imas.exception.ValidationError as e:
raise ImasError(f"IDS {ids} failed validation: {e}")
def _is_al5() -> bool:
import semantic_version
al_env = os.environ.get("AL_VERSION", default=None)
ual_env = os.environ.get("UAL_VERSION", default="5.0.0")
version = (
semantic_version.Version(al_env)
if al_env is not None
else semantic_version.Version(ual_env)
)
return version >= semantic_version.Version("5.0.0")
def _open_legacy(uri: URI) -> DBEntry:
import imas
path = uri.query.get("path", default=None)
if path is not None:
raise ImasError(f"cannot open AL5 URI {uri} with AL4")
backend_ids = {
"hdf5": imas.ids_defs.HDF5_BACKEND,
}
backend = uri.query.get("backend", default=None)
user = uri.query.get("user", default=None)
database = uri.query.get("database", default=None)
version = uri.query.get("version", default="3")
shot = uri.query.get("shot", default=None)
run = uri.query.get("run", default=None)
if backend not in backend_ids:
raise ImasError(
f"backend {backend} is not supported for legacy IMAS, please use AL5"
)
backend_id = backend_ids[backend]
if user is not None:
try:
entry = imas.DBEntry(
backend_id,
database,
int(shot),
int(run),
user_name=user,
data_version=version,
)
except:
raise ImasError(
f"failed to open IMAS data with URI {uri}"
)
else:
try:
entry = imas.DBEntry(
backend_id, database, int(shot), int(run), data_version=version
)
except:
raise ImasError(
f"failed to open IMAS data with URI {uri}"
)
(status, _) = entry.open()
if status != 0:
raise ImasError(f"failed to open IMAS data with URI {uri}")
return entry
[docs]
def open_imas(uri: URI) -> DBEntry:
"""
Open an IMAS URI and return the IMAS entry object.
@param uri: the IMAS URI to open
@return: the IMAS data entry object
"""
import imas
if uri.scheme != "imas":
raise ValueError(f"invalid imas URI: {uri} - invalid scheme")
if uri.query is None:
raise ValueError(f"invalid imas URI: {uri} - no query found in URI")
if not _is_al5():
return _open_legacy(uri)
path = uri.query.get("path", default=None)
if path is None:
path = get_path_for_legacy_uri(uri)
backend = uri.query.get("backend", default="mdsplus")
uri = f"imas:{backend}?path={path}"
try:
entry = imas.DBEntry(str(uri), "r")
except:
raise ImasError(f"failed to open IMAS data with URI {uri}")
return entry
[docs]
def imas_timestamp(uri: URI) -> datetime:
"""
Extract the timestamp from the IDS data for the given IMAS URI.
@param uri: the IMAS URI
@return: the timestamp as a datetime object
"""
entry = open_imas(uri)
ids_obj = entry.get("summary", autoconvert=False, lazy=True)
creation = ids_obj.ids_properties.creation_date
if creation:
try:
timestamp = parser.parse(creation)
except Exception:
timestamp = datetime.now()
# raise ValueError(f"invalid IMAS creation time {creation}")
else:
timestamp = datetime.now()
entry.close()
return timestamp
[docs]
def get_path_for_legacy_uri(uri: URI) -> Path:
user = uri.query.get("user", default=None)
database = uri.query.get("database", default=None)
version = uri.query.get("version", default="3")
shot = uri.query.get("shot", default=None)
run = uri.query.get("run", default=None)
backend = uri.query.get("backend", default="hdf5")
if any(x is None for x in [database, shot, run]):
raise ValueError(f"Invalid legacy URI {uri}")
if user == "public":
imas_home = os.environ.get("IMAS_HOME", default=None)
if imas_home is None:
raise ValueError(
"Legacy URI passed with user=public but $IMAS_HOME is not set"
)
path = Path(imas_home) / "shared" / "imasdb" / database / version
elif user.startswith("/"):
path = Path(user) / database / version
elif user is not None:
path = Path(f"~{user}").expanduser() / "public" / "imasdb" / database / version
else:
path = Path.home() / "public" / "imasdb" / database / version
if str(backend) == "mdsplus":
return path
else:
return path / shot / run
def _get_path(uri: URI) -> Path:
"""
Return the path to the data for a given IMAS URI
@param uri: a valid IMAS URI
@return: the path of the IDS data for the given IMAS URI
"""
path = uri.query.get("path", default=None)
if path is None:
raise ValueError("Invalid IMAS URI - path not found in query arguments")
path = Path(path)
if not path.exists():
raise ValueError(f"URI path {path} does not exist")
return path
[docs]
def imas_files(uri: URI) -> List[Path]:
"""
Return all the files associated with the given IMAS URI.
@param uri: a valid IMAS URI
@return: a list of files which contains the IDS data for the backend specified in the URI
"""
backend = str(uri.path)
if backend.startswith("/"):
backend = backend[1:]
path = _get_path(uri)
if backend == "uda":
backend = uri.query.get("backend", default=None)
if backend is None:
raise ValueError(
"Invalid IMAS URI - 'backend' query argument not provided for UDA backend"
)
if backend == "hdf5":
return list(p.absolute() for p in path.glob("*.h5"))
elif backend == "mdsplus":
return [
path / "ids_001.characteristics",
path / "ids_001.datafile",
path / "ids_001.tree",
]
elif backend == "ascii":
return list(p.absolute() for p in path.glob("*.ids"))
else:
raise ValueError(f"Unknown IMAS backend {backend}")
[docs]
def convert_uri(uri: URI, path: Path, config: Config) -> URI:
"""
Converts a local IMAS URI to a remote access IMAS URI based on the server.imas_remote_host configuration option.
Translate locale IMAS URI (imas:<backend>?path=<path>) to remote access URI
(imas://<imas_remote_host>:<imas_remote_port>/uda?path=<path>&backend=<backend>)
@param uri: The URI to convert
@param config: Config to read the server.imas_remote_host and server.imas_remote_port options from
"""
host = config.get_option("server.imas_remote_host", default=None)
if host is None:
raise ValueError(
"Cannot process IMAS data as server.imas_remote_host configuration option not set"
)
port = config.get_option("server.imas_remote_port", default=None)
backend = uri.path
if port is None:
return URI(f"imas://{host}/uda?path={path}&backend={backend}")
else:
port = int(port)
return URI(f"imas://{host}:{port}/uda?path={path}&backend={backend}")