Source code for alhambra.classes

from __future__ import annotations

import re
from abc import ABC, ABCMeta, abstractmethod
from itertools import dropwhile
from typing import (
    Any,
    Generic,
    Iterable,
    Literal,
    Protocol,
    SupportsIndex,
    Type,
    TypeVar,
    cast,
    overload,
)

[docs]T_NMI = TypeVar("T_NMI", bound="IdentMergeableItem")
[docs]class IdentMergeableItem(Protocol):
[docs] def ident(self) -> str: ...
[docs] def merge(self: T_NMI, other: T_NMI) -> T_NMI: ...
[docs] def copy(self: T_NMI) -> T_NMI: ...
[docs]T = TypeVar("T", bound="UpdateListD")
[docs]class UpdateListD(Generic[T_NMI]):
[docs] data: dict[str, T_NMI]
[docs] __slots__ = ("data",)
def __init__(self, initial: Iterable[T_NMI] = tuple()) -> None: self.data = {v.ident(): v for v in initial}
[docs] def __setitem__(self, k: str, v: T_NMI): self.data[k] = v
[docs] def __contains__(self, kv: str) -> bool: return kv in self.data.keys()
@overload
[docs] def __getitem__(self, k: str | SupportsIndex) -> T_NMI: ...
@overload def __getitem__(self: T, k: slice) -> T: ... def __getitem__(self: T, k: str | SupportsIndex | slice) -> T_NMI | T: # How easy! It's just a string! if isinstance(k, str): try: return self.data[k] except KeyError: # A name may have changed, so let's refresh and check before # throwing an error. self.refreshnames() try: return self.data[k] except KeyError: raise KeyError(k) from None m = self.data.values() if isinstance(k, SupportsIndex): mi = iter(m) for _ in range(0, k): next(mi) return next(mi) else: if k.step and k.step < 0: mi = reversed(m) step = -k.step else: mi = iter(m) if k.step: step = k.step else: step = 1 mi = iter(enumerate(mi)) # was mi, m, last m use r = [] if isinstance(k, slice): if isinstance(k.start, SupportsIndex): if int(k.start) < 0: k = slice(len(self.data) - int(k.start), k.stop, k.step) mi = dropwhile(lambda x: x[0] < cast(slice, k).start, mi) elif k.start is not None: mi = dropwhile(lambda x: x[1].ident() != cast(slice, k).start, mi) i = 0 if isinstance(k.stop, SupportsIndex): while (x := next(mi))[0] < int(k.stop): if i % step == 0: r.append(x[1]) i += 1 elif k.stop is not None: try: while (x := next(mi))[1].ident() != k.stop: if i % step == 0: r.append(x[1]) i += 1 except StopIteration: raise KeyError(k.stop) from None if i % step == 0: r.append(x[1]) else: r += list(x[1] for x in mi) return self.__class__(r)
[docs] def __delitem__(self, k: str): self.data.__delitem__(k)
[docs] def __len__(self): return len(self.data)
[docs] def refreshnames(self): for k, v in list(self.data.items()): if v.ident() != k: del self.data[k] self.data[v.ident()] = v # FIXME
[docs] def add(self, v: T_NMI): k = v.ident() if k in self.data: self.data[k] = self.data[k].merge(v) else: self.data[k] = v
[docs] def __iter__(self): return self.data.values().__iter__()
[docs] def __str__(self) -> str: return str(self.data.values()).__str__()
[docs] def __repr__(self) -> str: return self.__class__.__name__ + "(" + list(self.data.values()).__repr__() + ")"
[docs] def update(self, d: Iterable[T_NMI]): for v in d: self.add(v)
[docs] def aslist(self) -> list[T_NMI]: return list(self.data.values())
[docs] def asdict(self) -> dict[str, T_NMI]: return self.data.copy()
[docs] def copy(self: T) -> T: return self.__class__(self.data.copy().values())
[docs] def __add__(self: T, other: Iterable[T_NMI]) -> T: a = self.copy() a.update(other) return a
[docs] def __iadd__(self: T, other: Iterable[T_NMI]) -> T: self.refreshnames() self.update(other) return self
[docs] def __or__(self: T, other: Iterable[T_NMI]) -> T: a = self.copy() a.update(other) return a
[docs] def __ior__(self: T, other: Iterable[T_NMI]) -> T: self.refreshnames() self.update(other) return self
[docs] def __sub__(self: T, other: Iterable[T_NMI]) -> T: self.refreshnames() out = self.copy() for v in other: if v.ident() in out: # out[v.name].merge(v) # FIXME del out[v.ident()] return out
@overload
[docs] def search(self: T, regex: str, match: Literal[False] = False) -> T: ...
@overload def search( self: T, regex: str, match: Literal[True] ) -> list[tuple[re.Match, T_NMI]]: ... def search( self: T, regex: str, match: bool = False ) -> T | list[tuple[re.Match, T_NMI]]: r = re.compile(regex) if not match: return self.__class__(v for v in self.data.values() if r.search(v.ident())) else: a: list[tuple[re.Match, T_NMI]] = [] for v in self.data.values(): if m := r.search(v.ident()): a.append((m, v)) return a
[docs]TS = TypeVar("TS", bound="Serializable")
[docs]class Serializable(ABC): @classmethod @abstractmethod
[docs] def _deserialize(cls: Type[TS], input: Any) -> TS: ...
@abstractmethod
[docs] def _serialize(self) -> Any: ...
@classmethod
[docs] def from_yaml(cls: Type[TS], *args, **kwargs) -> TS: import yaml d = yaml.safe_load(*args, **kwargs) return cls._deserialize(d)
@classmethod
[docs] def from_toml(cls: Type[TS], *args, **kwargs) -> TS: import toml d = toml.load(*args, **kwargs) return cls._deserialize(d)
@classmethod
[docs] def from_json(cls: Type[TS], *args, **kwargs) -> TS: import json d = json.load(*args, **kwargs) return cls._deserialize(d)
[docs] def to_yaml(self, *args, **kwargs): import yaml return yaml.safe_dump(self._serialize(), *args, **kwargs)
[docs] def to_json(self, *args, **kwargs): import json return json.dump(self._serialize(), *args, **kwargs)
[docs] def to_toml(self, *args, **kwargs): import toml return toml.dump(self._serialize(), *args, **kwargs)