-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Added flushes/close functionality to logging handlers #917
base: main
Are you sure you want to change the base?
Changes from all commits
9f85e57
d725eff
9ee7bae
ae3ece3
b46779e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,7 +188,10 @@ def __init__( | |
resource = detect_resource(client.project) | ||
self.name = name | ||
self.client = client | ||
client._handlers.add(self) | ||
self.transport = transport(client, name, resource=resource) | ||
self.transport_open = True | ||
self._transport_cls = transport | ||
self.project_id = client.project | ||
self.resource = resource | ||
self.labels = labels | ||
|
@@ -213,6 +216,12 @@ def emit(self, record): | |
labels = {**add_resource_labels(resource, record), **(labels or {})} or None | ||
|
||
# send off request | ||
if not self.transport_open: | ||
self.transport = self._transport_cls( | ||
self.client, self.name, resource=self.resource | ||
) | ||
self.transport_open = True | ||
|
||
self.transport.send( | ||
record, | ||
message, | ||
|
@@ -225,6 +234,22 @@ def emit(self, record): | |
source_location=record._source_location, | ||
) | ||
|
||
def flush(self): | ||
"""Forces the Transport object to submit any pending log records. | ||
|
||
For SyncTransport, this is a no-op. | ||
""" | ||
super(CloudLoggingHandler, self).flush() | ||
if self.transport_open: | ||
self.transport.flush() | ||
|
||
def close(self): | ||
"""Closes the log handler and cleans up all Transport objects used.""" | ||
self.transport.close() | ||
self.transport = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the idea of using Can't we use a new flag for this? |
||
self.transport_open = False | ||
|
||
|
||
|
||
def _format_and_parse_message(record, formatter_handler): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,13 @@ | |
_WORKER_TERMINATOR = object() | ||
_LOGGER = logging.getLogger(__name__) | ||
|
||
_CLOSE_THREAD_SHUTDOWN_ERROR_MSG = ( | ||
"CloudLoggingHandler shutting down, cannot send logs entries to Cloud Logging due to " | ||
"inconsistent threading behavior at shutdown. To avoid this issue, flush the logging handler " | ||
"manually or switch to StructuredLogHandler. You can also close the CloudLoggingHandler manually " | ||
"via handler.close or client.close." | ||
) | ||
|
||
|
||
def _get_many(queue_, *, max_items=None, max_latency=0): | ||
"""Get multiple items from a Queue. | ||
|
@@ -140,9 +147,11 @@ def _thread_main(self): | |
else: | ||
batch.log(**item) | ||
|
||
self._safely_commit_batch(batch) | ||
# We cannot commit logs upstream if the main thread is shutting down | ||
if threading.main_thread().is_alive(): | ||
self._safely_commit_batch(batch) | ||
|
||
for _ in items: | ||
for it in items: | ||
self._queue.task_done() | ||
|
||
_LOGGER.debug("Background thread exited gracefully.") | ||
|
@@ -162,7 +171,7 @@ def start(self): | |
) | ||
self._thread.daemon = True | ||
self._thread.start() | ||
atexit.register(self._main_thread_terminated) | ||
atexit.register(self._close) | ||
|
||
def stop(self, *, grace_period=None): | ||
"""Signals the background thread to stop. | ||
|
@@ -202,26 +211,40 @@ def stop(self, *, grace_period=None): | |
|
||
return success | ||
|
||
def _main_thread_terminated(self): | ||
def _close(self): | ||
"""Callback that attempts to send pending logs before termination.""" | ||
if not self.is_alive: | ||
return | ||
|
||
# Print different messages to the user depending on whether or not the | ||
# program is shutting down. This is because this function now handles both | ||
# the atexit handler and the regular close. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we decouple this? Maybe let the caller pass in a message or something? |
||
if not self._queue.empty(): | ||
print( | ||
"Program shutting down, attempting to send %d queued log " | ||
"entries to Cloud Logging..." % (self._queue.qsize(),), | ||
file=sys.stderr, | ||
) | ||
|
||
if self.stop(grace_period=self._grace_period): | ||
if threading.main_thread().is_alive(): | ||
print( | ||
"Background thread shutting down, attempting to send %d queued log " | ||
"entries to Cloud Logging..." % (self._queue.qsize(),), | ||
file=sys.stderr, | ||
) | ||
else: | ||
print( | ||
_CLOSE_THREAD_SHUTDOWN_ERROR_MSG, | ||
file=sys.stderr, | ||
) | ||
|
||
if ( | ||
self.stop(grace_period=self._grace_period) | ||
and threading.main_thread().is_alive() | ||
): | ||
print("Sent all pending logs.", file=sys.stderr) | ||
else: | ||
print( | ||
"Failed to send %d pending logs." % (self._queue.qsize(),), | ||
file=sys.stderr, | ||
) | ||
|
||
self._thread = None | ||
|
||
def enqueue(self, record, message, **kwargs): | ||
"""Queues a log entry to be written by the background thread. | ||
|
||
|
@@ -251,6 +274,14 @@ def flush(self): | |
"""Submit any pending log records.""" | ||
self._queue.join() | ||
|
||
def close(self): | ||
"""Signals the worker thread to stop, then closes the transport thread. | ||
|
||
This call should be followed up by disowning the transport object. | ||
""" | ||
atexit.unregister(self._close) | ||
self._close() | ||
|
||
|
||
class BackgroundThreadTransport(Transport): | ||
"""Asynchronous transport that uses a background thread.""" | ||
|
@@ -285,6 +316,7 @@ def __init__( | |
""" | ||
self.client = client | ||
logger = self.client.logger(name, resource=resource) | ||
self.grace_period = grace_period | ||
self.worker = _Worker( | ||
logger, | ||
grace_period=grace_period, | ||
|
@@ -307,3 +339,7 @@ def send(self, record, message, **kwargs): | |
def flush(self): | ||
"""Submit any pending log records.""" | ||
self.worker.flush() | ||
|
||
def close(self): | ||
"""Closes the worker thread.""" | ||
self.worker.stop(grace_period=self.grace_period) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this mean the handler has been closed? Why not just raise an error, instead of re-creating the transport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I based this off of some logging handlers I saw in the Python standard library, which will reopen a closed connection if that connection is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I think this adds extra complexity, so I'd prefer to keep them permanently closed if we can get away with that
But if you think this behavior would help us integrate with the standard library, it makes sense to keep it.