Source code for malcolm.core.context

import logging
import time
import weakref
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, Union

import cothread

from .concurrency import Queue
from .errors import AbortedError, BadValueError, TimeoutError
from .future import Future
from .request import Post, Put, Request, Subscribe, Unsubscribe
from .response import Error, Return, Update

if TYPE_CHECKING:
    from .controller import Controller
    from .models import Model
    from .process import Process

# Create a module level logger
log = logging.getLogger(__name__)


[docs]class Context: """Helper allowing Future style access to Block Attributes and Methods""" STOP = object() def __init__(self, process: "Process") -> None: self._q = Queue() # Func to call just before requests are dispatched self._notify_dispatch_request = None self._notify_args = () self._process = process self._next_id = 1 self._futures: Dict[int, Future] = {} self._subscriptions: Dict[int, Tuple[Callable, Any]] = {} self._requests: Dict[Future, Request] = {} self._pending_unsubscribes: Dict[Future, Subscribe] = {} # If not None, wait for this before listening to STOPs self._sentinel_stop = None @property def mri_list(self) -> List[str]: return self._process.mri_list def get_controller(self, mri): controller = self._process.get_controller(mri) return controller
[docs] def block_view(self, mri: str) -> Any: """Get a view of a block Args: mri: The mri of the controller hosting the block Returns: Block: The block we control """ controller = self.get_controller(mri) block = controller.block_view(weakref.proxy(self)) return block
def make_view( self, controller: "Controller", data: "Model", child_name: str ) -> Any: return controller.make_view(self, data, child_name) def _get_next_id(self): new_id = self._next_id self._next_id += 1 return new_id
[docs] def set_notify_dispatch_request(self, notify_dispatch_request, *args): """Set function to call just before requests are dispatched Args: notify_dispatch_request (callable): function will be called with request as single arg just before request is dispatched """ self._notify_dispatch_request = notify_dispatch_request self._notify_args = args
def _dispatch_request(self, request): future = Future(weakref.proxy(self)) self._futures[request.id] = future self._requests[future] = request controller = self.get_controller(request.path[0]) if self._notify_dispatch_request: self._notify_dispatch_request(request, *self._notify_args) self.handle_request(controller, request) return future def handle_request(self, controller, request): controller.handle_request(request) # Yield control to allow the request to be handled cothread.Yield()
[docs] def ignore_stops_before_now(self): """Ignore any stops received before this point""" self._sentinel_stop = object() self._q.put(self._sentinel_stop)
[docs] def stop(self): """Stops any wait_all_futures call with an AbortedError""" self._q.put(self.STOP)
[docs] def put(self, path, value, timeout=None, event_timeout=None): """Puts a value to a path and returns when it completes Args: path (list): The path to put to value (object): The value to set timeout (float): time in seconds to wait for responses, wait forever if None event_timeout: maximum time in seconds to wait between each response event, wait forever if None Returns: The value after the put completes """ future = self.put_async(path, value) self.wait_all_futures(future, timeout=timeout, event_timeout=event_timeout) return future.result()
[docs] def put_async(self, path, value): """Puts a value to a path and returns immediately Args: path (list): The path to put to value (object): The value to set Returns: Future: A single Future which will resolve to the result """ request = Put(self._get_next_id(), path, value) request.set_callback(self._q.put) future = self._dispatch_request(request) return future
[docs] def post(self, path, params=None, timeout=None, event_timeout=None): """Synchronously calls a method Args: path (list): The path to post to params (dict): parameters for the call timeout (float): time in seconds to wait for responses, wait forever if None event_timeout: maximum time in seconds to wait between each response event, wait forever if None Returns: the result from 'method' """ future = self.post_async(path, params) self.wait_all_futures(future, timeout=timeout, event_timeout=event_timeout) return future.result()
[docs] def post_async(self, path, params=None): """Asynchronously calls a function on a child block Args: path (list): The path to post to params (dict): parameters for the call Returns: Future: as single Future that will resolve to the result """ request = Post(self._get_next_id(), path, params) request.set_callback(self._q.put) future = self._dispatch_request(request) return future
[docs] def subscribe(self, path, callback, *args): """Subscribe to changes in a given attribute and call ``callback(future, value, *args)`` when it changes Returns: Future: A single Future which will resolve to the result """ request = Subscribe(self._get_next_id(), path, delta=False) request.set_callback(self._q.put) # If self is in args, then make weak version of it saved_args = [] for arg in args: if arg is self: saved_args.append(weakref.proxy(self)) else: saved_args.append(arg) self._subscriptions[request.id] = (callback, saved_args) future = self._dispatch_request(request) return future
[docs] def unsubscribe(self, future): """Terminates the subscription given by a future Args: future (Future): The future of the original subscription """ assert ( future not in self._pending_unsubscribes ), f"{self._pending_unsubscribes[future]!r} has already been unsubscribed from" subscribe = self._requests[future] self._pending_unsubscribes[future] = subscribe # Clear out the subscription self._subscriptions.pop(subscribe.id) request = Unsubscribe(subscribe.id) request.set_callback(self._q.put) try: controller = self.get_controller(subscribe.path[0]) except ValueError: # Controller has already gone, probably during tearDown pass else: self.handle_request(controller, request)
[docs] def unsubscribe_all(self, callback=False): """Send an unsubscribe for all active subscriptions""" futures = ( (f, r) for f, r in self._requests.items() if isinstance(r, Subscribe) and f not in self._pending_unsubscribes ) if futures: for future, request in futures: if callback: log.warning(f"Unsubscribing from {request.path}") cothread.Callback(self.unsubscribe, future) else: self.unsubscribe(future)
def __del__(self): # Unsubscribe from anything that is still active self.unsubscribe_all(callback=True)
[docs] def when_matches( self, path, good_value, bad_values=None, timeout=None, event_timeout=None ): """Resolve when an path value equals value Args: path (list): The path to wait to good_value (object): the value to wait for bad_values (list): values to raise an error on timeout (float): time in seconds to wait for responses, wait forever if None event_timeout: maximum time in seconds to wait between each response event, wait forever if None """ future = self.when_matches_async(path, good_value, bad_values) self.wait_all_futures(future, timeout=timeout, event_timeout=event_timeout)
[docs] def when_matches_async(self, path, good_value, bad_values=None): """Wait for an attribute to become a given value Args: path (list): The path to wait to good_value: If it is a callable then expect it to return True if we are satisfied and raise on error. If it is not callable then compare each value against this one and return if it matches. bad_values (list): values to raise an error on Returns: Future: a single Future that will resolve when the path matches good_value or bad_values """ when = When(good_value, bad_values) future = self.subscribe(path, when) when.set_future_context(future, weakref.proxy(self)) return future
[docs] def wait_all_futures( self, futures: Union[List[Future], Future, None], timeout: float = None, event_timeout: float = None, ) -> None: """Services all futures until the list 'futures' are all done then returns. Calls relevant subscription callbacks as they come off the queue and raises an exception on abort Args: futures: a `Future` or list of all futures that the caller wants to wait for timeout: maximum total time in seconds to wait for responses, wait forever if None event_timeout: maximum time in seconds to wait between each response event, wait forever if None """ if timeout is None: end = None else: end = time.time() + timeout if not isinstance(futures, list): if futures: futures = [futures] else: futures = [] filtered_futures = [] for f in futures: if f.done(): if f.exception() is not None: raise f.exception() else: filtered_futures.append(f) until: Union[float, None] while filtered_futures: if event_timeout is not None: until = time.time() + event_timeout if end is not None: until = min(until, end) else: until = end self._service_futures(filtered_futures, until)
[docs] def sleep(self, seconds): """Services all futures while waiting Args: seconds (float): Time to wait """ until = time.time() + seconds try: while True: self._service_futures([], until) except TimeoutError: return
def _describe_futures(self, futures): descriptions = [] for future in futures: request = self._requests.get(future, None) if isinstance(request, Put): path = ".".join(request.path) descriptions.append(f"{path}.put_value({request.value})") elif isinstance(request, Subscribe): path = ".".join(request.path) func, _ = self._subscriptions.get(request.id, (None, None)) if isinstance(func, When): descriptions.append( f"When({path}, {func.condition_satisfied.__name__}, " f"last={func.last})" ) elif func is None: # We have already called unsubscribe, but haven't received # the Return for it yet descriptions.append(f"Unsubscribed({path})") else: descriptions.append(f"Subscribe({path})") elif isinstance(request, Post): path = ".".join(request.path) if request.parameters: params = "..." else: params = "" descriptions.append(f"{path}({params})") else: descriptions.append(str(request)) if descriptions: return "[" + ", ".join(descriptions) + "]" else: return "[]" def _service_futures(self, futures, until=None): """Service futures Args: futures (list): The futures to service until (float): Timestamp to wait until """ if until is None: timeout = None else: timeout = until - time.time() if timeout < 0: timeout = 0 try: response = self._q.get(timeout) except TimeoutError: raise TimeoutError(f"Timeout waiting for {self._describe_futures(futures)}") if response is self._sentinel_stop: self._sentinel_stop = None elif response is self.STOP: if self._sentinel_stop is None: # This is a stop we should listen to... raise AbortedError( f"Aborted waiting for {self._describe_futures(futures)}" ) elif isinstance(response, Update): # This is an update for a subscription if response.id in self._subscriptions: func, args = self._subscriptions[response.id] func(response.value, *args) # func() may call wait_for_futures() which may call set_result on # some futures that aren't known to it. This means that some of # our futures list are now concluded, so filter them out. If we # didn't do this we would hang forever for future in futures: if future not in self._requests: futures.remove(future) elif isinstance(response, Return): future = self._futures.pop(response.id) del self._requests[future] self._pending_unsubscribes.pop(future, None) result = response.value future.set_result(result) try: futures.remove(future) except ValueError: pass elif isinstance(response, Error): future = self._futures.pop(response.id) del self._requests[future] future.set_exception(response.message) try: futures.remove(future) except ValueError: pass else: raise response.message
class When: def __init__(self, good_value: Callable[[Any], bool], bad_values: Any) -> None: if callable(good_value): def condition_satisfied(value): return good_value(value) else: def condition_satisfied(value): if bad_values and value in bad_values: raise BadValueError(f"Waiting for {good_value!r}, got {value!r}") return value == good_value condition_satisfied.__name__ = f"equals_{good_value}" self.condition_satisfied = condition_satisfied self.future: Union[Future, None] = None self.context: Union[Context, None] = None self.last = None def set_future_context(self, future: Future, context: Context) -> None: self.future = future self.context = context def __call__(self, value: Any) -> None: # Need to check if we have a future as we might be called while the # unsubscribe is taking place if self.future: self.last = value try: satisfied = self.condition_satisfied(value) except Exception: # Bad value, so unsubscribe assert self.context, "No context" self.context.unsubscribe(self.future) self.future = None raise else: if satisfied: # All done, so unsubscribe assert self.context, "No context" self.context.unsubscribe(self.future) self.future = None