Advanced Data Warehousing

Jun 30, 2024

Snowflake Trail For Obervability (+Logs, +Events, +Traces)

Snowflake's observability framework, which includes logging, tracing, and events, is a powerful toolkit for understanding and optimizing your data pipelines. It provides detailed insights into how your code interacts with data and how data flows through your applications.

This comprehensive system leverages the OpenTelemetry standard, a vendor-neutral approach to instrumenting, generating, collecting, and exporting telemetry data (logs, metrics, and traces) for analysis and visualization.

Telemetry Instrumentation Hierarchy



Logs: Capture discrete events within your Snowflake environment, providing a detailed, textual record of what happened at specific points in time. E.g. 

logger.info(f"Processed {customer_count} customer records.")
logger.warning("High customer count detected.")

Traces: Record the flow of execution within your code, including timing and dependencies between operations. Traces are a sequence of events that belong to a single transaction.

Spans: A span is the basic unit of work in a trace. It represents a single operation within a trace and can have a parent-child relationship with other spans. In your code, you're creating spans using Snowflake's telemetry module: e.g. 

telemetry.set_span_attribute("process_step", "fetch_order_details")

Trace Events: Trace events are discrete events that occur within a span. They provide additional context about what happened during the span's lifetime. In your code, you're adding events to spans: e.g. 

telemetry.add_event("order_processed", {"order_id": order_id, "status": "SUCCESS"})

Attributes: Attributes are key-value pairs that provide additional context to spans or logs. They help in filtering and analyzing telemetry data: e.g. 

telemetry.set_span_attribute("order_id", order_id)

Events: Structured data points emitted by your code, providing context and additional details about logs and traces.

Snowflake Trail

SnowTrail is Snowflake's observability framework that provides detailed logging and tracing capabilities. It's the umbrella of features designed to help users understand and optimize query performance, troubleshoot issues, and gain insights into their Snowflake usage. 

This includes native APIs for Python, Java, JavaScript, Scala, and Snowflake Scripting to emit, capture and analyze telemetry data in snowflake. 

Practical Example: 

Let’s illustrate the power of SnowTrail and observability using a real-world example from the OrdersDB data model (setup the model in your practice environment if you haven’t done so already). 

Next, before we dive into the code, let's set up the necessary components:

-- Create a dedicated database for observability
CREATE OR REPLACE DATABASE observability_db;

-- Create an event table to store logs and traces
CREATE OR REPLACE EVENT TABLE observability_db.public.observability_event_table;

-- Associate the event table with your account (requires ACCOUNTADMIN role)
ALTER ACCOUNT SET EVENT_TABLE = observability_db.public.observability_event_table;

-- Set log level to INFO
ALTER SESSION SET LOG_LEVEL = INFO;

-- Enable Tracing 
ALTER SESSION SET TRACE_LEVEL = ALWAYS;

Important Note: The Event Table created above is a special type of table designed to store logging and tracing data generated within your Snowflake environment. It has a predefined set of columns to capture various information, such as timestamps, event types, log messages, and trace details. By associating an event table with your Snowflake account, you can collect and centrally store all logs and traces, making it easier to analyze, troubleshoot, and monitor the health and performance of your data pipelines and applications.

Logging Customer Counts

This Python UDF logs a message indicating the number of customer records processed:

-- Create a Snowpark Python function to log customer counts
CREATE OR REPLACE FUNCTION log_customer_counts()
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'run'
AS $$
import logging
import random

# Create a logger instance with a specific name
logger = logging.getLogger("salesdb.custs.customer_processing_logger")

def run():
    # Simulate processing customer records
    customer_count = random.randint(10, 100) 
    logger.info(f"Processed {customer_count} customer records.")

    # Simulate different logging scenarios
    if customer_count > 50:
        logger.warning("High customer count detected.") 
    elif customer_count < 20:
        logger.debug("Low customer count.")              
    else:
        logger.info("Normal customer count.")
    if customer_count == 0:
        logger.error("No customers found!")

    return "SUCCESS"
$$;

To execute it:

select log_customer_counts();

Querying Logs

You can query the event table to retrieve the logged messages:

-- Query for all types of events in the event table
select * from observability_db.public.observability_event_table;

-- Query specifically for logs and extract relevant information
SELECT
    TIMESTAMP AS time,
    RESOURCE_ATTRIBUTES['snow.executable.name'] as executable,
    RECORD['severity_text'] AS severity,
    VALUE AS message
FROM
    observability_db.public.observability_event_table
WHERE
    RECORD_TYPE = 'LOG';

Sample results:

Tracing the process_order_with_trace Procedure

Let's create and use a process_order_with_trace stored procedure to demonstrate tracing. 

CREATE OR REPLACE PROCEDURE observability_db.public.process_order_with_trace(order_id INT)
RETURNS VARCHAR
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('snowflake-snowpark-python', 'snowflake-telemetry-python')
HANDLER = 'process_order_with_trace'
AS $$
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum
from snowflake import telemetry
import json

def process_order_with_trace(session: Session, order_id: int):
    #Fully qualify the table names within the SP 
    sales_order_table = "OrdersDB.public.sales_order"
    customer_table = "OrdersDB.public.customer"
    sales_order_item_table = "OrdersDB.public.sales_order_item"
    observability_event_table = "observability_db.public.observability_event_table"
    # Add order_id as a span attribute
    telemetry.set_span_attribute("order_id", order_id)

    # Fetch order details
    telemetry.set_span_attribute("process_step", "fetch_order_details") 
    order_df = session.table(sales_order_table).where(f"order_id = {order_id}")
    order_list = order_df.collect() #Collect results from order_df

    # If order doesn't exist, log an event and return
    if len(order_list) == 0: #Check if the query returned results
        telemetry.add_event("order_not_found", {"order_id": order_id})
        return "Order not found."

    order = order_list[0] #Extract the first row as dictionary 
    

    # Fetch customer details
    telemetry.set_span_attribute("process_step", "fetch_customer_details")
    customer_id = order["CUSTOMER_ID"]
    customer_df = session.table(customer_table).where(f"customer_id = {customer_id}")
    customer = customer_df.collect()[0] if customer_df.count() > 0 else None
    telemetry.add_event("fetch_customer_details", {"order_id": order_id, "status": "SUCCESS"})

    # Calculate total using snowpark functions. Do not iterate over the columns
    telemetry.set_span_attribute("process_step", "calculate_total") 
    order_price = order_df.select(sum(c
-
ol("order_price").cast("float")).alias("order_price")).collect()[0]["ORDER_PRICE"] 
    telemetry.set_span_attribute("order_price", order_price) 
    telemetry.add_event("calculate_price", {"order_id": order_id, "status": "SUCCESS"})

    # Update order status
    telemetry.set_span_attribute("process_step", "update_order_status")
    # Create a dictionary with the new order status
    updated_order = {"ORDER_ID": order["ORDER_ID"], "ORDER_STATUS": 'Shipped'}
    # Update the sales_order table directly
    session.table(sales_order_table).filter(col("ORDER_ID") == order_id).update(updated_order)
    telemetry.add_event("updated_order_status", {"order_id": order_id, "status": "SUCCESS"})

    # Order processed successfully
    telemetry.add_event("order_processed", {"order_id": order_id, "status": "SUCCESS"})



    # Log the high-value order information (convert numeric values to strings)
    log_message = f"High-value order placed: Order ID={str(order['ORDER_ID'])}, Customer ID={str(order['CUSTOMER_ID'])}, Total Amount={str(order_price)}"  # Use order_price here


    return "Order processed successfully."
$$;

To execute it:

CALL process_order_with_trace(5400003);

The CALL process_order_with_trace(5400003); statement triggers the stored procedure for a specific sales order (ID 5400003). This procedure not only processes the order by fetching details, checking inventory, calculating the total, and updating the status, but also incorporates tracing using Snowflake's telemetry framework. 

Query for Traces and Span Events:

SQL

-- Query for all types of events in the event table
select * from observability_db.public.observability_event_table;

-- Query for Span Events
SELECT
    RECORD_ATTRIBUTES['order_id']::INT AS order_id,
    RECORD['name']::VARCHAR AS span_name,
    TIMESTAMP AS start_time,
    LEAD(TIMESTAMP) OVER (ORDER BY TIMESTAMP) AS end_time,
    DATEDIFF('MILLISECOND', TIMESTAMP, LEAD(TIMESTAMP) OVER (ORDER BY TIMESTAMP)) AS duration_ms
FROM observability_db.public.observability_event_table
WHERE RECORD_TYPE = 'SPAN_EVENT'
  AND RESOURCE_ATTRIBUTES['snow.executable.name']::VARCHAR = 'PROCESS_ORDER_WITH_TRACE(ORDER_ID NUMBER):VARCHAR(16777216)'
ORDER BY TIMESTAMP;

Results: 

Using a simple streamlit app (code not included in this tutorial) to visualize the order flow through the stages: 

Conclustion & Resources

As we've seen, Snowflake's observability framework—comprising logging, tracing, and events—is a powerful toolkit for understanding and optimizing your data pipelines. By leveraging these capabilities, you can gain deep insights into how your code interacts with data, identify performance bottlenecks, and proactively detect and resolve issues. This level of observability is essential for building reliable, efficient, and scalable data applications.

To delve further into Snowflake observability and explore advanced techniques, be sure to check out these additional resources:

Additionally, consider exploring third-party observability platforms like DataDog and Metaplane, which offer seamless integration with Snowflake and provide additional features for monitoring, alerting, and visualization.

Join Our Community

Join our newsletter list for occasional updates, products and insights.

Join Our Community

Join our newsletter list for occasional updates, products and insights.

Join Our Community

Join our newsletter list for occasional updates, products and insights.

More from
Advanced Data Warehousing

Snowflake Data Quality Metrics/Functions

Snowflake Data Quality Metrics/Functions

Snowflake Data Quality Metrics/Functions

Snowflake Higher Order Functions (HoF)

Snowflake Higher Order Functions (HoF)

Snowflake Higher Order Functions (HoF)

Snowflake Data Classification & Tagging

Snowflake Data Classification & Tagging

Snowflake Data Classification & Tagging

Snowflake ASOF JOIN

Snowflake ASOF JOIN

Snowflake ASOF JOIN

Snowflake Soundex & Fuzzy Matching

Snowflake Soundex & Fuzzy Matching

Snowflake Soundex & Fuzzy Matching

Snowflake Aggregation Policies

Snowflake Aggregation Policies

Snowflake Aggregation Policies

Snowflake Projection Policies

Snowflake Projection Policies

Snowflake Projection Policies

Snowflake JMeter Load & Concurrency Test

Snowflake JMeter Load & Concurrency Test

Snowflake JMeter Load & Concurrency Test

Snowflake Memoizable Functions

Snowflake Memoizable Functions

Snowflake Memoizable Functions

Snowflake Trail For Obervability (+Logs, +Events, +Traces)

Snowflake Trail For Obervability (+Logs, +Events, +Traces)

Snowflake Trail For Obervability (+Logs, +Events, +Traces)

Copyright 2020-24 - All Right Reserved

Copyright 2020-24 - All Right Reserved

Copyright 2020-24 - All Right Reserved