DEV Community

Cover image for Apache Kafka Project: Real-Time Twitter Streaming with Python
Pravesh Sudha
Pravesh Sudha

Posted on

Apache Kafka Project: Real-Time Twitter Streaming with Python

💡 Introduction

Welcome to another blog in the Python-for-DevOps series! Today, we’ll explore Apache Kafka and how it can be used to stream tweets between two services.

For those unfamiliar with Kafka, it is an open-source streaming platform that enables high-performance and efficient communication between services—whether in a client-server setup or a microservices architecture.

In this project, we’ll create:
✅ A producer.py script to fetch tweets and send them to Kafka.
✅ A consumer.py script to receive and process the streamed tweets.
✅ A Dockerized setup to run Kafka, Zookeeper, and Elasticsearch for real-time tweet visualization.

Enough with the theory—let’s dive straight into the implementation! 🚀

1️⃣ Prerequisites

Before we start, make sure you have the following:

  • 🐳 Docker Installed (for running Kafka, Zookeeper, Elasticsearch, and Kibana)
  • 🐍 Basic knowledge of Python, Docker, and Kafka
  • 🔎 Overview of Elasticsearch & Kibana (for data storage & visualization)
  • 🐦 Twitter Developer Portal Account (to generate the Bearer Token)

Make sure that your Python version is <= 3.10, because Kafka module doesn’t run fine with Python 3.11 or greater.

2️⃣ Setting Up the Project

Clone the Repository

git clone https://github.com/Pravesh-Sudha/twitter-streams.git
cd twitter-streams
Enter fullscreen mode Exit fullscreen mode

Create a Virtual Environment & Install Dependencies

python3 -m venv kafka-env
source kafka-env/bin/activate  # Windows users: kafka-env\Scripts\activate
pip3 install kafka-python tweepy six
Enter fullscreen mode Exit fullscreen mode

3️⃣ Running Kafka, Zookeeper, Elasticsearch, and Kibana using Docker

Start the required services using the following Docker commands:

Start Zookeeper

docker run -d --name zookeeper -p 2181:2181 zookeeper
Enter fullscreen mode Exit fullscreen mode

Start Kafka

docker run -d --name kafka \
  --link zookeeper \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka
Enter fullscreen mode Exit fullscreen mode

Start Elasticsearch

docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.17.10
Enter fullscreen mode Exit fullscreen mode

Start Kibana

docker run -d --name kibana --link elasticsearch -p 5601:5601 kibana:7.17.10
Enter fullscreen mode Exit fullscreen mode

See All container are running fine

docker ps
Enter fullscreen mode Exit fullscreen mode

Image description

4️⃣ Creating a Kafka Topic

Create a topic named twitter-stream for streaming tweets:

docker exec -it kafka kafka-topics --create --topic twitter-stream --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Enter fullscreen mode Exit fullscreen mode

5️⃣ Implementing Kafka Producer (producer.py)

import tweepy
from kafka import KafkaProducer
import json
import time

# Twitter API credentials
BEARER_TOKEN = "Your-Bearer-Token"

# Kafka Producer
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda x: json.dumps(x).encode("utf-8")
)

# Authenticate Twitter API
client = tweepy.Client(bearer_token=BEARER_TOKEN)

def fetch_tweets():
    query = "#AI OR #Tech -is:retweet"  # Fetch tweets with these hashtags
    tweets = client.search_recent_tweets(query=query, tweet_fields=["created_at", "text", "id"], max_results=10)

    if tweets.data:
        for tweet in tweets.data:
            tweet_data = {
                "id": tweet.id,
                "text": tweet.text,
                "timestamp": str(tweet.created_at)
            }
            producer.send("twitter-stream", value=tweet_data)
            print(f"Tweet sent: {tweet_data}")

while True:
    fetch_tweets()
    time.sleep(30)  # Fetch tweets every 30 seconds
Enter fullscreen mode Exit fullscreen mode

Image description

How It Works?

✅ Imports the Kafka Producer and Tweepy for Twitter API authentication.
✅ Uses a Twitter API bearer token for authentication.
✅ Defines a query for tweets containing #AI or #Tech (excluding retweets).
✅ Fetches tweets every 30 seconds and sends them to Kafka under the twitter-stream topic.

Run the Producer

python producer.py
Enter fullscreen mode Exit fullscreen mode

Image description

6️⃣ Implementing Kafka Consumer (consumer.py)

from kafka import KafkaConsumer
import json
import requests

# Elasticsearch endpoint
ELASTICSEARCH_URL = "http://localhost:9200/twitter/_doc/"

# Kafka Consumer
consumer = KafkaConsumer(
    "twitter-stream",
    bootstrap_servers="localhost:9092",
    auto_offset_reset="earliest",
    value_deserializer=lambda x: json.loads(x.decode("utf-8"))
)

print("Listening for tweets...")

for message in consumer:
    tweet = message.value
    print(f"Received tweet: {tweet}")

    # Send tweet to Elasticsearch
    response = requests.post(ELASTICSEARCH_URL, json=tweet)
    print(f"Elasticsearch response: {response.json()}")
Enter fullscreen mode Exit fullscreen mode

How It Works?

✅ Connects to Kafka and listens to the twitter-stream topic.
✅ Reads tweets and prints them to the console.
✅ Sends each tweet to Elasticsearch for storage and further visualization in Kibana.

Run the Consumer

python consumer.py
Enter fullscreen mode Exit fullscreen mode

Image description

7️⃣ Verifying Data in Elasticsearch

Run the following command to check if tweets are successfully stored in Elasticsearch:

curl -X GET "http://localhost:9200/twitter/_search?pretty=true"
Enter fullscreen mode Exit fullscreen mode

Image description

8️⃣ Visualizing Tweets in Kibana

  1. Open Kibana: http://localhost:5601
  2. Go to ManagementStack ManagementIndex Patterns
  3. Click Create Index Pattern and enter twitter
  4. Select timestamp as the time field (if required) and click Create
  5. Navigate to Discover to explore tweets or Visualize to create charts & graphs!

Image description

🎯 Conclusion

In this blog, we built a real-time Twitter data streaming pipeline using Apache Kafka, Python, Elasticsearch, and Kibana. Here's a quick recap of what we achieved:

✅ Set up Kafka, Zookeeper, Elasticsearch, and Kibana using Docker.
✅ Created a Kafka topic (twitter-stream) to stream tweets.
✅ Built a Kafka Producer (producer.py) to fetch tweets from the Twitter API and send them to Kafka.
✅ Developed a Kafka Consumer (consumer.py) to retrieve tweets from Kafka and store them in Elasticsearch.
✅ Used Kibana to visualize and explore the streamed tweets.

This project demonstrates how Apache Kafka can be leveraged for real-time data streaming and analytics. You can extend it further by:
🚀 Adding Sentiment Analysis to categorize tweets as positive, negative, or neutral.
🚀 Deploying the solution on cloud platforms like AWS, GCP, or Azure.
🚀 Scaling Kafka to handle high-volume Twitter data streams.

I hope you found this project helpful! Feel free to check out the GitHub repo and experiment with enhancements. Happy coding! 🚀

✨ For more informative blog, Follow me on Hashnode, X(Twitter) and LinkedIn.

Top comments (0)