title | titleSuffix | description | services | ms.service | ms.subservice | ms.author | author | ms.date | ms.topic | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|
Trigger Azure Machine Learning pipelines |
Azure Machine Learning |
Triggered pipelines allow you to automate routine, time-consuming tasks such as data processing, training, and monitoring. |
machine-learning |
machine-learning |
mlops |
larryfr |
blackmist |
10/21/2021 |
how-to |
devx-track-python, sdkv1, event-tier1-build-2022 |
[!INCLUDE sdk v1]
In this article, you'll learn how to programmatically schedule a pipeline to run on Azure. You can create a schedule based on elapsed time or on file-system changes. Time-based schedules can be used to take care of routine tasks, such as monitoring for data drift. Change-based schedules can be used to react to irregular or unpredictable changes, such as new data being uploaded or old data being edited. After learning how to create schedules, you'll learn how to retrieve and deactivate them. Finally, you'll learn how to use other Azure services, Azure Logic App and Azure Data Factory, to run pipelines. An Azure Logic App allows for more complex triggering logic or behavior. Azure Data Factory pipelines allow you to call a machine learning pipeline as part of a larger data orchestration pipeline.
-
An Azure subscription. If you don’t have an Azure subscription, create a free account.
-
A Python environment in which the Azure Machine Learning SDK for Python is installed. For more information, see Create and manage reusable environments for training and deployment with Azure Machine Learning.
-
A Machine Learning workspace with a published pipeline. You can use the one built in Create and run machine learning pipelines with Azure Machine Learning SDK.
To schedule a pipeline, you'll need a reference to your workspace, the identifier of your published pipeline, and the name of the experiment in which you wish to create the schedule. You can get these values with the following code:
import azureml.core
from azureml.core import Workspace
from azureml.pipeline.core import Pipeline, PublishedPipeline
from azureml.core.experiment import Experiment
ws = Workspace.from_config()
experiments = Experiment.list(ws)
for experiment in experiments:
print(experiment.name)
published_pipelines = PublishedPipeline.list(ws)
for published_pipeline in published_pipelines:
print(f"{published_pipeline.name},'{published_pipeline.id}'")
experiment_name = "MyExperiment"
pipeline_id = "aaaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
To run a pipeline on a recurring basis, you'll create a schedule. A Schedule
associates a pipeline, an experiment, and a trigger. The trigger can either be aScheduleRecurrence
that describes the wait between runs or a Datastore path that specifies a directory to watch for changes. In either case, you'll need the pipeline identifier and the name of the experiment in which to create the schedule.
At the top of your Python file, import the Schedule
and ScheduleRecurrence
classes:
from azureml.pipeline.core.schedule import ScheduleRecurrence, Schedule
The ScheduleRecurrence
constructor has a required frequency
argument that must be one of the following strings: "Minute", "Hour", "Day", "Week", or "Month". It also requires an integer interval
argument specifying how many of the frequency
units should elapse between schedule starts. Optional arguments allow you to be more specific about starting times, as detailed in the ScheduleRecurrence SDK docs.
Create a Schedule
that begins a run every 15 minutes:
recurrence = ScheduleRecurrence(frequency="Minute", interval=15)
recurring_schedule = Schedule.create(ws, name="MyRecurringSchedule",
description="Based on time",
pipeline_id=pipeline_id,
experiment_name=experiment_name,
recurrence=recurrence)
Pipelines that are triggered by file changes may be more efficient than time-based schedules. When you want to do something before a file is changed, or when a new file is added to a data directory, you can preprocess that file. You can monitor any changes to a datastore or changes within a specific directory within the datastore. If you monitor a specific directory, changes within subdirectories of that directory will not trigger a run.
To create a file-reactive Schedule
, you must set the datastore
parameter in the call to Schedule.create. To monitor a folder, set the path_on_datastore
argument.
The polling_interval
argument allows you to specify, in minutes, the frequency at which the datastore is checked for changes.
If the pipeline was constructed with a DataPath PipelineParameter, you can set that variable to the name of the changed file by setting the data_path_parameter_name
argument.
datastore = Datastore(workspace=ws, name="workspaceblobstore")
reactive_schedule = Schedule.create(ws, name="MyReactiveSchedule", description="Based on input file change.",
pipeline_id=pipeline_id, experiment_name=experiment_name, datastore=datastore, data_path_parameter_name="input_data")
In addition to the arguments discussed previously, you may set the status
argument to "Disabled"
to create an inactive schedule. Finally, the continue_on_step_failure
allows you to pass a Boolean that will override the pipeline's default failure behavior.
In your Web browser, navigate to Azure Machine Learning. From the Endpoints section of the navigation panel, choose Pipeline endpoints. This takes you to a list of the pipelines published in the Workspace.
:::image type="content" source="./media/how-to-trigger-published-pipeline/scheduled-pipelines.png" alt-text="Pipelines page of AML":::
In this page you can see summary information about all the pipelines in the Workspace: names, descriptions, status, and so forth. Drill in by clicking in your pipeline. On the resulting page, there are more details about your pipeline and you may drill down into individual runs.
If you have a Pipeline
that is published, but not scheduled, you can disable it with:
pipeline = PublishedPipeline.get(ws, id=pipeline_id)
pipeline.disable()
If the pipeline is scheduled, you must cancel the schedule first. Retrieve the schedule's identifier from the portal or by running:
ss = Schedule.list(ws)
for s in ss:
print(s)
Once you have the schedule_id
you wish to disable, run:
def stop_by_schedule_id(ws, schedule_id):
s = next(s for s in Schedule.list(ws) if s.id == schedule_id)
s.disable()
return s
stop_by_schedule_id(ws, schedule_id)
If you then run Schedule.list(ws)
again, you should get an empty list.
More complex trigger rules or behavior can be created using an Azure Logic App.
To use an Azure Logic App to trigger a Machine Learning pipeline, you'll need the REST endpoint for a published Machine Learning pipeline. Create and publish your pipeline. Then find the REST endpoint of your PublishedPipeline
by using the pipeline ID:
# You can find the pipeline ID in Azure Machine Learning studio
published_pipeline = PublishedPipeline.get(ws, id="<pipeline-id-here>")
published_pipeline.endpoint
Now create an Azure Logic App instance. If you wish, use an integration service environment (ISE) and set up a customer-managed key for use by your Logic App.
Once your Logic App has been provisioned, use these steps to configure a trigger for your pipeline:
-
Create a system-assigned managed identity to give the app access to your Azure Machine Learning Workspace.
-
Navigate to the Logic App Designer view and select the Blank Logic App template.
[!div class="mx-imgBorder"] :::image type="content" source="media/how-to-trigger-published-pipeline/blank-template.png" alt-text="Blank template":::
-
In the Designer, search for blob. Select the When a blob is added or modified (properties only) trigger and add this trigger to your Logic App.
[!div class="mx-imgBorder"] :::image type="content" source="media/how-to-trigger-published-pipeline/add-trigger.png" alt-text="Add trigger":::
-
Fill in the connection info for the Blob storage account you wish to monitor for blob additions or modifications. Select the Container to monitor.
Choose the Interval and Frequency to poll for updates that work for you.
[!NOTE] This trigger will monitor the selected Container but won't monitor subfolders.
-
Add an HTTP action that will run when a new or modified blob is detected. Select + New Step, then search for and select the HTTP action.
[!div class="mx-imgBorder"] :::image type="content" source="media/how-to-trigger-published-pipeline/search-http.png" alt-text="Search for HTTP action":::
Use the following settings to configure your action:
Setting | Value |
---|---|
HTTP action | POST |
URI | the endpoint to the published pipeline that you found as a Prerequisite |
Authentication mode | Managed Identity |
-
Set up your schedule to set the value of any DataPath PipelineParameters you may have:
{ "DataPathAssignments": { "input_datapath": { "DataStoreName": "<datastore-name>", "RelativePath": "@{triggerBody()?['Name']}" } }, "ExperimentName": "MyRestPipeline", "ParameterAssignments": { "input_string": "sample_string3" }, "RunSource": "SDK" }
Use the
DataStoreName
you added to your workspace as a Prerequisite.[!div class="mx-imgBorder"] :::image type="content" source="media/how-to-trigger-published-pipeline/http-settings.png" alt-text="HTTP settings":::
-
Select Save and your schedule is now ready.
Important
If you are using Azure role-based access control (Azure RBAC) to manage access to your pipeline, set the permissions for your pipeline scenario (training or scoring).
In an Azure Data Factory pipeline, the Machine Learning Execute Pipeline activity runs an Azure Machine Learning pipeline. You can find this activity in the Data Factory's authoring page under the Machine Learning category:
:::image type="content" source="media/how-to-trigger-published-pipeline/azure-data-factory-pipeline-activity.png" alt-text="Screenshot showing the ML pipeline activity in the Azure Data Factory authoring environment":::
In this article, you used the Azure Machine Learning SDK for Python to schedule a pipeline in two different ways. One schedule recurs based on elapsed clock time. The other schedule runs if a file is modified on a specified Datastore
or within a directory on that store. You saw how to use the portal to examine the pipeline and individual runs. You learned how to disable a schedule so that the pipeline stops running. Finally, you created an Azure Logic App to trigger a pipeline.
For more information, see:
[!div class="nextstepaction"] Use Azure Machine Learning Pipelines for batch scoring
- Learn more about pipelines
- Learn more about exploring Azure Machine Learning with Jupyter