Build a crypto trading data pipeline with PySpark in Mage Pro

October 1, 2025

TLDR

Build a PySpark pipeline in Mage Pro that fetches live cryptocurrency prices from Binance’s free API, calculates portfolio values using distributed computing, and exports results to Google BigQuery. This is all supported in Mage Pro and doesn’t require a complex Spark configuration. This is a perfect project for crypto portfolio tracking, price monitoring, or building trading dashboards.

Table of contents

  • Introduction

  • Why PySpark for crypto data?

  • Build the pipeline step by step

  • Step 1: Create your pipeline

  • Step 2: Build the data loader

  • Step 3: Calculate price x quantity

  • Step 4: Build the data exporter

  • Conclusion

Introduction

Cryptocurrency markets operate 24/7 across global exchanges, generating massive amounts of price data every second. Whether you're tracking a personal portfolio, building a trading dashboard, or analyzing market trends, you need a system that can handle real-time data efficiently.

Traditional approaches using pandas and CSV files work fine for a handful of assets, but what happens when you want to track 100 cryptocurrencies? Or process historical data spanning months or years? Your analysis slows down, memory errors appear, and what should take seconds starts taking minutes.

This is where PySpark saves the day. Its distributed computing capabilities let you process large datasets efficiently, and Mage Pro makes it incredibly simple to set up. No cluster configuration, no complex YAML files, just two lines of configuration and you're ready to write PySpark code.

Why PySpark for crypto data

You might wonder why we need Spark for cryptocurrency data. After all, fetching prices for 10 coins doesn't seem like "big data." But consider the real-world scenarios:

Portfolio tracking at scale: Professional traders track hundreds of cryptocurrencies across multiple exchanges. Each coin has price data updated every second. Processing this volume of data quickly requires distributed computing.

Historical analysis: Want to analyze price patterns over the last year? That's 8,760 hourly data points per cryptocurrency. For 100 coins, you're processing nearly a million data points. PySpark handles this effortlessly.

Real-time processing: Market conditions change rapidly. Your analysis needs to complete in seconds, not minutes. PySpark's parallel processing ensures fast execution times even as your dataset grows.

Future-proof architecture: Start small with current prices, then easily expand to include trading volume, market cap, order book data, social sentiment, and blockchain metrics—all processed in the same pipeline.

The key advantage of using Mage Pro for PySpark is simplicity. Traditional Spark setups require configuring master nodes, worker nodes, memory allocation, and execution parameters. Mage Pro eliminates all of this complexity. You set two configuration lines and start writing PySpark code immediately.

Build the pipeline step by step


Step 1: Create your pipeline

Let’s set up a pipeline that enables PySpark for your data pipeline.

Create a new batch pipeline in Mage Pro:

  1. Navigate to Pipelines from the left menu

  2. Click "New pipeline"

  3. Select Batch pipeline (not streaming—we'll make it stream-like via triggers)

  4. Name: crypto_realtime_stream

  5. Click "Create new pipeline"

Configure for PySpark - Open metadata.yaml and add:

cache_block_output_in_memory: true
run_pipeline_in_one_process: true

These two lines enable PySpark in your pipeline. Mage Pro handles all Spark session management automatically.

Step 2: Build the data loader

The data loader acts as your connection to live market data, fetching fresh prices on every execution.

Create the data loader block:

  1. Click "Blocks""Loader""Base template (generic)"

  2. Name: fetch crypto prices

  3. Copy the data loader code from the artifact below into the data loader block

  4. Click the run button in the top right part of the block

if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType, TimestampType
import requests
from datetime import datetime

@data_loader
def load_data(*args, **kwargs):
    """
    Fetch recent trades from Binance Spot market
    """
    spark = SparkSession.builder.getOrCreate()
    
    symbols = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT", "ADAUSDT", "MATICUSDT"]
    
    schema = StructType([
        StructField("symbol", StringType(), False),
        StructField("trade_id", LongType(), False),
        StructField("price", DoubleType(), False),
        StructField("quantity", DoubleType(), False),
        StructField("trade_timestamp", TimestampType(), False),
        StructField("buyer_maker", BooleanType(), False),
        StructField("trade_side", StringType(), False),
        StructField("batch_timestamp", TimestampType(), False),
        StructField("batch_id", LongType(), False),
        StructField("exchange", StringType(), False)
    ])
    
    print("=" * 80)
    print("BINANCE SPOT TRADES")
    print("=" * 80)
    
    all_trades = []
    batch_time = datetime.now()
    batch_id = int(batch_time.timestamp() * 1000)
    
    for symbol in symbols:
        try:
            url = "<https://api.binance.us/api/v3/trades>"
            params = {'symbol': symbol, 'limit': 50}
            
            resp = requests.get(url, params=params, timeout=10)
            
            if resp.status_code == 200:
                trades = resp.json()
                
                for trade in trades:
                    price = float(trade['price'])
                    qty = float(trade['qty'])
                    
                    record = (
                        symbol,
                        int(trade['id']),
                        price,
                        qty,
                        datetime.fromtimestamp(int(trade['time']) / 1000),
                        trade['isBuyerMaker'],
                        "SELL" if trade['isBuyerMaker'] else "BUY",
                        batch_time,
                        batch_id,
                        "binance_spot"
                    )
                    
                    all_trades.append(record)
                
                if trades:
                    latest = trades[-1]
                    p = float(latest['price'])
                    q = float(latest['qty'])
                    side = "SELL" if latest['isBuyerMaker'] else "BUY"
                    ts = datetime.fromtimestamp(int(latest['time']) / 1000).strftime("%H:%M:%S")
                    
                    print(f"{ts} | {symbol:8} | {side:4} | ${p:>10.4f} | Qty: {q:>12.6f}")
                
            else:
                print(f"Error {symbol}: {resp.status_code}")
                
        except Exception as e:
            print(f"Error {symbol}: {e}")
    
    df = spark.createDataFrame(all_trades, schema=schema)
    
    print("=" * 80)
    print(f"Batch: {batch_id} | Trades: {df.count()} | Time: {batch_time.strftime('%H:%M:%S')}")
    print("=" * 80)
    
    return df

@test
def test_output(output, *args):
    assert output is not None, 'No output'
    assert output.count() > 0, 'No trades'
    
    for col in ['symbol', 'price', 'quantity', 'trade_side']:
        assert col in output.columns, f'Missing {col}'
    
    print(f"OK: {output.count()} trades")

Step 3: Calculate price x quantity

With raw trade data loaded, the transformer performs the core calculation that turns individual trades into analyzable metrics.

Create the data transformer block:

  1. Click "Blocks""Transformer""Base template (generic)"

  2. Name the block Price x quantity transformation

  3. Copy the data loader code from the artifact below into the data loader block

  4. Click the run button in the top right part of the block

The transformer adds a single calculated field: trade value in USD. This multiplies the price per unit by the quantity traded, giving you the total dollar value of each transaction. For crypto analysis, knowing that someone bought 0.5 Bitcoin isn't as useful as knowing they spent $56,000, the trade value reveals the actual capital flow in the market.

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, round as spark_round

@transformer
def transform(df: DataFrame, *args, **kwargs) -> DataFrame:
    """
    Calculate trade value (price * quantity)
    """
    spark = kwargs.get('spark')
    
    print("Calculating trade values...")
    
    # Add trade_value_usd column
    df = df.withColumn(
        "trade_value_usd",
        spark_round(col("price") * col("quantity"), 2)
    )
    
    # Reorder columns
    result = df.select(
        col("batch_id"),
        col("batch_timestamp"),
        col("symbol"),
        col("trade_id"),
        col("price"),
        col("quantity"),
        col("trade_value_usd"),
        col("trade_side"),
        col("trade_timestamp"),
        col("buyer_maker"),
        col("exchange")
    )
    
    print(f"Processed {result.count()} trades")
    result.show(20, truncate=False)
    
    return result

@test
def test_output(output, *args):
    assert output is not None, 'No output'
    assert 'trade_value_usd' in output.columns, 'Missing trade_value_usd'
    print(f"OK: {output.count()} trades with values")

Step 4: Build the data exporter

The final step writes your processed trade data to BigQuery for long-term storage and SQL analysis.

Create the exporter block:

  1. Click "Blocks""Exporter""Python"

  2. Select "BigQuery" template

  3. Name: export_to_bigquery

  4. Connect it to your transformer block

BigQuery serves as your data warehouse, accumulating trades over time and enabling SQL queries for analysis. The exporter converts your PySpark DataFrame to pandas before writing, this might seem counterintuitive after using distributed computing, but for small frequent batches, the conversion is instantaneous and BigQuery's streaming insert API handles this pattern efficiently.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
    """
    Template for exporting data to a BigQuery warehouse.
    Specify your configuration settings in 'io_config.yaml'.

    Docs: <https://docs.mage.ai/design/data-loading#bigquery>
    """
    df_pandas = df.toPandas()

    table_id = 'your-project.your_dataset.your_table_name'
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
        df_pandas,
        table_id,
        if_exists='replace',  # Specify resolution policy if table name already exists
    )

Conclusion

You've built a production-grade cryptocurrency data pipeline using PySpark in Mage Pro that fetches live trade data from Binance, calculates trade values, and stores results in Google BigQuery. This architecture shows how modern data platforms simplify complex streaming or micro batch workloads, what traditionally required Kafka clusters and extensive DevOps now runs with three code blocks and minimal configuration. Whether building personal trading tools or production financial applications, this foundation handles near real-time data with minimal operational overhead.

Your AI data engineer

Power data, streamline workflows, and scale effortlessly.