title | description | ms.service | ms.topic | ms.date |
---|---|---|---|---|
Use Machine Learning Studio (classic) endpoints in Azure Stream Analytics |
This article describes how to use Machine Language user defined functions in Azure Stream Analytics. |
stream-analytics |
how-to |
06/11/2019 |
[!INCLUDE ML Studio (classic) retirement]
Stream Analytics supports user-defined functions that call out to Machine Learning Studio (classic) endpoints. REST API support for this feature is detailed in the Stream Analytics REST API library. This article provides supplemental information needed for successful implementation of this capability in Stream Analytics. A tutorial has also been posted and is available here.
Microsoft Machine Learning Studio (classic) provides a collaborative, drag-and-drop tool you can use to build, test, and deploy predictive analytics solutions on your data. This tool is called Machine Learning Studio (classic). Studio (classic) is used to interact with the machine learning resources and easily build, test, and iterate on your design. These resources and their definitions are below.
- Workspace: The workspace is a container that holds all other machine learning resources together in a container for management and control.
- Experiment: Experiments are created by data scientists to utilize datasets and train a machine learning model.
- Endpoint: Endpoints are the Studio (classic) object used to take features as input, apply a specified machine learning model and return scored output.
- Scoring Webservice: A scoring webservice is a collection of endpoints as mentioned above.
Each endpoint has apis for batch execution and synchronous execution. Stream Analytics uses synchronous execution. The specific service is named a Request/Response Service in Machine Learning Studio (classic).
For the purposes of Stream Analytics job processing, a Request/Response endpoint, an apikey, and a swagger definition are all necessary for successful execution. Stream Analytics has an additional endpoint that constructs the url for swagger endpoint, looks up the interface and returns a default UDF definition to the user.
By using REST APIs you may configure your job to call Studio (classic) functions. The steps are as follows:
- Create a Stream Analytics job
- Define an input
- Define an output
- Create a user-defined function (UDF)
- Write a Stream Analytics transformation that calls the UDF
- Start the job
As an example, the following sample code creates a scalar UDF named newudf that binds to an Machine Learning Studio (classic) endpoint. Note that the endpoint (service URI) can be found on the API help page for the chosen service and the apiKey can be found on the Services main page.
PUT : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>
Example request body:
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77fb4b46bf2a30c63c078dca/services/b7be5e40fd194258796fb402c1958eaf/execute ",
"apiKey": "replacekeyhere"
}
}
}
}
}
Once the skeleton UDF is created the complete definition of the UDF is needed. The RetrieveDefaultDefinition endpoint helps you get the default definition for a scalar function that is bound to an Machine Learning Studio (classic) endpoint. The payload below requires you to get the default UDF definition for a scalar function that is bound to a Studio (classic) endpoint. It doesn't specify the actual endpoint as it has already been provided during PUT request. Stream Analytics calls the endpoint provided in the request if it is provided explicitly. Otherwise it uses the one originally referenced. Here the UDF takes a single string parameter (a sentence) and returns a single output of type string which indicates the "sentiment" label for that sentence.
POST : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>/RetrieveDefaultDefinition?api-version=<apiVersion>
Example request body:
{
"bindingType": "Microsoft.MachineLearning/WebService",
"bindingRetrievalProperties": {
"executeEndpoint": null,
"udfType": "Scalar"
}
}
A sample output of this would look something like below.
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"inputs": [{
"dataType": "nvarchar(max)",
"isConfigurationParameter": null
}],
"output": {
"dataType": "nvarchar(max)"
},
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
"apiKey": null,
"inputs": {
"name": "input1",
"columnNames": [{
"name": "tweet",
"dataType": "string",
"mapTo": 0
}]
},
"outputs": [{
"name": "Sentiment",
"dataType": "string"
}],
"batchSize": 10
}
}
}
}
}
Now the UDF must be patched with the previous response, as shown below.
PATCH : /subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjobs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>
Request Body (Output from RetrieveDefaultDefinition):
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"inputs": [{
"dataType": "nvarchar(max)",
"isConfigurationParameter": null
}],
"output": {
"dataType": "nvarchar(max)"
},
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint": "https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77ga4a4bbf2a30c63c078dca/services/b7be5e40fd194258896fb602c1858eaf/execute",
"apiKey": null,
"inputs": {
"name": "input1",
"columnNames": [{
"name": "tweet",
"dataType": "string",
"mapTo": 0
}]
},
"outputs": [{
"name": "Sentiment",
"dataType": "string"
}],
"batchSize": 10
}
}
}
}
}
Now query the UDF (here named scoreTweet) for every input event and write a response for that event to an output.
{
"name": "transformation",
"properties": {
"streamingUnits": null,
"query": "select *,scoreTweet(Tweet) TweetSentiment into blobOutput from blobInput"
}
}
For further assistance, try our Microsoft Q&A question page for Azure Stream Analytics