TLDR:
We'll build a complete real-time data pipeline that captures live cryptocurrency trading data from Binance, process it through Apache Kafka, and transforms it using Mage Pro for analytics and storage in BigQuery. This demonstrates how modern streaming architectures handle high-frequency financial data with sub-second latency and zero message loss.
Table of Contents
Introduction
Understanding Kafka for streaming pipelines
Setting up your Kafka instance
Building the data producer
Configuring Mage Pro for Kafka streaming
Implementing real-time transformations
Exporting to BigQuery
Performance results and lessons learned
Conclusion
Introduction
Processing data in real time is essential for financial applications, especially when dealing with high frequency stock or crypto trading data. Batch processing falls short when data needs to be processed in milliseconds, a requirement for trading decisions and market analysis.
In this article, we’ll build a streaming pipeline that processes live crypto trading data using Apache Kafka and Mage Pro. The pipeline we’ll build will provide immediate insights into market movements, trading pressure and price dynamics.
This architecture demonstrates key streaming patterns that apply beyond financial data. It’s applicable to use cases such as IoT sensor readings, application logs, and user activity tracking.

Understanding Kafka for streaming pipelines
Kafka acts as a distributed streaming platform that handles continuous data flows. This is a perfect method for handling streaming crypto data and creates several advantages:
Event ordering: Trades are processed in chronological order, preserving market timing Durability: No data loss even during system failures or network interruptions
Scalability: Handle thousands of trades per second with horizontal scaling Replayability: Reprocess historical data for backtesting or debugging
When a trade occurs on Binance, the process follows this flow:
Data capture: WebSocket connection receives trade events in real-time
Enrichment: Producer adds calculated fields like trade value and processing timestamps
Publishing: Enhanced trade data is published to Kafka topics with symbol-based partitioning
Processing: Mage Pro consumes batches of trades for transformation and analysis
Storage: Processed data flows to BigQuery for analytics and reporting
This approach separates data capture from processing, allowing each component to scale independently and handle different workloads efficiently.

Setting up your Kafka instance
Kafka serves as the backbone for real-time streaming pipelines. Whether you're using Confluent Cloud, AWS MSK, or a local installation, the setup process follows these key steps:
Step 1: Choose your Kafka deployment
Choose your infrastructure where you will host your Kafka setup:
Confluent Cloud: Fully managed with enterprise features and global availability
AWS MSK: Integrated with AWS ecosystem, VPC networking, and IAM security
Azure Event Hubs: Native Azure Kafka service with automatic scaling
Google Cloud Pub/Sub: Google's messaging service with Kafka compatibility
Self-managed: Full control over configuration, infrastructure, and costs
Step 2: Configure essential Kafka settings
Regardless of your deployment method, ensure these configurations:
Core Kafka Configuration: Set replication factor, partitions, and retention policies based on your fault tolerance and throughput requirements.
Security Settings: Enable authentication, authorization, and encryption to protect your data streams.
Performance Optimization: Configure batching, compression, and acknowledgment settings for optimal throughput and reliability.
Step 3: Create your topics and access credentials
Topic Creation: Create your main topics with appropriate partitions and replication factors.
Access Management: Generate authentication credentials and configure ACLs for proper access control.
Connectivity Testing: Verify your setup with simple producer/consumer tests before deploying applications.
Step 4: Verify connectivity
Test your Kafka setup with a simple producer/consumer to ensure:
Network connectivity is established
Authentication credentials work correctly
Topics are accessible from your application
Messages flow end-to-end without errors
This foundation supports any streaming use case, from financial data to IoT sensors to application logs.
Building the websocket producer
Real-time crypto data stream
For this pipeline, we’ll build a Python WebSocket producer connecting to Binance’s streaming API to capture streaming crypto trading data. The WebSocket producer handles several critical responsibilities:
Connection management: Maintains persistent WebSocket connections with automatic reconnection on failures Data enrichment: Adds calculated fields and metadata for downstream processing
Error handling: Graceful handling of network issues, API disconnections, and rate limits Performance optimization: Efficient message batching and JSON serialization for Kafka delivery
Here's the core WebSocket producer implementation:
import json
import asyncio
import websockets
from kafka import KafkaProducer
import logging
from datetime import datetime
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
KAFKA_BOOTSTRAP_SERVERS = "localhost:9094"
KAFKA_TOPIC = "binance-crypto-trades"
SYMBOLS = ["btcusdt", "ethusdt", "bnbusdt", "adausdt", "maticusdt", "solusdt"]
STREAMS = "/".join([f"{symbol}@trade" for symbol in SYMBOLS])
BINANCE_WS_URL = f"wss://fstream.binance.com/stream?streams={STREAMS}"
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
batch_size=16384,
linger_ms=10
)
logger.info(f"✅ Kafka producer initialized for {KAFKA_BOOTSTRAP_SERVERS}")
except Exception as e:
logger.error(f"❌ Failed to initialize Kafka producer: {e}")
sys.exit(1)
async def process_websocket_messages():
while True:
try:
async with websockets.connect(BINANCE_WS_URL) as websocket:
print(f"🚀 Connected to Binance WebSocket for symbols: {', '.join(SYMBOLS).upper()}")
logger.info(f"📊 Publishing to Kafka topic: {KAFKA_TOPIC}")
print("📈 Streaming live trade data to Kafka...")
print("🛑 Press Ctrl+C to stop")
print("=" * 80)
while True:
message = await websocket.recv()
data = json.loads(message)
stream = data.get("stream")
trade_data = data.get("data", {})
if not stream or not trade_data:
print("⚠️ Invalid message format received.")
continue
symbol = trade_data.get("s")
price = float(trade_data.get("p", 0))
quantity = float(trade_data.get("q", 0))
timestamp = trade_data.get("T")
buyer_maker = trade_data.get("m")
trade_id = trade_data.get("t")
trade_value = price * quantity
transformed_data = {
"symbol": symbol,
"price": price,
"quantity": quantity,
"timestamp": timestamp,
"buyer_maker": buyer_maker,
"trade_id": trade_id,
"stream": stream,
"trade_value_usd": trade_value,
"processing_time": datetime.now().isoformat(),
"exchange": "binance",
"data_type": "trade"
}
try:
future = producer.send(
topic=KAFKA_TOPIC,
key=symbol,
value=transformed_data
)
trade_time = datetime.fromtimestamp(timestamp / 1000).strftime("%H:%M:%S")
trade_side = "SELL" if buyer_maker else "BUY"
print(f"✅ {trade_time} | {symbol:8} | {trade_side:4} | ${price:>10.4f} | Qty: {quantity:>12.6f} | Value: ${trade_value:>10.2f}")
except Exception as e:
logger.error(f"❌ Failed to publish to Kafka: {e}")
except websockets.exceptions.ConnectionClosed:
print("🔄 WebSocket connection closed. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
print(f"❌ Error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
def cleanup():
"""Cleanup function to close Kafka producer"""
try:
if producer:
producer.flush()
producer.close()
logger.info("✅ Kafka producer closed successfully")
except Exception as e:
logger.error(f"❌ Error closing Kafka producer: {e}")
if __name__ == '__main__':
try:
print("🚀 Starting Binance WebSocket to Kafka Producer")
print(f"📊 Kafka Topic: {KAFKA_TOPIC}")
print(f"📈 Monitoring symbols: {', '.join(SYMBOLS).upper()}")
print(f"🔗 Kafka servers: {KAFKA_BOOTSTRAP_SERVERS}")
print("⚡ Starting stream...")
print("=" * 80)
asyncio.run(process_websocket_messages())
except KeyboardInterrupt:
print("\\n" + "=" * 80)
print("🛑 Shutting down gracefully...")
cleanup()
print("👋 Stream stopped. Goodbye!")
sys.exit(0)
except Exception as e:
logger.error(f"💥 Fatal error: {e}")
cleanup()
sys.exit(1)
import json
import asyncio
import websockets
from kafka import KafkaProducer
import logging
from datetime import datetime
import sys
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
KAFKA_BOOTSTRAP_SERVERS = "localhost:9094"
KAFKA_TOPIC = "binance-crypto-trades"
SYMBOLS = ["btcusdt", "ethusdt", "bnbusdt", "adausdt", "maticusdt", "solusdt"]
STREAMS = "/".join([f"{symbol}@trade" for symbol in SYMBOLS])
BINANCE_WS_URL = f"wss://fstream.binance.com/stream?streams={STREAMS}"
try:
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all',
retries=3,
batch_size=16384,
linger_ms=10
)
logger.info(f"✅ Kafka producer initialized for {KAFKA_BOOTSTRAP_SERVERS}")
except Exception as e:
logger.error(f"❌ Failed to initialize Kafka producer: {e}")
sys.exit(1)
async def process_websocket_messages():
while True:
try:
async with websockets.connect(BINANCE_WS_URL) as websocket:
print(f"🚀 Connected to Binance WebSocket for symbols: {', '.join(SYMBOLS).upper()}")
logger.info(f"📊 Publishing to Kafka topic: {KAFKA_TOPIC}")
print("📈 Streaming live trade data to Kafka...")
print("🛑 Press Ctrl+C to stop")
print("=" * 80)
while True:
message = await websocket.recv()
data = json.loads(message)
stream = data.get("stream")
trade_data = data.get("data", {})
if not stream or not trade_data:
print("⚠️ Invalid message format received.")
continue
symbol = trade_data.get("s")
price = float(trade_data.get("p", 0))
quantity = float(trade_data.get("q", 0))
timestamp = trade_data.get("T")
buyer_maker = trade_data.get("m")
trade_id = trade_data.get("t")
trade_value = price * quantity
transformed_data = {
"symbol": symbol,
"price": price,
"quantity": quantity,
"timestamp": timestamp,
"buyer_maker": buyer_maker,
"trade_id": trade_id,
"stream": stream,
"trade_value_usd": trade_value,
"processing_time": datetime.now().isoformat(),
"exchange": "binance",
"data_type": "trade"
}
try:
future = producer.send(
topic=KAFKA_TOPIC,
key=symbol,
value=transformed_data
)
trade_time = datetime.fromtimestamp(timestamp / 1000).strftime("%H:%M:%S")
trade_side = "SELL" if buyer_maker else "BUY"
print(f"✅ {trade_time} | {symbol:8} | {trade_side:4} | ${price:>10.4f} | Qty: {quantity:>12.6f} | Value: ${trade_value:>10.2f}")
except Exception as e:
logger.error(f"❌ Failed to publish to Kafka: {e}")
except websockets.exceptions.ConnectionClosed:
print("🔄 WebSocket connection closed. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
print(f"❌ Error: {e}. Reconnecting in 5 seconds...")
await asyncio.sleep(5)
def cleanup():
"""Cleanup function to close Kafka producer"""
try:
if producer:
producer.flush()
producer.close()
logger.info("✅ Kafka producer closed successfully")
except Exception as e:
logger.error(f"❌ Error closing Kafka producer: {e}")
if __name__ == '__main__':
try:
print("🚀 Starting Binance WebSocket to Kafka Producer")
print(f"📊 Kafka Topic: {KAFKA_TOPIC}")
print(f"📈 Monitoring symbols: {', '.join(SYMBOLS).upper()}")
print(f"🔗 Kafka servers: {KAFKA_BOOTSTRAP_SERVERS}")
print("⚡ Starting stream...")
print("=" * 80)
asyncio.run(process_websocket_messages())
except KeyboardInterrupt:
print("\\n" + "=" * 80)
print("🛑 Shutting down gracefully...")
cleanup()
print("👋 Stream stopped. Goodbye!")
sys.exit(0)
except Exception as e:
logger.error(f"💥 Fatal error: {e}")
cleanup()
sys.exit(1)Key producer patterns
Reliability: Using acks='all' ensures no data loss during network interruptions Performance: Batching with batch_size=16384 optimizes throughput while maintaining low latency Monitoring: Adding processing timestamps enables end-to-end latency tracking Schema consistency: Standardized message structure simplifies downstream processing
The producer enriches raw trading data with calculated fields like trade_value_usd and processing_time, making downstream processing more efficient and reducing computational load in the streaming layer.
Configuring Mage Pro for Kafka streaming
Why Mage Pro for stream processing?
Mage Pro excels at streaming workloads because it provides:
Visual pipeline development with code-based transformations
Built-in Kafka connectivity with minimal configuration overhead
Real-time monitoring and comprehensive error handling
Seamless integration with data warehouses like BigQuery and Snowflake
Version control and deployment management for production pipelines
Mage data loader configuration
Add this code to your Mage Kafka data loader:
connector_type: kafka
bootstrap_server: "your-kafka-endpoint:9092"
topic: crypto-trades
consumer_group: mage_consumer_earliest
include_metadata: false
batch_size: 100
timeout_ms: 5000
auto_offset_reset: "earliest"
api_version: "2.8.0"
connector_type: kafka
bootstrap_server: "your-kafka-endpoint:9092"
topic: crypto-trades
consumer_group: mage_consumer_earliest
include_metadata: false
batch_size: 100
timeout_ms: 5000
auto_offset_reset: "earliest"
api_version: "2.8.0"
Configuration details
bootstrap_server: Your Kafka cluster endpoint (Confluent Cloud, AWS MSK, etc.)
auto_offset_reset: "earliest" to read all historical messages for testing
consumer_group: Unique name to track consumption progress across restarts
batch_size: Number of messages per batch for efficient processing
timeout_ms: Consumer timeout for batch processing and error handling
After finishing the configuration of the data loader, add the transformation block that will further process your streaming data.
Implementing real-time transformations
The transformation block is responsible for processing the data according to business rules/logic for our use case. It calculates trading metrics, directional pressure, and market insights in real-time.
This block:
Processes messages in chronological order to maintain market timing
Calculates buy/sell pressure to identify market sentiment
Tracks cumulative metrics per cryptocurrency symbol
Adds derived fields for downstream analytics
from typing import Dict, List
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform(messages: List[Dict], *args, **kwargs):
"""
Calculate directional trade values to track buying vs selling pressure
Args:
messages: List of messages in the stream.
Returns:
Transformed messages with buy/sell pressure calculations and price totals
"""
messages_sorted = sorted(messages, key=lambda x: (x['symbol'], x['timestamp']))
symbol_totals = {}
for message in messages_sorted:
symbol = message['symbol']
price = message['price']
quantity = message['quantity']
if symbol not in symbol_totals:
symbol_totals[symbol] = {'cumulative_price': 0, 'trade_count': 0}
trade_value_usd = price * quantity
message['trade_value_usd'] = trade_value_usd
if message['buyer_maker']:
message['buy_value'] = 0
message['sell_value'] = trade_value_usd
message['trade_side'] = 'SELL'
else:
message['buy_value'] = trade_value_usd
message['sell_value'] = 0
message['trade_side'] = 'BUY'
message['net_buy_pressure'] = message['buy_value'] - message['sell_value']
symbol_totals[symbol]['cumulative_price'] += price
symbol_totals[symbol]['trade_count'] += 1
message['cumulative_price_total'] = symbol_totals[symbol]['cumulative_price']
message['average_price'] = symbol_totals[symbol]['cumulative_price'] / symbol_totals[symbol]['trade_count']
print(f"Processed {len(messages)} trades with directional values and price totals")
return messagesfrom typing import Dict, List
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform(messages: List[Dict], *args, **kwargs):
"""
Calculate directional trade values to track buying vs selling pressure
Args:
messages: List of messages in the stream.
Returns:
Transformed messages with buy/sell pressure calculations and price totals
"""
messages_sorted = sorted(messages, key=lambda x: (x['symbol'], x['timestamp']))
symbol_totals = {}
for message in messages_sorted:
symbol = message['symbol']
price = message['price']
quantity = message['quantity']
if symbol not in symbol_totals:
symbol_totals[symbol] = {'cumulative_price': 0, 'trade_count': 0}
trade_value_usd = price * quantity
message['trade_value_usd'] = trade_value_usd
if message['buyer_maker']:
message['buy_value'] = 0
message['sell_value'] = trade_value_usd
message['trade_side'] = 'SELL'
else:
message['buy_value'] = trade_value_usd
message['sell_value'] = 0
message['trade_side'] = 'BUY'
message['net_buy_pressure'] = message['buy_value'] - message['sell_value']
symbol_totals[symbol]['cumulative_price'] += price
symbol_totals[symbol]['trade_count'] += 1
message['cumulative_price_total'] = symbol_totals[symbol]['cumulative_price']
message['average_price'] = symbol_totals[symbol]['cumulative_price'] / symbol_totals[symbol]['trade_count']
print(f"Processed {len(messages)} trades with directional values and price totals")
return messagesTransformation logic explained
Market direction analysis: The transformer distinguishes between market buy orders (bullish) and market sell orders (bearish) using the buyer_maker field.
Pressure calculations: Net buying pressure helps identify whether more money is flowing into (positive) or out of (negative) specific cryptocurrencies.
Running averages: Cumulative price tracking enables moving average calculations and trend analysis.
Performance optimization: Sorting by timestamp ensures correct chronological processing while maintaining processing efficiency.
Exporting to BigQuery
Mage Pro offers many out of the box configurations for exporting your data to a target destination be that Snowflake, BigQuery, or PostgreSQL. For this specific pipeline we’ll be exporting the data to Google BigQuery with a few simple lines of YAML code. In fact, you can just choose the BigQuery data exporter block and just provide the YAML configuration in the code below.
BigQuery configuration
connector_type: bigquery
profile: default
config:
table_id
connector_type: bigquery
profile: default
config:
table_id
Conclusion
Kafka and Mage Pro create a powerful combination for real-time data pipelines that goes far beyond cryptocurrency trading. Whether you're processing IoT sensor readings, application logs, user activity streams, or financial market data, this architecture provides the reliability, scalability, and developer productivity needed for production streaming applications.
Are you ready to build your own streaming data pipelines in Mage Pro? Connect your managed or local Kafka service to Mage Pro and begin with this simple crypto proof of concept. The patterns demonstrated here will scale to handle any streaming workload your business requires.
Want to implement a real-time data pipeline using the methods discussed above? Schedule a free demo with Mage to get started today.