We krijgen vaker de vraag hoe we omgaan met het in sync houden van tabellen en voor ons is het met Snowflake een eitje gebleken met Streams en Tasks.

Snowflake Streams

Snowflake Streams stellen je instaat om op een eenvoudige wijze CDC (Change Data Capture) te gebruiken op specifieke ‘Source’ tabellen. Hierna kunnen we deze gegevens gebruiken om alle veranderingen te mergen in onze Target tabel.

Een voorbeeld is altijd een goede manier om een concept helder te krijgen , laten we daarom direct van start gaan met een source tabel :

CREATE TABLE mike_test_source(
    ID int,
    first_name VARCHAR(100),
    last_name VARCHAR(100)
);

We hebben een eenvoudige tabel met een paar simpele kolommen. De volgende stap is het opzetten van de Stream zelf :

CREATE STREAM mike_test_source_stream ON TABLE mike_test_source;

Meer is niet nodig! Alle wijzigingen worden nu bijgehouden in de stream , totdat deze worden opgevraagd.

Laten we beginnen met het vullen van mike_test_source :

INSERT INTO mike_test_source VALUES (1,'Mike','Droog');
INSERT INTO mike_test_source VALUES (2,'Jan','Jansen');
INSERT INTO mike_test_source VALUES (3,'John','Doe');

We hebben natuurlijk ook nog een target tabel nodig :

CREATE TABLE mike_test_target(
    ID int,
    first_name VARCHAR(100),
    last_name VARCHAR(100)
);

We hebben dus op dit moment de stream laten zien dat we 3 records hebben toegevoegd in de source tabel , iets wat we kunnen opvragen met :

select * from mike_test_source_stream;

We kunnen hier duidelijk de 3 nieuwe records zien , waarvan de METADATA$ACTION aangeeft dat het hier om een insert gaat.

We gaan deze nieuwe records, in de stream , dan ook overbrengen naar de target tabel :

MERGE INTO mike_test_target AS T
using (SELECT *
      FROM mike_test_source_stream) AS S
ON T.ID = S.ID
WHEN MATCHED
        AND S.METADATA$ACTION = 'INSERT'
        AND S.METADATA$ISUPDATE = TRUE THEN
    UPDATE SET T.first_name = S.first_name,
               T.last_name = S.last_name
WHEN MATCHED
        AND S.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN NOT MATCHED
        AND S.METADATA$ACTION = 'INSERT' THEN
    INSERT (ID,
           first_name,
           last_name)
    VALUES (S.id,
           S.first_name,
           S.last_name);

We gaan de nieuwe gegevens uit de stream, samen voegen met de target. We gaan hierbij eerst na of het record ID al voorkomt ; Hiermee kunnen we zien of het om een nieuw record gaat of een bestaand record.
Zodra we dit hebben bepaald , gaan we kijken naar de $ACTION die hiervoor staat. Als het gaat om een INSERT en een UPDATE dan kunnen we de gegevens wijzigen.
Als het ID echter niet wordt gevonden , dan kunnen we dit behandelen als een nieuw record en een INSERT gebruiken om deze toe te voegen.

Nadat we dit hebben uitgevoerd, zullen we zien dat dezelfde records in mike_test_target zijn gekomen en dat de stream direct geleegd is!
We hebben al vaker gezegd dat handmatige handelingen een risico kunnen vormen en we daarom liever uitgaan van een automatisering, dit is nu juist hetgeen waar Tasks ons bij kunnen helpen.

Tasks

Een task is een automatische handeling die Snowflake zelf voor je uitvoerd , gekoppeld aan een tijdschema.

Om onze voorgaande Merge in een task te verwerken hebben we nog 2 kleine stappen nodig :
– het aanmaken van de task
– het aanzetten van de task.

Eerst voegen we onderstaande code toe aan de merge :

CREATE TASK TASK_FactInternetSalesEveryMinute
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = '1 MINUTE' 
AS

Dit wordt dus in zijn totaliteit :

CREATE TASK TASK_merge_test_target_every_minute
  WAREHOUSE = COMPUTE_WH
  SCHEDULE = '1 MINUTE' 
AS
MERGE INTO mike_test_target AS T
using (SELECT *
      FROM mike_test_source_stream) AS S
ON T.ID = S.ID
WHEN MATCHED
        AND S.METADATA$ACTION = 'INSERT'
        AND S.METADATA$ISUPDATE = TRUE THEN
    UPDATE SET T.first_name = S.first_name,
               T.last_name = S.last_name
WHEN MATCHED
        AND S.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN NOT MATCHED
        AND S.METADATA$ACTION = 'INSERT' THEN
    INSERT (ID,
           first_name,
           last_name)
    VALUES (S.id,
           S.first_name,
           S.last_name);

de laatste stap is het aanzetten van de task , als deze wordt aangemaakt dan wordt deze namelijk standaard uitgezet :

ALTER TASK TASK_merge_test_target_every_minute RESUME;

We kunnen nu alle handelingen op mike_test_source uitvoeren , een minuut wachten , en dezelfde handelingen worden uitgevoerd op mike_test_target!

Conclusie

Het is met behulp van Snowflake al lang niet meer ingewikkeld om CDC in te zetten en snel te gebruiken. Je kunt de merge zelfs nog aanpassen door de DELETE action uit de task te halen ; hierdoor worden alleen records gewijzigd en toegevoegd maar nooit verwijderd, hierdoor zou een target kunnen dienen als bijvoorbeeld archief.