Skip to content

Files

252 lines (188 loc) · 9.83 KB

custom-deserializer-examples.md

File metadata and controls

252 lines (188 loc) · 9.83 KB
title description author ms.author ms.service ms.topic ms.date ms.custom
Read input in any format using .NET custom deserializers in Azure Stream Analytics
This article explains the serialization format and the interfaces that define custom .NET deserializers for Azure Stream Analytics cloud and edge jobs.
sidramadoss
sidram
stream-analytics
conceptual
6/16/2021
devx-track-csharp

Read input in any format using .NET custom deserializers (Preview)

.NET custom deserializers allow your Azure Stream Analytics job to read data from formats outside of the three built-in data formats. This article explains the serialization format and the interfaces that define .NET custom deserializers for Azure Stream Analytics cloud and edge jobs. There are also example deserializers for Protocol Buffer and CSV format.

.NET custom deserializer

Following code samples are the interfaces that define the custom deserializer and implement StreamDeserializer<T>.

UserDefinedOperator is the base class for all custom streaming operators. It initializes StreamingContext, which provides context which includes mechanism for publishing diagnostics for which you will need to debug any issues with your deserializer.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

The following code snippet is the deserialization for streaming data.

Skippable errors should be emitted using IStreamingDiagnostics passed through UserDefinedOperator's Initialize method. All exceptions will be treated as errors and the deserializer will be recreated. After a certain number of errors, the job will go to a failed status.

StreamDeserializer<T> deserializes a stream into object of type T. The following conditions must be met:

  1. T is a class or a struct.
  2. All public fields in T are either
    1. One of [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] or their nullable equivalents.
    2. Another struct or class following the same rules.
    3. Array of type T2 that follows the same rules.
    4. IListT2 where T2 follows the same rules.
    5. Does not have any recursive types.

The parameter stream is the stream containing the serialized object. Deserialize returns a collection of T instances.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext provides context which includes mechanism for publishing diagnostics for user operator.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics is the diagnostics for user defined operators including serializer, deserializer, and user defined functions.

WriteError writes an error message to resource logs and sends the error to diagnostics.

briefMessage is a brief error message. This message shows up in diagnostics and is used by the product team for debugging purposes. Do not include sensitive information, and keep the message less than 200 characters

detailedMessage is a detailed error message that is only added to your resource logs in your storage. This message should be less than 2000 characters.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Deserializer examples

This section shows you how to write custom deserializers for Protobuf and CSV. For additional examples, such as AVRO format for Event Hub Capture, visit Azure Stream Analytics on GitHub.

Protocol buffer (Protobuf) format

This is an example using protocol buffer format.

Assume the following protocol buffer definition.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Running protoc.exe from the Google.Protobuf.Tools NuGet generates a .cs file with the definition. The generated file is not shown here. You must ensure that the version of Protobuf Nuget you use in your Stream Analytics project matches the Protobuf version that was used to generate the input.

The following code snippet is the deserializer implementation assuming the generated file is included in the project. This implementation is just a thin wrapper over the generated file.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

The following code snippet is a simple CSV deserializer that also demonstrates propagating errors.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Serialization format for REST APIs

Every Stream Analytics input has a serialization format. For more information on input options, see the Input REST API documentation.

The following JavaScript code is an example of the .NET deserializer serialization format when using the REST API:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName should be a class that implements StreamDeserializer<T>. This is described in the following section.

Region support

This feature is available in the following regions when using Standard SKU:

  • West Central US
  • North Europe
  • East US
  • West US
  • East US 2
  • West Europe

You can request support for additional regions. However, there is no such region restriction when using Stream Analytics clusters.

Frequently asked questions

When will this feature be available in all Azure regions?

This feature is available in 6 regions. If you are interested in using this functionality in another region, you can submit a request. Support for all Azure regions is on the roadmap.

Can I access MetadataPropertyValue from my inputs similar to GetMetadataPropertyValue function?

This functionality is not supported. If you need this capability, you can vote for this request on UserVoice.

Can I share my deserializer implementation with the community so that others can benefit?

Once you have implemented your deserializer, you can help others by sharing it with the community. Submit your code to the Azure Stream Analytics GitHub repo.

What are the other limitations of using custom deserializers in Stream Analytics?

If your input is of Protobuf format with a schema containing MapField type, you will not be able to implement a custom deserializer. Also, custom deserializers do not support sample data or preview data.

Next Steps