💡 Introduction
Welcome to another blog in the Python-for-DevOps series! Today, we’ll explore Apache Kafka and how it can be used to stream tweets between two services.
For those unfamiliar with Kafka, it is an open-source streaming platform that enables high-performance and efficient communication between services—whether in a client-server setup or a microservices architecture.
In this project, we’ll create:
✅ A producer.py
script to fetch tweets and send them to Kafka.
✅ A consumer.py
script to receive and process the streamed tweets.
✅ A Dockerized setup to run Kafka, Zookeeper, and Elasticsearch for real-time tweet visualization.
Enough with the theory—let’s dive straight into the implementation! 🚀
1️⃣ Prerequisites
Before we start, make sure you have the following:
- 🐳 Docker Installed (for running Kafka, Zookeeper, Elasticsearch, and Kibana)
- 🐍 Basic knowledge of Python, Docker, and Kafka
- 🔎 Overview of Elasticsearch & Kibana (for data storage & visualization)
- 🐦 Twitter Developer Portal Account (to generate the Bearer Token)
Make sure that your Python version is <= 3.10, because Kafka module doesn’t run fine with Python 3.11 or greater.
2️⃣ Setting Up the Project
Clone the Repository
git clone https://github.com/Pravesh-Sudha/twitter-streams.git
cd twitter-streams
Create a Virtual Environment & Install Dependencies
python3 -m venv kafka-env
source kafka-env/bin/activate # Windows users: kafka-env\Scripts\activate
pip3 install kafka-python tweepy six
3️⃣ Running Kafka, Zookeeper, Elasticsearch, and Kibana using Docker
Start the required services using the following Docker commands:
Start Zookeeper
docker run -d --name zookeeper -p 2181:2181 zookeeper
Start Kafka
docker run -d --name kafka \
--link zookeeper \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka
Start Elasticsearch
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.17.10
Start Kibana
docker run -d --name kibana --link elasticsearch -p 5601:5601 kibana:7.17.10
See All container are running fine
docker ps
4️⃣ Creating a Kafka Topic
Create a topic named twitter-stream
for streaming tweets:
docker exec -it kafka kafka-topics --create --topic twitter-stream --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
5️⃣ Implementing Kafka Producer (producer.py
)
import tweepy
from kafka import KafkaProducer
import json
import time
# Twitter API credentials
BEARER_TOKEN = "Your-Bearer-Token"
# Kafka Producer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda x: json.dumps(x).encode("utf-8")
)
# Authenticate Twitter API
client = tweepy.Client(bearer_token=BEARER_TOKEN)
def fetch_tweets():
query = "#AI OR #Tech -is:retweet" # Fetch tweets with these hashtags
tweets = client.search_recent_tweets(query=query, tweet_fields=["created_at", "text", "id"], max_results=10)
if tweets.data:
for tweet in tweets.data:
tweet_data = {
"id": tweet.id,
"text": tweet.text,
"timestamp": str(tweet.created_at)
}
producer.send("twitter-stream", value=tweet_data)
print(f"Tweet sent: {tweet_data}")
while True:
fetch_tweets()
time.sleep(30) # Fetch tweets every 30 seconds
How It Works?
✅ Imports the Kafka Producer and Tweepy for Twitter API authentication.
✅ Uses a Twitter API bearer token for authentication.
✅ Defines a query for tweets containing #AI
or #Tech
(excluding retweets).
✅ Fetches tweets every 30 seconds and sends them to Kafka under the twitter-stream topic.
Run the Producer
python producer.py
6️⃣ Implementing Kafka Consumer (consumer.py
)
from kafka import KafkaConsumer
import json
import requests
# Elasticsearch endpoint
ELASTICSEARCH_URL = "http://localhost:9200/twitter/_doc/"
# Kafka Consumer
consumer = KafkaConsumer(
"twitter-stream",
bootstrap_servers="localhost:9092",
auto_offset_reset="earliest",
value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)
print("Listening for tweets...")
for message in consumer:
tweet = message.value
print(f"Received tweet: {tweet}")
# Send tweet to Elasticsearch
response = requests.post(ELASTICSEARCH_URL, json=tweet)
print(f"Elasticsearch response: {response.json()}")
How It Works?
✅ Connects to Kafka and listens to the twitter-stream
topic.
✅ Reads tweets and prints them to the console.
✅ Sends each tweet to Elasticsearch for storage and further visualization in Kibana.
Run the Consumer
python consumer.py
7️⃣ Verifying Data in Elasticsearch
Run the following command to check if tweets are successfully stored in Elasticsearch:
curl -X GET "http://localhost:9200/twitter/_search?pretty=true"
8️⃣ Visualizing Tweets in Kibana
- Open Kibana: http://localhost:5601
- Go to Management → Stack Management → Index Patterns
- Click Create Index Pattern and enter
twitter
- Select
timestamp
as the time field (if required) and click Create - Navigate to Discover to explore tweets or Visualize to create charts & graphs!
🎯 Conclusion
In this blog, we built a real-time Twitter data streaming pipeline using Apache Kafka, Python, Elasticsearch, and Kibana. Here's a quick recap of what we achieved:
✅ Set up Kafka, Zookeeper, Elasticsearch, and Kibana using Docker.
✅ Created a Kafka topic (twitter-stream) to stream tweets.
✅ Built a Kafka Producer (producer.py) to fetch tweets from the Twitter API and send them to Kafka.
✅ Developed a Kafka Consumer (consumer.py) to retrieve tweets from Kafka and store them in Elasticsearch.
✅ Used Kibana to visualize and explore the streamed tweets.
This project demonstrates how Apache Kafka can be leveraged for real-time data streaming and analytics. You can extend it further by:
🚀 Adding Sentiment Analysis to categorize tweets as positive, negative, or neutral.
🚀 Deploying the solution on cloud platforms like AWS, GCP, or Azure.
🚀 Scaling Kafka to handle high-volume Twitter data streams.
I hope you found this project helpful! Feel free to check out the GitHub repo and experiment with enhancements. Happy coding! 🚀
Top comments (0)