7.4 Practice exercise

Lesson

7.4 Practice exercise

Scenario

The bronze layer contains raw, unprocessed NYC 311 data with inconsistencies, nulls, and missing values. Your analytics team needs clean, standardized data with additional time-based metrics for analysis.

Exercise Requirements

Create a SQL data loader block that will:

  • Input: bronze_nyc311 table

  • Output: silver_nyc311 table with cleaned data and calculated metrics

  • Transformations:

    • Handle null values with appropriate defaults

    • Generate hash keys for dimension table joins

    • Calculate time-based metrics (days since created, days to close, etc.)

    • Add tracking columns (first_seen_date, last_updated_date)

    • Implement Type 1 SCD (Slowly Changing Dimension) logic

Step-by-Step Implementation

Step 1: Create a new analytics pipeline

  1. Navigate to the Pipelines page and begin creating a new pipeline

  2. Choose the type of pipeline that would collect data on a daily schedule

  3. Configure the basic settings of the pipeline

  • Set name to "mage academy nyc311 analytics"

  • Add description: "Learning exercise: analytics pipeline"

  • Add tags: "learning" and "exercise"

  1. Create the pipeline

Step 2: Create SQL Data Loader Block

  1. In your Mage pipeline, click "+ Data loader"

  2. Select "SQL"

  3. Name the block: transform_to_silver_layer

  4. Click "Save and add block"

Step 3: Implement the Silver Layer SQL

  1. Select BigQuery from the target connection dropdown list

  2. Select your profile from the profile dropdown list

  3. Copy and paste the following SQL code:

-- Silver Layer: Type 1 SCD with Time Metrics and Hash Keys

CREATE TABLE IF NOT EXISTS `gcp_project_name.schema.silver_layer_table_name` (
    complaint_id STRING,
    created_date TIMESTAMP,
    agency STRING,
    agency_key STRING,
    complaint_type STRING,
    descriptor STRING,
    complaint_type_key STRING,
    status STRING,
    borough STRING,
    incident_zip STRING,
    location_key STRING,
    incident_address STRING,
    latitude FLOAT64,
    longitude FLOAT64,
    resolution_action_updated_date TIMESTAMP,
    closed_date TIMESTAMP,
    resolution_description STRING,
    loaded_at TIMESTAMP,
    first_seen_date DATE,
    last_updated_date DATE,
    days_since_created INT64,
    days_since_last_action INT64,
    days_to_close INT64,
    is_open BOOLEAN
);

MERGE `gcp_project_name.schema.silver_layer_table_name` AS target
USING (
    SELECT 
        unique_key as complaint_id,
        TIMESTAMP(created_date) as created_date,
        COALESCE(agency, 'Unknown') as agency,
        TO_HEX(SHA256(COALESCE(agency, 'Unknown'))) as agency_key,
        COALESCE(complaint_type, 'Other') as complaint_type,
        COALESCE(descriptor, 'No Description') as descriptor,
        TO_HEX(SHA256(CONCAT(COALESCE(complaint_type, 'Other'), '|', COALESCE(descriptor, 'No Description')))) as complaint_type_key,
        COALESCE(status, 'Unknown') as status,
        COALESCE(borough, 'Unknown') as borough,
        COALESCE(incident_zip, 'Unknown') as incident_zip,
        TO_HEX(SHA256(CONCAT(COALESCE(borough, 'Unknown'), '|', COALESCE(incident_zip, 'Unknown')))) as location_key,
        COALESCE(incident_address, 'Address Not Provided') as incident_address,
        CAST(latitude AS FLOAT64) as latitude,
        CAST(longitude AS FLOAT64) as longitude,
        TIMESTAMP(resolution_action_updated_date) as resolution_action_updated_date,
        TIMESTAMP(closed_date) as closed_date,
        COALESCE(resolution_description, '') as resolution_description,
        TIMESTAMP(loaded_at) as loaded_at,
        DATE_DIFF(CURRENT_DATE(), DATE(created_date), DAY) as days_since_created,
        DATE_DIFF(CURRENT_DATE(), DATE(COALESCE(resolution_action_updated_date, created_date)), DAY) as days_since_last_action,
        CASE 
            WHEN closed_date IS NOT NULL 
            THEN DATE_DIFF(DATE(closed_date), DATE(created_date), DAY)
            ELSE NULL 
        END as days_to_close,
        CASE WHEN UPPER(status) NOT IN ('CLOSED', 'RESOLVED') THEN TRUE ELSE FALSE END as is_open
    FROM `gcp_project_name.schema.bronze_layer_table_name`
    WHERE DATE(loaded_at) = (SELECT MAX(DATE(loaded_at)) FROM `gcp_project_name.schema.bronze_layer_table_name`)
) AS source
ON target.complaint_id = source.complaint_id

WHEN MATCHED THEN UPDATE SET
    agency_key = source.agency_key,
    complaint_type_key = source.complaint_type_key,
    location_key = source.location_key,
    status = source.status,
    resolution_action_updated_date = source.resolution_action_updated_date,
    closed_date = source.closed_date,
    resolution_description = source.resolution_description,
    loaded_at = source.loaded_at,
    last_updated_date = CURRENT_DATE(),
    days_since_created = source.days_since_created,
    days_since_last_action = source.days_since_last_action,
    days_to_close = source.days_to_close,
    is_open = source.is_open

WHEN NOT MATCHED THEN INSERT (
    complaint_id, created_date, agency, agency_key, complaint_type, descriptor, complaint_type_key,
    status, borough, incident_zip, location_key, incident_address, latitude, longitude,
    resolution_action_updated_date, closed_date, resolution_description,
    loaded_at, first_seen_date, last_updated_date,
    days_since_created, days_since_last_action, days_to_close, is_open
) VALUES (
    source.complaint_id, source.created_date, source.agency, source.agency_key,
    source.complaint_type, source.descriptor, source.complaint_type_key,
    source.status, source.borough, source.incident_zip, source.location_key,
    source.incident_address, source.latitude, source.longitude,
    source.resolution_action_updated_date, source.closed_date, source.resolution_description,
    source.loaded_at, CURRENT_DATE(), CURRENT_DATE(),
    source.days_since_created, source.days_since_last_action, source.days_to_close, source.is_open
)

Step 4: Run and Validate

  1. Click "Run code” button to test the connection and get a sample output of the code

  2. Verify the query runs successfully

  3. You can also test the code by running a simple SELECT * query in your BigQuery environment

  4. Check the output:

    • Confirm records were inserted/updated

    • Verify hash keys were generated

    • Review calculated metrics