How to Perform ETL in Snowflake using Stored Procedures, Azure Data Factory, and Azure Functions
This is the transcript of a video recording. Access the recording in the original post published on Aptitive’s blog.
A version of this Snowflake lab on performing ETL in Snowflake using Stored Procedures, Azure Data Factory, and Azure Functions was originally presented in the Chicago Snowflake user group in March 2020. Since this video was recorded Microsoft announced the availability of an Azure Data Factory connector for reading data from and copying data in/out of Snowflake. However, data transformations and the use of the VARIANT data type are not yet supported. We recommend utilizing the methods showcased in this video to implement these features.
Hello and welcome to this webinar performing ETL in Snowflake using Snowflake stored procedures Azure Data Factory and Azure Functions. I’m Melanie Ruiz, a managing consultant and lead solution architect for Aptitive.
In today’s webinar we’ll go over a brief introduction to Snowflake stored procedures, Azure Data Factory and Azure Functions before jumping into the code.
Azure Data Factory is Microsoft’s fully managed serverless data integration tool. It allows developers to build ETL/ELT data processes called pipelines, with drag and drop functionality using numerous pre built activities. There are dozens of native connectors for your data sources and destinations from on-prem file systems and databases, such as Oracle DB2 and SQL Server to applications such as Dynamics 365, and Salesforce to cloud solutions such as AWS s3, Azure Data Lake Storage, and Azure Synapse. However, with all of these available connections, there is currently no native connector for Snowflake. To use Azure Data Factory you can use Azure functions to load data into and manipulate data within Snowflake.
Azure functions are a serverless compute platform. With functions, you can create web applications and micro services, or perform complex data processing and machine learning workflows. Azure functions can be written in several different languages, so you can choose the one that is most comfortable for you and your team.
For this demo, I’ll be showcasing a simple function written in C sharp. Let’s imagine you work for a company that is looking to create reports for its sales representatives using data stored in a Snowflake data warehouse. The tables are normalized and very large. The main customer table has 100 million records, and the sales table is 10 terabytes. Management wants to make sure that sales reps are only able to see information for customers in their state. Now there are several different ways to go about solving this but for the purposes of this demonstration, I’ve created separate data mart’s that will contain the data required for states sales reps.
Today I’ll focus on loading customer data into each data mart. To begin, let’s look at Snowflake. Here on the left, you can see I have a database called Aptitive Discovery. Within there are three schema, California sales mart, Illinois sales mart, and New York sales mart. Each of these has a dim customer table with the same structure in each mart. Hovering over the names of the table, you can see that the tables are currently empty.
Let’s move on to the Azure function. The function will be called from Azure Data Factory using an HTTP POST request, I have a request class that contains the information that ADF will pass into it, which includes the source and target databases, the target schema, table and state. To make this function more dynamic, I’m also passing in the name of the stored procedure that should be called. Within the function, I parsed the request class into variables. I then set a variable called run stored proc with my statement actually called the stored procedure. I replaced the variables here with the values from the request body. Next I create a connection to Snowflake. Notice I’m using an environment variable for the connection string. This in turn, references an Azure Key Vault to securely store the connection string and password. Once I have a connection to Snowflake, I’ll issue my run stored proc variable as the command. I will wait for it to return the result. Alternatively, you could issue the command and return to ADF without awaiting the query to run. This may be preferable if your data pipeline can continue without waiting for a result, and is required if you have a long running query, as the timeout for an HTTP triggered function is 230 seconds. If you do wait for the result, you can close the connection to Snowflake like I do at the end.
Let’s switch over to the Azure storage account. Here I have a container that contains a JSON configuration file. If we look inside the file, it contains three JSON objects, each of which are the same structure as the request body in the Azure function. ADF will pass one entire JSON object to the function at runtime.
Now let’s take a look at Azure Data Factory. First, I have an Azure Blob Storage data set called config JSON that is connected to my storage account. Within the data set, I created two parameters, blob container and config file. I use these on the connection tab for the file path fields. For the sake of this demo, I’ve created a simple pipeline. Within my pipeline, I have defined two parameters, container name, and file name. The first component is a lookup activity to access the configuration file we reviewed. Here you can see my data set parameters, blob container and config file. I’m going to pass in my pipeline parameters, container name and file name at runtime. At the bottom of the Settings tab, make sure that the first row only box is unchecked to return all data in the file. Next, we’ll use a ForEach loop to iterate through each of the JSON objects in my file. On the Settings tab, I’ve decided to run my tables in parallel by unchecking the sequential box. I’m going to pass in the output value from my lookup activity to the items field. Within the ForEach, I have an Azure function activity. Here I’ll send my incoming item, one JSON object to my function via a post request. Now I’m going to run the pipeline in debug mode so we can view the inputs and outputs of each activity. I need to supply values for the container name and the file name parameters. I’ll specify my config container and the name of my configuration file. I’ll let this run and we’ll come back in just a second.
Okay, my pipeline has completed successfully, let’s take a look at the runtime values. For the input to the look up activity, we can see the values that I entered for the container name and file name were passed to the data set parameters, blob container and config file. Coming out of the activity, the output count is three and the value field holds my JSON objects. Remember I specified I want to pass the output value to the items field in the ForEach activity. You can see I have three instances of the call to my stored procedure function corresponding to the three objects in my configuration file. If we look at the input to the activity, The body of the post request contains one entire JSON object.
Finally, let’s head back to Snowflake to take a look at the populated tables. Hovering over each table name, I can now see that they contain data. Notice that the row count is different for each sales mart. If I preview the data for the New York sales mart, I’ll only see customers whose current address is New York.
Thank you for joining me for this webinar on performing ETL using Snowflake stored procedures Azure Data Factory and Azure Functions. Click here for more Snowflake resources or to learn more about our Snowflake consulting, Snowflake training, and other Snowflake solutions.