title | description | author | ms.author | ms.service | ms.topic | ms.date | ms.custom |
---|---|---|---|---|---|---|---|
Read input in any format using .NET custom deserializers in Azure Stream Analytics |
This article explains the serialization format and the interfaces that define custom .NET deserializers for Azure Stream Analytics cloud and edge jobs. |
sidramadoss |
sidram |
stream-analytics |
conceptual |
6/16/2021 |
devx-track-csharp |
.NET custom deserializers allow your Azure Stream Analytics job to read data from formats outside of the three built-in data formats. This article explains the serialization format and the interfaces that define .NET custom deserializers for Azure Stream Analytics cloud and edge jobs. There are also example deserializers for Protocol Buffer and CSV format.
Following code samples are the interfaces that define the custom deserializer and implement StreamDeserializer<T>
.
UserDefinedOperator
is the base class for all custom streaming operators. It initializes StreamingContext
, which provides context which includes mechanism for publishing diagnostics for which you will need to debug any issues with your deserializer.
public abstract class UserDefinedOperator
{
public abstract void Initialize(StreamingContext streamingContext);
}
The following code snippet is the deserialization for streaming data.
Skippable errors should be emitted using IStreamingDiagnostics
passed through UserDefinedOperator
's Initialize method. All exceptions will be treated as errors and the deserializer will be recreated. After a certain number of errors, the job will go to a failed status.
StreamDeserializer<T>
deserializes a stream into object of type T
. The following conditions must be met:
- T is a class or a struct.
- All public fields in T are either
- One of [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] or their nullable equivalents.
- Another struct or class following the same rules.
- Array of type
T2
that follows the same rules. - IList
T2
where T2 follows the same rules. - Does not have any recursive types.
The parameter stream
is the stream containing the serialized object. Deserialize
returns a collection of T
instances.
public abstract class StreamDeserializer<T> : UserDefinedOperator
{
public abstract IEnumerable<T> Deserialize(Stream stream);
}
StreamingContext
provides context which includes mechanism for publishing diagnostics for user operator.
public abstract class StreamingContext
{
public abstract StreamingDiagnostics Diagnostics { get; }
}
StreamingDiagnostics
is the diagnostics for user defined operators including serializer, deserializer, and user defined functions.
WriteError
writes an error message to resource logs and sends the error to diagnostics.
briefMessage
is a brief error message. This message shows up in diagnostics and is used by the product team for debugging purposes. Do not include sensitive information, and keep the message less than 200 characters
detailedMessage
is a detailed error message that is only added to your resource logs in your storage. This message should be less than 2000 characters.
public abstract class StreamingDiagnostics
{
public abstract void WriteError(string briefMessage, string detailedMessage);
}
This section shows you how to write custom deserializers for Protobuf and CSV. For additional examples, such as AVRO format for Event Hub Capture, visit Azure Stream Analytics on GitHub.
This is an example using protocol buffer format.
Assume the following protocol buffer definition.
syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto
package SimulatedTemperatureSensor;
message MessageBodyProto {
message Ambient {
double temperature = 1;
int64 humidity = 2;
}
message Machine {
double temperature = 1;
double pressure = 2;
}
Machine machine = 1;
Ambient ambient = 2;
string timeCreated = 3;
}
Running protoc.exe
from the Google.Protobuf.Tools NuGet generates a .cs file with the definition. The generated file is not shown here. You must ensure that the version of Protobuf Nuget you use in your Stream Analytics project matches the Protobuf version that was used to generate the input.
The following code snippet is the deserializer implementation assuming the generated file is included in the project. This implementation is just a thin wrapper over the generated file.
public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
{
public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
{
while (stream.Position < stream.Length)
{
yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
}
}
public override void Initialize(StreamingContext streamingContext)
{
}
}
The following code snippet is a simple CSV deserializer that also demonstrates propagating errors.
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;
namespace ExampleCustomCode.Serialization
{
public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
{
private StreamingDiagnostics streamingDiagnostics;
public override IEnumerable<CustomEvent> Deserialize(Stream stream)
{
using (var sr = new StreamReader(stream))
{
string line = sr.ReadLine();
while (line != null)
{
if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
{
string[] parts = line.Split(',');
if (parts.Length != 3)
{
streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
}
else
{
yield return new CustomEvent()
{
Column1 = parts[0],
Column2 = parts[1],
Column3 = parts[2]
};
}
}
line = sr.ReadLine();
}
}
}
public override void Initialize(StreamingContext streamingContext)
{
this.streamingDiagnostics = streamingContext.Diagnostics;
}
}
public class CustomEvent
{
public string Column1 { get; set; }
public string Column2 { get; set; }
public string Column3 { get; set; }
}
}
Every Stream Analytics input has a serialization format. For more information on input options, see the Input REST API documentation.
The following JavaScript code is an example of the .NET deserializer serialization format when using the REST API:
{
"properties":{
"type":"stream",
"serialization":{
"type":"CustomCLR",
"properties":{
"serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>",
"serializationClassName":"<Full name of the deserializer class name>"
}
}
}
}
serializationClassName
should be a class that implements StreamDeserializer<T>
. This is described in the following section.
This feature is available in the following regions when using Standard SKU:
- West Central US
- North Europe
- East US
- West US
- East US 2
- West Europe
You can request support for additional regions. However, there is no such region restriction when using Stream Analytics clusters.
This feature is available in 6 regions. If you are interested in using this functionality in another region, you can submit a request. Support for all Azure regions is on the roadmap.
This functionality is not supported. If you need this capability, you can vote for this request on UserVoice.
Once you have implemented your deserializer, you can help others by sharing it with the community. Submit your code to the Azure Stream Analytics GitHub repo.
If your input is of Protobuf format with a schema containing MapField
type, you will not be able to implement a custom deserializer. Also, custom deserializers do not support sample data or preview data.