From 0ff4b65abd6275651c4d31ff6662735a5b1c457b Mon Sep 17 00:00:00 2001 From: otorreno Date: Fri, 7 Mar 2025 11:12:55 +0000 Subject: [PATCH] fix: make atexit registration conditional on background transport The background transport fails on Python 3.12 when working with gRPC in case there are messages in the queue when the Python interpreter exits. Making the registration of the atexit callback conditional so that users can circumvent this error by explicitly stopping the logging when their application terminates. We can consider making the default value depend on the Python version at runtime, but it was decided to not change the default behaviour, regardless of the Python version. Fixes #850 and #855 (if users would setup the logging disabling the atexit hook) --- google/cloud/logging_v2/handlers/handlers.py | 2 +- .../handlers/transports/background_thread.py | 14 +++++++++++++- .../transports/test_background_thread.py | 19 +++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/google/cloud/logging_v2/handlers/handlers.py b/google/cloud/logging_v2/handlers/handlers.py index e71f673f7..c8ee9dbb5 100644 --- a/google/cloud/logging_v2/handlers/handlers.py +++ b/google/cloud/logging_v2/handlers/handlers.py @@ -188,7 +188,7 @@ def __init__( resource = detect_resource(client.project) self.name = name self.client = client - self.transport = transport(client, name, resource=resource) + self.transport = transport(client, name, resource=resource, **kwargs) self.project_id = client.project self.resource = resource self.labels = labels diff --git a/google/cloud/logging_v2/handlers/transports/background_thread.py b/google/cloud/logging_v2/handlers/transports/background_thread.py index 7cf2799f5..ea45c4073 100644 --- a/google/cloud/logging_v2/handlers/transports/background_thread.py +++ b/google/cloud/logging_v2/handlers/transports/background_thread.py @@ -34,6 +34,7 @@ _DEFAULT_GRACE_PERIOD = 5.0 # Seconds _DEFAULT_MAX_BATCH_SIZE = 10 _DEFAULT_MAX_LATENCY = 0 # Seconds +_DEFAULT_REGISTER_EXIT_CALLBACK = True _WORKER_THREAD_NAME = "google.cloud.logging.Worker" _WORKER_TERMINATOR = object() _LOGGER = logging.getLogger(__name__) @@ -79,6 +80,7 @@ def __init__( grace_period=_DEFAULT_GRACE_PERIOD, max_batch_size=_DEFAULT_MAX_BATCH_SIZE, max_latency=_DEFAULT_MAX_LATENCY, + register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK, ): """ Args: @@ -93,11 +95,15 @@ def __init__( than the grace_period. This means this is effectively the longest amount of time the background thread will hold onto log entries before sending them to the server. + register_exit_callback (Optional[bool]): Whether to register the atexit callback + or not. Starting Python 3.12 atexit does not allow to create new threads, what + happens when using gRPC. """ self._cloud_logger = cloud_logger self._grace_period = grace_period self._max_batch_size = max_batch_size self._max_latency = max_latency + self._register_exit_callback = register_exit_callback self._queue = queue.Queue(0) self._operational_lock = threading.Lock() self._thread = None @@ -162,7 +168,8 @@ def start(self): ) self._thread.daemon = True self._thread.start() - atexit.register(self._main_thread_terminated) + if self._register_exit_callback: + atexit.register(self._main_thread_terminated) def stop(self, *, grace_period=None): """Signals the background thread to stop. @@ -264,6 +271,7 @@ def __init__( batch_size=_DEFAULT_MAX_BATCH_SIZE, max_latency=_DEFAULT_MAX_LATENCY, resource=_GLOBAL_RESOURCE, + register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK, **kwargs, ): """ @@ -280,6 +288,9 @@ def __init__( than the grace_period. This means this is effectively the longest amount of time the background thread will hold onto log entries before sending them to the server. + register_exit_callback (Optional[bool]): Whether to register the atexit callback + or not. Starting Python 3.12 atexit does not allow to create new threads, what + happens when using gRPC. resource (Optional[Resource|dict]): The default monitored resource to associate with logs when not specified """ @@ -290,6 +301,7 @@ def __init__( grace_period=grace_period, max_batch_size=batch_size, max_latency=max_latency, + register_exit_callback=register_exit_callback, ) self.worker.start() diff --git a/tests/unit/handlers/transports/test_background_thread.py b/tests/unit/handlers/transports/test_background_thread.py index d4954ff7b..1f441ee8b 100644 --- a/tests/unit/handlers/transports/test_background_thread.py +++ b/tests/unit/handlers/transports/test_background_thread.py @@ -230,6 +230,25 @@ def test_start(self): self._start_with_thread_patch(worker) self.assertIs(current_thread, worker._thread) + def test_start_not_registering_exit_callback(self): + from google.cloud.logging_v2.handlers.transports import background_thread + + worker = self._make_one(_Logger(self.NAME), register_exit_callback=False) + + _, atexit_mock = self._start_with_thread_patch(worker) + + self.assertTrue(worker.is_alive) + self.assertIsNotNone(worker._thread) + self.assertTrue(worker._thread.daemon) + self.assertEqual(worker._thread._target, worker._thread_main) + self.assertEqual(worker._thread._name, background_thread._WORKER_THREAD_NAME) + atexit_mock.assert_not_called() + + # Calling start again should not start a new thread. + current_thread = worker._thread + self._start_with_thread_patch(worker) + self.assertIs(current_thread, worker._thread) + def test_stop(self): from google.cloud.logging_v2.handlers.transports import background_thread