Skip to content

Commit

Permalink
Dependency inject logger (partial) (#4432)
Browse files Browse the repository at this point in the history
  • Loading branch information
p-datadog authored Feb 26, 2025
1 parent 2941dc2 commit fa88415
Show file tree
Hide file tree
Showing 23 changed files with 151 additions and 62 deletions.
2 changes: 1 addition & 1 deletion lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def initialize(settings)

@telemetry = self.class.build_telemetry(settings, agent_settings, @logger)

@remote = Remote::Component.build(settings, agent_settings, telemetry: telemetry)
@remote = Remote::Component.build(settings, agent_settings, logger: @logger, telemetry: telemetry)
@tracer = self.class.build_tracer(settings, agent_settings, logger: @logger)
@crashtracker = self.class.build_crashtracker(settings, agent_settings, logger: @logger)

Expand Down
20 changes: 11 additions & 9 deletions lib/datadog/core/remote/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ module Remote
# Configures the HTTP transport to communicate with the agent
# to fetch and sync the remote configuration
class Component
attr_reader :client, :healthy
attr_reader :logger, :client, :healthy

def initialize(settings, capabilities, agent_settings, logger:)
@logger = logger

def initialize(settings, capabilities, agent_settings)
transport_options = {}
transport_options[:agent_settings] = agent_settings if agent_settings

Expand All @@ -26,9 +28,9 @@ def initialize(settings, capabilities, agent_settings)

@client = Client.new(transport_v7, capabilities)
@healthy = false
Datadog.logger.debug { "new remote configuration client: #{@client.id}" }
logger.debug { "new remote configuration client: #{@client.id}" }

@worker = Worker.new(interval: settings.remote.poll_interval_seconds) do
@worker = Worker.new(interval: settings.remote.poll_interval_seconds, logger: logger) do
unless @healthy || negotiation.endpoint?('/v0.7/config')
@barrier.lift

Expand All @@ -40,7 +42,7 @@ def initialize(settings, capabilities, agent_settings)
@healthy ||= true
rescue Client::SyncError => e
# Transient errors due to network or agent. Logged the error but not via telemetry
Datadog.logger.error do
logger.error do
"remote worker client sync error: #{e.message} location: #{Array(e.backtrace).first}. skipping sync"
end
rescue StandardError => e
Expand All @@ -50,15 +52,15 @@ def initialize(settings, capabilities, agent_settings)
negotiation = Negotiation.new(settings, agent_settings)

# Transient errors due to network or agent. Logged the error but not via telemetry
Datadog.logger.error do
logger.error do
"remote worker error: #{e.class.name} #{e.message} location: #{Array(e.backtrace).first}. "\
'reseting client state'
end

# client state is unknown, state might be corrupted
@client = Client.new(transport_v7, capabilities)
@healthy = false
Datadog.logger.debug { "new remote configuration client: #{@client.id}" }
logger.debug { "new remote configuration client: #{@client.id}" }

# TODO: bail out if too many errors?
end
Expand Down Expand Up @@ -152,10 +154,10 @@ class << self
#
# Those checks are instead performed inside the worker loop.
# This allows users to upgrade their agent while keeping their application running.
def build(settings, agent_settings, telemetry:)
def build(settings, agent_settings, logger:, telemetry:)
return unless settings.remote.enabled

new(settings, Client::Capabilities.new(settings, telemetry), agent_settings)
new(settings, Client::Capabilities.new(settings, telemetry), agent_settings, logger: logger)
end
end
end
Expand Down
17 changes: 10 additions & 7 deletions lib/datadog/core/remote/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module Core
module Remote
# Worker executes a block every interval on a separate Thread
class Worker
def initialize(interval:, &block)
def initialize(interval:, logger:, &block)
@mutex = Mutex.new
@thr = nil

Expand All @@ -14,18 +14,21 @@ def initialize(interval:, &block)
@stopped = false

@interval = interval
@logger = logger
raise ArgumentError, 'can not initialize a worker without a block' unless block

@block = block
end

attr_reader :logger

def start
Datadog.logger.debug { 'remote worker starting' }
logger.debug { 'remote worker starting' }

acquire_lock

if @stopped
Datadog.logger.debug('remote worker: refusing to restart after previous stop')
logger.debug('remote worker: refusing to restart after previous stop')
return
end

Expand All @@ -41,13 +44,13 @@ def start
@started = true
@starting = false

Datadog.logger.debug { 'remote worker started' }
logger.debug { 'remote worker started' }
ensure
release_lock
end

def stop
Datadog.logger.debug { 'remote worker stopping' }
logger.debug { 'remote worker stopping' }

acquire_lock

Expand All @@ -62,7 +65,7 @@ def stop
@thr = nil
@stopped = true

Datadog.logger.debug { 'remote worker stopped' }
logger.debug { 'remote worker stopped' }
ensure
release_lock
end
Expand Down Expand Up @@ -92,7 +95,7 @@ def poll(interval)
end

def call
Datadog.logger.debug { 'remote worker perform' }
logger.debug { 'remote worker perform' }

@block.call
end
Expand Down
6 changes: 5 additions & 1 deletion lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Telemetry
# Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle.
# Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes.
class Component
attr_reader :enabled
attr_reader :enabled, :logger

include Core::Utils::Forking
include Telemetry::Logging
Expand Down Expand Up @@ -52,6 +52,7 @@ def self.build(settings, agent_settings, logger)
heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds,
dependency_collection: settings.telemetry.dependency_collection,
logger: logger,
shutdown_timeout_seconds: settings.telemetry.shutdown_timeout_seconds,
log_collection_enabled: settings.telemetry.log_collection_enabled
)
Expand All @@ -66,6 +67,7 @@ def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
dependency_collection:,
logger:,
http_transport:,
shutdown_timeout_seconds:,
enabled: true,
Expand All @@ -74,6 +76,7 @@ def initialize(
)
@enabled = enabled
@log_collection_enabled = log_collection_enabled
@logger = logger

@metrics_manager = MetricsManager.new(
enabled: enabled && metrics_enabled,
Expand All @@ -87,6 +90,7 @@ def initialize(
emitter: Emitter.new(http_transport: http_transport),
metrics_manager: @metrics_manager,
dependency_collection: dependency_collection,
logger: logger,
shutdown_timeout: shutdown_timeout_seconds
)

Expand Down
14 changes: 9 additions & 5 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ def initialize(
emitter:,
metrics_manager:,
dependency_collection:,
logger:,
enabled: true,
shutdown_timeout: Workers::Polling::DEFAULT_SHUTDOWN_TIMEOUT,
buffer_size: DEFAULT_BUFFER_MAX_SIZE
)
@emitter = emitter
@metrics_manager = metrics_manager
@dependency_collection = dependency_collection
@logger = logger

@ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
@current_ticks = 0
Expand All @@ -48,6 +50,8 @@ def initialize(
self.buffer = buffer_klass.new(@buffer_size)
end

attr_reader :logger

def start
return if !enabled? || forked?

Expand Down Expand Up @@ -99,7 +103,7 @@ def flush_events(events)

events = deduplicate_logs(events)

Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
logger.debug { "Sending #{events&.count} telemetry events" }
send_event(Event::MessageBatch.new(events))
end

Expand All @@ -113,7 +117,7 @@ def started!
return unless enabled?

if failed_to_start?
Datadog.logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker')
logger.debug('Telemetry app-started event exhausted retries, disabling telemetry worker')
disable!
return
end
Expand All @@ -122,13 +126,13 @@ def started!
res = send_event(Event::AppStarted.new)

if res.ok?
Datadog.logger.debug('Telemetry app-started event is successfully sent')
logger.debug('Telemetry app-started event is successfully sent')

send_event(Event::AppDependenciesLoaded.new) if @dependency_collection

true
else
Datadog.logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...')
logger.debug('Error sending telemetry app-started event, retry after heartbeat interval...')
false
end
end
Expand Down Expand Up @@ -166,7 +170,7 @@ def disable!
def disable_on_not_found!(response)
return unless response.not_found?

Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
logger.debug('Agent does not support telemetry; disabling future telemetry events.')
disable!
end

Expand Down
1 change: 1 addition & 0 deletions lib/datadog/tracing/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def build_tracer(settings, agent_settings, logger:)
Tracing::Tracer.new(
default_service: settings.service,
enabled: settings.tracing.enabled,
logger: logger,
trace_flush: trace_flush,
sampler: sampler_delegator,
span_sampler: build_span_sampler(settings),
Expand Down
7 changes: 5 additions & 2 deletions lib/datadog/tracing/sync_writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ module Tracing
# @public_api
class SyncWriter
attr_reader \
:logger,
:events,
:transport

Expand All @@ -25,7 +26,9 @@ class SyncWriter
# @param [Hash<Symbol,Object>] transport_options options for the default transport instance.
# @param [Datadog::Tracing::Configuration::AgentSettingsResolver::AgentSettings] agent_settings agent options for
# the default transport instance.
def initialize(transport: nil, transport_options: {}, agent_settings: nil)
def initialize(transport: nil, transport_options: {}, agent_settings: nil, logger: Datadog.logger)
@logger = logger

@transport = transport || begin
transport_options[:agent_settings] = agent_settings if agent_settings
Transport::HTTP.default(**transport_options)
Expand All @@ -40,7 +43,7 @@ def initialize(transport: nil, transport_options: {}, agent_settings: nil)
def write(trace)
flush_trace(trace)
rescue => e
Datadog.logger.debug(e)
logger.debug(e)
end

# Does nothing.
Expand Down
17 changes: 10 additions & 7 deletions lib/datadog/tracing/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class Tracer
:provider,
:sampler,
:span_sampler,
:tags
:tags,
:logger

attr_accessor \
:default_service,
Expand All @@ -52,17 +53,19 @@ def initialize(
context_provider: DefaultContextProvider.new,
default_service: Core::Environment::Ext::FALLBACK_SERVICE_NAME,
enabled: true,
logger: Datadog.logger,
sampler: Sampling::PrioritySampler.new(
base_sampler: Sampling::AllSampler.new,
post_sampler: Sampling::RuleSampler.new
),
span_sampler: Sampling::Span::Sampler.new,
tags: {},
writer: Writer.new
writer: Writer.new(logger: logger)
)
@trace_flush = trace_flush
@default_service = default_service
@enabled = enabled
@logger = logger
@provider = context_provider
@sampler = sampler
@span_sampler = span_sampler
Expand Down Expand Up @@ -146,7 +149,7 @@ def trace(
active_trace
end
rescue StandardError => e
Datadog.logger.debug { "Failed to trace: #{e}" }
logger.debug { "Failed to trace: #{e}" }

# Tracing failed: fallback and run code without tracing.
return skip_trace(name, &block)
Expand Down Expand Up @@ -268,7 +271,7 @@ def sample_trace(trace_op)
@sampler.sample!(trace_op)
rescue StandardError => e
SAMPLE_TRACE_LOG_ONLY_ONCE.run do
Datadog.logger.warn { "Failed to sample trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
logger.warn { "Failed to sample trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
end
end
end
Expand Down Expand Up @@ -488,7 +491,7 @@ def sample_span(trace_op, span)
@span_sampler.sample!(trace_op, span)
rescue StandardError => e
SAMPLE_SPAN_LOG_ONLY_ONCE.run do
Datadog.logger.warn { "Failed to sample span: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
logger.warn { "Failed to sample span: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
end
end
end
Expand All @@ -504,7 +507,7 @@ def flush_trace(trace_op)
write(trace) if trace && !trace.empty?
rescue StandardError => e
FLUSH_TRACE_LOG_ONLY_ONCE.run do
Datadog.logger.warn { "Failed to flush trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
logger.warn { "Failed to flush trace: #{e.class.name} #{e} at #{Array(e.backtrace).first}" }
end
end
end
Expand All @@ -518,7 +521,7 @@ def write(trace)
return unless trace && @writer

if Datadog.configuration.diagnostics.debug
Datadog.logger.debug { "Writing #{trace.length} spans (enabled: #{@enabled})\n#{trace.spans.pretty_inspect}" }
logger.debug { "Writing #{trace.length} spans (enabled: #{@enabled})\n#{trace.spans.pretty_inspect}" }
end

@writer.write(trace)
Expand Down
9 changes: 5 additions & 4 deletions lib/datadog/tracing/workers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class AsyncTransport
BACK_OFF_MAX = 5
DEFAULT_SHUTDOWN_TIMEOUT = 1

attr_reader \
:trace_buffer
attr_reader :trace_buffer, :logger

def initialize(options = {})
@transport = options[:transport]
Expand All @@ -42,6 +41,8 @@ def initialize(options = {})
@mutex = Mutex.new
@worker = nil
@run = false

@logger = options.fetch(:logger)
end

# Callback function that process traces and executes the +send_traces()+ method.
Expand All @@ -56,7 +57,7 @@ def callback_traces
# ensures that the thread will not die because of an exception.
# TODO[manu]: findout the reason and reschedule the send if it's not
# a fatal exception
Datadog.logger.warn(
logger.warn(
"Error during traces flush: dropped #{traces.length} items. Cause: #{e} Location: #{Array(e.backtrace).first}"
)
end
Expand All @@ -68,7 +69,7 @@ def start
return if @run

@run = true
Datadog.logger.debug { "Starting thread for: #{self}" }
logger.debug { "Starting thread for: #{self}" }
@worker = Thread.new { perform }
@worker.name = self.class.name
@worker.thread_variable_set(:fork_safe, true)
Expand Down
Loading

0 comments on commit fa88415

Please sign in to comment.