Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make atexit registration conditional on background transport #978

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/logging_v2/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def __init__(
resource = detect_resource(client.project)
self.name = name
self.client = client
self.transport = transport(client, name, resource=resource)
self.transport = transport(client, name, resource=resource, **kwargs)
self.project_id = client.project
self.resource = resource
self.labels = labels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
_DEFAULT_MAX_BATCH_SIZE = 10
_DEFAULT_MAX_LATENCY = 0 # Seconds
_DEFAULT_REGISTER_EXIT_CALLBACK = True
_WORKER_THREAD_NAME = "google.cloud.logging.Worker"
_WORKER_TERMINATOR = object()
_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(
grace_period=_DEFAULT_GRACE_PERIOD,
max_batch_size=_DEFAULT_MAX_BATCH_SIZE,
max_latency=_DEFAULT_MAX_LATENCY,
register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK,
):
"""
Args:
Expand All @@ -93,11 +95,15 @@ def __init__(
than the grace_period. This means this is effectively the longest
amount of time the background thread will hold onto log entries
before sending them to the server.
register_exit_callback (Optional[bool]): Whether to register the atexit callback
or not. Starting Python 3.12 atexit does not allow to create new threads, what
happens when using gRPC.
"""
self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._max_batch_size = max_batch_size
self._max_latency = max_latency
self._register_exit_callback = register_exit_callback
self._queue = queue.Queue(0)
self._operational_lock = threading.Lock()
self._thread = None
Expand Down Expand Up @@ -162,7 +168,8 @@ def start(self):
)
self._thread.daemon = True
self._thread.start()
atexit.register(self._main_thread_terminated)
if self._register_exit_callback:
atexit.register(self._main_thread_terminated)

def stop(self, *, grace_period=None):
"""Signals the background thread to stop.
Expand Down Expand Up @@ -264,6 +271,7 @@ def __init__(
batch_size=_DEFAULT_MAX_BATCH_SIZE,
max_latency=_DEFAULT_MAX_LATENCY,
resource=_GLOBAL_RESOURCE,
register_exit_callback=_DEFAULT_REGISTER_EXIT_CALLBACK,
**kwargs,
):
"""
Expand All @@ -280,6 +288,9 @@ def __init__(
than the grace_period. This means this is effectively the longest
amount of time the background thread will hold onto log entries
before sending them to the server.
register_exit_callback (Optional[bool]): Whether to register the atexit callback
or not. Starting Python 3.12 atexit does not allow to create new threads, what
happens when using gRPC.
resource (Optional[Resource|dict]): The default monitored resource to associate
with logs when not specified
"""
Expand All @@ -290,6 +301,7 @@ def __init__(
grace_period=grace_period,
max_batch_size=batch_size,
max_latency=max_latency,
register_exit_callback=register_exit_callback,
)
self.worker.start()

Expand Down
19 changes: 19 additions & 0 deletions tests/unit/handlers/transports/test_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,25 @@ def test_start(self):
self._start_with_thread_patch(worker)
self.assertIs(current_thread, worker._thread)

def test_start_not_registering_exit_callback(self):
from google.cloud.logging_v2.handlers.transports import background_thread

worker = self._make_one(_Logger(self.NAME), register_exit_callback=False)

_, atexit_mock = self._start_with_thread_patch(worker)

self.assertTrue(worker.is_alive)
self.assertIsNotNone(worker._thread)
self.assertTrue(worker._thread.daemon)
self.assertEqual(worker._thread._target, worker._thread_main)
self.assertEqual(worker._thread._name, background_thread._WORKER_THREAD_NAME)
atexit_mock.assert_not_called()

# Calling start again should not start a new thread.
current_thread = worker._thread
self._start_with_thread_patch(worker)
self.assertIs(current_thread, worker._thread)

def test_stop(self):
from google.cloud.logging_v2.handlers.transports import background_thread

Expand Down