Final Project
8.3 Part 2: Build the Fact Table

Scenario
Now that you have clean silver data and dimension tables, you need to create the fact table - the heart of your star schema. This table will store all the complaint events with foreign keys to your dimensions, enabling fast analytical queries.
Exercise Requirements
Create a fact table that:
Stores: All complaint events with metrics
Links: To dimension tables via hash keys
Enables: Fast aggregation and filtering for analytics
Maintains: Historical data with proper SCD handling
Step 1: Create SQL Data Loader Block
Click "Blocks" → "Transformer" → SQL: Base template (generic)”
Name the block:
build_fact_nyc311Connect it to the following blocks blocks:
build_dim_agencybuild_dim_complaint_typebuild_dim_locationtransform_to_silver_layer
Step 2: Implement the Fact Table SQL
-- Fact Table: NYC 311 Complaints CREATE TABLE IF NOT EXISTS `gcp_project.schema_name.table_name` ( complaint_id STRING, created_date TIMESTAMP, agency_key STRING, complaint_type_key STRING, location_key STRING, status STRING, incident_address STRING, latitude FLOAT64, longitude FLOAT64, resolution_action_updated_date TIMESTAMP, closed_date TIMESTAMP, resolution_description STRING, loaded_at TIMESTAMP, first_seen_date DATE, last_updated_date DATE, days_since_created INT64, days_since_last_action INT64, days_to_close INT64, is_open BOOLEAN ); MERGE `gcp_project.schema_name.table_name` AS target USING ( SELECT s.complaint_id, s.created_date, s.agency_key, s.complaint_type_key, s.location_key, s.status, s.incident_address, s.latitude, s.longitude, s.resolution_action_updated_date, s.closed_date, s.resolution_description, s.loaded_at, s.first_seen_date, s.last_updated_date, s.days_since_created, s.days_since_last_action, s.days_to_close, s.is_open FROM `gcp_project.schema_name.table_name` s ) AS source ON target.complaint_id = source.complaint_id WHEN MATCHED THEN UPDATE SET agency_key = source.agency_key, complaint_type_key = source.complaint_type_key, location_key = source.location_key, status = source.status, resolution_action_updated_date = source.resolution_action_updated_date, closed_date = source.closed_date, resolution_description = source.resolution_description, loaded_at = source.loaded_at, last_updated_date = source.last_updated_date, days_since_created = source.days_since_created, days_since_last_action = source.days_since_last_action, days_to_close = source.days_to_close, is_open = source.is_open WHEN NOT MATCHED THEN INSERT ( complaint_id, created_date, agency_key, complaint_type_key, location_key, status, incident_address, latitude, longitude, resolution_action_updated_date, closed_date, resolution_description, loaded_at, first_seen_date, last_updated_date, days_since_created, days_since_last_action, days_to_close, is_open ) VALUES ( source.complaint_id, source.created_date, source.agency_key, source.complaint_type_key, source.location_key, source.status, source.incident_address, source.latitude, source.longitude, source.resolution_action_updated_date, source.closed_date, source.resolution_description, source.loaded_at, source.first_seen_date, source.last_updated_date, source.days_since_created, source.days_since_last_action, source.days_to_close, source.is_open )
-- Fact Table: NYC 311 Complaints CREATE TABLE IF NOT EXISTS `gcp_project.schema_name.table_name` ( complaint_id STRING, created_date TIMESTAMP, agency_key STRING, complaint_type_key STRING, location_key STRING, status STRING, incident_address STRING, latitude FLOAT64, longitude FLOAT64, resolution_action_updated_date TIMESTAMP, closed_date TIMESTAMP, resolution_description STRING, loaded_at TIMESTAMP, first_seen_date DATE, last_updated_date DATE, days_since_created INT64, days_since_last_action INT64, days_to_close INT64, is_open BOOLEAN ); MERGE `gcp_project.schema_name.table_name` AS target USING ( SELECT s.complaint_id, s.created_date, s.agency_key, s.complaint_type_key, s.location_key, s.status, s.incident_address, s.latitude, s.longitude, s.resolution_action_updated_date, s.closed_date, s.resolution_description, s.loaded_at, s.first_seen_date, s.last_updated_date, s.days_since_created, s.days_since_last_action, s.days_to_close, s.is_open FROM `gcp_project.schema_name.table_name` s ) AS source ON target.complaint_id = source.complaint_id WHEN MATCHED THEN UPDATE SET agency_key = source.agency_key, complaint_type_key = source.complaint_type_key, location_key = source.location_key, status = source.status, resolution_action_updated_date = source.resolution_action_updated_date, closed_date = source.closed_date, resolution_description = source.resolution_description, loaded_at = source.loaded_at, last_updated_date = source.last_updated_date, days_since_created = source.days_since_created, days_since_last_action = source.days_since_last_action, days_to_close = source.days_to_close, is_open = source.is_open WHEN NOT MATCHED THEN INSERT ( complaint_id, created_date, agency_key, complaint_type_key, location_key, status, incident_address, latitude, longitude, resolution_action_updated_date, closed_date, resolution_description, loaded_at, first_seen_date, last_updated_date, days_since_created, days_since_last_action, days_to_close, is_open ) VALUES ( source.complaint_id, source.created_date, source.agency_key, source.complaint_type_key, source.location_key, source.status, source.incident_address, source.latitude, source.longitude, source.resolution_action_updated_date, source.closed_date, source.resolution_description, source.loaded_at, source.first_seen_date, source.last_updated_date, source.days_since_created, source.days_since_last_action, source.days_to_close, source.is_open )
-- Fact Table: NYC 311 Complaints CREATE TABLE IF NOT EXISTS `gcp_project.schema_name.table_name` ( complaint_id STRING, created_date TIMESTAMP, agency_key STRING, complaint_type_key STRING, location_key STRING, status STRING, incident_address STRING, latitude FLOAT64, longitude FLOAT64, resolution_action_updated_date TIMESTAMP, closed_date TIMESTAMP, resolution_description STRING, loaded_at TIMESTAMP, first_seen_date DATE, last_updated_date DATE, days_since_created INT64, days_since_last_action INT64, days_to_close INT64, is_open BOOLEAN ); MERGE `gcp_project.schema_name.table_name` AS target USING ( SELECT s.complaint_id, s.created_date, s.agency_key, s.complaint_type_key, s.location_key, s.status, s.incident_address, s.latitude, s.longitude, s.resolution_action_updated_date, s.closed_date, s.resolution_description, s.loaded_at, s.first_seen_date, s.last_updated_date, s.days_since_created, s.days_since_last_action, s.days_to_close, s.is_open FROM `gcp_project.schema_name.table_name` s ) AS source ON target.complaint_id = source.complaint_id WHEN MATCHED THEN UPDATE SET agency_key = source.agency_key, complaint_type_key = source.complaint_type_key, location_key = source.location_key, status = source.status, resolution_action_updated_date = source.resolution_action_updated_date, closed_date = source.closed_date, resolution_description = source.resolution_description, loaded_at = source.loaded_at, last_updated_date = source.last_updated_date, days_since_created = source.days_since_created, days_since_last_action = source.days_since_last_action, days_to_close = source.days_to_close, is_open = source.is_open WHEN NOT MATCHED THEN INSERT ( complaint_id, created_date, agency_key, complaint_type_key, location_key, status, incident_address, latitude, longitude, resolution_action_updated_date, closed_date, resolution_description, loaded_at, first_seen_date, last_updated_date, days_since_created, days_since_last_action, days_to_close, is_open ) VALUES ( source.complaint_id, source.created_date, source.agency_key, source.complaint_type_key, source.location_key, source.status, source.incident_address, source.latitude, source.longitude, source.resolution_action_updated_date, source.closed_date, source.resolution_description, source.loaded_at, source.first_seen_date, source.last_updated_date, source.days_since_created, source.days_since_last_action, source.days_to_close, source.is_open )
Step 3: Run and Validate
Click "Execute with all upstream blocks"
Verify the entire pipeline runs successfully
Validate your star schema:
-- Test Query: Complaint Volume by Agency and Borough SELECT a.agency_name, l.borough, COUNT(*) as complaint_count, AVG(f.days_to_close) as avg_days_to_close, SUM(CASE WHEN f.is_open THEN 1 ELSE 0 END) as open_complaints FROM `gcp_project.schema.fact_table_name` f JOIN `gcp_project.schema.dim_agency` a ON f.agency_key = a.agency_key JOIN `gcp_project.schema.dim_location` l ON f.location_key = l.location_key GROUP BY a.agency_name, l.borough ORDER BY complaint_count DESC LIMIT 10
-- Test Query: Complaint Volume by Agency and Borough SELECT a.agency_name, l.borough, COUNT(*) as complaint_count, AVG(f.days_to_close) as avg_days_to_close, SUM(CASE WHEN f.is_open THEN 1 ELSE 0 END) as open_complaints FROM `gcp_project.schema.fact_table_name` f JOIN `gcp_project.schema.dim_agency` a ON f.agency_key = a.agency_key JOIN `gcp_project.schema.dim_location` l ON f.location_key = l.location_key GROUP BY a.agency_name, l.borough ORDER BY complaint_count DESC LIMIT 10
-- Test Query: Complaint Volume by Agency and Borough SELECT a.agency_name, l.borough, COUNT(*) as complaint_count, AVG(f.days_to_close) as avg_days_to_close, SUM(CASE WHEN f.is_open THEN 1 ELSE 0 END) as open_complaints FROM `gcp_project.schema.fact_table_name` f JOIN `gcp_project.schema.dim_agency` a ON f.agency_key = a.agency_key JOIN `gcp_project.schema.dim_location` l ON f.location_key = l.location_key GROUP BY a.agency_name, l.borough ORDER BY complaint_count DESC LIMIT 10
💡 Run the query above in BigQuery
💡 Run the query above in BigQuery
💡 Run the query above in BigQuery
Step 4: Orchestrate the analytics pipeline
Navigate to the
mage_academy_nyc_311pipeline you createdAdd a new block downstream from the data exporter block
Click "Blocks" → "Exporter" → “Orchestration” → “Trigger Mage pipeline”
Add the code below:
from mage_ai.orchestration.triggers.api import trigger_pipeline if 'data_exporter' not in globals(): from mage_ai.data_preparation.decorators import data_exporter @data_exporter def trigger(*args, **kwargs): """ Trigger another Mage pipeline to run. Documentation: https://docs.mage.ai/orchestration/triggers/trigger-pipeline """ trigger_pipeline( 'mage_academy_nyc311_analytics', # Required: enter the UUID of the pipeline to trigger variables={}, # Optional: runtime variables for the pipeline check_status=False, # Optional: poll and check the status of the triggered pipeline error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception poll_interval=60, # Optional: check the status of triggered pipeline every N seconds poll_timeout=None, # Optional: raise an exception after N seconds verbose=True, # Optional: print status of triggered pipeline run )
from mage_ai.orchestration.triggers.api import trigger_pipeline if 'data_exporter' not in globals(): from mage_ai.data_preparation.decorators import data_exporter @data_exporter def trigger(*args, **kwargs): """ Trigger another Mage pipeline to run. Documentation: https://docs.mage.ai/orchestration/triggers/trigger-pipeline """ trigger_pipeline( 'mage_academy_nyc311_analytics', # Required: enter the UUID of the pipeline to trigger variables={}, # Optional: runtime variables for the pipeline check_status=False, # Optional: poll and check the status of the triggered pipeline error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception poll_interval=60, # Optional: check the status of triggered pipeline every N seconds poll_timeout=None, # Optional: raise an exception after N seconds verbose=True, # Optional: print status of triggered pipeline run )
from mage_ai.orchestration.triggers.api import trigger_pipeline if 'data_exporter' not in globals(): from mage_ai.data_preparation.decorators import data_exporter @data_exporter def trigger(*args, **kwargs): """ Trigger another Mage pipeline to run. Documentation: https://docs.mage.ai/orchestration/triggers/trigger-pipeline """ trigger_pipeline( 'mage_academy_nyc311_analytics', # Required: enter the UUID of the pipeline to trigger variables={}, # Optional: runtime variables for the pipeline check_status=False, # Optional: poll and check the status of the triggered pipeline error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception poll_interval=60, # Optional: check the status of triggered pipeline every N seconds poll_timeout=None, # Optional: raise an exception after N seconds verbose=True, # Optional: print status of triggered pipeline run )
Learning Objectives
After completing the fact table exercise, you should understand:
✅ Fact table design in a star schema
✅ Storing metrics and measures in fact tables
✅ Using foreign keys to link dimensions
✅ Implementing incremental fact table loads
✅ Building analytical queries across the star schema
✅ Orchestrating a pipeline run from another pipeline