Source code for py_gql.execution.runtime.blocking

# -*- coding: utf-8 -*-

from typing import Any, Callable, Iterable, Optional, Tuple, Type, TypeVar

from .base import Runtime


T = TypeVar("T")
E = TypeVar("E", bound=Exception)
TException = TypeVar("TException", bound=Type[Exception])
AnyFn = Callable[..., Any]


[docs]class BlockingRuntime(Runtime): """ Default runtime implementation which blocks the current thread. """
[docs] def submit(self, fn: AnyFn, *args: Any, **kwargs: Any) -> Any: return fn(*args, **kwargs)
[docs] def ensure_wrapped(self, value: Any) -> Any: return value
[docs] def gather_values(self, values: Iterable[Any]) -> Any: return list(values)
[docs] def map_value( self, value: Any, then: Callable[[Any], T], else_: Optional[Tuple[Type[E], Callable[[E], T]]] = None, ) -> Any: try: return then(value) except Exception as err: if else_ and isinstance(err, else_[0]): return else_[1](err) raise
[docs] def unwrap_value(self, value): return value
[docs] def wrap_callable(self, func: AnyFn) -> AnyFn: return func