Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added flushes/close functionality to logging handlers #917

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions google/cloud/logging_v2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ def __init__(
else:
self._use_grpc = _use_grpc

self._handlers = set()

@property
def logging_api(self):
"""Helper for logging-related API calls.
Expand Down Expand Up @@ -411,4 +413,17 @@ def setup_logging(
dict: keyword args passed to handler constructor
"""
handler = self.get_default_handler(**kw)
self._handlers.add(handler)
setup_logging(handler, log_level=log_level, excluded_loggers=excluded_loggers)

def flush_handlers(self):
"""Flushes all Python log handlers associated with this Client."""

for handler in self._handlers:
handler.flush()

def close(self):
"""Closes the Client and all handlers associated with this Client."""
super(Client, self).close()
for handler in self._handlers:
handler.close()
25 changes: 25 additions & 0 deletions google/cloud/logging_v2/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ def __init__(
resource = detect_resource(client.project)
self.name = name
self.client = client
client._handlers.add(self)
self.transport = transport(client, name, resource=resource)
self.transport_open = True
self._transport_cls = transport
self.project_id = client.project
self.resource = resource
self.labels = labels
Expand All @@ -213,6 +216,12 @@ def emit(self, record):
labels = {**add_resource_labels(resource, record), **(labels or {})} or None

# send off request
if not self.transport_open:
self.transport = self._transport_cls(
self.client, self.name, resource=self.resource
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this mean the handler has been closed? Why not just raise an error, instead of re-creating the transport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I based this off of some logging handlers I saw in the Python standard library, which will reopen a closed connection if that connection is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think this adds extra complexity, so I'd prefer to keep them permanently closed if we can get away with that

But if you think this behavior would help us integrate with the standard library, it makes sense to keep it.

self.transport_open = True

self.transport.send(
record,
message,
Expand All @@ -225,6 +234,22 @@ def emit(self, record):
source_location=record._source_location,
)

def flush(self):
"""Forces the Transport object to submit any pending log records.

For SyncTransport, this is a no-op.
"""
super(CloudLoggingHandler, self).flush()
if self.transport_open:
self.transport.flush()

def close(self):
"""Closes the log handler and cleans up all Transport objects used."""
self.transport.close()
self.transport = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the idea of using transport == None as a way to check if the handler is closed. That makes the types quite a bit more complicated.

Can't we use a new flag for this?

self.transport_open = False



def _format_and_parse_message(record, formatter_handler):
"""
Expand Down
58 changes: 47 additions & 11 deletions google/cloud/logging_v2/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@
_WORKER_TERMINATOR = object()
_LOGGER = logging.getLogger(__name__)

_CLOSE_THREAD_SHUTDOWN_ERROR_MSG = (
"CloudLoggingHandler shutting down, cannot send logs entries to Cloud Logging due to "
"inconsistent threading behavior at shutdown. To avoid this issue, flush the logging handler "
"manually or switch to StructuredLogHandler. You can also close the CloudLoggingHandler manually "
"via handler.close or client.close."
)


def _get_many(queue_, *, max_items=None, max_latency=0):
"""Get multiple items from a Queue.
Expand Down Expand Up @@ -140,9 +147,11 @@ def _thread_main(self):
else:
batch.log(**item)

self._safely_commit_batch(batch)
# We cannot commit logs upstream if the main thread is shutting down
if threading.main_thread().is_alive():
self._safely_commit_batch(batch)

for _ in items:
for it in items:
self._queue.task_done()

_LOGGER.debug("Background thread exited gracefully.")
Expand All @@ -162,7 +171,7 @@ def start(self):
)
self._thread.daemon = True
self._thread.start()
atexit.register(self._main_thread_terminated)
atexit.register(self._close)

def stop(self, *, grace_period=None):
"""Signals the background thread to stop.
Expand Down Expand Up @@ -202,26 +211,40 @@ def stop(self, *, grace_period=None):

return success

def _main_thread_terminated(self):
def _close(self):
"""Callback that attempts to send pending logs before termination."""
if not self.is_alive:
return

# Print different messages to the user depending on whether or not the
# program is shutting down. This is because this function now handles both
# the atexit handler and the regular close.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we decouple this? Maybe let the caller pass in a message or something?

if not self._queue.empty():
print(
"Program shutting down, attempting to send %d queued log "
"entries to Cloud Logging..." % (self._queue.qsize(),),
file=sys.stderr,
)

if self.stop(grace_period=self._grace_period):
if threading.main_thread().is_alive():
print(
"Background thread shutting down, attempting to send %d queued log "
"entries to Cloud Logging..." % (self._queue.qsize(),),
file=sys.stderr,
)
else:
print(
_CLOSE_THREAD_SHUTDOWN_ERROR_MSG,
file=sys.stderr,
)

if (
self.stop(grace_period=self._grace_period)
and threading.main_thread().is_alive()
):
print("Sent all pending logs.", file=sys.stderr)
else:
print(
"Failed to send %d pending logs." % (self._queue.qsize(),),
file=sys.stderr,
)

self._thread = None

def enqueue(self, record, message, **kwargs):
"""Queues a log entry to be written by the background thread.

Expand Down Expand Up @@ -251,6 +274,14 @@ def flush(self):
"""Submit any pending log records."""
self._queue.join()

def close(self):
"""Signals the worker thread to stop, then closes the transport thread.

This call should be followed up by disowning the transport object.
"""
atexit.unregister(self._close)
self._close()


class BackgroundThreadTransport(Transport):
"""Asynchronous transport that uses a background thread."""
Expand Down Expand Up @@ -285,6 +316,7 @@ def __init__(
"""
self.client = client
logger = self.client.logger(name, resource=resource)
self.grace_period = grace_period
self.worker = _Worker(
logger,
grace_period=grace_period,
Expand All @@ -307,3 +339,7 @@ def send(self, record, message, **kwargs):
def flush(self):
"""Submit any pending log records."""
self.worker.flush()

def close(self):
"""Closes the worker thread."""
self.worker.stop(grace_period=self.grace_period)
8 changes: 8 additions & 0 deletions google/cloud/logging_v2/handlers/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ def flush(self):

For blocking/sync transports, this is a no-op.
"""
pass

def close(self):
"""Closes the transport and cleans up resources used by it.

This call should be followed up by disowning the transport.
"""
pass
7 changes: 7 additions & 0 deletions google/cloud/logging_v2/handlers/transports/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,10 @@ def send(self, record, message, **kwargs):
labels=labels,
**kwargs,
)

def close(self):
"""Closes the transport and cleans up resources used by it.

This call is usually followed up by cleaning up the reference to the transport.
"""
self.logger = None
81 changes: 81 additions & 0 deletions tests/unit/handlers/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,13 +461,25 @@ def test_ctor_defaults(self):
self.assertEqual(handler.name, DEFAULT_LOGGER_NAME)
self.assertIs(handler.client, client)
self.assertIsInstance(handler.transport, _Transport)
self.assertTrue(handler.transport_open)
self.assertIs(handler.transport.client, client)
self.assertEqual(handler.transport.name, DEFAULT_LOGGER_NAME)
global_resource = _create_global_resource(self.PROJECT)
self.assertEqual(handler.resource, global_resource)
self.assertIsNone(handler.labels)
self.assertIs(handler.stream, sys.stderr)

def test_add_handler_to_client_handlers(self):
from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE

client = _Client(self.PROJECT)
handler = self._make_one(
client,
transport=_Transport,
resource=_GLOBAL_RESOURCE,
)
self.assertIn(handler, client._handlers)

def test_ctor_explicit(self):
import io
from google.cloud.logging import Resource
Expand Down Expand Up @@ -790,6 +802,56 @@ def test_emit_with_encoded_json(self):
),
)

def test_emit_after_close(self):
from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE

client = _Client(self.PROJECT)
handler = self._make_one(
client, transport=_Transport, resource=_GLOBAL_RESOURCE
)
logname = "loggername"
message = "hello world"
record = logging.LogRecord(
logname, logging.INFO, None, None, message, None, None
)
handler.handle(record)
old_transport = handler.transport
self.assertEqual(
handler.transport.send_called_with,
(
record,
message,
_GLOBAL_RESOURCE,
{"python_logger": logname},
None,
None,
False,
None,
None,
),
)

handler.close()
self.assertFalse(handler.transport_open)

handler.handle(record)
self.assertTrue(handler.transport_open)
self.assertNotEqual(handler.transport, old_transport)
self.assertEqual(
handler.transport.send_called_with,
(
record,
message,
_GLOBAL_RESOURCE,
{"python_logger": logname},
None,
None,
False,
None,
None,
),
)

def test_format_with_arguments(self):
"""
Handler should support format string arguments
Expand Down Expand Up @@ -825,6 +887,20 @@ def test_format_with_arguments(self):
),
)

def test_close(self):
from google.cloud.logging_v2.logger import _GLOBAL_RESOURCE

client = _Client(self.PROJECT)
handler = self._make_one(
client,
transport=_Transport,
resource=_GLOBAL_RESOURCE,
)
old_transport = handler.transport
handler.close()
self.assertFalse(handler.transport_open)
self.assertTrue(old_transport.close_called)


class TestFormatAndParseMessage(unittest.TestCase):
def test_none(self):
Expand Down Expand Up @@ -1127,12 +1203,14 @@ def release(self):
class _Client(object):
def __init__(self, project):
self.project = project
self._handlers = set()


class _Transport(object):
def __init__(self, client, name, resource=None):
self.client = client
self.name = name
self.close_called = False

def send(
self,
Expand All @@ -1157,3 +1235,6 @@ def send(
http_request,
source_location,
)

def close(self):
self.close_called = True
Loading