Introduction
Let me share how I built an AI chatbot using AWS, OpenAI, and Telegram. The main goal was to create a smart, cost-effective chatbot without dealing with server maintenance. A serverless approach was a perfect fit for this task.
The project needed to solve these main challenges:
- Create an intelligent chatbot using OpenAI
- Keep running costs low with serverless architecture
- Ensure secure handling of sensitive data
- Guarantee reliable message delivery
Serverless architecture was chosen because:
- Pay-per-use pricing model
- Automatic scaling capabilities
- Minimal maintenance overhead
- Built-in high availability
The tech stack includes:
- AWS services (Lambda, API Gateway, SQS, DynamoDB, KMS)
- OpenAI's GPT-4 for message processing
- Telegram as a messaging platform
- Terraform for infrastructure setup
- AWS Lambda Powertools for monitoring
Architecture Overview
How It Works
The system processes messages in a simple flow:
- User sends a message to the Telegram bot
- Telegram forwards it to AWS API Gateway
- Message goes through processing pipeline
- User receives response from OpenAI
Here's the visual representation:
Core Components
Each component has a specific role in the system:
API Gateway serves as an entry point:
module "api_gateway" {
name = "${var.app_name}-webhook"
protocol_type = "HTTP"
integrations = {
"ANY /" = {
integration_type = "AWS_PROXY"
integration_subtype = "SQS-SendMessage"
}
}
}
SQS Queue handles message buffering:
resource "aws_sqs_queue" "inbound" {
name_prefix = "${var.app_name}-inbound-queue"
visibility_timeout_seconds = 360
message_retention_seconds = 86400
}
Lambda Function processes messages:
module "lambda_function" {
function_name = "${var.app_name}-messages-processing"
handler = "index.handler"
runtime = "python3.12"
timeout = 60
}
DynamoDB stores conversation state:
resource "aws_dynamodb_table" "threads" {
name = "${var.app_name}-threads"
hash_key = "chat_id"
range_key = "thread_id"
}
Each component was designed with scalability and reliability in mind. The system can handle multiple conversations simultaneously while maintaining message order and conversation context.
Deep Dive: Implementation Details
Message Flow Implementation
Let's break down how messages move through the system. This section covers the actual implementation of each component.
Setting Up Telegram Webhook
First, we need to connect Telegram to our AWS endpoint. Here's a simple script that handles this:
TELEGRAM_TOKEN="your-bot-token"
ENDPOINT="your-api-gateway-url"
curl -X "POST" "https://api.telegram.org/bot${TELEGRAM_TOKEN}/setWebhook" \
-d "{\"url\": \"${ENDPOINT}\"}" \
-H "Content-Type: application/json"
Message Processing Pipeline
The Lambda function processes messages in several steps. Here's the main handler:
def handler(event, _context):
"""
Main entry point for processing messages.
Receives events from SQS, processes them, and sends responses.
"""
try:
# Extract message from SQS event
request_body = json.loads(event['Records'][0]['body'])
update = telebot.types.Update.de_json(request_body)
# Process message only if user is allowed
if update.message.chat.id in ALLOWED_USERS:
process_message(update.message)
return {"statusCode": 200}
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
return {"statusCode": 500}
OpenAI Integration
The OpenAI integration is handled through a dedicated function:
def ask_openai_threads(chat_id, question):
"""
Sends user message to OpenAI and manages conversation threads.
"""
# Get or create assistant
assistant_id = get_stored_assistant_id()
if not assistant_id:
assistant_id = create_assistant()
save_assistant(assistant_id)
# Get or create thread
thread_id = get_stored_thread_id(chat_id)
if not thread_id:
thread = openai_client.beta.threads.create()
thread_id = thread.id
save_thread(chat_id, thread_id)
# Add message and run assistant
openai_client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=question
)
run = openai_client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=assistant_id
)
# Wait for response
while run.status != 'completed':
run = openai_client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
# Get and return assistant's response
messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
return messages.data[0].content[0].text.value
State Management
The project uses DynamoDB to keep track of conversations and assistant configuration.
Thread Storage
Here's how we store and retrieve conversation threads:
def save_thread(chat_id, thread_id):
"""
Saves new thread to DynamoDB.
"""
item = {
'chat_id': {'N': str(chat_id)},
'thread_id': {'S': thread_id},
'thread_status': {'S': 'ACTIVE'},
'created_at': {'S': datetime.now().isoformat()}
}
dynamodb_client.put_item(
TableName=THREADS_TABLE_NAME,
Item=item
)
def get_stored_thread_id(chat_id):
"""
Retrieves active thread for a chat.
"""
response = threads_table.query(
IndexName='UserStatusIndex',
KeyConditionExpression=Key('chat_id').eq(chat_id) &
Key('thread_status').eq('ACTIVE'),
Limit=1
)
return response['Items'][0]['thread_id'] if response['Items'] else None
Security Implementation
Secret Management
We use AWS Parameter Store to keep API tokens and other secrets safe.
# Instead of hardcoding tokens:
ssm = boto3.client('ssm')
# Get Telegram token
TELEGRAM_TOKEN = ssm.get_parameter(
Name=TELEGRAM_TOKEN_PARAM_NAME,
WithDecryption=True
)['Parameter']['Value']
# Get OpenAI token
OPENAI_TOKEN = ssm.get_parameter(
Name=OPENAI_TOKEN_PARAM_NAME,
WithDecryption=True
)['Parameter']['Value']
The parameters are created using Terraform:
resource "aws_ssm_parameter" "bot-token" {
name = "${var.app_name}-bot-token"
type = "SecureString"
key_id = "alias/aws/ssm"
value = "CHANGE-ME" # Changed manually after deployment
}
Encryption
We use KMS for encrypting data at rest. Here's how we set it up:
resource "aws_kms_key" "dynamo-encryption-key" {
description = "Key for DynamoDB encryption"
deletion_window_in_days = 10
enable_key_rotation = true
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "Enable IAM User Permissions"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::${local.account_id}:root"
}
Action = "kms:*"
Resource = "*"
}
]
})
}
Access Control
We limit who can use the bot with a simple check:
ALLOWED_USERS = list(map(int, allowed_users_ids.split(',')))
@bot.message_handler(func=lambda message: message.chat.id not in ALLOWED_USERS)
def decline_strangers(message):
response = (
f"Access denied.\n"
f"Your user ID: {message.chat.id}"
)
bot.reply_to(message, response)
IAM Roles
Lambda needs specific permissions to access other services. Here's the IAM configuration:
module "lambda_function" {
# ... other configuration ...
attach_policy_statements = true
policy_statements = {
sqs = {
effect = "Allow",
actions = ["sqs:SendMessage"],
resources = [aws_sqs_queue.inbound.arn]
},
ssm = {
effect = "Allow",
actions = ["ssm:GetParameter"],
resources = [
aws_ssm_parameter.bot-token.arn,
aws_ssm_parameter.openai-token.arn,
]
},
dynamodb = {
effect = "Allow",
actions = [
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:Scan"
],
resources = ["*"]
}
}
}
Security Best Practices
Some key security measures we implemented:
-
Network Security:
- API Gateway uses HTTPS only
- Lambda functions run in private VPC (optional)
-
Data Security:
- All sensitive data encrypted at rest
- Secrets stored in Parameter Store
- DynamoDB encryption enabled
-
Access Security:
- Minimal IAM permissions
- User allowlist
- API key rotation enabled
Monitoring and Operations
CloudWatch Integration
We use AWS Lambda Powertools to make monitoring easier. Here's how we set it up:
from aws_lambda_powertools import Tracer, Metrics, Logger
tracer = Tracer()
metrics = Metrics()
logger = Logger()
@tracer.capture_lambda_handler
@metrics.log_metrics
@logger.inject_lambda_context
def handler(event, _context):
"""
Main handler with full observability.
"""
try:
process_event(event)
metrics.add_metric(name="SuccessfulProcessing", value=1, unit="Count")
except Exception:
metrics.add_metric(name="FailedProcessing", value=1, unit="Count")
raise
Logging Strategy
We use structured logging to make debugging easier:
def process_event(event):
"""
Process events with structured logging.
"""
logger.info("Processing new event", extra={
"event_type": "message_received",
"timestamp": datetime.now().isoformat(),
"source": "telegram"
})
Conclusion
What We Built
We created a serverless AI chatbot that combines:
- AWS serverless infrastructure
- OpenAI's powerful language models
- Telegram's messaging platform
The system handles:
- Secure message processing
- Reliable conversation management
- Cost-effective scaling
- Comprehensive monitoring
Key Takeaways
- Serverless architecture reduces operational overhead
- Queue-based design ensures message reliability
- DynamoDB provides flexible state management
- KMS encryption protects sensitive data
Lessons Learned
What Worked Well
- Serverless architecture scaled smoothly
- SQS prevented message loss
- Lambda Powertools improved observability
What Could Be Better
- Cold starts need optimization
- OpenAI API costs need monitoring
- Error handling could be more robust
Final Thoughts
Building a serverless AI chatbot taught us that:
- Simple architecture can handle complex tasks
- AWS services work well together
- Proper monitoring is crucial
- Cost management needs constant attention
Getting Started
Want to try it yourself? Here's a quick start:
- Clone the repository
- Set up AWS credentials
- Deploy with Terraform
- Update SSM parameters with your API keys
- Set up the Telegram webhook
Check deployment instructions in the repository.
The code is open source and available on GitHub: https://github.com/requix/aws-telegram-ai-module
Feel free to contribute or adapt it for your needs.
This project shows how modern cloud services and AI can work together to create practical, scalable applications. While there's always room for improvement, this architecture provides a solid foundation for building AI-powered chatbots.
Top comments (1)
Looks like fun, I know what I'll be playing around with this weekend!