8.3 Part 2: Build the Fact Table

Final Project

8.3 Part 2: Build the Fact Table

Scenario

Now that you have clean silver data and dimension tables, you need to create the fact table - the heart of your star schema. This table will store all the complaint events with foreign keys to your dimensions, enabling fast analytical queries.

Exercise Requirements

Create a fact table that:

  • Stores: All complaint events with metrics

  • Links: To dimension tables via hash keys

  • Enables: Fast aggregation and filtering for analytics

  • Maintains: Historical data with proper SCD handling

Step 1: Create SQL Data Loader Block

  1. Click "Blocks""Transformer" SQL: Base template (generic)”

  2. Name the block: build_fact_nyc311

  3. Connect it to the following blocks blocks:

    • build_dim_agency

    • build_dim_complaint_type

    • build_dim_location

    • transform_to_silver_layer

Step 2: Implement the Fact Table SQL

-- Fact Table: NYC 311 Complaints

CREATE TABLE IF NOT EXISTS `gcp_project.schema_name.table_name` (
    complaint_id STRING,
    created_date TIMESTAMP,
    agency_key STRING,
    complaint_type_key STRING,
    location_key STRING,
    status STRING,
    incident_address STRING,
    latitude FLOAT64,
    longitude FLOAT64,
    resolution_action_updated_date TIMESTAMP,
    closed_date TIMESTAMP,
    resolution_description STRING,
    loaded_at TIMESTAMP,
    first_seen_date DATE,
    last_updated_date DATE,
    days_since_created INT64,
    days_since_last_action INT64,
    days_to_close INT64,
    is_open BOOLEAN
);

MERGE `gcp_project.schema_name.table_name` AS target
USING (
    SELECT 
        s.complaint_id,
        s.created_date,
        s.agency_key,
        s.complaint_type_key,
        s.location_key,
        s.status,
        s.incident_address,
        s.latitude,
        s.longitude,
        s.resolution_action_updated_date,
        s.closed_date,
        s.resolution_description,
        s.loaded_at,
        s.first_seen_date,
        s.last_updated_date,
        s.days_since_created,
        s.days_since_last_action,
        s.days_to_close,
        s.is_open
    FROM `gcp_project.schema_name.table_name` s
) AS source
ON target.complaint_id = source.complaint_id

WHEN MATCHED THEN UPDATE SET
    agency_key = source.agency_key,
    complaint_type_key = source.complaint_type_key,
    location_key = source.location_key,
    status = source.status,
    resolution_action_updated_date = source.resolution_action_updated_date,
    closed_date = source.closed_date,
    resolution_description = source.resolution_description,
    loaded_at = source.loaded_at,
    last_updated_date = source.last_updated_date,
    days_since_created = source.days_since_created,
    days_since_last_action = source.days_since_last_action,
    days_to_close = source.days_to_close,
    is_open = source.is_open

WHEN NOT MATCHED THEN INSERT (
    complaint_id, created_date, agency_key, complaint_type_key, location_key,
    status, incident_address, latitude, longitude,
    resolution_action_updated_date, closed_date, resolution_description,
    loaded_at, first_seen_date, last_updated_date,
    days_since_created, days_since_last_action, days_to_close, is_open
) VALUES (
    source.complaint_id, source.created_date, source.agency_key, 
    source.complaint_type_key, source.location_key,
    source.status, source.incident_address, source.latitude, source.longitude,
    source.resolution_action_updated_date, source.closed_date, source.resolution_description,
    source.loaded_at, source.first_seen_date, source.last_updated_date,
    source.days_since_created, source.days_since_last_action, source.days_to_close, source.is_open
)

Step 3: Run and Validate

  1. Click "Execute with all upstream blocks"

  2. Verify the entire pipeline runs successfully

  3. Validate your star schema:

-- Test Query: Complaint Volume by Agency and Borough
SELECT 
    a.agency_name,
    l.borough,
    COUNT(*) as complaint_count,
    AVG(f.days_to_close) as avg_days_to_close,
    SUM(CASE WHEN f.is_open THEN 1 ELSE 0 END) as open_complaints
FROM `gcp_project.schema.fact_table_name` f
JOIN `gcp_project.schema.dim_agency` a 
    ON f.agency_key = a.agency_key
JOIN `gcp_project.schema.dim_location` l 
    ON f.location_key = l.location_key
GROUP BY a.agency_name, l.borough
ORDER BY complaint_count DESC
LIMIT 10

💡 Run the query above in BigQuery

Step 4: Orchestrate the analytics pipeline

  1. Navigate to the mage_academy_nyc_311 pipeline you created

  2. Add a new block downstream from the data exporter block

  3. Click "Blocks""Exporter"“Orchestration”“Trigger Mage pipeline”

  4. Add the code below:


    from mage_ai.orchestration.triggers.api import trigger_pipeline
    if 'data_exporter' not in globals():
        from mage_ai.data_preparation.decorators import data_exporter
    
    
    @data_exporter
    def trigger(*args, **kwargs):
        """
        Trigger another Mage pipeline to run.
    
        Documentation: https://docs.mage.ai/orchestration/triggers/trigger-pipeline
        """
    
        trigger_pipeline(
            'mage_academy_nyc311_analytics',        # Required: enter the UUID of the pipeline to trigger
            variables={},           # Optional: runtime variables for the pipeline
            check_status=False,     # Optional: poll and check the status of the triggered pipeline
            error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception
            poll_interval=60,       # Optional: check the status of triggered pipeline every N seconds
            poll_timeout=None,      # Optional: raise an exception after N seconds
            verbose=True,           # Optional: print status of triggered pipeline run
        )


Learning Objectives

After completing the fact table exercise, you should understand:

  • ✅ Fact table design in a star schema

  • ✅ Storing metrics and measures in fact tables

  • ✅ Using foreign keys to link dimensions

  • ✅ Implementing incremental fact table loads

  • ✅ Building analytical queries across the star schema

  • ✅ Orchestrating a pipeline run from another pipeline