Source code for metrics_as_scores.tools.lazy

"""
This module contains a single class, :py:class:`SelfResetLazy`. It is used to
lazily load pre-generated densities in the web application. Since these files
can get large quickly, the :py:class:`SelfResetLazy` allows to free memory if
its loaded value has expired.
"""

from concurrent.futures import Future, ThreadPoolExecutor
from threading import Semaphore, Timer
from typing import Any, Callable, TypeVar, Generic, Union
from typing_extensions import Self
from gc import collect


T = TypeVar('T')


[docs]class SelfResetLazy(Generic[T]): """ This class lazily loads and automatically destroys its value after some timeout, so that subsequent requests to it force the factory to produce a new instance. All of its semantics are thread-safe (except for the explicitly named volatile members). """
[docs] def __init__(self, fn_create_val: Callable[[], T], fn_destroy_val: Callable[[T], Any]=None, reset_after: float=None) -> None: """ fn_create_val: ``Callable[[], T]`` Function that produces the desired value. fn_destroy_val: ``Callable[[T], Any]`` Function that will be given the value before it is de-referenced here. reset_after: ``float`` Amount of time, in seconds, after which the produced value ought to be destroyed. """ self._val: T = None self._val_type: type = None self._has_val = False self._semaphore = Semaphore(1) self._fn_create_val = fn_create_val self._fn_destroy_val = fn_destroy_val self._reset_after = reset_after self._timer: Timer = None self._tpe = ThreadPoolExecutor(max_workers=1)
@property def reset_after(self): """ Thread-safe getter for the reset-after property. """ try: self._semaphore.acquire() return self._reset_after finally: self._semaphore.release() @reset_after.setter def reset_after(self, value: float=None): """ Thread-safe setter for the reset-after property. """ try: self._semaphore.acquire() self._reset_after = value self._set_timer() # Conditionally re-sets a timer finally: self._semaphore.release() return self
[docs] def unset_value(self): """ Thread-safe method to destroy a previously produced value. If a value is present, it is passed to py:meth:`fn_destroy_val()` first. """ try: self._semaphore.acquire() self._unset_timer() if self._has_val: if callable(self._fn_destroy_val): self._fn_destroy_val(self._val) # Pass in the current value self._val = None self._val_type = None self._has_val = False collect() finally: self._semaphore.release() return self
def _unset_timer(self): if type(self._timer) is Timer and self._timer.is_alive(): self._timer.cancel() del self._timer self._timer = None return self def _set_timer(self): self._unset_timer() if type(self._reset_after) is float and self._reset_after > 0.0: self._timer = Timer(interval=self._reset_after, function=self.unset_value) self._timer.start() return self @property def has_value_volatile(self) -> bool: """ Checks, in a volatile manner, if a value is present. """ return self._has_val @property def has_value(self) -> bool: """ Thread-safe getter for checking if a value is present. """ try: self._semaphore.acquire() return self._has_val finally: self._semaphore.release() @property def value_volatile(self) -> Union[None, T]: """ Volatile getter for the may-not-present value. """ return self._val @property def value(self) -> T: """ Thread-safe getter for the value. If no value is present, one will be produced and this getter blocks until then. """ try: self._semaphore.acquire() if not self._has_val: self._val = self._fn_create_val() self._val_type = type(self._val) self._has_val = True self._set_timer() return self._val finally: self._semaphore.release() @property def value_future(self) -> Future[T]: """ Returns an awaitable :py:class:`Future` that will hold the value once it is available. """ f = Future() temp = self._val tempt = self._val_type if not tempt is None and isinstance(temp, tempt) and self.has_value_volatile: # pragma: no cover f.set_result(temp) else: def set_val(): try: f.set_result(self.value) except Exception as e: f.set_exception(e) self._tpe.submit(set_val) return f