-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathbuilder.py
131 lines (102 loc) · 4.23 KB
/
builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from abc import ABC
from collections.abc import Callable, Generator
from functools import wraps
from typing import Any, Generic, TypeVar, cast
from typing_extensions import ParamSpec
from .error import EffectError
_TInner = TypeVar("_TInner")
_TOuter = TypeVar("_TOuter")
_P = ParamSpec("_P")
class Builder(Generic[_TInner, _TOuter], ABC):
"""Effect builder."""
def bind(self, xs: _TOuter, fn: Callable[[Any], _TOuter]) -> _TOuter:
raise NotImplementedError("Builder does not implement a bind method")
def return_(self, x: _TInner) -> _TOuter:
raise NotImplementedError("Builder does not implement a return method")
def return_from(self, xs: _TOuter) -> _TOuter:
raise NotImplementedError("Builder does not implement a return from method")
def combine(self, xs: _TOuter, ys: _TOuter) -> _TOuter:
"""Used for combining multiple statements in the effect."""
raise NotImplementedError("Builder does not implement a combine method")
def zero(self) -> _TOuter:
"""Zero effect.
Called if the effect raises StopIteration without a value, i.e
returns None.
"""
raise NotImplementedError("Builder does not implement a zero method")
def delay(self, fn: Callable[[], _TOuter]) -> _TOuter:
"""Default implementation evaluates the given function."""
return fn()
def run(self, xs: _TOuter) -> _TOuter:
"""Default implementation assumes the result is already evaluated."""
return xs
def _send(
self,
gen: Generator[Any, Any, Any],
done: list[bool],
value: _TInner | None = None,
) -> _TOuter:
try:
yielded = gen.send(value)
return self.return_(yielded)
except EffectError as error:
# Effect errors (Nothing, Error, etc) short circuits the processing so we
# set `done` to `True` here.
done.append(True)
# get value from exception
value = error.args[0]
return self.return_from(cast("_TOuter", value))
except StopIteration as ex:
done.append(True)
# Return of a value in the generator produces StopIteration with a value
if ex.value is not None:
return self.return_(ex.value)
raise
except RuntimeError:
done.append(True)
raise StopIteration
def __call__(
self,
fn: Callable[
_P,
Generator[_TInner | None, _TInner, _TInner | None] | Generator[_TInner | None, None, _TInner | None],
],
) -> Callable[_P, _TOuter]:
"""Option builder.
Enables the use of computational expressions using coroutines.
Thus inside the coroutine the keywords `yield` and `yield from`
reassembles `yield` and `yield!` from F#.
Args:
fn: A function that contains a computational expression and
returns either a coroutine, generator or an option.
Returns:
A `builder` function that can wrap coroutines into builders.
"""
@wraps(fn)
def wrapper(*args: _P.args, **kw: _P.kwargs) -> _TOuter:
gen = fn(*args, **kw)
done: list[bool] = []
result: _TOuter | None = None
def binder(value: Any) -> _TOuter:
ret = self._send(gen, done, value)
# Delay every result except the first
if result is not None:
return self.delay(lambda: ret)
return ret
try:
result = self._send(gen, done)
while not done:
cont = self.bind(result, binder)
# Combine every result except the first
if result is None:
result = cont
else:
result = self.combine(result, cont)
except StopIteration:
pass
# If anything returns `None` (i.e raises StopIteration without a value) then
# we expect the effect to have a zero method implemented.
if result is None:
result = self.zero()
return self.run(result)
return wrapper