This module contains a single class, :py:class:`SelfResetLazy`. It is used to
lazily load pre-generated densities in the web application. Since these files
can get large quickly, the :py:class:`SelfResetLazy` allows to free memory if
its loaded value has expired.

from concurrent.futures import Future, ThreadPoolExecutor
from threading import Semaphore, Timer
from typing import Any, Callable, TypeVar, Generic, Union
from typing_extensions import Self
from gc import collect

T = TypeVar('T')

[docs]class SelfResetLazy(Generic[T]): """ This class lazily loads and automatically destroys its value after some timeout, so that subsequent requests to it force the factory to produce a new instance. All of its semantics are thread-safe (except for the explicitly named volatile members). """
[docs] def __init__(self, fn_create_val: Callable[[], T], fn_destroy_val: Callable[[T], Any]=None, reset_after: float=None) -> None: """ fn_create_val: ``Callable[[], T]`` Function that produces the desired value. fn_destroy_val: ``Callable[[T], Any]`` Function that will be given the value before it is de-referenced here. reset_after: ``float`` Amount of time, in seconds, after which the produced value ought to be destroyed. """ self._val: T = None self._val_type: type = None self._has_val = False self._semaphore = Semaphore(1) self._fn_create_val = fn_create_val self._fn_destroy_val = fn_destroy_val self._reset_after = reset_after self._timer: Timer = None self._tpe = ThreadPoolExecutor(max_workers=1)
@property def reset_after(self): """ Thread-safe getter for the reset-after property. """ try: self._semaphore.acquire() return self._reset_after finally: self._semaphore.release() @reset_after.setter def reset_after(self, value: float=None): """ Thread-safe setter for the reset-after property. """ try: self._semaphore.acquire() self._reset_after = value self._set_timer() # Conditionally re-sets a timer finally: self._semaphore.release() return self
[docs] def unset_value(self): """ Thread-safe method to destroy a previously produced value. If a value is present, it is passed to py:meth:`fn_destroy_val()` first. """ try: self._semaphore.acquire() self._unset_timer() if self._has_val: if callable(self._fn_destroy_val): self._fn_destroy_val(self._val) # Pass in the current value self._val = None self._val_type = None self._has_val = False collect() finally: self._semaphore.release() return self
def _unset_timer(self): if type(self._timer) is Timer and self._timer.is_alive(): self._timer.cancel() del self._timer self._timer = None return self def _set_timer(self): self._unset_timer() if type(self._reset_after) is float and self._reset_after > 0.0: self._timer = Timer(interval=self._reset_after, function=self.unset_value) self._timer.start() return self @property def has_value_volatile(self) -> bool: """ Checks, in a volatile manner, if a value is present. """ return self._has_val @property def has_value(self) -> bool: """ Thread-safe getter for checking if a value is present. """ try: self._semaphore.acquire() return self._has_val finally: self._semaphore.release() @property def value_volatile(self) -> Union[None, T]: """ Volatile getter for the may-not-present value. """ return self._val @property def value(self) -> T: """ Thread-safe getter for the value. If no value is present, one will be produced and this getter blocks until then. """ try: self._semaphore.acquire() if not self._has_val: self._val = self._fn_create_val() self._val_type = type(self._val) self._has_val = True self._set_timer() return self._val finally: self._semaphore.release() @property def value_future(self) -> Future[T]: """ Returns an awaitable :py:class:`Future` that will hold the value once it is available. """ f = Future() temp = self._val tempt = self._val_type if not tempt is None and isinstance(temp, tempt) and self.has_value_volatile: # pragma: no cover f.set_result(temp) else: def set_val(): try: f.set_result(self.value) except Exception as e: f.set_exception(e) self._tpe.submit(set_val) return f