Building real-time crypto trading pipelines with Kafka and Mage Pro

Building real-time crypto trading pipelines with Kafka and Mage Pro

Mage Pro

Your AI data engineer

Share on LinkedIn

June 25, 2025

TLDR:

We'll build a complete real-time data pipeline that captures live cryptocurrency trading data from Binance, process it through Apache Kafka, and transforms it using Mage Pro for analytics and storage in BigQuery. This demonstrates how modern streaming architectures handle high-frequency financial data with sub-second latency and zero message loss.

Table of Contents

  • Introduction

  • Understanding Kafka for streaming pipelines

  • Setting up your Kafka instance

  • Building the data producer

  • Configuring Mage Pro for Kafka streaming

  • Implementing real-time transformations

  • Exporting to BigQuery

  • Performance results and lessons learned

  • Conclusion

Introduction

Processing data in real time is essential for financial applications, especially when dealing with high frequency stock or crypto trading data. Batch processing falls short when data needs to be processed in milliseconds, a requirement for trading decisions and market analysis.

In this article, we’ll build a streaming pipeline that processes live crypto trading data using Apache Kafka and Mage Pro. The pipeline we’ll build will provide immediate insights into market movements, trading pressure and price dynamics.

This architecture demonstrates key streaming patterns that apply beyond financial data. It’s applicable to use cases such as IoT sensor readings, application logs, and user activity tracking.

Understanding Kafka for streaming pipelines

Kafka acts as a distributed streaming platform that handles continuous data flows. This is a perfect method for handling streaming crypto data and creates several advantages:

Event ordering: Trades are processed in chronological order, preserving market timing Durability: No data loss even during system failures or network interruptions

Scalability: Handle thousands of trades per second with horizontal scaling Replayability: Reprocess historical data for backtesting or debugging

When a trade occurs on Binance, the process follows this flow:

  1. Data capture: WebSocket connection receives trade events in real-time

  2. Enrichment: Producer adds calculated fields like trade value and processing timestamps

  3. Publishing: Enhanced trade data is published to Kafka topics with symbol-based partitioning

  4. Processing: Mage Pro consumes batches of trades for transformation and analysis

  5. Storage: Processed data flows to BigQuery for analytics and reporting

This approach separates data capture from processing, allowing each component to scale independently and handle different workloads efficiently.

Setting up your Kafka instance

Kafka serves as the backbone for real-time streaming pipelines. Whether you're using Confluent Cloud, AWS MSK, or a local installation, the setup process follows these key steps:

Step 1: Choose your Kafka deployment

Choose your infrastructure where you will host your Kafka setup:

  • Confluent Cloud: Fully managed with enterprise features and global availability

  • AWS MSK: Integrated with AWS ecosystem, VPC networking, and IAM security

  • Azure Event Hubs: Native Azure Kafka service with automatic scaling

  • Google Cloud Pub/Sub: Google's messaging service with Kafka compatibility

  • Self-managed: Full control over configuration, infrastructure, and costs

Step 2: Configure essential Kafka settings

Regardless of your deployment method, ensure these configurations:

  • Core Kafka Configuration: Set replication factor, partitions, and retention policies based on your fault tolerance and throughput requirements.

  • Security Settings: Enable authentication, authorization, and encryption to protect your data streams.

  • Performance Optimization: Configure batching, compression, and acknowledgment settings for optimal throughput and reliability.

Step 3: Create your topics and access credentials

  • Topic Creation: Create your main topics with appropriate partitions and replication factors.

  • Access Management: Generate authentication credentials and configure ACLs for proper access control.

  • Connectivity Testing: Verify your setup with simple producer/consumer tests before deploying applications.

Step 4: Verify connectivity

Test your Kafka setup with a simple producer/consumer to ensure:

  • Network connectivity is established

  • Authentication credentials work correctly

  • Topics are accessible from your application

  • Messages flow end-to-end without errors

This foundation supports any streaming use case, from financial data to IoT sensors to application logs.

Building the websocket producer

Real-time crypto data stream

For this pipeline, we’ll build a Python WebSocket producer connecting to Binance’s streaming API to capture streaming crypto trading data. The WebSocket producer handles several critical responsibilities:

Connection management: Maintains persistent WebSocket connections with automatic reconnection on failures Data enrichment: Adds calculated fields and metadata for downstream processing

Error handling: Graceful handling of network issues, API disconnections, and rate limits Performance optimization: Efficient message batching and JSON serialization for Kafka delivery

Here's the core WebSocket producer implementation:

#!/usr/bin/env python3
import json
import asyncio
import websockets
from kafka import KafkaProducer
import logging
from datetime import datetime
import sys

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = "localhost:9094"
KAFKA_TOPIC = "binance-crypto-trades"

# List of symbols you want to subscribe to
SYMBOLS = ["btcusdt", "ethusdt", "bnbusdt", "adausdt", "maticusdt", "solusdt"]

# Create a combined stream URL for Binance futures
STREAMS = "/".join([f"{symbol}@trade" for symbol in SYMBOLS])
BINANCE_WS_URL = f"wss://fstream.binance.com/stream?streams={STREAMS}"

# Initialize the Kafka producer
try:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        acks='all',
        retries=3,
        batch_size=16384,
        linger_ms=10
    )
    logger.info(f"✅ Kafka producer initialized for {KAFKA_BOOTSTRAP_SERVERS}")
except Exception as e:
    logger.error(f"❌ Failed to initialize Kafka producer: {e}")
    sys.exit(1)

async def process_websocket_messages():
    while True:
        try:
            async with websockets.connect(BINANCE_WS_URL) as websocket:
                print(f"🚀 Connected to Binance WebSocket for symbols: {', '.join(SYMBOLS).upper()}")
                logger.info(f"📊 Publishing to Kafka topic: {KAFKA_TOPIC}")
                print("📈 Streaming live trade data to Kafka...")
                print("🛑 Press Ctrl+C to stop")
                print("=" * 80)

                while True:
                    message = await websocket.recv()
                    data = json.loads(message)

                    stream = data.get("stream")
                    trade_data = data.get("data", {})

                    if not stream or not trade_data:
                        print("⚠️  Invalid message format received.")
                        continue

                    symbol = trade_data.get("s")
                    price = float(trade_data.get("p", 0))
                    quantity = float(trade_data.get("q", 0))
                    timestamp = trade_data.get("T")
                    buyer_maker = trade_data.get("m")
                    trade_id = trade_data.get("t")
                    
                    # Calculate trade value
                    trade_value = price * quantity
                    
                    # Add additional useful fields
                    transformed_data = {
                        "symbol": symbol,
                        "price": price,
                        "quantity": quantity,
                        "timestamp": timestamp,
                        "buyer_maker": buyer_maker,
                        "trade_id": trade_id,
                        "stream": stream,
                        "trade_value_usd": trade_value,
                        "processing_time": datetime.now().isoformat(),
                        "exchange": "binance",
                        "data_type": "trade"
                    }

                    try:
                        # Publish to Kafka using symbol as key for partitioning
                        future = producer.send(
                            topic=KAFKA_TOPIC,
                            key=symbol,
                            value=transformed_data
                        )
                        
                        # Format timestamp for display
                        trade_time = datetime.fromtimestamp(timestamp / 1000).strftime("%H:%M:%S")
                        trade_side = "SELL" if buyer_maker else "BUY"
                        
                        print(f"✅ {trade_time} | {symbol:8} | {trade_side:4} | ${price:>10.4f} | Qty: {quantity:>12.6f} | Value: ${trade_value:>10.2f}")
                        
                    except Exception as e:
                        logger.error(f"❌ Failed to publish to Kafka: {e}")

        except websockets.exceptions.ConnectionClosed:
            print("🔄 WebSocket connection closed. Reconnecting in 5 seconds...")
            await asyncio.sleep(5)
        except Exception as e:
            print(f"❌ Error: {e}. Reconnecting in 5 seconds...")
            await asyncio.sleep(5)

def cleanup():
    """Cleanup function to close Kafka producer"""
    try:
        if producer:
            producer.flush()
            producer.close()
            logger.info("✅ Kafka producer closed successfully")
    except Exception as e:
        logger.error(f"❌ Error closing Kafka producer: {e}")

if __name__ == '__main__':
    try:
        print("🚀 Starting Binance WebSocket to Kafka Producer")
        print(f"📊 Kafka Topic: {KAFKA_TOPIC}")
        print(f"📈 Monitoring symbols: {', '.join(SYMBOLS).upper()}")
        print(f"🔗 Kafka servers: {KAFKA_BOOTSTRAP_SERVERS}")
        print("⚡ Starting stream...")
        print("=" * 80)
        
        asyncio.run(process_websocket_messages())
        
    except KeyboardInterrupt:
        print("\\n" + "=" * 80)
        print("🛑 Shutting down gracefully...")
        cleanup()
        print("👋 Stream stopped. Goodbye!")
        sys.exit(0)
    except Exception as e:
        logger.error(f"💥 Fatal error: {e}")
        cleanup()
        sys.exit(1)

Key producer patterns

Reliability: Using acks='all' ensures no data loss during network interruptions Performance: Batching with batch_size=16384 optimizes throughput while maintaining low latency Monitoring: Adding processing timestamps enables end-to-end latency tracking Schema consistency: Standardized message structure simplifies downstream processing

The producer enriches raw trading data with calculated fields like trade_value_usd and processing_time, making downstream processing more efficient and reducing computational load in the streaming layer.

Configuring Mage Pro for Kafka streaming

Why Mage Pro for stream processing?

Mage Pro excels at streaming workloads because it provides:

  • Visual pipeline development with code-based transformations

  • Built-in Kafka connectivity with minimal configuration overhead

  • Real-time monitoring and comprehensive error handling

  • Seamless integration with data warehouses like BigQuery and Snowflake

  • Version control and deployment management for production pipelines

Mage data loader configuration

Add this code to your Mage Kafka data loader:

connector_type: kafka
bootstrap_server: "your-kafka-endpoint:9092"
topic: crypto-trades
consumer_group: mage_consumer_earliest
include_metadata: false
batch_size: 100
timeout_ms: 5000
auto_offset_reset: "earliest"
api_version: "2.8.0"

# Uncomment the config below to use SSL config
# security_protocol: "SSL"
# ssl_config:
#   cafile: "CARoot.pem"
#   certfile: "certificate.pem"
#   keyfile: "key.pem"
#   password: password
#   check_hostname: true

# Uncomment the config below to use SASL_SSL config
# security_protocol: "SASL_SSL"
# sasl_config:
#   mechanism: "PLAIN"
#   username: username
#   password: password

# Uncomment the config below to use protobuf schema to deserialize message
# serde_config:
#   serialization_method: PROTOBUF
#   schema_classpath: "path.to.schema.SchemaClass"

Configuration details

  • bootstrap_server: Your Kafka cluster endpoint (Confluent Cloud, AWS MSK, etc.)

  • auto_offset_reset: "earliest" to read all historical messages for testing

  • consumer_group: Unique name to track consumption progress across restarts

  • batch_size: Number of messages per batch for efficient processing

  • timeout_ms: Consumer timeout for batch processing and error handling

After finishing the configuration of the data loader, add the transformation block that will further process your streaming data.

Implementing real-time transformations

The transformation block is responsible for processing the data according to business rules/logic for our use case. It calculates trading metrics, directional pressure, and market insights in real-time.

This block:

  • Processes messages in chronological order to maintain market timing

  • Calculates buy/sell pressure to identify market sentiment

  • Tracks cumulative metrics per cryptocurrency symbol

  • Adds derived fields for downstream analytics

from typing import Dict, List

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(messages: List[Dict], *args, **kwargs):
   """
   Calculate directional trade values to track buying vs selling pressure
   
   Args:
       messages: List of messages in the stream.
   Returns:
       Transformed messages with buy/sell pressure calculations and price totals
   """
   
   # Sort messages by timestamp and symbol to maintain order
   messages_sorted = sorted(messages, key=lambda x: (x['symbol'], x['timestamp']))
   
   # Track running totals per symbol
   symbol_totals = {}
   
   for message in messages_sorted:
       symbol = message['symbol']
       price = message['price']
       quantity = message['quantity']
       
       # Initialize symbol tracking if not exists
       if symbol not in symbol_totals:
           symbol_totals[symbol] = {'cumulative_price': 0, 'trade_count': 0}
       
       # Basic trade value (price × quantity)
       trade_value_usd = price * quantity
       message['trade_value_usd'] = trade_value_usd
       
       # Directional values based on trade side
       # buyer_maker = True means market sell order (bearish)
       # buyer_maker = False means market buy order (bullish)
       
       if message['buyer_maker']:
           # This is a sell order
           message['buy_value'] = 0
           message['sell_value'] = trade_value_usd
           message['trade_side'] = 'SELL'
       else:
           # This is a buy order
           message['buy_value'] = trade_value_usd
           message['sell_value'] = 0
           message['trade_side'] = 'BUY'
       
       # Net buying pressure (positive = more buying, negative = more selling)
       message['net_buy_pressure'] = message['buy_value'] - message['sell_value']
       
       # Cumulative price totals
       symbol_totals[symbol]['cumulative_price'] += price
       symbol_totals[symbol]['trade_count'] += 1
       
       message['cumulative_price_total'] = symbol_totals[symbol]['cumulative_price']
       message['average_price'] = symbol_totals[symbol]['cumulative_price'] / symbol_totals[symbol]['trade_count']
   
   print(f"Processed {len(messages)} trades with directional values and price totals")
   
   return messages

Transformation logic explained

Market direction analysis: The transformer distinguishes between market buy orders (bullish) and market sell orders (bearish) using the buyer_maker field.

Pressure calculations: Net buying pressure helps identify whether more money is flowing into (positive) or out of (negative) specific cryptocurrencies.

Running averages: Cumulative price tracking enables moving average calculations and trend analysis.

Performance optimization: Sorting by timestamp ensures correct chronological processing while maintaining processing efficiency.

Exporting to BigQuery

Mage Pro offers many out of the box configurations for exporting your data to a target destination be that Snowflake, BigQuery, or PostgreSQL. For this specific pipeline we’ll be exporting the data to Google BigQuery with a few simple lines of YAML code. In fact, you can just choose the BigQuery data exporter block and just provide the YAML configuration in the code below.

BigQuery configuration

connector_type: bigquery
profile: default
config:
  table_id

Conclusion

Kafka and Mage Pro create a powerful combination for real-time data pipelines that goes far beyond cryptocurrency trading. Whether you're processing IoT sensor readings, application logs, user activity streams, or financial market data, this architecture provides the reliability, scalability, and developer productivity needed for production streaming applications.

Are you ready to build your own streaming data pipelines in Mage Pro? Connect your managed or local Kafka service to Mage Pro and begin with this simple crypto proof of concept. The patterns demonstrated here will scale to handle any streaming workload your business requires.

Want to implement a real-time data pipeline using the methods discussed above? Schedule a free demo with Mage to get started today.

Your AI data engineer

Power data, streamline workflows, and scale effortlessly.