The development of Snowflake skills continues with Frosty Friday challenge #2. In this post, we will go over challenge 2 – Streams, and try to explore more features within the Snowflake ecosystem.

In Frosty Friday challenge #2, we are asked to load in parquet data and transform it into a table, then create a stream that will only show changes to the DEPT and JOB_TITLE columns. For more context please refer to Frosty Friday – Challenge #2.

As the basis for our flow, let’s set up our Environment Configuration as mentioned in Developing Snowflake Skills – Frosy Friday 1.

// Environment Configuration
USE DATABASE ATZMON_DB;
USE WAREHOUSE COMPUTE_WH;
USE SCHEMA CHALLENGES;

Load in the Parquet Data to a Stage

Since we have a Parquet data, we will create an appropriate File Format object specifying the Parquet file format type. We will accept the default values for the other options.

// Creating a PARQUET file format object
CREATE OR REPLACE FILE FORMAT ch2_parquet
  type = 'parquet';

Parquet File is an open source file format built to handle flat columnar storage data formats. Parquet operates well with complex data in large volumes. It is known for its both performant data compression and its ability to handle a wide variety of encoding types.

Snowflake reads Parquet data into a single Variant column (Variant is a tagged universal type that can hold up to 16 MB of any data type supported by Snowflake). Users can query the data in a Variant column using standard SQL, including joining it with structured data. Additionally, users can extract select columns from a staged Parquet file into separate table columns.

https://www.snowflake.com/guides/
// Create stage and load the parquet file
CREATE OR REPLACE STAGE ch2_parq_stg
  url = 's3://frostyfridaychallenges/challenge_2';

// List files in stage
LIST @ch2_parq_stg;

// Checking file contents
SELECT *
FROM @ch2_parq_stg (
    file_format => 'ch2_parquet',
    pattern => 'challenge_2/employees.parquet');

Creating a Table and Table schema

In Frosty Friday – Challenge #1, we created a table with a manual input of the Table Schema. Although we can do the same, Parquet files offer us another option, a better one. We will use a few Snowflake SQL functions to automatically detect the underlying Table Schema and associate it to table.

We are going to tweak our table creation with a template and use a few functions. Let’s look at the script to do that and then explain the functions we are using.

// Create an empty table 'ch2_parquet_tbl' automatically using template
// while using array_agg(object_construct(*)) and infer_schema() functions
CREATE OR REPLACE TABLE ch2_parquet_tbl USING template (
SELECT array_agg(object_construct(*))
FROM table (
  infer_schema(
    location => '@ch2_parq_stg/',
    file_format => 'ch2_parquet')
    ));

There are several things happening in the code above and I will elaborate on that a bit. The CREATE TABLE command with the USING TEMPLATE clause is executed to create a new table with the column definitions derived from the INFER_SCHEMA function output.

The array_agg(object_construct(*)) expression creates a Json object and tranforms it to an array that feeds into the CREATE TABLE with the USING TEMPLATE clause.

Cool! We have our table and a table schema in place. One caveat to note is that our generated columns don’t neccesarily match with the order in our staged file. But don’t worry, we will address it in our next action.

Copy Stage Data to Table

Next, we will use a COPY INTO command to load data in from the staged file to our newly created table. In order to insure an accurate ingestion of the data, we will add the option MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE and match corresponding columns represented in the data. PURGE option is a boolean that specifies whether to remove the data files from the stage automatically after the data is loaded successfully.

// Loading the data into 'ch2_parquet_tbl' to insure columns order mismatch
COPY INTO ch2_parquet_tbl
FROM @ch2_parq_stg
  file_format = 'ch2_parquet',
  ON_ERROR = 'ABORT_STATEMENT'
  MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
  PURGE = FALSE;

Tracking Changes – Streams

Since our task is to find changes made to only a few of the table columns (DEPT and JOB_TITLE), we will create a view with those columns and then track it.

Tables and Views are the primary objects created and maintained in database schemas:
All data in Snowflake is stored in tables.
Views can be used to display selected rows and columns in one or more tables.

https://docs.snowflake.com/en/sql-reference/ddl-table.html
// Creating a view based on ch2_parquet_tbl with the requested columns
CREATE OR REPLACE VIEW ch2_parquet_view as
  SELECT "employee_id",
    "job_title",
    "dept"
  FROM ch2_parquet_tbl;

As you might have noticed, we added employee_id to the view to keep all rows otherwise they will be aggragated.

In order to track the changes made to the view we will create a stream.

A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data. This process is referred to as change data capture (CDC). 

Streams can be created to query change data on Table and Views objects

https://docs.snowflake.com/en/user-guide/streams.html
// Creating a stream to record data changes made to a Table/View
CREATE OR REPLACE STREAM ch2_parquet_stream ON VIEW ch2_parquet_view;

Ok Cool! We’ve created a strem to track changes made to our Table/View. Let’s implement those changes.

// Execute the following commands:
UPDATE ch2_parquet_tbl SET "country" = 'Japan' WHERE "employee_id" = 8;
UPDATE ch2_parquet_tbl SET "last_name" = 'Forester' WHERE "employee_id" = 22;
UPDATE ch2_parquet_tbl SET "dept" = 'Marketing' WHERE "employee_id" = 25;
UPDATE ch2_parquet_tbl SET "title" = 'Ms' WHERE "employee_id" = 32;
UPDATE ch2_parquet_tbl SET "job_title" = 'Senior Financial Analyst' WHERE "employee_id" = 68;

All we have to do is check the stream!

// Checking the stream ch2_parquet_stream  
// Query shows changes made on specific columns: dept, job_title
SELECT * FROM ch2_parquet_stream;

And that’s it, Challenge solved!

I hope you found the post useful…

Next time we will cover Frosty Friday – Challenge #3

Tags:

Bel ons

Afspraak

Mail ons