Source code for simdb.uri

from urllib3.util.url import parse_url, Url, LocationParseError
from pathlib import Path
from typing import Dict, Union, Optional


[docs] class URIParserError(ValueError): def __init__(self, msg: str): super().__init__(msg)
[docs] class Query: """ Class representing the URI query parameters. """ _args: Dict[str, Optional[str]] def __init__(self, query: Optional[str]): query = "" if query is None else query self._args = {} for arg in query.split("&"): key, *value = arg.split("=") if key and value: self._args[key] = "=".join(value) elif key: self._args[key] = None
[docs] @classmethod def empty(cls): return cls(None)
def __str__(self): return "&".join(f"{k}={v}" for k, v in self._args.items()) def __bool__(self): return len(self._args) > 0 def __contains__(self, item) -> bool: return item in self._args def __getitem__(self, name): return self._args[name]
[docs] def get(self, name: str, *, default: Optional[str] = None) -> Optional[str]: return self._args.get(name, default)
[docs] def set(self, name: str, value: str) -> None: self._args[name] = value
[docs] def remove(self, name: str) -> None: del self._args[name]
[docs] class Authority: """ Class representing URI authority. """ __slots__ = ("host", "port", "auth") def __init__(self, host: Optional[int], port: Optional[int], auth: Optional[str]): self.host: Optional[str] = host self.port: Optional[int] = port self.auth: Optional[str] = auth
[docs] @classmethod def empty(cls): return cls(None, None, None)
def __bool__(self): return bool(self.host) or bool(self.port) or bool(self.auth) def __str__(self): string = "" if self.host: string = f"{self.host}" if self.auth: string = f"{self.auth}@{string}" if self.port is not None: string = f"{string}:{self.port}" return string def __repr__(self): return f"Authority({self.host}, {self.port}, {self.auth})"
[docs] class URI: """ Class for parsing and representing a URI. """ __slots__ = ("scheme", "query", "path", "authority", "fragment") def __init__(self, uri: Union[str, "URI", None] = None, *, scheme=None, path=None): """ Create a URI object by either parsing a URI string or copying from an existing URI object. :param uri: A URI string, another URI to copy from or None for an empty URI. :param scheme: The URI scheme. Takes precedence over any scheme found from the URI argument. :param path: The URI path. Takes precedence over any path found from the URI argument. """ self.scheme: Optional[str] = None self.query: Query = Query.empty() self.path: Optional[Path] = None self.authority: Authority = Authority.empty() self.fragment: Optional[str] = None if uri is not None: try: result: Url = parse_url(str(uri)) except LocationParseError: raise URIParserError("failed to parse URI") self.scheme = result.scheme self.query = Query(result.query) self.authority = Authority(result.host, result.port, result.auth) if result.path is not None: if ( self.scheme == "imas" and not self.authority and result.path.startswith("/") ): self.path = Path(result.path[1:]) else: self.path = Path(result.path) self.fragment = result.fragment if scheme is not None: self.scheme = scheme if path is not None: self.path = Path(path) if not self.scheme: raise URIParserError("failed to parse URI: no scheme specified") @property def uri(self) -> str: """ Return the URI object as a URI string. :return: A string representation of the URI. """ uri = f"{self.scheme}:" if self.authority: path = "" if self.path and str(self.path) != ".": path = self.path if self.path.is_absolute() else "/" / self.path uri += f"//{self.authority}{path}" elif self.path and str(self.path) != ".": uri += f"{self.path}" if self.query: uri += f"?{self.query}" if self.fragment: uri += f"#{self.fragment}" return uri def __repr__(self): return f"URI({self.uri})" def __str__(self): return self.uri def __eq__(self, other): return self.uri == other.uri