Skip to content

Commit 4bd27d5

Browse files
authored
Merge pull request #12 from gnir-work/feature/add-blocking-handle-name-to-monitoring
Add pretty handle name to monitored result
2 parents f328abb + 4f9a7d4 commit 4bd27d5

File tree

7 files changed

+176
-32
lines changed

7 files changed

+176
-32
lines changed

README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ With every call you will receive an [IoLoopMonitorState](monitored_ioloop/monito
5151
- `callback_wall_time`: Wall executing time of the callback.
5252
- `loop_handles_count`: The amount of handles (think about them as tasks) that the IO loop is currently handling.
5353
- `loop_lag`: The amount of time it took from the moment the task was added to the loop until it was executed.
54+
- `callback_pretty_name`: The pretty name of the callback that was executed
55+
__Please Note__: This is a best effort, the name of the callback may still be of little help, depending on the specific callback implementation.
5456

5557
## Performance impact
5658
As many of you might be concerned about the performance impact of this library, I have run some benchmarks to measure the performance impact of this library.
@@ -66,7 +68,7 @@ Currently there is only the [fastapi with prometheus exporter example](examples/
6668
- [x] Add support for the amount of `Handle`'s on the event loop
6769
- [x] Add an examples folder
6870
- [x] Add loop lag metric (Inspired from nodejs loop monitoring)
69-
- [ ] Add visibility into which `Handle` are making the event loop slower
71+
- [x] Add visibility into which `Handle` are making the event loop slower
7072
- [ ] Add easier integration with `uvicorn`
7173
- [ ] Add easier integration with popular monitoring tools like Prometheus
7274

examples/simple_python_example/simple_python_example.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99

1010
async def main() -> None:
11-
async def sleep(coroutine_id: int) -> None:
11+
async def non_blocking_sleep(coroutine_id: int) -> None:
1212
logger.info(f"[id: {coroutine_id}] Before non-blocking sleep")
1313
await asyncio.sleep(1)
1414
logger.info(f"[id: {coroutine_id}] After non-blocking sleep")
@@ -19,23 +19,25 @@ async def blocking_sleep(coroutine_id: int) -> None:
1919
logger.info(f"[id: {coroutine_id}] After blocking sleep")
2020

2121
await asyncio.gather(
22-
sleep(coroutine_id=0),
22+
non_blocking_sleep(coroutine_id=0),
2323
blocking_sleep(coroutine_id=1),
24-
sleep(coroutine_id=2),
25-
sleep(coroutine_id=3),
24+
non_blocking_sleep(coroutine_id=2),
25+
non_blocking_sleep(coroutine_id=3),
2626
blocking_sleep(coroutine_id=4),
27-
sleep(coroutine_id=5),
27+
non_blocking_sleep(coroutine_id=5),
2828
)
2929

3030

3131
def monitor(ioloop_monitor_state: IoLoopMonitorState) -> None:
3232
if ioloop_monitor_state.callback_wall_time > 0.1:
3333
logger.warning(
34-
f"Blocking operation detected, executing took: {ioloop_monitor_state.callback_wall_time}"
34+
f"Blocking operation detected, executing of {ioloop_monitor_state.callback_pretty_name} "
35+
f"took: {ioloop_monitor_state.callback_wall_time}"
3536
)
3637
if ioloop_monitor_state.loop_lag > 0.1:
3738
logger.warning(
38-
f"A task was executed after a significant delay: {ioloop_monitor_state.loop_lag}"
39+
f"Task {ioloop_monitor_state.callback_pretty_name} was executed after "
40+
f"a significant delay: {ioloop_monitor_state.loop_lag}"
3941
)
4042

4143

monitored_ioloop/formatting_utils.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import typing
2+
from asyncio import tasks
3+
from asyncio.events import Handle
4+
5+
6+
def pretty_format_handle(handle: Handle) -> str:
7+
callback = handle._callback # type: ignore
8+
if isinstance(getattr(callback, "__self__", None), tasks.Task):
9+
# format the task
10+
return repr(callback.__self__)
11+
else:
12+
return repr(handle)
13+
14+
15+
def pretty_callback_name(callback: typing.Callable[..., typing.Any]) -> str:
16+
if isinstance(getattr(callback, "__self__", None), tasks.Task):
17+
# format the task
18+
return repr(callback.__self__) # type: ignore
19+
else:
20+
return repr(callback)

monitored_ioloop/monitored_asyncio.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ def call_soon(
3131
callback, self._monitor_callback, self._state
3232
)
3333

34-
return super().call_soon(callback_with_monitoring, *args, **kwargs)
34+
handle = super().call_soon(callback_with_monitoring, *args, **kwargs)
35+
callback_with_monitoring.set_handle(handle)
36+
return handle
3537

3638

3739
class MonitoredAsyncIOEventLoopPolicy(BaseMonitoredEventLoopPolicy):

monitored_ioloop/monitoring.py

+48-20
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import threading
22
import time
33
import typing
4+
from asyncio import Handle
45
from dataclasses import dataclass
56
from logging import getLogger
67

8+
from monitored_ioloop.formatting_utils import pretty_format_handle, pretty_callback_name
9+
710
logger = getLogger(__name__)
811

912

@@ -28,6 +31,12 @@ class IoLoopMonitorState:
2831
"""
2932
callback_wall_time: float
3033

34+
"""
35+
A best effort try to give a meaningful name to the callback that was currently executed.
36+
This property will come in handy when trying to debug callbacks with high wall time.
37+
"""
38+
callback_pretty_name: str
39+
3140
"""
3241
The amount of handles in the loop, excluding the current one.
3342
"""
@@ -59,36 +68,55 @@ def decrease_handles_count_thread_safe(self, decrease_by: int) -> None:
5968
self.handles_count -= decrease_by
6069

6170

62-
def wrap_callback_with_monitoring(
63-
callback: typing.Callable[..., typing.Any],
64-
monitor_callback: typing.Callable[[IoLoopMonitorState], None],
65-
ioloop_state: IoLoopInnerState,
66-
) -> typing.Callable[..., typing.Any]:
67-
"""
68-
Add monitoring to a callback.
69-
The callback will be wrapped in a function that will monitor the callbacks execution time and report
70-
back to the monitor_callback.
71-
"""
72-
ioloop_state.increase_handles_count_thread_safe(1)
73-
added_to_loop_time = time.perf_counter()
74-
75-
def wrapper(*inner_args: typing.Any, **inner_kwargs: typing.Any) -> typing.Any:
76-
loop_lag = time.perf_counter() - added_to_loop_time
71+
class MonitoredCallbackWrapper:
72+
def __init__(
73+
self,
74+
callback: typing.Callable[..., typing.Any],
75+
monitor_callback: typing.Callable[[IoLoopMonitorState], None],
76+
io_loop_state: IoLoopInnerState,
77+
):
78+
self._original_callback = callback
79+
self._monitor_callback = monitor_callback
80+
self._ioloop_state = io_loop_state
81+
self._added_to_loop_time = time.perf_counter()
82+
self._handle: typing.Optional[Handle] = None
83+
84+
def set_handle(self, handle: Handle) -> None:
85+
self._handle = handle
86+
87+
def __call__(self, *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
88+
loop_lag = time.perf_counter() - self._added_to_loop_time
7789
start_wall_time = time.perf_counter()
78-
response = callback(*inner_args, **inner_kwargs)
79-
ioloop_state.decrease_handles_count_thread_safe(1)
90+
response = self._original_callback(*args, **kwargs)
91+
self._ioloop_state.decrease_handles_count_thread_safe(1)
8092
wall_duration = time.perf_counter() - start_wall_time
8193

8294
try:
83-
monitor_callback(
95+
pretty_name = (
96+
pretty_format_handle(self._handle)
97+
if self._handle
98+
else pretty_callback_name(self._original_callback)
99+
)
100+
self._monitor_callback(
84101
IoLoopMonitorState(
85102
callback_wall_time=wall_duration,
86-
loop_handles_count=ioloop_state.handles_count,
103+
loop_handles_count=self._ioloop_state.handles_count,
87104
loop_lag=loop_lag,
105+
callback_pretty_name=pretty_name,
88106
)
89107
)
90108
except Exception:
91109
logger.warning("Monitor callback failed.", exc_info=True)
92110
return response
93111

94-
return wrapper
112+
def __getattr__(self, item: str) -> typing.Any:
113+
return getattr(self._original_callback, item)
114+
115+
116+
def wrap_callback_with_monitoring(
117+
callback: typing.Callable[..., typing.Any],
118+
monitor_callback: typing.Callable[[IoLoopMonitorState], None],
119+
ioloop_state: IoLoopInnerState,
120+
) -> MonitoredCallbackWrapper:
121+
ioloop_state.increase_handles_count_thread_safe(1)
122+
return MonitoredCallbackWrapper(callback, monitor_callback, ioloop_state)

stress_tests/results/README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ The architecture of system tests was
77

88
### Tests
99
#### 300 active locust users
10-
Under the stress of 300 users which resulted in ~70 requests per second there was
10+
Under the stress of 300 users which resulted in ~70 requests __per second__ there was
1111
no visible difference in the response time between monitored and vanilla loops.
1212

1313
#### 1000 active locust users
14-
Under the stress of 1000 users which resulted in ~220 requests per second there was
15-
a 5-10% increase in response time when observing the 90th - 100th percentile of longest requests.
14+
Under the stress of 1000 users which resulted in ~220 requests __per second__ there was
15+
a 5~7% increase in response time when observing the 90th - 100th percentile of longest requests.
1616
requests under the 90th percentile were not affected by the monitoring loop.

tests/test_asyncio_profiler.py

+90
Original file line numberDiff line numberDiff line change
@@ -213,3 +213,93 @@ def test_loop_lag(
213213
)
214214
== non_blocking_coroutines_count
215215
)
216+
217+
218+
@pytest.mark.parametrize(
219+
"ioloop_policy_class",
220+
[MonitoredAsyncIOEventLoopPolicy, MonitoredUvloopEventLoopPolicy],
221+
)
222+
def test_callback_pretty_name__basic_top_level_coroutine_name(
223+
ioloop_policy_class: typing.Type[MonitoredUvloopEventLoopPolicy],
224+
) -> None:
225+
mock = Mock()
226+
asyncio.set_event_loop_policy(ioloop_policy_class(monitor_callback=mock))
227+
asyncio.run(non_cpu_intensive_blocking_coroutine(0.1))
228+
assert (
229+
len(
230+
[
231+
callback_pretty_name
232+
for call in mock.mock_calls
233+
if "non_cpu_intensive_blocking_coroutine"
234+
in (callback_pretty_name := call.args[0].callback_pretty_name)
235+
]
236+
)
237+
== 1
238+
)
239+
240+
241+
async def several_coroutines_in_gather_with_pretty_name_testing() -> None:
242+
async def first_function() -> None:
243+
time.sleep(0.1)
244+
245+
async def second_function() -> None:
246+
time.sleep(0.1)
247+
248+
await asyncio.gather(
249+
first_function(),
250+
first_function(),
251+
second_function(),
252+
)
253+
254+
255+
@pytest.mark.parametrize(
256+
"ioloop_policy_class",
257+
[MonitoredAsyncIOEventLoopPolicy, MonitoredUvloopEventLoopPolicy],
258+
)
259+
def test_callback_pretty_name__several_coroutines_with_gather(
260+
ioloop_policy_class: typing.Type[MonitoredUvloopEventLoopPolicy],
261+
) -> None:
262+
mock = Mock()
263+
asyncio.set_event_loop_policy(ioloop_policy_class(monitor_callback=mock))
264+
asyncio.run(several_coroutines_in_gather_with_pretty_name_testing())
265+
266+
# The function is called twice from the ioloop, once until the gather and once after the gather has finished
267+
assert (
268+
len(
269+
[
270+
callback_pretty_name
271+
for call in mock.mock_calls
272+
if (
273+
"several_coroutines_in_gather_with_pretty_name_testing"
274+
in (callback_pretty_name := call.args[0].callback_pretty_name)
275+
and "first_function" not in callback_pretty_name
276+
and "second_function" not in callback_pretty_name
277+
)
278+
]
279+
)
280+
== 2
281+
)
282+
283+
assert (
284+
len(
285+
[
286+
callback_pretty_name
287+
for call in mock.mock_calls
288+
if "first_function"
289+
in (callback_pretty_name := call.args[0].callback_pretty_name)
290+
]
291+
)
292+
== 2
293+
)
294+
295+
assert (
296+
len(
297+
[
298+
callback_pretty_name
299+
for call in mock.mock_calls
300+
if "second_function"
301+
in (callback_pretty_name := call.args[0].callback_pretty_name)
302+
]
303+
)
304+
== 1
305+
)

0 commit comments

Comments
 (0)