Source code for simdb.cli.remote_api

import os
import shutil 
import json
import uuid
from typing import (
    List,
    Dict,
    Callable,
    Tuple,
    IO,
    Iterable,
    Optional,
    Union,
    TYPE_CHECKING,
    Any,
)
import gzip
import io
import sys
import click
import itertools
import hashlib
import appdirs
import pickle
import getpass
from urllib.parse import urlparse
from pathlib import Path
from semantic_version import Version

from .manifest import DataObject
from ..config import Config
from ..json import CustomDecoder, CustomEncoder
from ..imas.utils import imas_files

if TYPE_CHECKING:
    from ..database.models import Simulation, Watcher, File

if TYPE_CHECKING or "sphinx" in sys.modules:
    # Only importing these for type checking and documentation generation in order to speed up runtime startup.
    import requests
    from requests.auth import AuthBase


[docs] class APIError(RuntimeError): pass
[docs] class FailedConnection(APIError): pass
[docs] class RemoteError(APIError): pass
[docs] def try_request(func: Callable) -> Callable: def wrapped_func(*args, **kwargs): import requests try: return func(*args, **kwargs) except requests.ConnectionError as ex: raise FailedConnection( f"""\ Connection failed to {ex.request.url} Please check that the URL is valid and that SIMDB_REQUESTS_CA_BUNDLE is set if required. """ ) except requests.HTTPError as ex: raise FailedConnection( f"""\ HTTP error {ex.response.status_code} returned from endpoint {ex.request.url} """ ) except requests.JSONDecodeError: raise FailedConnection( """\ Invalid JSON returned from request endpoint This might indicate an invalid SimDB URL or the existence of a firewall. """ ) return wrapped_func
[docs] def read_bytes(path: str, compressed: bool = True) -> bytes: if compressed: with io.BytesIO() as buffer: with gzip.GzipFile(fileobj=buffer, mode="wb") as gz_file: with open(path, "rb") as file_in: gz_file.write(file_in.read()) buffer.seek(0) return buffer.read() else: with open(path, "rb") as file: return file.read()
def _read_bytes_in_chunks( path: Path, compressed: bool = True, chunk_size: int = 1024 ) -> Iterable[bytes]: with path.open("rb") as file_in: while True: if compressed: with io.BytesIO() as buffer: with gzip.GzipFile(fileobj=buffer, mode="wb") as gz_file: data = file_in.read(chunk_size) if not data: break gz_file.write(data) buffer.seek(0) yield buffer.read() else: data = file_in.read(chunk_size) if not data: break yield data
[docs] def check_return(res: "requests.Response") -> None: if res.status_code != 200: try: data = res.json() except ValueError: data = {} if "error" in data: raise RemoteError(data["error"]) else: res.raise_for_status()
def _get_paths(file: "File") -> Iterable[Path]: if file.type == DataObject.Type.FILE: return [file.uri.path] else: return imas_files(file.uri)
[docs] class RemoteAPI: """ Class to represent connection to remote API. This is used by the CLI to make all requests to the remote. """ _remote: str def __init__( self, remote: Optional[str], username: Optional[str], password: Optional[str], config: Config, use_token: Optional[bool] = None, ) -> None: """ Create a new RemoteAPI. @param remote: the name of the remote - this is the name as created in the configuration file. If not provided this will use the remote that has been marked as default. @param username: the username to use to authenticate with the remote - optional if a token has been created for the remote. @param password: the password to used to authenticate with the remote - only required if username is also provided. @param config: the CLI configuration object. @param use_token: override the default behaviour of only looking for a token if username and password are not provided. """ self._config: Config = config if not remote: remote = config.default_remote if not remote: raise KeyError( "Remote name not provided and no default remote found in config." ) self._remote = remote try: self._url: str = config.get_option(f"remote.{remote}.url") except KeyError: raise ValueError( f"Remote '{remote}' not found. Use `simdb remote config add` to add it." ) self._api_url: str = f"{self._url}/v{config.api_version}/" self._firewall: Optional[str] = config.get_option( f"remote.{remote}.firewall", default=None ) if not username: username = config.get_option(f"remote.{remote}.username", default="") if use_token is not None: self._use_token = use_token else: token = config.get_option(f"remote.{remote}.token", default="") self._use_token = token or (not username and not password) if password and not username: raise ValueError( "Password given but no username given or found in configuration." ) self._cookies = {} if self._firewall is not None: self._load_cookies(remote, username, password) self._api_url: str = f"{self._url}/" self._server_auth = self.get_server_authentication() if self._firewall: self._server_auth = "None" if username == "admin": self._server_auth = "admin-auth" if self._server_auth != "None" and not self._use_token: if not username: username = click.prompt("Username", default=getpass.getuser()) if not password: password = click.prompt( f"Password for user {username}", hide_input=True ) self._token = config.get_option(f"remote.{remote}.token", default="") if self._server_auth != "None" and (self._use_token and not self._token): raise ValueError("No username or password given and no token found.") self._username = username self._password = password endpoints = self.get_endpoints() endpoint_versions = [endpoint.split("/")[-1] for endpoint in endpoints] if not endpoint_versions: raise RemoteError("No compatible API version found on remote") latest_version = max(endpoint_versions) if config.verbose: print(f"Selected latest endpoint version {latest_version}") self._api_url += f"{latest_version}/" self.version = Version.coerce(self.get_api_version()) self.server_version = Version.coerce(self.get_server_version()) def _load_cookies( self, remote: str, username: Optional[str], password: Optional[str] ) -> None: if self._firewall == "F5": import requests headers = {"User-Agent": "it_script_basic"} cookies_file = f"{remote}-cookies.pkl" cookies_path = Path(appdirs.user_config_dir("simdb")) / cookies_file parsed_url = urlparse(self._url) base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" cookies = None if os.path.exists(cookies_path): with open(cookies_path, "rb") as f: cookies = pickle.load(f) r = requests.get(f"{self._url}/", headers=headers, cookies=cookies) try: # check to see if the cookies are still valid by trying a simple request r.json() except requests.JSONDecodeError: cookies = None if cookies is None: if not username: username = click.prompt("Username", default=getpass.getuser()) if not password: password = click.prompt( f"Password for user {username}", hide_input=True ) auth = (username, password) with requests.Session() as s: s.headers['User-Agent'] = 'it_script_basic' p = s.post(f"{base_url}/my.policy", auth=auth) if p.status_code != 200: raise RuntimeError( "Failed to get firewall authentication cookies" ) cookies = s.cookies os.umask(0) descriptor = os.open( path=cookies_path, flags=os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=0o600, ) with open(descriptor, "wb") as f: pickle.dump(cookies, f) if not cookies: raise RuntimeError("Failed to get firewall authentication cookies") self._cookies = cookies else: raise ValueError(f"Unknown firewall option {self._firewall}") @property def remote(self) -> str: """ Return the name of the remote. """ return self._remote def _get_auth(self) -> Union["AuthBase", Tuple]: from requests.auth import AuthBase class JWTAuth(AuthBase): def __init__(self, token): self._token = token def __call__(self, request: "requests.PreparedRequest"): request.headers["Authorization"] = f"JWT-Token {self._token}" return request if self._use_token: return JWTAuth(self._token) else: return self._username, self._password
[docs] def get( self, url: str, params: Optional[Dict] = None, headers: Optional[Dict] = None, authenticate: Optional[bool] = True, stream: Optional[bool] = False, ) -> "requests.Response": """ Perform an HTTP GET request. @param url: the URL of the request. @param params: any additional parameters to send along with the request. @param headers: additional headers to send with the request. @param authenticate: True if we should send authentication headers with the request. @param stream: True to enable streaming. @return: """ import requests params = params if params is not None else {} headers = headers if headers is not None else {} headers["Accept-encoding"] = "gzip" headers['User-Agent'] = 'it_script_basic' # Get token api expected basic auth in request # if authenticate and url.startswith("token"): # self._server_auth = "" if authenticate and self._server_auth != "None": res = requests.get( self._api_url + url, params=params, auth=self._get_auth(), headers=headers, cookies=self._cookies, stream=stream, ) else: res = requests.get( self._api_url + url, params=params, headers=headers, cookies=self._cookies, stream=stream, ) check_return(res) return res
[docs] def put(self, url: str, data: Dict, **kwargs) -> "requests.Response": """ Perform an HTTP PUT request. @param url: the URL of the request. @param data: the PUT data to send. @param kwargs: any additional keyword arguments to add to the request. @return: """ import requests headers = {"Content-type": "application/json"} headers['User-Agent'] = 'it_script_basic' if self._server_auth != "None": res = requests.put( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, auth=self._get_auth(), cookies=self._cookies, **kwargs, ) else: res = requests.put( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, cookies=self._cookies, **kwargs, ) check_return(res) return res
[docs] def post(self, url: str, data: Dict, **kwargs) -> "requests.Response": """ Perform an HTTP POST request. @param url: the URL of the request. @param data: the POST data to send. @param kwargs: any additional keyword arguments to add to the request. @return: """ import requests if "files" in kwargs: if data: raise Exception("Cannot send JSON data at the same time as files.") headers = {} else: headers = {"Content-type": "application/json"} post_data = json.dumps(data, cls=CustomEncoder, indent=2) if data else {} headers['User-Agent'] = 'it_script_basic' # Compress the data if it is larger than 2 MB and the URL is for simulations if url == "simulations" and len(post_data) > 2 * 1024 * 1024: import gzip from io import BytesIO buf = BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: gz.write(post_data.encode("utf-8")) post_data = buf.getvalue() headers["Content-Encoding"] = "gzip" headers["Content-Type"] = "application/json" if self._server_auth != "None": res = requests.post( self._api_url + url, data=post_data, headers=headers, auth=self._get_auth(), cookies=self._cookies, **kwargs, ) else: res = requests.post( self._api_url + url, data=post_data, headers=headers, cookies=self._cookies, **kwargs, ) check_return(res) return res
[docs] def patch(self, url: str, data: Dict, **kwargs) -> "requests.Response": """ Perform an HTTP PATCH request. @param url: the URL of the request. @param data: the PATCH data to send. @param kwargs: any additional keyword arguments to add to the request. @return: """ import requests headers = {"Content-type": "application/json"} headers['User-Agent'] = 'it_script_basic' if self._server_auth != "None": res = requests.patch( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, auth=self._get_auth(), cookies=self._cookies, **kwargs, ) else: res = requests.patch( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, cookies=self._cookies, **kwargs, ) check_return(res) return res
[docs] def delete(self, url: str, data: Dict[Any, Any], **kwargs) -> "requests.Response": """ Perform an HTTP DELETE request. @param url: the URL of the request. @param data: the DELETE data to send. @param kwargs: any additional keyword arguments to add to the request. @return: """ import requests headers = {"Content-type": "application/json"} headers['User-Agent'] = 'it_script_basic' if self._server_auth != "None": res = requests.delete( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, auth=self._get_auth(), cookies=self._cookies, **kwargs, ) else: res = requests.delete( self._api_url + url, data=json.dumps(data, cls=CustomEncoder), headers=headers, cookies=self._cookies, **kwargs, ) check_return(res) return res
[docs] def has_url(self) -> bool: return bool(self._url)
[docs] @try_request def get_token(self) -> str: res = self.get("token") data = res.json() return data["token"]
[docs] @try_request def get_endpoints(self) -> List[str]: res = self.get("", authenticate=False) data = res.json() return data["endpoints"]
[docs] @try_request def get_server_authentication(self) -> Optional[str]: res = self.get("", authenticate=False) data = res.json() return data.get("authentication")
[docs] @try_request def get_api_version(self) -> str: res = self.get("", authenticate=False) data = res.json() return data["api_version"]
[docs] @try_request def get_server_version(self) -> str: res = self.get("", authenticate=False) data = res.json() return data["server_version"]
[docs] @try_request def get_validation_schemas(self) -> List[Dict]: res = self.get("validation_schema") return res.json()
[docs] @try_request def get_upload_options(self) -> Dict[str, Any]: try: res = self.get("upload_options") return res.json() except FailedConnection: # old remotes may not provide this endpoint return {}
[docs] @try_request def list_simulations( self, meta: Optional[List[str]] = None, limit: int = 0 ) -> List["Simulation"]: from ..database.models import Simulation args = "?" + "&".join(meta) if meta else "" headers = {"simdb-result-limit": str(limit)} res = self.get("simulations" + args, headers=headers) data = res.json(cls=CustomDecoder) return [Simulation.from_data(sim) for sim in data["results"]]
[docs] @try_request def get_simulation(self, sim_id: str) -> "Simulation": from ..database.models import Simulation res = self.get("simulation/" + sim_id) return Simulation.from_data(res.json(cls=CustomDecoder))
[docs] @try_request def trace_simulation(self, sim_id: str) -> dict: res = self.get("trace/" + sim_id) return res.json(cls=CustomDecoder)
[docs] @try_request def query_simulations( self, constraints: List[str], meta: List[str], limit=0 ) -> List["Simulation"]: from ..database.models import Simulation from ..remote import APIConstants from collections import defaultdict params = defaultdict(list) for item in constraints: (key, value) = item.split("=") params[key].append(value) args = "?" + "&".join(meta) if meta else "" headers = { APIConstants.LIMIT_HEADER: str(limit), APIConstants.PAGE_HEADER: str(1), } res = self.get("simulations" + args, params, headers=headers) data = res.json(cls=CustomDecoder) return [Simulation.from_data(sim) for sim in data["results"]]
[docs] @try_request def delete_simulation(self, sim_id: str) -> Dict: res = self.delete("simulation/" + sim_id, {}) return res.json()
[docs] @try_request def update_simulation(self, sim_id: str, update_type: "Simulation.Status") -> None: self.patch("simulation/" + sim_id, {"status": update_type.value})
[docs] @try_request def validate_simulation(self, sim_id: str) -> Tuple[bool, str]: res = self.post("validate/" + sim_id, {}) data = res.json() if data["passed"]: return True, "" else: return False, data["error"]
[docs] @try_request def add_watcher( self, sim_id: str, user: str, email: str, notification: "Watcher.Notification" ) -> None: self.post( "watchers/" + sim_id, {"user": user, "email": email, "notification": notification.name}, )
[docs] @try_request def remove_watcher(self, sim_id: str, user: str) -> None: self.delete("watchers/" + sim_id, {"user": user})
[docs] @try_request def list_watchers(self, sim_id: str) -> List[Tuple]: res = self.get("watchers/" + sim_id) return [(d["username"], d["email"], d["notification"]) for d in res.json()]
[docs] @try_request def set_metadata( self, sim_id: str, key: str, value: Union[str, uuid.UUID, int, float] ) -> List[str]: res = self.patch("simulation/metadata/" + sim_id, {"key": key, "value": value}) return [data["value"] for data in res.json()]
[docs] @try_request def delete_metadata(self, sim_id: str, key: str) -> List[str]: res = self.delete("simulation/metadata/" + sim_id, {"key": key}) return [data["value"] for data in res.json()]
[docs] @try_request def get_directory(self) -> str: res = self.get("staging_dir") return res.json()["staging_dir"]
def _push_file( self, path: Path, uuid: uuid.UUID, file_type: str, sim_data: Dict, chunk_size: int, out_stream: IO, type: DataObject.Type, ): msg = f"Uploading file {path} " print(msg, file=out_stream, end="") num_chunks = 0 for chunk_index, chunk in enumerate( _read_bytes_in_chunks(path, compressed=True, chunk_size=chunk_size) ): print(".", file=out_stream, end="", flush=True) self._send_chunk(chunk_index, chunk, chunk_size, uuid, file_type, sim_data) num_chunks += 1 if num_chunks == 0: # empty file self._send_chunk(0, b"", chunk_size, uuid, file_type, sim_data) if type == DataObject.Type.FILE: self.post( "files", data={ "simulation": sim_data, "obj_type": DataObject.Type.FILE, "files": [ { "chunks": num_chunks, "" "file_type": file_type, "file_uuid": uuid.hex, "ids_list": None, } ], }, ) print(f"\r{msg}", file=out_stream, end="") print( "Complete".rjust(shutil.get_terminal_size().columns - len(msg)), file=out_stream, flush=True, ) def _send_chunk( self, chunk_index: int, chunk: bytes, chunk_size: int, uuid: uuid.UUID, file_type: str, sim_data: dict, ): data = { "simulation": sim_data, "file_type": file_type, "chunk_info": {uuid.hex: {"chunk_size": chunk_size, "chunk": chunk_index}}, } files: List[Tuple[str, Tuple[str, bytes, str]]] = [ ( "data", ( "data", json.dumps(data, cls=CustomEncoder).encode(), "text/json", ), ), ("files", (uuid.hex, chunk, "application/octet-stream")), ] self.post("files", data={}, files=files)
[docs] @try_request def push_simulation( self, simulation: "Simulation", out_stream: IO[str] = sys.stdout, add_watcher: bool = True, ) -> None: """ Push the local simulation to the remote server. First we upload any files associated with the simulation, then push the simulation metadata. :param simulation: The Simulation to push to remote server :param out_stream: The IO stream to write messages to the user (default: stdout) :param add_watcher: Add the current user as a watcher of the simulation on the remote server """ from ..imas.utils import imas_files sim_data = simulation.data(recurse=True) try: sim_json = json.dumps(sim_data, cls=CustomEncoder, separators=(",", ":")).encode("utf-8") sim_json_size = len(sim_json) except Exception as e: sim_json_size = 0 # Target max request (10MB minus headroom); adjust chunk size so (chunk + sim_data JSON) fits MAX_REQUEST_BYTES = 9 * 1024 * 1024 # nominal 10 MB limit HEADROOM = 2048 # for JSON envelope & headers # Base chunk size before adjustment (previous constant) base_chunk_size = 8 * 1024 * 1024 # Compute allowed chunk payload allowed_chunk = max(1024, min(base_chunk_size, MAX_REQUEST_BYTES - sim_json_size - HEADROOM)) options = self.get_upload_options() if options.get("copy_files", True): chunk_size = allowed_chunk # 10 MB limit on ITER network copy_ids = options.get("copy_ids", True) for file in simulation.inputs: if file.type == DataObject.Type.IMAS: if not copy_ids: print(f"Skipping IDS data {file}", file=out_stream, flush=True) continue ids_list = simulation.meta_dict().get("input_ids", []) for path in imas_files(file.uri): # Check if hdf5 ids_name is in ids_list ids_name = Path(path).name.split(".") if ids_name[1] == "h5": if ids_name[0] != "master" and ids_list is not None and ids_name[0] not in ids_list: continue sim_file = next( (f for f in sim_data["inputs"] if f["uuid"] == file.uuid) ) sim_file["uri"] = f"file:{path}" self._push_file( path, file.uuid, "input", sim_data, chunk_size, out_stream, file.type ) self.post( "files", data={ "simulation": simulation.data(recurse=True), "obj_type": file.type, "files": [ { "file_type": "input", "file_uuid": file.uuid.hex, "ids_list": ids_list, } ], }, ) else: self._push_file( file.uri.path, file.uuid, "input", sim_data, chunk_size, out_stream, file.type, ) for file in simulation.outputs: if file.type == DataObject.Type.IMAS: if not copy_ids: print(f"Skipping IDS data {file}", file=out_stream, flush=True) continue ids_list = simulation.meta_dict().get("ids", []) for path in imas_files(file.uri): # Check if hdf5 ids_name is in ids_list ids_name = Path(path).name.split(".") if ids_name[1] == "h5": if ids_name[0] != "master" and ids_list is not None and ids_name[0] not in ids_list: continue sim_file = next( (f for f in sim_data["outputs"] if f["uuid"] == file.uuid) ) sim_file["uri"] = f"file:{path}" self._push_file( path, file.uuid, "output", sim_data, chunk_size, out_stream, file.type ) self.post( "files", data={ "simulation": simulation.data(recurse=True), "obj_type": file.type, "files": [ { "file_type": "output", "file_uuid": file.uuid.hex, "ids_list": ids_list, } ], }, ) else: self._push_file( file.uri.path, file.uuid, "output", sim_data, chunk_size, out_stream, file.type, ) sim_data = simulation.data(recurse=True) uploaded_by = simulation.meta_dict().get("uploaded_by", None) print("Uploading simulation data ... ", file=out_stream, end="", flush=True) self.post( "simulations", data={"simulation": sim_data, "add_watcher": add_watcher, "uploaded_by": uploaded_by} ) print("Success", file=out_stream, flush=True)
def _get_file_info(self, uuid: uuid.UUID) -> List[Tuple[Path, str]]: r = self.get(f"file/{uuid.hex}") data = r.json() files = data["files"] return [(Path(file["path"]), file["checksum"]) for file in files] def _pull_file( self, uuid: uuid.UUID, index: int, checksum: str, from_path: Path, to_path: Path, out_stream: IO[str], ): msg = f"Downloading file {from_path} to {to_path}" print( msg, file=out_stream, flush=True, ) response = self.get(f"file/download/{uuid.hex}/{index}", stream=True) os.makedirs(to_path.parent, exist_ok=True) sha1 = hashlib.sha1() with open(to_path, "wb") as f: total_length = response.headers.get("content-length") if total_length is None: f.write(response.content) else: downloaded = 0 total_length = int(total_length) for data in response.iter_content(chunk_size=4096): sha1.update(data) downloaded += len(data) f.write(data) done = int(50 * downloaded / total_length) print( "\r[%s%s] %0.2f%%" % ( "=" * done, " " * (50 - done), 100.0 * (downloaded / total_length), ), file=out_stream, end="", flush=True, ) print("\r", file=out_stream, end="", flush=True) if sha1.hexdigest() != checksum: raise APIError(f"Checksum failed for file {from_path}")
[docs] @try_request def pull_simulation( self, sim_id: str, directory: Path, out_stream: IO[str] = sys.stdout ) -> "Simulation": from ..uri import URI """ Pull the simulation from the remote server. This involves downloading all the files associated with the simulation into the provided simulation directory. :param sim_id: The id of the Simulation to pull :param directory: The local directory to use as the root directory of the simulation :param out_stream: The IO stream to write messages to the user (default: stdout) """ simulation = self.get_simulation(sim_id) if simulation is None: raise RemoteError(f"Failed to find simulation: {sim_id}") all_paths = [] for file in itertools.chain(simulation.inputs, simulation.outputs): info = self._get_file_info(file.uuid) all_paths += [path for (path, _) in info] common_root = os.path.commonpath(all_paths) for file in itertools.chain(simulation.inputs, simulation.outputs): info = self._get_file_info(file.uuid) if file.type == DataObject.Type.FILE: (path, checksum) = info[0] rel_path = directory / path.relative_to(common_root) self._pull_file(file.uuid, 0, checksum, path, rel_path, out_stream) file.uri = URI(file.uri, path=rel_path.absolute()) elif file.type == DataObject.Type.IMAS: for index, (path, checksum) in enumerate(info): rel_path = directory / path.relative_to(common_root) self._pull_file( file.uuid, index, checksum, path, rel_path, out_stream ) to_path = ( directory / Path(file.uri.query.get("path")).relative_to(common_root) ).absolute() backend = file.uri.query.get("backend") file.uri = URI(f"imas:{backend}?path={to_path}") return simulation
[docs] @try_request def reset_database(self) -> None: self.post("reset", {})