title | description | ms.author | author | ms.service | ms.subservice | ms.topic | ms.date |
---|---|---|---|---|---|---|---|
Incrementally copy data using Change Data Capture |
In this tutorial, you create an Azure Data Factory pipeline that copies delta data incrementally from a table in Azure SQL Managed Instance database to Azure Storage. |
yexu |
dearandyxu |
data-factory |
tutorials |
tutorial |
07/05/2021 |
Incrementally load data from Azure SQL Managed Instance to Azure Storage using change data capture (CDC)
[!INCLUDEappliesto-adf-asa-md]
In this tutorial, you create an Azure data factory with a pipeline that loads delta data based on change data capture (CDC) information in the source Azure SQL Managed Instance database to an Azure blob storage.
You perform the following steps in this tutorial:
[!div class="checklist"]
- Prepare the source data store
- Create a data factory.
- Create linked services.
- Create source and sink datasets.
- Create, debug and run the pipeline to check for changed data
- Modify data in the source table
- Complete, run and monitor the full incremental copy pipeline
The Change Data Capture technology supported by data stores such as Azure SQL Managed Instances (MI) and SQL Server can be used to identify changed data. This tutorial describes how to use Azure Data Factory with SQL Change Data Capture technology to incrementally load delta data from Azure SQL Managed Instance into Azure Blob Storage. For more concrete information about SQL Change Data Capture technology, see Change data capture in SQL Server.
Here are the typical end-to-end workflow steps to incrementally load data using the Change Data Capture technology.
Note
Both Azure SQL MI and SQL Server support the Change Data Capture technology. This tutorial uses Azure SQL Managed Instance as the source data store. You can also use an on-premises SQL Server.
In this tutorial, you create a pipeline that performs the following operations:
- Create a lookup activity to count the number of changed records in the SQL Database CDC table and pass it to an IF Condition activity.
- Create an If Condition to check whether there are changed records and if so, invoke the copy activity.
- Create a copy activity to copy the inserted/updated/deleted data between the CDC table to Azure Blob Storage.
If you don't have an Azure subscription, create a free account before you begin.
- Azure SQL Database Managed Instance. You use the database as the source data store. If you don't have an Azure SQL Database Managed Instance, see the Create an Azure SQL Database Managed Instance article for steps to create one.
- Azure Storage account. You use the blob storage as the sink data store. If you don't have an Azure storage account, see the Create a storage account article for steps to create one. Create a container named raw.
-
Launch SQL Server Management Studio, and connect to your Azure SQL Managed Instances server.
-
In Server Explorer, right-click your database and choose the New Query.
-
Run the following SQL command against your Azure SQL Managed Instances database to create a table named
customers
as data source store.create table customers ( customer_id int, first_name varchar(50), last_name varchar(50), email varchar(100), city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") );
-
Enable Change Data Capture mechanism on your database and the source table (customers) by running the following SQL query:
[!NOTE]
- Replace <your source schema name> with the schema of your Azure SQL MI that has the customers table.
- Change data capture doesn't do anything as part of the transactions that change the table being tracked. Instead, the insert, update, and delete operations are written to the transaction log. Data that is deposited in change tables will grow unmanageably if you do not periodically and systematically prune the data. For more information, see Enable Change Data Capture for a database
EXEC sys.sp_cdc_enable_db EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = 'null', @supports_net_changes = 1
-
Insert data into the customers table by running the following command:
insert into customers (customer_id, first_name, last_name, email, city) values (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'), (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'), (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
[!NOTE] No historical changes to the table are captured prior to change data capture being enabled.
-
Launch Microsoft Edge or Google Chrome web browser. Currently, Data Factory UI is supported only in Microsoft Edge and Google Chrome web browsers.
-
On the left menu, select Create a resource > Data + Analytics > Data Factory:
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-azure-data-factory-menu.png" alt-text="Data Factory selection in the "New" pane":::
-
In the New data factory page, enter ADFTutorialDataFactory for the name.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-azure-data-factory.png" alt-text="New data factory page":::
The name of the Azure data factory must be globally unique. If you receive the following error, change the name of the data factory (for example, yournameADFTutorialDataFactory) and try creating again. See Data Factory - Naming Rules article for naming rules for Data Factory artifacts.
Data factory name "ADFTutorialDataFactory" is not available.
-
Select V2 for the version.
-
Select your Azure subscription in which you want to create the data factory.
-
For the Resource Group, do one of the following steps:
- Select Use existing, and select an existing resource group from the drop-down list.
- Select Create new, and enter the name of a resource group.
To learn about resource groups, see Using resource groups to manage your Azure resources.
-
Select the location for the data factory. Only locations that are supported are displayed in the drop-down list. The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.
-
De-select Enable GIT.
-
Click Create.
-
Once the deployment is complete, click on Go to resource
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/data-factory-deploy-complete.png" alt-text="Screenshot shows a message that your deployment is complete and an option to go to resource.":::
-
After the creation is complete, you see the Data Factory page as shown in the image.
:::image type="content" source="./media/doc-common-process/data-factory-home-page.png" alt-text="Home page for the Azure Data Factory, with the Open Azure Data Factory Studio tile.":::
-
Select Open on the Open Azure Data Factory Studio tile to launch the Azure Data Factory user interface (UI) in a separate tab.
-
In the home page, switch to the Manage tab in the left panel as shown in the following image:
:::image type="content" source="media/doc-common-process/get-started-page-manage-button.png" alt-text="Screenshot that shows the Manage button.":::
You create linked services in a data factory to link your data stores and compute services to the data factory. In this section, you create linked services to your Azure Storage account and Azure SQL MI.
In this step, you link your Azure Storage Account to the data factory.
-
Click Connections, and click + New.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-connection-button-storage.png" alt-text="New connection button":::
-
In the New Linked Service window, select Azure Blob Storage, and click Continue.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/select-azure-storage.png" alt-text="Select Azure Blob Storage":::
-
In the New Linked Service window, do the following steps:
- Enter AzureStorageLinkedService for Name.
- Select your Azure Storage account for Storage account name.
- Click Save.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/azure-storage-linked-service-settings.png" alt-text="Azure Storage Account settings":::
In this step, you link your Azure SQL MI database to the data factory.
Note
For those using SQL MI see here for information regarding access via public vs private endpoint. If using private endpoint one would need to run this pipeline using a self-hosted integration runtime. The same would apply to those running SQL Server on-prem, in a VM or VNet scenarios.
-
Click Connections, and click + New.
-
In the New Linked Service window, select Azure SQL Database Managed Instance, and click Continue.
-
In the New Linked Service window, do the following steps:
- Enter AzureSqlMI1 for the Name field.
- Select your SQL server for the Server name field.
- Select your SQL database for the Database name field.
- Enter name of the user for the User name field.
- Enter password for the user for the Password field.
- Click Test connection to test the connection.
- Click Save to save the linked service.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/azure-sql-managed-instance-database-linked-service-settings.png" alt-text="Azure SQL MI Database linked service settings":::
In this step, you create datasets to represent data source and data destination.
In this step, you create a dataset to represent the source data.
-
In the treeview, click + (plus), and click Dataset.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-dataset-menu.png" alt-text="New Dataset menu":::
-
Select Azure SQL Database Managed Instance, and click Continue.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/select-azure-sql-database.png" alt-text="Source dataset type - Azure SQL Database":::
-
In the Set properties tab, set the dataset name and connection information:
- Select AzureSqlMI1 for Linked service.
- Select [dbo].[dbo_customers_CT] for Table name. Note: this table was automatically created when CDC was enabled on the customers table. Changed data is never queried from this table directly but is instead extracted through the CDC functions.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/source-dataset-configuration.png" alt-text="Source connection":::
In this step, you create a dataset to represent the data that is copied from the source data store. You created the data lake container in your Azure Blob Storage as part of the prerequisites. Create the container if it does not exist (or) set it to the name of an existing one. In this tutorial, the output file name is dynamically generated by using the trigger time, which will be configured later.
-
In the treeview, click + (plus), and click Dataset.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-dataset-menu.png" alt-text="New Dataset menu":::
-
Select Azure Blob Storage, and click Continue.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-type.png" alt-text="Sink dataset type - Azure Blob Storage":::
-
Select DelimitedText, and click Continue.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-format.png" alt-text="Sink dataset format - DelimitedText":::
-
In the Set Properties tab, set the dataset name and connection information:
- Select AzureStorageLinkedService for Linked service.
- Enter raw for container part of the filePath.
- Enable First row as header
- Click Ok
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-configuration.png" alt-text="Sink dataset - connection":::
In this step, you create a pipeline, which first checks the number of changed records present in the change table using a lookup activity. An IF condition activity checks whether the number of changed records is greater than zero and runs a copy activity to copy the inserted/updated/deleted data from Azure SQL Database to Azure Blob Storage. Lastly, a tumbling window trigger is configured and the start and end times will be passed to the activities as the start and end window parameters.
-
In the Data Factory UI, switch to the Edit tab. Click + (plus) in the left pane, and click Pipeline.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/new-pipeline-menu.png" alt-text="New pipeline menu":::
-
You see a new tab for configuring the pipeline. You also see the pipeline in the treeview. In the Properties window, change the name of the pipeline to IncrementalCopyPipeline.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-name.png" alt-text="Pipeline name":::
-
Expand General in the Activities toolbox, and drag-drop the Lookup activity to the pipeline designer surface. Set the name of the activity to GetChangeCount. This activity gets the number of records in the change table for a given time window.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/first-lookup-activity-name.png" alt-text="Lookup Activity - name":::
-
Switch to the Settings in the Properties window:
-
Specify the SQL MI dataset name for the Source Dataset field.
-
Select the Query option and enter the following into the query box:
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
- Enable First row only
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/first-lookup-activity-settings.png" alt-text="Lookup Activity - settings":::
-
-
Click the Preview data button to ensure a valid output is obtained by the lookup activity
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/first-lookup-activity-preview.png" alt-text="Lookup Activity - preview":::
-
Expand Iteration & conditionals in the Activities toolbox, and drag-drop the If Condition activity to the pipeline designer surface. Set the name of the activity to HasChangedRows.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/if-condition-activity-name.png" alt-text="If Condition Activity - name":::
-
Switch to the Activities in the Properties window:
- Enter the following Expression
@greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
- Click on the pencil icon to edit the True condition.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/if-condition-activity-setting.png" alt-text="If Condition Activity - settings":::
- Expand General in the Activities toolbox and drag-drop a Wait activity to the pipeline designer surface. This is a temporary activity in order to debug the If condition and will be changed later in the tutorial.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/if-condition-activity-wait.png" alt-text="If Condition True - wait":::
- Click on the IncrementalCopyPipeline breadcrumb to return to the main pipeline.
-
Run the pipeline in Debug mode to verify the pipeline executes successfully.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-debug.png" alt-text="Pipeline - debug":::
-
Next, return to the True condition step and delete the Wait activity. In the Activities toolbox, expand Move & transform, and drag-drop a Copy activity to the pipeline designer surface. Set the name of the activity to IncrementalCopyActivity.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-source-name.png" alt-text="Copy Activity - name":::
-
Switch to the Source tab in the Properties window, and do the following steps:
-
Specify the SQL MI dataset name for the Source Dataset field.
-
Select Query for Use Query.
-
Enter the following for Query.
DECLARE @from_lsn binary(10), @to_lsn binary(10); SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE()); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-source-settings.png" alt-text="Copy Activity - source settings":::
-
Click preview to verify that the query returns the changed rows correctly.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-source-preview.png" alt-text="Screenshot shows preview to verify query.":::
-
Switch to the Sink tab, and specify the Azure Storage dataset for the Sink Dataset field.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-sink-settings.png" alt-text="Screenshot shows the Sink tab.":::
-
Click back to the main pipeline canvas and connect the Lookup activity to the If Condition activity one by one. Drag the green button attached to the Lookup activity to the If Condition activity.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/connect-lookup-if.png" alt-text="Connect Lookup and Copy activities":::
-
Click Validate on the toolbar. Confirm that there are no validation errors. Close the Pipeline Validation Report window by clicking >>.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/validate-button.png" alt-text="Validate button":::
-
Click Debug to test the pipeline and verify that a file is generated in the storage location.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-debug-2.png" alt-text="Incremental pipeline debug-2":::
-
Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish all button. Wait until you see the Publishing succeeded message.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/publish-button-2.png" alt-text="Publish button":::
In this step, you create a tumbling window trigger to run the job on a frequent schedule. You will use the WindowStart and WindowEnd system variables of the tumbling window trigger and pass them as parameters to your pipeline to be used in the CDC query.
-
Navigate to the Parameters tab of the IncrementalCopyPipeline pipeline and using the + New button add two parameters (triggerStartTime and triggerEndTime) to the pipeline, which will represent the tumbling window start and end time. For debugging purposes add default values in the format YYYY-MM-DD HH24:MI:SS.FFF but ensure the triggerStartTime is not prior to CDC being enabled on the table, otherwise this will result in an error.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-parameters.png" alt-text="Trigger Now menu":::
-
Click on the settings tab of the Lookup activity and configure the query to use the start and end parameters. Copy the following into the query:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
-
Navigate to the Copy activity in the True case of the If Condition activity and click on the Source tab. Copy the following into the query:
@concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); SET @begin_time = ''',pipeline().parameters.triggerStartTime,'''; SET @end_time = ''',pipeline().parameters.triggerEndTime,'''; SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time); SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time); SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
-
Click on the Sink tab of the Copy activity and click Open to edit the dataset properties. Click on the Parameters tab and add a new parameter called triggerStart
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-configuration-2.png" alt-text="Screenshot shows adding a new parameter to the Parameters tab.":::
-
Next, configure the dataset properties to store the data in a customers/incremental subdirectory with date-based partitions.
-
Click on the Connection tab of the dataset properties and add dynamic content for both the Directory and the File sections.
-
Enter the following expression in the Directory section by clicking on the dynamic content link under the textbox:
@concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
-
Enter the following expression in the File section. This will create file names based on the trigger start date and time, suffixed with the csv extension:
@concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-configuration-3.png" alt-text="Sink Dataset Configuration-3":::
-
Navigate back to the Sink settings in Copy activity by clicking on the IncrementalCopyPipeline tab.
-
Expand the dataset properties and enter dynamic content in the triggerStart parameter value with the following expression:
@pipeline().parameters.triggerStartTime
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/sink-dataset-configuration-4.png" alt-text="Sink Dataset Configuration-4":::
-
-
Click Debug to test the pipeline and ensure the folder structure and output file is generated as expected. Download and open the file to verify the contents.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-debug-3.png" alt-text="Incremental Copy Debug-3":::
-
Ensure the parameters are being injected into the query by reviewing the Input parameters of the pipeline run.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-debug-4.png" alt-text="Incremental Copy Debug-4":::
-
Publish entities (linked services, datasets, and pipelines) to the Data Factory service by clicking the Publish all button. Wait until you see the Publishing succeeded message.
-
Finally, configure a tumbling window trigger to run the pipeline at a regular interval and set start and end time parameters.
- Click the Add trigger button, and select New/Edit
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/add-trigger.png" alt-text="Add New Trigger":::
- Enter a trigger name and specify a start time, which is equal to the end time of the debug window above.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/tumbling-window-trigger.png" alt-text="Tumbling Window Trigger":::
-
On the next screen, specify the following values for the start and end parameters respectively.
@formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff') @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/tumbling-window-trigger-2.png" alt-text="Tumbling Window Trigger-2":::
Note
The trigger will only run once it has been published. Additionally the expected behavior of tumbling window is to run all historical intervals from the start date until now. More information regarding tumbling window triggers can be found here.
-
Using SQL Server Management Studio make some additional changes to the customer table by running the following SQL:
insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading'); insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth'); insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth'); update customers set first_name='Elon' where customer_id=6; delete from customers where customer_id=5;
-
Click the Publish all button. Wait until you see the Publishing succeeded message.
-
After a few minutes the pipeline will have triggered and a new file will have been loaded into Azure Storage
-
Click the Monitor tab on the left. You see the pipeline run in the list and its status. To refresh the list, click Refresh. Hover near the name of the pipeline to access the Rerun action and Consumption report.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-pipeline-runs.png" alt-text="Pipeline runs":::
-
To view activity runs associated with the pipeline run, click the Pipeline name. If changed data was detected, there will be three activities including the copy activity otherwise there will only be two entries in the list. To switch back to the pipeline runs view, click the All Pipelines link at the top.
:::image type="content" source="./media/tutorial-incremental-copy-change-data-capture-feature-portal/copy-activity-runs.png" alt-text="Activity runs":::
You see the second file in the customers/incremental/YYYY/MM/DD
folder of the raw
container.
:::image type="content" source="media/tutorial-incremental-copy-change-data-capture-feature-portal/incremental-copy-pipeline-run.png" alt-text="Output file from incremental copy":::
Advance to the following tutorial to learn about copying new and changed files only based on their LastModifiedDate:
[!div class="nextstepaction"] Copy new files by lastmodifieddate