TLDR
Dive into the implementation of stream data processing with Mage, using Kafka as source.
Outline
Introduction to Mage
Why is kafka a popular component of streaming applications?
Step by step guide to create streaming pipeline on Mage
Conclusion
Introduction to Mage
Mage is a powerful data processing tool allowing integration and synchronization of data from third-party sources. It supports building real-time and batch pipelines using Python, SQL, and R, making data transformation simple and efficient. Moreover, it enables running, monitoring, and orchestrating thousands of pipelines, ensuring a smooth data operation without the risk of data loss or interruption.
Why is kafka a popular component of streaming applications?
Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and later donated to the Apache Software Foundation. It's built on the publish-subscribe messaging system and designed to handle real-time data feeds. Kafka is essentially a distributed event log service that is fault-tolerant, highly scalable, and provides high throughput for publishing and subscribing records.
Given its robust features, Kafka is a popular component of streaming applications due to the following reasons:
Performance and Scalability: Kafka can handle real-time data feeds on a large scale, processing millions of messages per second. Its distributed architecture allows for effortless scalability.
Durability and Reliability: Kafka's distributed commit log ensures robust data persistence, safeguarding against data loss. If a node fails, the data can still be retrieved from other nodes, hence ensuring reliability.
Fault Tolerance: Kafka can handle system failures without impacting the availability of data streams, which is crucial for applications that require constant, uninterrupted access to data.
Real-time Processing: Kafka supports both batch and real-time use cases, providing developers with flexibility when creating various applications.
Integration Capabilities: Kafka can integrate with a wide range of programming languages and data systems, making it versatile for differing application needs.
Kafka's popularity stems from its high performance, reliability, fault tolerance, real-time processing, and comprehensive integration capabilities.
Step by step guide to create streaming pipeline on Mage
Dive into a comparison of Flink and Spark based on their performance benchmarks and scalability. Discover how they handle processing speed, in-memory computing, resource management, and more.
Processing Speed: Flink excels in low-latency, high-throughput stream processing, while Spark is known for its fast batch processing capabilities. Both frameworks can process large volumes of data quickly, with Flink focusing on real-time analytics and Spark catering to batch data processing tasks.
In-Memory Computing: Both Flink and Spark leverage in-memory computing, which allows them to cache intermediate results during data processing tasks. This approach significantly reduces the time spent on disk I/O operations and improves overall performance.
Resource Management: Flink and Spark can efficiently manage resources by dynamically allocating and deallocating them according to workload requirements. This enables both frameworks to scale horizontally, handling large-scale data processing tasks across multiple nodes in a distributed environment.
Adaptive Query Execution: Spark's Adaptive Query Execution (AQE) feature optimizes query execution plans at runtime, allowing it to adapt to changing data and workload characteristics. This results in improved performance and resource utilization. Flink, on the other hand, does not currently have an equivalent feature.
Backpressure Handling: Flink is designed to handle backpressure, ensuring that the system remains stable even under high loads. This is achieved through its built-in flow control mechanisms, which prevent data processing bottlenecks. Spark Streaming, in contrast, may struggle to handle backpressure, leading to potential performance degradation.
Data Partitioning: Both Flink and Spark utilize data partitioning techniques to improve parallelism and optimize resource utilization during data processing tasks. While Spark employs RDDs and data partitioning strategies like Hash and Range partitioning, Flink uses operator chaining and pipelined execution to optimize data processing performance.
Recommendations for choosing the right tool for specific use cases
Set up Kafka
Here is a quick guide on how to run and use Kafka locally.
Clone repository: git clone https://github.com/wurstmeister/kafka-docker.git
Change directory into that repository: cd kafka-docker
Edit the docker-compose.yml file to match this:
1 version: "2"
2 services:
3 zookeeper:
4 image: wurstmeister/zookeeper:3.4.6
5 ports:
6 - "2181:2181"
7 kafka:
8 build: .
9 container_name: docker_kafka
10 ports:
11 - "9092:9092"
12 expose:
13 - "9093"
14 environment:
15 KAFKA_ADVERTISED_LISTENERS: INSIDE:
16 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
17 KAFKA_LISTENERS: INSIDE:
18 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
19 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
20 volumes:
21 - /var/run/docker.sock:/var/run/docker.sock
1 version: "2"
2 services:
3 zookeeper:
4 image: wurstmeister/zookeeper:3.4.6
5 ports:
6 - "2181:2181"
7 kafka:
8 build: .
9 container_name: docker_kafka
10 ports:
11 - "9092:9092"
12 expose:
13 - "9093"
14 environment:
15 KAFKA_ADVERTISED_LISTENERS: INSIDE:
16 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
17 KAFKA_LISTENERS: INSIDE:
18 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
19 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
20 volumes:
21 - /var/run/docker.sock:/var/run/docker.sock
Start Docker: docker-compose up
Start a terminal session in the running container:
1 docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
1 docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
1 $KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 -
2 topic test
1 $KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 -
2 topic test
List all available topics in Kafka instance:
1 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
1 $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
Start a producer on topic named test:
1 $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test
1 $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test
Send messages to the topic named test by typing the following in the terminal:
1 >hello
2 >this is a test
3 >test 1
4 >test 2
5 >test 3
1 >hello
2 >this is a test
3 >test 1
4 >test 2
5 >test 3
Open another terminal and start a consumer on the topic named test:
1 $KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092
2 --topic=test
1 $KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092
2 --topic=test
The output should look something like this:
1 hello
2 test 1
3 test 3
4 this is a test
5 test 2
1 hello
2 test 1
3 test 3
4 this is a test
5 test 2
Setup stream data ingestion in Mage
Run the following command to run Docker in network mode:
1 docker run -it -p 6789:6789 -v $(pwd):/home/src \
2 --env AWS_ACCESS_KEY_ID=your_access_key_id \
3 --env AWS_SECRET_ACCESS_KEY=your_secret_access_key \
4 --env AWS_REGION=your_region \
5 --network kafka-docker_default \
6 mageai/mageai /app/run_app.sh mage start default_repo
1 docker run -it -p 6789:6789 -v $(pwd):/home/src \
2 --env AWS_ACCESS_KEY_ID=your_access_key_id \
3 --env AWS_SECRET_ACCESS_KEY=your_secret_access_key \
4 --env AWS_REGION=your_region \
5 --network kafka-docker_default \
6 mageai/mageai /app/run_app.sh mage start default_repo
If the network named kafka-docker_default doesn’t exist, create a new network:
1 docker network create -d bridge kafka-docker_default
1 docker network create -d bridge kafka-docker_default
If not able to connect with Kafka locally in a Docker container using Mage, in a Docker container the follow these steps:
Clone Mage: git clone https://github.com/mage-ai/mage-ai.git
Change directory into Mage: cd mage-ai
Edit the docker-compose.yml file to match this:
1 version: '3'
2 services:
3 server:
4 ... (original config)
5 networks:
6 - kafka
7 app:
8 ... (original config)
9 networks:
10 kafka:
11 name: kafka-docker_default
12 external: true
1 version: '3'
2 services:
3 server:
4 ... (original config)
5 networks:
6 - kafka
7 app:
8 ... (original config)
9 networks:
10 kafka:
11 name: kafka-docker_default
12 external: true
Run the following script in terminal: ./scripts/dev.sh
This will run Mage in development mode, which runs it in a Docker container using docker compose instead of docker run.
Create streaming data pipeline
Open Mage in your browser.
Click + New pipeline, then select Streaming.
Add a data loader block, select Kafka, and paste the following:
1 connector_type: kafka
2 bootstrap_server: "localhost:9092"
3 topic: test
4 consumer_group: unique_consumer_group
5 batch_size: 100
1 connector_type: kafka
2 bootstrap_server: "localhost:9092"
3 topic: test
4 consumer_group: unique_consumer_group
5 batch_size: 100
By default, the bootstrap_server is set to localhost:9092, If you’re running Mage in a container, the bootstrap_server should be kafka:9093
Messages are consumed from source in micro batch mode for better efficiency. The default batch size is 100. You can adjust the batch size in the source config.
Add a transformer block and paste the following:
1 from typing import Dict, List
2
3 if 'transformer' not in globals():
4 from mage_ai.data_preparation.decorators import transformer
5
6 @transformer
7 def transform(messages: List[Dict], *args, **kwargs):
8 for msg in messages:
9 print(msg)
10
11 return messages
1 from typing import Dict, List
2
3 if 'transformer' not in globals():
4 from mage_ai.data_preparation.decorators import transformer
5
6 @transformer
7 def transform(messages: List[Dict], *args, **kwargs):
8 for msg in messages:
9 print(msg)
10
11 return messages
Add a data exporter block, select OpenSearch and paste the following:
1 connector_type: opensearch
2 host: https:
3 index_name: python-test-index
1 connector_type: opensearch
2 host: https:
3 index_name: python-test-index
Change the host to match your OpenSearch domain’s endpoint.
Change the index_name to match the index you want to export data into.
Test pipeline
Open the streaming pipeline you just created, and in the right side panel near the bottom, click the button Execute pipeline to test the pipeline.
You should see an output like this:
1 [streaming_pipeline_test] Start initializing kafka consumer.
2 [streaming_pipeline_test] Finish initializing kafka consumer.
3 [streaming_pipeline_test] Start consuming messages from kafka
1 [streaming_pipeline_test] Start initializing kafka consumer.
2 [streaming_pipeline_test] Finish initializing kafka consumer.
3 [streaming_pipeline_test] Start consuming messages from kafka
Publish messages using Python
Open a terminal on your local workstation.
Install kafka-python:
1 pip install kafka-python
1 pip install kafka-python
Open a Python shell and write the following code to publish messages:
1 from kafka import KafkaProducer
2 from random import random
3 import json
4
5 topic = 'test'
6 producer = KafkaProducer(
7 bootstrap_servers='kafka:9093',
8 )
9
10 def publish_messages(limit):
11 for i in range(limit):
12 data = {
13 'title': 'test_title',
14 'director': 'Bennett Miller',
15 'year': '2011',
16 'rating': random(),
17 }
18 producer.send(topic, json.dumps(data).encode('utf-8'))
19
20 publish_messages(5)1 from kafka import KafkaProducer
2 from random import random
3 import json
4
5 topic = 'test'
6 producer = KafkaProducer(
7 bootstrap_servers='kafka:9093',
8 )
9
10 def publish_messages(limit):
11 for i in range(limit):
12 data = {
13 'title': 'test_title',
14 'director': 'Bennett Miller',
15 'year': '2011',
16 'rating': random(),
17 }
18 producer.send(topic, json.dumps(data).encode('utf-8'))
19
20 publish_messages(5)Once you run the code snippet above, go back to your streaming pipeline in Mage and the output should look like this:
1 [streaming_pipeline_test] Start initializing kafka consumer.
2 [streaming_pipeline_test] Finish initializing kafka consumer.
3 [streaming_pipeline_test] Start consuming messages from kafka.
4 [streaming_pipeline_test] [Kafka] Receive message 2:16: v=b'{"title": "test_title",
5 "director": "Bennett Miller", "year": "2011", "rating": 0.7010424523477785}',
6 time=1665618592.226788
7 [streaming_pipeline_test] [Kafka] Receive message 0:16: v=b'{"title": "test_title",
8 "director": "Bennett Miller", "year": "2011", "rating": 0.7886308380991354}',
9 time=1665618592.2268753
10 [streaming_pipeline_test] [Kafka] Receive message 0:17: v=b'{"title": "test_title",
11 "director": "Bennett Miller", "year": "2011", "rating": 0.0673276352704153}',
12 time=1665618592.2268832
13 [streaming_pipeline_test] [Kafka] Receive message 3:10: v=b'{"title": "test_title",
14 "director": "Bennett Miller", "year": "2011", "rating": 0.37935417366095525}',
15 time=1665618592.2268872
16 [streaming_pipeline_test] [Kafka] Receive message 3:11: v=b'{"title": "test_title",
17 "director": "Bennett Miller", "year": "2011", "rating": 0.21110511524126563}',
18 time=1665618592.2268918
19 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
20 '2011', 'rating': 0.7010424523477785}
21 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
22 '2011', 'rating': 0.7886308380991354}
23 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
24 '2011', 'rating': 0.0673276352704153}
25 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
26 '2011', 'rating': 0.37935417366095525}
27 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
28 '2011', 'rating': 0.21110511524126563}
29 [streaming_pipeline_test] [Opensearch] Batch ingest data [{'title': 'test_title',
30 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}, {'title':
31 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354},
32 {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating':
33 0.0673276352704153}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011',
34 'rating': 0.37935417366095525}, {'title': 'test_title', 'director': 'Bennett Miller',
35 'year': '2011', 'rating': 0.21110511524126563}], time=1665618592.22946261 [streaming_pipeline_test] Start initializing kafka consumer.
2 [streaming_pipeline_test] Finish initializing kafka consumer.
3 [streaming_pipeline_test] Start consuming messages from kafka.
4 [streaming_pipeline_test] [Kafka] Receive message 2:16: v=b'{"title": "test_title",
5 "director": "Bennett Miller", "year": "2011", "rating": 0.7010424523477785}',
6 time=1665618592.226788
7 [streaming_pipeline_test] [Kafka] Receive message 0:16: v=b'{"title": "test_title",
8 "director": "Bennett Miller", "year": "2011", "rating": 0.7886308380991354}',
9 time=1665618592.2268753
10 [streaming_pipeline_test] [Kafka] Receive message 0:17: v=b'{"title": "test_title",
11 "director": "Bennett Miller", "year": "2011", "rating": 0.0673276352704153}',
12 time=1665618592.2268832
13 [streaming_pipeline_test] [Kafka] Receive message 3:10: v=b'{"title": "test_title",
14 "director": "Bennett Miller", "year": "2011", "rating": 0.37935417366095525}',
15 time=1665618592.2268872
16 [streaming_pipeline_test] [Kafka] Receive message 3:11: v=b'{"title": "test_title",
17 "director": "Bennett Miller", "year": "2011", "rating": 0.21110511524126563}',
18 time=1665618592.2268918
19 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
20 '2011', 'rating': 0.7010424523477785}
21 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
22 '2011', 'rating': 0.7886308380991354}
23 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
24 '2011', 'rating': 0.0673276352704153}
25 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
26 '2011', 'rating': 0.37935417366095525}
27 [streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year':
28 '2011', 'rating': 0.21110511524126563}
29 [streaming_pipeline_test] [Opensearch] Batch ingest data [{'title': 'test_title',
30 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}, {'title':
31 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354},
32 {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating':
33 0.0673276352704153}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011',
34 'rating': 0.37935417366095525}, {'title': 'test_title', 'director': 'Bennett Miller',
35 'year': '2011', 'rating': 0.21110511524126563}], time=1665618592.2294626Consume messages using Python
If you want to programmatically consume messages from a Kafka topic, here is a code snippet:
1 from kafka import KafkaConsumer
2 import time
3
4 topic = 'test'
5 consumer = KafkaConsumer(
6 topic,
7 group_id='test',
8 bootstrap_servers='kafka:9093',
9 )
10
11 for message in consumer:
12 print(f"{message.partition}:{message.offset}: v={message.value}, time={time.time()}")1 from kafka import KafkaConsumer
2 import time
3
4 topic = 'test'
5 consumer = KafkaConsumer(
6 topic,
7 group_id='test',
8 bootstrap_servers='kafka:9093',
9 )
10
11 for message in consumer:
12 print(f"{message.partition}:{message.offset}: v={message.value}, time={time.time()}")Run in production
If you want to programmatically consume messages from a Kafka topic, here is a code snippet:
Create a trigger.
Once the trigger is created, click the Start trigger button at the top of the page to make the streaming pipeline active.
Conclusion
In conclusion, Mage is an exceptional tool for stream data processing, adept at managing data from various sources and transforming it through real-time and batch pipelines using Python, SQL, and R. It stands out in its capacity to efficiently handle thousands of pipelines simultaneously, ensuring smooth operations and data integrity. Given the increasing need for real-time data processing in today's data-driven world, Mage is positioned as a vital tool in the arsenal of data professionals. Its versatility and robust capabilities make it a reliable choice for handling complex and voluminous streaming data.