Skip to content

Commit d00d198

Browse files
authoredJun 16, 2021
Added .close() to memory streams (#313)
1 parent 5460e21 commit d00d198

File tree

4 files changed

+62
-3
lines changed

4 files changed

+62
-3
lines changed
 

‎docs/streams.rst

+8
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ Example::
6767

6868
run(main)
6969

70+
In contrast to other AnyIO streams (but in line with trio's Channels), memory object streams can be
71+
closed synchronously, using either the ``close()`` method or by using the stream as a context
72+
manager::
73+
74+
def synchronous_callback(send_stream: MemoryObjectSendStream) -> None:
75+
with stream:
76+
stream.send_nowait('hello')
77+
7078
Stapled streams
7179
---------------
7280

‎docs/versionhistory.rst

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
55

66
**UNRELEASED**
77

8+
- Added the ability to close memory object streams synchronously (including support for use as a
9+
synchronous context manager)
810
- Fixed ``to_thread.run_sync()`` hanging on the second call on asyncio when used with
911
``loop.run_until_complete()``
1012
- Fixed the type annotation of ``open_signal_receiver()`` as a synchronous context manager

‎src/anyio/streams/memory.py

+40-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from collections import OrderedDict, deque
22
from dataclasses import dataclass, field
3-
from typing import Deque, Generic, List, NamedTuple, TypeVar
3+
from types import TracebackType
4+
from typing import Deque, Generic, List, NamedTuple, Optional, Type, TypeVar
45

56
from .. import (
67
BrokenResourceError, ClosedResourceError, EndOfStream, WouldBlock, get_cancelled_exc_class)
@@ -112,7 +113,14 @@ def clone(self) -> 'MemoryObjectReceiveStream':
112113

113114
return MemoryObjectReceiveStream(_state=self._state)
114115

115-
async def aclose(self) -> None:
116+
def close(self) -> None:
117+
"""
118+
Close the stream.
119+
120+
This works the exact same way as :meth:`aclose`, but is provided as a special case for the
121+
benefit of synchronous callbacks.
122+
123+
"""
116124
if not self._closed:
117125
self._closed = True
118126
self._state.open_receive_channels -= 1
@@ -121,6 +129,9 @@ async def aclose(self) -> None:
121129
for event in send_events:
122130
event.set()
123131

132+
async def aclose(self) -> None:
133+
self.close()
134+
124135
def statistics(self) -> MemoryObjectStreamStatistics:
125136
"""
126137
Return statistics about the current state of this stream.
@@ -129,6 +140,14 @@ def statistics(self) -> MemoryObjectStreamStatistics:
129140
"""
130141
return self._state.statistics()
131142

143+
def __enter__(self) -> 'MemoryObjectReceiveStream[T_Item]':
144+
return self
145+
146+
def __exit__(self, exc_type: Optional[Type[BaseException]],
147+
exc_val: Optional[BaseException],
148+
exc_tb: Optional[TracebackType]) -> None:
149+
self.close()
150+
132151

133152
@dataclass(eq=False)
134153
class MemoryObjectSendStream(Generic[T_Item], ObjectSendStream[T_Item]):
@@ -198,7 +217,14 @@ def clone(self) -> 'MemoryObjectSendStream':
198217

199218
return MemoryObjectSendStream(_state=self._state)
200219

201-
async def aclose(self) -> None:
220+
def close(self) -> None:
221+
"""
222+
Close the stream.
223+
224+
This works the exact same way as :meth:`aclose`, but is provided as a special case for the
225+
benefit of synchronous callbacks.
226+
227+
"""
202228
if not self._closed:
203229
self._closed = True
204230
self._state.open_send_channels -= 1
@@ -208,10 +234,21 @@ async def aclose(self) -> None:
208234
for event in receive_events:
209235
event.set()
210236

237+
async def aclose(self) -> None:
238+
self.close()
239+
211240
def statistics(self) -> MemoryObjectStreamStatistics:
212241
"""
213242
Return statistics about the current state of this stream.
214243
215244
.. versionadded:: 3.0
216245
"""
217246
return self._state.statistics()
247+
248+
def __enter__(self) -> 'MemoryObjectSendStream[T_Item]':
249+
return self
250+
251+
def __exit__(self, exc_type: Optional[Type[BaseException]],
252+
exc_val: Optional[BaseException],
253+
exc_tb: Optional[TracebackType]) -> None:
254+
self.close()

‎tests/streams/test_memory.py

+12
Original file line numberDiff line numberDiff line change
@@ -320,3 +320,15 @@ async def test_statistics() -> None:
320320
assert stream.statistics().current_buffer_used == 0
321321
assert stream.statistics().tasks_waiting_send == 0
322322
assert stream.statistics().tasks_waiting_receive == 0
323+
324+
325+
async def test_sync_close() -> None:
326+
send_stream, receive_stream = create_memory_object_stream(1)
327+
with send_stream, receive_stream:
328+
pass
329+
330+
with pytest.raises(ClosedResourceError):
331+
send_stream.send_nowait(None)
332+
333+
with pytest.raises(ClosedResourceError):
334+
receive_stream.receive_nowait()

0 commit comments

Comments
 (0)
Please sign in to comment.