Data Engineering & Lake

Jun 1, 2024

Snowflake Dynamic Tables

In the ever-changing world of data, real-time processing and analysis are paramount. Snowflake's Dynamic Tables (DTs) provide a powerful mechanism to create automated data pipelines without the overhead of complex code or manual scheduling. This chapter will guide you through building a multi-stage data pipeline using dynamic tables, showcasing how they can streamline your workflows and deliver timely insights.

Overview

Snowflake Dynamic Tables offer a revolutionary approach to data pipelines. They empower you to define your data transformation logic in a simple, declarative SQL query. Snowflake then takes care of the rest, automatically updating the results as your source data changes. This eliminates the need for complex, procedural code and tedious scheduling tasks.

Compared to traditional streams and tasks, dynamic tables provide a more streamlined and intuitive experience. They are easier to set up, maintain, and understand, especially for users who prefer a SQL-centric approach. Streams and tasks offer more granular control and flexibility, but they can be more challenging to implement and manage. Dynamic tables strike a balance between simplicity and power, making them an ideal choice for many data transformation use cases.


A Practical Example: Sales Data Analysis

Run and setup the SalesDB sample data model and proceed with the rest of the exercises. 

⛁ Sample Data Model: salesdb-data-model

This model provides a robust foundation for tracking customers, buyers, clients, and sales opportunities. 

>_ SQL

// Assign the database schema
use schema

Let's delve into how Snowflake Dynamic Table feature can supercharge your data workflow:

Building the Sales Data Pipeline

We'll work with the SalesDB database and the custs schema, which contains the following tables:

  • Customer: Stores information about potential and existing customers.

  • Opportunities: Tracks sales opportunities at various stages.

Our goal is to create a pipeline that cleans and enriches customer data, filters out irrelevant opportunities, joins the two datasets, and finally aggregates the results for analysis.

Step 1: Enriched Customer Data (Dynamic Table 1)

>_ SQL

CREATE OR REPLACE DYNAMIC TABLE custs.enriched_customers
TARGET_LAG = '1 minutes'
WAREHOUSE = demo_wh
AS
SELECT
    c.CustomerID,
    c.FirstName,
    c.LastName,
    TRIM(c.Email) AS Email,             -- Trim whitespace
    INITCAP(c.FirstName) AS CleanFirstName,  -- Capitalize first letter of names
    INITCAP(c.LastName) AS CleanLastName,
    CASE
        WHEN c.Email LIKE '%@%.%' THEN 'Valid'
        ELSE 'Invalid'
    END AS EmailValidity,
    CURRENT_TIMESTAMP() AS LoadDate
FROM

  • Purpose: This dynamic table cleans and enriches the raw customer data by:

    • Trimming whitespace from emails.

    • Standardizing names by capitalizing the first letter.

    • Adding an EmailValidity flag to indicate whether an email is valid.

    • Recording the LoadDate for tracking freshness.

Step 2: Filtered Opportunities (Dynamic Table 2)

>_ SQL

CREATE OR REPLACE DYNAMIC TABLE custs.filtered_opportunities
TARGET_LAG = '1 minutes'
WAREHOUSE = demo_wh
AS
SELECT
    o.*
FROM custs.Opportunities o
WHERE o.Amount > 0 AND o.SalesStage != 'InvalidStage'
  AND o.ExpectedCloseDate IS NOT NULL

  • Purpose: Filters out invalid or irrelevant opportunities based on these criteria:

    • Positive Amount

    • Valid SalesStage (not 'InvalidStage')

    • Not null ExpectedCloseDate

Step 3: Joined Customer Opportunities (Dynamic Table 3)

>_ SQL

CREATE OR REPLACE DYNAMIC TABLE custs.customer_opportunities
TARGET_LAG = '1 minutes'
WAREHOUSE = demo_wh
AS
SELECT
    e.FirstName,
    e.LastName,
    e.Email,
    e.EmailValidity,
    f.*
FROM custs.enriched_customers e
LEFT JOIN custs.filtered_opportunities f ON

  • Purpose: Joins the enriched customer data with the filtered opportunities, using a LEFT JOIN to include customers without any opportunities.

Step 4: Aggregated Sales (Dynamic Table 4)

>_ SQL

CREATE OR REPLACE DYNAMIC TABLE custs.aggregated_sales 
TARGET_LAG = '1 minutes'
WAREHOUSE = demo_wh
AS
SELECT
    c.CustomerID,
    c.CleanFirstName || ' ' || c.CleanLastName AS FullName,
    c.Email,
    COUNT(o.OpportunityID) AS TotalOpportunities,
    SUM(o.Amount) AS TotalPotentialRevenue
FROM custs.enriched_customers c
LEFT JOIN custs.filtered_opportunities o ON c.CustomerID = o.CustomerID
GROUP BY c.CustomerID, FullName,

  • Purpose: Aggregates sales data for each customer based on their opportunities.

Step 5: Customer Opportunities with Aggregated Sales (Dynamic Table 5)

>_ SQL

CREATE OR REPLACE DYNAMIC TABLE custs.customer_opportunities_with_sales
TARGET_LAG = '1 minutes'
WAREHOUSE = demo_wh
AS
SELECT
    co.*,
    agg.TotalOpportunities,
    agg.TotalPotentialRevenue
FROM custs.customer_opportunities co
LEFT JOIN custs.aggregated_sales agg ON

  • Purpose: This new dynamic table combines the detailed customer opportunity information from customer_opportunities with the aggregated sales metrics from aggregated_sales.

Dynamic Tables In Action

Here are three batches of INSERT statements designed to demonstrate the dynamic updates of your Snowflake tables, along with explanations:

Batch 1: New High-Value Customer and Opportunities

>_ SQL

-- Customer Data (Batch 1)
INSERT INTO custs.Customer (CustomerID, FirstName, LastName, Email, HomeLocation, ZipCode)
VALUES
    (11, 'Sophia', 'Miller', 'sophia.miller@example.com', '567 High St', '94108');

-- Opportunities Data (Batch 1)
INSERT INTO custs.Opportunities (OpportunityID, CustomerID, LeadSource, SalesStage, ExpectedCloseDate, Amount)
VALUES
    (1013, 11, 'Partner Referral', 'Closed Won', '2024-05-25', 250000), -- High-value, closed opportunity
    (1014, 11, 'Outbound Call', 'Negotiation', '2024-06-10', 180000); -- High-value, ongoing opportunity

This batch introduces a new high-value customer ("Sophia Miller") with two associated opportunities, one closed and one still in negotiation. These additions will trigger updates in the enriched_customers, filtered_opportunities, customer_opportunities, and aggregated_sales dynamic tables.

Batch 2: New Opportunities with Data Issues

>_ SQL

-- Opportunities Data (Batch 2)
INSERT INTO custs.Opportunities (OpportunityID, CustomerID, LeadSource, SalesStage, ExpectedCloseDate, Amount)
VALUES
    (1015, 2, 'Web Form', 'InvalidStage', '2024-07-20', 100000),  -- Invalid SalesStage
    (1016, 3, 'Referral', 'Qualification', NULL, 80000),          -- Missing ExpectedCloseDate
    (1017, 1, 'Email Campaign', 'Proposal', '2024-06-25', -5000); -- Negative Amount
  • This batch adds three new opportunities but introduces data quality issues:

    • An invalid SalesStage value.

    • A missing ExpectedCloseDate.

    • A negative Amount.

  • This demonstrates how the filtered_opportunities table will exclude these records due to the applied filters.

Batch 3: Updates to Existing Records

>_ SQL

-- Update customer data
UPDATE custs.Customer
SET LastName = 'Johnson-Smith'
WHERE CustomerID = 1;

-- Update opportunity data
UPDATE custs.Opportunities
SET SalesStage = 'Closed Won'
WHERE OpportunityID = 1005

  • These updates modify existing records, showcasing how dynamic tables can react to changes in the source data.

Key Points for Demonstration:

  • Observe the Pipeline: After each batch insertion, query the dynamic tables to see how the data is transformed and propagated through the pipeline.

  • Highlight Filtering: Demonstrate how the filtered_opportunities table excludes records with invalid or missing data.

  • Aggregation Results: Show how the aggregated_sales table updates to reflect the new opportunities and their impact on the total potential revenue.

  • Data Classification & Masking: If you have implemented masking policies, demonstrate how they protect sensitive information in the enriched_customers table after the updates.

This showcases dynamic updates, and illustrates the power of Snowflake Dynamic Tables in creating real-time data pipelines.

Bonus: Streamlit App

Use this optional Streamlit app to visualize the record flow across the different tables. Add additional records and see how they get updated.  

>_ Python

Packages:


import streamlit as st
import pandas as pd
from snowflake.snowpark.session import Session
from snowflake.snowpark.context import get_active_session
import altair as alt
import time
import matplotlib.pyplot as plt


session = get_active_session()

current_batch  = 0

# Define batches of sample data 
customer_batches = [
    [
        (11, 'Sophia', 'Miller', 'LTY-00011', 'sophia.miller@example.com', '567 High St', '94108', '2024-04-20 10:30:00'),
        (12, 'Olivia', 'Miller', 'LTY-00012', 'olivia.miller@example.com', '999 Lake Ave', '75001', '2024-04-20 11:15:00'),  
        (13, 'Liam', 'Johnson', 'LTY-00013', 'liam.jonson@example.com', '55 Spring St', '30303', '2024-04-20 12:45:00')
    ],
    [
        (14, 'Noah', 'Clark', 'LTY-00014', 'noah.clark@example.com', '888 River Dr.', '80202', '2024-04-21 10:30:00'),  
        (15, 'Emma', 'Davis', 'LTY-00015', 'emma.davis@example.com', '77 Sunset Blvd', '90210', '2024-04-21 11:15:00'), 
        (16, 'William', 'Taylor', 'LTY-00016', 'william.taylor@example.com', '42 Main St', '60601', '2024-04-21 12:45:00')
    ],
    [
        (17, 'Ava', 'Garcia', 'LTY-00017', 'ava.garcia@example.com', '123 Park Ave', '10021', '2024-04-22 10:30:00'),
        (18, 'James', 'Hernandez', 'LTY-00018', 'james.hernandez@example.com', '456 Elm St', '92101', '2024-04-22 11:15:00'),
        (19, 'Isabella', 'Lopez', 'LTY-00019', 'isabella.lopez@example.com', '789 Oak Ave', '33139', '2024-04-22 12:45:00')
    ]
]

opportunity_batches = [
    [
        (1013, 11, None, 'Social Media', 'Qualification', '2024-07-10', 60000, '2024-04-20 13:30:00'),
        (1014, 11, None, 'Outbound Call', 'Negotiation', '2024-06-10', 180000, '2024-04-20 14:20:00'),
        (1015, 2, None, 'Web Form', 'InvalidStage', '2024-07-20', 100000, '2024-04-20 15:10:00') 
    ],
    [
        (1016, 3, None, 'Referral', 'Qualification', '2024-06-25', 80000, '2024-04-21 13:30:00'),
        (1017, 1, None, 'Email Campaign', 'Proposal', '2024-06-25', -5000, '2024-04-21 14:20:00'),
        (1018, 12, None, 'Partner Referral', 'Closed Won', '2024-05-30', 150000, '2024-04-21 15:10:00')
    ],
    [
        (1019, 14, None, 'Outbound Call', 'Negotiation', '2024-08-15', 95000, '2024-04-22 13:30:00'),
        (1020, 15, None, 'Web Form', 'Qualification', '2024-09-05', 70000, '2024-04-22 14:20:00'),
        (1021, 11, None, 'Social Media', 'Closed Won', '2024-06-01', 220000, '2024-04-22 15:10:00')
    ]
]


# Function to execute a query and return a DataFrame
def run_query(session, query):
    result = session.sql(query).collect()
    df = pd.DataFrame(result)
    return df

# Function to insert a batch of sample data
def insert_batch(session, batch_num):
    # Insert into Customer table
    inserted_customers = pd.DataFrame(customer_batches[batch_num], columns=session.table("Customer").columns)
    session.write_pandas(inserted_customers, "CUSTOMER")
    
    # Insert into Opportunities table
    inserted_opportunities = pd.DataFrame(opportunity_batches[batch_num], columns=session.table("Opportunities").columns)
    session.write_pandas(inserted_opportunities, "OPPORTUNITIES")

    # Display inserted data
    st.subheader(f"Inserted Data - Batch {batch_num + 1}")
    st.write("Customers:")
    st.dataframe(inserted_customers)
    st.write("Opportunities:")
    st.dataframe(inserted_opportunities)


# Function to get table counts in desired order with shortened names
def get_table_counts(session):
    counts = {}
    table_order = ['Customer', 'Opportunities', 'enriched_customers', 'filtered_opportunities', 'customer_opportunities', 'aggregated_sales', 'customer_opportunities_with_sales']
    for table_name in table_order:
        # Fetch the exact table name from Snowflake
        actual_table_name = session.sql(f"SHOW TABLES LIKE '{table_name}' IN SCHEMA custs").collect()[0][1]
        short_name = table_name.replace('_', ' ') # Create a shorter name for the chart
        counts[short_name] = session.sql(f"SELECT COUNT(*) FROM custs.{actual_table_name}").collect()[0][0]
    return counts

# Streamlit app
st.title("Snowflake Dynamic Table Pipelines")

def insert_batch_and_refresh(session, batch_num, refresh_time=60):
  insert_batch(session, batch_num)
  with st.empty():
    for seconds_left in range(refresh_time, -1, -1):
      st.write(f"Refreshing in {seconds_left} seconds...")
      time.sleep(1)
    st.write("")
    st.experimental_rerun()

# Main table counts chart
with st.container():
    st.subheader("Table Counts")
    counts = get_table_counts(session)
    df_counts = pd.DataFrame.from_dict(counts, orient='index', columns=['Count'])

    # Choose a color palette for better readability
    colors = ['skyblue', 'salmon', 'lightgreen', 'gold', 'orchid', 'lightcoral', 'mediumpurple']

    # Create bar chart with Matplotlib to have more control over labels
    fig, ax = plt.subplots(figsize=(10, 5))  # Adjust size if needed

    bars = ax.bar(df_counts.index, df_counts['Count'], color=colors)

    # Add the count value as a label above each bar
    for bar in bars:
        height = bar.get_height()
        label_text = str(height)
        ax.annotate(label_text,
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 3),
                    textcoords="offset points",
                    ha='center', va='bottom')

    plt.xlabel("Table", fontsize=12)
    plt.ylabel("Record Count", fontsize=12)
    plt.xticks(rotation=-45, ha='left')

    # Remove top and right borders
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)

    st.pyplot(fig)

# Batch insertion buttons
for i in range(3):
    if st.button(f"Insert Batch {i + 1}"):
        insert_batch_and_refresh(session, i)


with st.expander("Raw Tables"):
    st.write("Customer Information:")
    df_customers = run_query(session, "SELECT * FROM Custs.Customer")
    st.metric("Total Customers", len(df_customers))  
    st.dataframe(df_customers)

    st.write("Sales Opportunities:")
    df_opportunities = run_query(session, "SELECT * FROM Custs.Opportunities")
    st.metric("Total Opportunities", len(df_opportunities))  
    st.dataframe(df_opportunities)

with st.expander("Dynamic Tables"):
    for table_name in ["enriched_customers", "filtered_opportunities", "customer_opportunities", "aggregated_sales", "customer_opportunities_with_sales"]:
        with st.container():
            st.write(f"**{table_name.replace('_', ' ').title()}**") 
            df = run_query(session, f"SELECT * FROM custs.{table_name}")
            st.metric(f"Record Count", len(df)) 
            st.dataframe(df)

# Success message when all batches are inserted
if current_batch == 3:
    st.success("All batches inserted!")

Key Points

  • Automation: Snowflake automatically updates these dynamic tables whenever the underlying Customer or Opportunities tables change, keeping your data pipeline fresh.

  • Modularity: Each dynamic table performs a specific step in the pipeline, making it easier to maintain and modify the process.

  • Real-Time Insights: The customer_opportunities_with_sales table provides a real-time view of your customers, their opportunities, and aggregated sales data, facilitating rapid decision-making.

Resources

Join Our Community

Join our newsletter list for occasional updates, products and insights.

Join Our Community

Join our newsletter list for occasional updates, products and insights.

Join Our Community

Join our newsletter list for occasional updates, products and insights.

More from
Data Engineering & Lake

Snowflake Dynamic Tables

Snowflake Dynamic Tables

Snowflake Dynamic Tables

Snowflake Database Change Management (+Git Integration)

Snowflake Database Change Management (+Git Integration)

Snowflake Database Change Management (+Git Integration)

Snowflake Database Change Management (+SQL Templates, +Jinja)

Snowflake Database Change Management (+SQL Templates, +Jinja)

Snowflake Database Change Management (+SQL Templates, +Jinja)

Copyright 2020-24 - All Right Reserved

Copyright 2020-24 - All Right Reserved

Copyright 2020-24 - All Right Reserved