The development of Snowflake skills continues with Frosty Friday challenge #2. In this post, we will go over challenge 2 – Streams, and try to explore more features within the Snowflake ecosystem.
In Frosty Friday challenge #2, we are asked to load in parquet data and transform it into a table, then create a stream that will only show changes to the DEPT and JOB_TITLE columns. For more context please refer to Frosty Friday – Challenge #2.
As the basis for our flow, let’s set up our Environment Configuration as mentioned in Developing Snowflake Skills – Frosy Friday 1.
// Environment Configuration USE DATABASE ATZMON_DB; USE WAREHOUSE COMPUTE_WH; USE SCHEMA CHALLENGES;
Load in the Parquet Data to a Stage
Since we have a Parquet data, we will create an appropriate File Format object specifying the Parquet file format type. We will accept the default values for the other options.
// Creating a PARQUET file format object CREATE OR REPLACE FILE FORMAT ch2_parquet type = 'parquet';
Parquet File is an open source file format built to handle flat columnar storage data formats. Parquet operates well with complex data in large volumes. It is known for its both performant data compression and its ability to handle a wide variety of encoding types.
Snowflake reads Parquet data into a single Variant column (Variant is a tagged universal type that can hold up to 16 MB of any data type supported by Snowflake). Users can query the data in a Variant column using standard SQL, including joining it with structured data. Additionally, users can extract select columns from a staged Parquet file into separate table columns.https://www.snowflake.com/guides/
// Create stage and load the parquet file CREATE OR REPLACE STAGE ch2_parq_stg url = 's3://frostyfridaychallenges/challenge_2'; // List files in stage LIST @ch2_parq_stg; // Checking file contents SELECT * FROM @ch2_parq_stg ( file_format => 'ch2_parquet', pattern => 'challenge_2/employees.parquet');
Creating a Table and Table schema
In Frosty Friday – Challenge #1, we created a table with a manual input of the Table Schema. Although we can do the same, Parquet files offer us another option, a better one. We will use a few Snowflake SQL functions to automatically detect the underlying Table Schema and associate it to table.
We are going to tweak our table creation with a template and use a few functions. Let’s look at the script to do that and then explain the functions we are using.
// Create an empty table 'ch2_parquet_tbl' automatically using template // while using array_agg(object_construct(*)) and infer_schema() functions CREATE OR REPLACE TABLE ch2_parquet_tbl USING template ( SELECT array_agg(object_construct(*)) FROM table ( infer_schema( location => '@ch2_parq_stg/', file_format => 'ch2_parquet') ));
There are several things happening in the code above and I will elaborate on that a bit. The
CREATE TABLE command with the
USING TEMPLATE clause is executed to create a new table with the column definitions derived from the
INFER_SCHEMA function output.
array_agg(object_construct(*)) expression creates a Json object and tranforms it to an array that feeds into the
CREATE TABLE with the
USING TEMPLATE clause.
Cool! We have our table and a table schema in place. One caveat to note is that our generated columns don’t neccesarily match with the order in our staged file. But don’t worry, we will address it in our next action.
Copy Stage Data to Table
Next, we will use a
COPY INTO command to load data in from the staged file to our newly created table. In order to insure an accurate ingestion of the data, we will add the option
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE and match corresponding columns represented in the data.
PURGE option is a boolean that specifies whether to remove the data files from the stage automatically after the data is loaded successfully.
// Loading the data into 'ch2_parquet_tbl' to insure columns order mismatch COPY INTO ch2_parquet_tbl FROM @ch2_parq_stg file_format = 'ch2_parquet', ON_ERROR = 'ABORT_STATEMENT' MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE PURGE = FALSE;
Tracking Changes – Streams
Since our task is to find changes made to only a few of the table columns (
JOB_TITLE), we will create a view with those columns and then track it.
Tables and Views are the primary objects created and maintained in database schemas:https://docs.snowflake.com/en/sql-reference/ddl-table.html
All data in Snowflake is stored in tables.
Views can be used to display selected rows and columns in one or more tables.
// Creating a view based on ch2_parquet_tbl with the requested columns CREATE OR REPLACE VIEW ch2_parquet_view as SELECT "employee_id", "job_title", "dept" FROM ch2_parquet_tbl;
As you might have noticed, we added employee_id to the view to keep all rows otherwise they will be aggragated.
In order to track the changes made to the view we will create a stream.
A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data. This process is referred to as change data capture (CDC).
Streams can be created to query change data on Table and Views objectshttps://docs.snowflake.com/en/user-guide/streams.html
// Creating a stream to record data changes made to a Table/View CREATE OR REPLACE STREAM ch2_parquet_stream ON VIEW ch2_parquet_view;
Ok Cool! We’ve created a strem to track changes made to our Table/View. Let’s implement those changes.
// Execute the following commands: UPDATE ch2_parquet_tbl SET "country" = 'Japan' WHERE "employee_id" = 8; UPDATE ch2_parquet_tbl SET "last_name" = 'Forester' WHERE "employee_id" = 22; UPDATE ch2_parquet_tbl SET "dept" = 'Marketing' WHERE "employee_id" = 25; UPDATE ch2_parquet_tbl SET "title" = 'Ms' WHERE "employee_id" = 32; UPDATE ch2_parquet_tbl SET "job_title" = 'Senior Financial Analyst' WHERE "employee_id" = 68;
All we have to do is check the stream!
// Checking the stream ch2_parquet_stream // Query shows changes made on specific columns: dept, job_title SELECT * FROM ch2_parquet_stream;
And that’s it, Challenge solved!
I hope you found the post useful…
Next time we will cover Frosty Friday – Challenge #3