Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hook up ml event to OTLP #822

Merged
merged 17 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions newrelic/core/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,11 +595,7 @@ def connect(
return protocol

def _to_http(self, method, payload=()):
params = dict(self._params)
params["method"] = method
if self._run_token:
params["run_id"] = self._run_token
return params, self._headers, otlp_encode(payload)
return {}, self._headers, otlp_encode(payload)

def decode_response(self, response):
return response.decode("utf-8")
10 changes: 3 additions & 7 deletions newrelic/core/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
)
from newrelic.core.agent_streaming import StreamingRpc
from newrelic.core.config import global_settings
from newrelic.core.otlp_utils import encode_metric_data
from newrelic.core.otlp_utils import encode_metric_data, encode_ml_event_data

_logger = logging.getLogger(__name__)

DIMENSIONAL_METRIC_DATA_TEMP = [] # TODO: REMOVE THIS


class Session(object):
PROTOCOL = AgentProtocol
Expand Down Expand Up @@ -125,10 +123,8 @@ def send_custom_events(self, sampling_info, custom_event_data):

def send_ml_events(self, sampling_info, custom_event_data):
"""Called to submit sample set for machine learning events."""

# TODO Make this send to MELT/OTLP endpoint instead of agent listener
payload = (self.agent_run_id, sampling_info, custom_event_data) # TODO this payload will be different
return self._protocol.send("custom_event_data", payload)
payload = encode_ml_event_data(custom_event_data, str(self.agent_run_id))
return self._otlp_protocol.send("ml_event_data", payload, path="/v1/logs")

def send_span_events(self, sampling_info, span_event_data):
"""Called to submit sample set for span events."""
Expand Down
39 changes: 31 additions & 8 deletions newrelic/core/otlp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import logging

from newrelic.common.encoding_utils import json_encode
from newrelic.core.stats_engine import CountStats, TimeStats
from newrelic.core.config import global_settings
from newrelic.core.stats_engine import CountStats, TimeStats

_logger = logging.getLogger(__name__)

Expand All @@ -33,7 +33,7 @@
try:
from newrelic.packages.opentelemetry_proto.common_pb2 import AnyValue, KeyValue
from newrelic.packages.opentelemetry_proto.logs_pb2 import (
LogRecord,
LogsData,
ResourceLogs,
ScopeLogs,
)
Expand All @@ -58,8 +58,8 @@
except Exception:
if otlp_content_setting == "protobuf":
raise # Reraise exception if content type explicitly set
else: # Fallback to JSON
otlp_content_setting = "json"
# Fallback to JSON
otlp_content_setting = "json"


if otlp_content_setting == "json":
Expand All @@ -77,7 +77,7 @@
ValueAtQuantile = dict
ResourceLogs = dict
ScopeLogs = dict
LogRecord = dict
LogsData = dict

AGGREGATION_TEMPORALITY_DELTA = 1
OTLP_CONTENT_TYPE = "application/json"
Expand All @@ -88,9 +88,8 @@ def otlp_encode(payload):
_logger.warning(
"Using OTLP integration while protobuf is not installed. This may result in larger payload sizes and data loss."
)
return json_encode(payload)
else:
return payload.SerializeToString()
return json_encode(payload).encode("utf-8")
return payload.SerializeToString()


def create_key_value(key, value):
Expand Down Expand Up @@ -216,3 +215,27 @@ def encode_metric_data(metric_data, start_time, end_time, resource=None, scope=N
)
]
)


def encode_ml_event_data(custom_event_data, agent_run_id):
resource = create_resource()
ml_events = []
for event in custom_event_data:
event_info, event_attrs = event
event_attrs.update(
{
"real_agent_id": agent_run_id,
"event.domain": "newrelic.ml_events",
"event.name": event_info["type"],
}
)
ml_attrs = create_key_values_from_iterable(event_attrs)
unix_nano_timestamp = event_info["timestamp"] * 1e6
ml_events.append(
{
"time_unix_nano": int(unix_nano_timestamp),
"attributes": ml_attrs,
}
)

return LogsData(resource_logs=[ResourceLogs(resource=resource, scope_logs=[ScopeLogs(log_records=ml_events)])])
4 changes: 2 additions & 2 deletions newrelic/hooks/mlmodel_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def create_label_event(transaction, class_, inference_id, instance, return_val):
# Don't include the raw value when inference_event_value is disabled.
if settings and settings.machine_learning.inference_events_value.enabled:
event["label_value"] = str(value)
transaction.record_custom_event("inferenceData", event)
transaction.record_ml_event("InferenceData", event)


def _get_label_names(user_defined_label_names, prediction_array):
Expand Down Expand Up @@ -319,7 +319,7 @@ def create_feature_event(transaction, class_, inference_id, instance, args, kwar
# Don't include the raw value when inference_event_value is disabled.
if settings and settings.machine_learning and settings.machine_learning.inference_events_value.enabled:
event["feature_value"] = str(value)
transaction.record_custom_event("inferenceData", event)
transaction.record_ml_event("InferenceData", event)


def _nr_instrument_model(module, model_class):
Expand Down
66 changes: 66 additions & 0 deletions tests/agent_features/test_ml_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,27 @@
reset_core_stats_engine,
)
from testing_support.validators.validate_ml_event_count import validate_ml_event_count
from testing_support.validators.validate_ml_event_payload import (
validate_ml_event_payload,
)
from testing_support.validators.validate_ml_events import validate_ml_events
from testing_support.validators.validate_ml_events_outside_transaction import (
validate_ml_events_outside_transaction,
)

import newrelic.core.otlp_utils
from newrelic.api.application import application_instance as application
from newrelic.api.background_task import background_task
from newrelic.api.transaction import record_ml_event
from newrelic.core.config import global_settings
from newrelic.packages import six

try:
# python 2.x
reload
except NameError:
# python 3.x
from importlib import reload

_now = time.time()

Expand All @@ -38,6 +51,38 @@
}


@pytest.fixture(scope="session")
def core_app(collector_agent_registration):
app = collector_agent_registration
return app._agent.application(app.name)


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
)
@reset_core_stats_engine()
def test_ml_event_payload_inside_transaction(core_app):
@background_task(name="test_ml_event_payload_inside_transaction")
def _test():
record_ml_event("InferenceEvent", {"foo": "bar"})

_test()
core_app.harvest()


@validate_ml_event_payload(
[{"foo": "bar", "real_agent_id": "1234567", "event.domain": "newrelic.ml_events", "event.name": "InferenceEvent"}]
)
@reset_core_stats_engine()
def test_ml_event_payload_outside_transaction(core_app):
def _test():
app = application()
record_ml_event("InferenceEvent", {"foo": "bar"}, application=app)

_test()
core_app.harvest()


@pytest.mark.parametrize(
"params,expected",
[
Expand All @@ -47,6 +92,7 @@
],
ids=["Valid key/value", "Bad key", "Value too long"],
)
@reset_core_stats_engine()
def test_record_ml_event_inside_transaction(params, expected):
@validate_ml_events(expected)
@background_task()
Expand Down Expand Up @@ -75,6 +121,7 @@ def _test():
_test()


@reset_core_stats_engine()
@validate_ml_event_count(count=0)
@background_task()
def test_record_ml_event_inside_transaction_bad_event_type():
Expand All @@ -88,6 +135,7 @@ def test_record_ml_event_outside_transaction_bad_event_type():
record_ml_event("!@#$%^&*()", {"foo": "bar"}, application=app)


@reset_core_stats_engine()
@validate_ml_event_count(count=0)
@background_task()
def test_record_ml_event_inside_transaction_params_not_a_dict():
Expand Down Expand Up @@ -120,15 +168,33 @@ def test_ml_event_settings_check_ml_insights_enabled():


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@function_not_called("newrelic.api.transaction", "create_custom_event")
@background_task()
def test_transaction_create_ml_event_not_called():
record_ml_event("FooEvent", {"foo": "bar"})


@override_application_settings({"ml_insights_events.enabled": False})
@reset_core_stats_engine()
@function_not_called("newrelic.core.application", "create_custom_event")
@background_task()
def test_application_create_ml_event_not_called():
app = application()
record_ml_event("FooEvent", {"foo": "bar"}, application=app)


@pytest.fixture(scope="module", autouse=True, params=["protobuf", "json"])
def otlp_content_encoding(request):
if six.PY2 and request.param == "protobuf":
pytest.skip("OTLP protos are not compatible with Python 2.")

_settings = global_settings()
prev = _settings.debug.otlp_content_encoding
_settings.debug.otlp_content_encoding = request.param
reload(newrelic.core.otlp_utils)
assert newrelic.core.otlp_utils.otlp_content_setting == request.param, "Content encoding mismatch."

yield

_settings.debug.otlp_content_encoding = prev
96 changes: 0 additions & 96 deletions tests/mlmodel_sklearn/_validate_custom_events.py

This file was deleted.

Loading