How to Perform ETL in Snowflake using Stored Procedures, Azure Data Factory, and Azure Functions
This is the transcript of a video recording. Access the recording in the original post published on Aptitive’s blog.
A version of this Snowflake lab on performing ETL in Snowflake using Stored Procedures, Azure Data Factory, and Azure Functions was originally presented in the Chicago Snowflake user group in March 2020. Since this video was recorded Microsoft announced the availability of an Azure Data Factory connector for reading data from and copying data in/out of Snowflake. However, data transformations and the use of the VARIANT data type are not yet supported. We recommend utilizing the methods showcased in this video to implement these features.
Transcript
Hello and welcome to this webinar performing ETL in Snowflake using Snowflake stored procedures Azure Data Factory and Azure Functions. I’m Melanie Ruiz, a managing consultant and lead solution architect for Aptitive.
In today’s webinar we’ll go over a brief introduction to Snowflake stored procedures, Azure Data Factory and Azure Functions before jumping into the code.
As of May 2, 2019, which coincidentally is my birthday, Snowflake introduced stored procedures. These act similar to user defined functions with some differences. In a stored procedure, returning a value is optional, but it can return only a single value. Additionally, a return value cannot be directly used by the calling SQL statement like a function can. It is also important to note that multiple stored procedures can have the same fully qualified name, database schema and procedure name, so long as their arguments differ, whether by the number of arguments or the data types. Snowflake stored procedures are created using JavaScript, which provides the structure and allows for iterating, branching and error handling. Within the JavaScript, you can dynamically create SQL statements and run them using the JavaScript API.
Azure Data Factory is Microsoft’s fully managed serverless data integration tool. It allows developers to build ETL/ELT data processes called pipelines, with drag and drop functionality using numerous pre built activities. There are dozens of native connectors for your data sources and destinations from on-prem file systems and databases, such as Oracle DB2 and SQL Server to applications such as Dynamics 365, and Salesforce to cloud solutions such as AWS s3, Azure Data Lake Storage, and Azure Synapse. However, with all of these available connections, there is currently no native connector for Snowflake. To use Azure Data Factory you can use Azure functions to load data into and manipulate data within Snowflake.
Azure functions are a serverless compute platform. With functions, you can create web applications and micro services, or perform complex data processing and machine learning workflows. Azure functions can be written in several different languages, so you can choose the one that is most comfortable for you and your team.
For this demo, I’ll be showcasing a simple function written in C sharp. Let’s imagine you work for a company that is looking to create reports for its sales representatives using data stored in a Snowflake data warehouse. The tables are normalized and very large. The main customer table has 100 million records, and the sales table is 10 terabytes. Management wants to make sure that sales reps are only able to see information for customers in their state. Now there are several different ways to go about solving this but for the purposes of this demonstration, I’ve created separate data mart’s that will contain the data required for states sales reps.
Today I’ll focus on loading customer data into each data mart. To begin, let’s look at Snowflake. Here on the left, you can see I have a database called Aptitive Discovery. Within there are three schema, California sales mart, Illinois sales mart, and New York sales mart. Each of these has a dim customer table with the same structure in each mart. Hovering over the names of the table, you can see that the tables are currently empty.
Now let’s take a look at the stored procedure that will be loading dim customer. When creating a Snowflake stored procedure, you specify the name and data type of each argument. Remember stored procs with the same name, but different arguments are considered different stored procs. In this example, I’ll be passing in the name of the schema, table and state to load. This stored procedure will return a string value to let the Azure function know that it has completed stressfully. You also specify the language that the stored procedure is written in. Although JavaScript is currently the only option. I’m setting a variable called SQL statement that will contain the value of my SQL statement enclosed using the tick mark. I break in and out of SQL to replace objects in the statement with values passed in through the arguments. The statement begins with insert overwrite into Snowflakes equivalent of a truncate and load, and then the fully qualified name of the database schema and table using the table and schema arguments that I passed in. The select statement joins across three tables in the Snowflake sample data database to reduce a denormalized customer record. The where statement uses my state variable to filter the result set to only the records for a given state. At the bottom, I pass my SQL statement various To the Snowflake dot execute method of the JavaScript API, which I’ve wrapped in a try catch block to catch any error messages.
Let’s move on to the Azure function. The function will be called from Azure Data Factory using an HTTP POST request, I have a request class that contains the information that ADF will pass into it, which includes the source and target databases, the target schema, table and state. To make this function more dynamic, I’m also passing in the name of the stored procedure that should be called. Within the function, I parsed the request class into variables. I then set a variable called run stored proc with my statement actually called the stored procedure. I replaced the variables here with the values from the request body. Next I create a connection to Snowflake. Notice I’m using an environment variable for the connection string. This in turn, references an Azure Key Vault to securely store the connection string and password. Once I have a connection to Snowflake, I’ll issue my run stored proc variable as the command. I will wait for it to return the result. Alternatively, you could issue the command and return to ADF without awaiting the query to run. This may be preferable if your data pipeline can continue without waiting for a result, and is required if you have a long running query, as the timeout for an HTTP triggered function is 230 seconds. If you do wait for the result, you can close the connection to Snowflake like I do at the end.
Let’s switch over to the Azure storage account. Here I have a container that contains a JSON configuration file. If we look inside the file, it contains three JSON objects, each of which are the same structure as the request body in the Azure function. ADF will pass one entire JSON object to the function at runtime.
Now let’s take a look at Azure Data Factory. First, I have an Azure Blob Storage data set called config JSON that is connected to my storage account. Within the data set, I created two parameters, blob container and config file. I use these on the connection tab for the file path fields. For the sake of this demo, I’ve created a simple pipeline. Within my pipeline, I have defined two parameters, container name, and file name. The first component is a lookup activity to access the configuration file we reviewed. Here you can see my data set parameters, blob container and config file. I’m going to pass in my pipeline parameters, container name and file name at runtime. At the bottom of the Settings tab, make sure that the first row only box is unchecked to return all data in the file. Next, we’ll use a ForEach loop to iterate through each of the JSON objects in my file. On the Settings tab, I’ve decided to run my tables in parallel by unchecking the sequential box. I’m going to pass in the output value from my lookup activity to the items field. Within the ForEach, I have an Azure function activity. Here I’ll send my incoming item, one JSON object to my function via a post request. Now I’m going to run the pipeline in debug mode so we can view the inputs and outputs of each activity. I need to supply values for the container name and the file name parameters. I’ll specify my config container and the name of my configuration file. I’ll let this run and we’ll come back in just a second.
Okay, my pipeline has completed successfully, let’s take a look at the runtime values. For the input to the look up activity, we can see the values that I entered for the container name and file name were passed to the data set parameters, blob container and config file. Coming out of the activity, the output count is three and the value field holds my JSON objects. Remember I specified I want to pass the output value to the items field in the ForEach activity. You can see I have three instances of the call to my stored procedure function corresponding to the three objects in my configuration file. If we look at the input to the activity, The body of the post request contains one entire JSON object.
Finally, let’s head back to Snowflake to take a look at the populated tables. Hovering over each table name, I can now see that they contain data. Notice that the row count is different for each sales mart. If I preview the data for the New York sales mart, I’ll only see customers whose current address is New York.
Thank you for joining me for this webinar on performing ETL using Snowflake stored procedures Azure Data Factory and Azure Functions. Click here for more Snowflake resources or to learn more about our Snowflake consulting, Snowflake training, and other Snowflake solutions.
Originally published at https://aptitive.com on June 9, 2020.