DEV Community

Building a Serverless AI Chatbot: Integrating OpenAI with Telegram on AWS

Introduction

Let me share how I built an AI chatbot using AWS, OpenAI, and Telegram. The main goal was to create a smart, cost-effective chatbot without dealing with server maintenance. A serverless approach was a perfect fit for this task.

The project needed to solve these main challenges:

  • Create an intelligent chatbot using OpenAI
  • Keep running costs low with serverless architecture
  • Ensure secure handling of sensitive data
  • Guarantee reliable message delivery

Serverless architecture was chosen because:

  • Pay-per-use pricing model
  • Automatic scaling capabilities
  • Minimal maintenance overhead
  • Built-in high availability

The tech stack includes:

  • AWS services (Lambda, API Gateway, SQS, DynamoDB, KMS)
  • OpenAI's GPT-4 for message processing
  • Telegram as a messaging platform
  • Terraform for infrastructure setup
  • AWS Lambda Powertools for monitoring

Architecture Overview

How It Works

The system processes messages in a simple flow:

  1. User sends a message to the Telegram bot
  2. Telegram forwards it to AWS API Gateway
  3. Message goes through processing pipeline
  4. User receives response from OpenAI

Here's the visual representation:
Architecture Diagram

Core Components

Each component has a specific role in the system:

API Gateway serves as an entry point:

module "api_gateway" {
  name          = "${var.app_name}-webhook"
  protocol_type = "HTTP"
  integrations = {
    "ANY /" = {
      integration_type = "AWS_PROXY"
      integration_subtype = "SQS-SendMessage"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

SQS Queue handles message buffering:

resource "aws_sqs_queue" "inbound" {
  name_prefix = "${var.app_name}-inbound-queue"
  visibility_timeout_seconds = 360
  message_retention_seconds = 86400
}
Enter fullscreen mode Exit fullscreen mode

Lambda Function processes messages:

module "lambda_function" {
  function_name = "${var.app_name}-messages-processing"
  handler       = "index.handler"
  runtime       = "python3.12"
  timeout       = 60
}
Enter fullscreen mode Exit fullscreen mode

DynamoDB stores conversation state:

resource "aws_dynamodb_table" "threads" {
  name      = "${var.app_name}-threads"
  hash_key  = "chat_id"
  range_key = "thread_id"
}
Enter fullscreen mode Exit fullscreen mode

Each component was designed with scalability and reliability in mind. The system can handle multiple conversations simultaneously while maintaining message order and conversation context.

Deep Dive: Implementation Details

Message Flow Implementation

Let's break down how messages move through the system. This section covers the actual implementation of each component.

Setting Up Telegram Webhook

First, we need to connect Telegram to our AWS endpoint. Here's a simple script that handles this:

TELEGRAM_TOKEN="your-bot-token"
ENDPOINT="your-api-gateway-url"

curl -X "POST" "https://api.telegram.org/bot${TELEGRAM_TOKEN}/setWebhook" \
    -d "{\"url\": \"${ENDPOINT}\"}" \
    -H "Content-Type: application/json"
Enter fullscreen mode Exit fullscreen mode

Message Processing Pipeline

The Lambda function processes messages in several steps. Here's the main handler:

def handler(event, _context):
    """
    Main entry point for processing messages.
    Receives events from SQS, processes them, and sends responses.
    """
    try:
        # Extract message from SQS event
        request_body = json.loads(event['Records'][0]['body'])
        update = telebot.types.Update.de_json(request_body)

        # Process message only if user is allowed
        if update.message.chat.id in ALLOWED_USERS:
            process_message(update.message)

        return {"statusCode": 200}
    except Exception as e:
        logger.error(f"Error processing message: {str(e)}")
        return {"statusCode": 500}
Enter fullscreen mode Exit fullscreen mode

OpenAI Integration

The OpenAI integration is handled through a dedicated function:

def ask_openai_threads(chat_id, question):
    """
    Sends user message to OpenAI and manages conversation threads.
    """
    # Get or create assistant
    assistant_id = get_stored_assistant_id()
    if not assistant_id:
        assistant_id = create_assistant()
        save_assistant(assistant_id)

    # Get or create thread
    thread_id = get_stored_thread_id(chat_id)
    if not thread_id:
        thread = openai_client.beta.threads.create()
        thread_id = thread.id
        save_thread(chat_id, thread_id)

    # Add message and run assistant
    openai_client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=question
    )

    run = openai_client.beta.threads.runs.create(
        thread_id=thread_id,
        assistant_id=assistant_id
    )

    # Wait for response
    while run.status != 'completed':
        run = openai_client.beta.threads.runs.retrieve(
            thread_id=thread_id,
            run_id=run.id
        )

    # Get and return assistant's response
    messages = openai_client.beta.threads.messages.list(thread_id=thread_id)
    return messages.data[0].content[0].text.value
Enter fullscreen mode Exit fullscreen mode

State Management

The project uses DynamoDB to keep track of conversations and assistant configuration.

Thread Storage

Here's how we store and retrieve conversation threads:

def save_thread(chat_id, thread_id):
    """
    Saves new thread to DynamoDB.
    """
    item = {
        'chat_id': {'N': str(chat_id)},
        'thread_id': {'S': thread_id},
        'thread_status': {'S': 'ACTIVE'},
        'created_at': {'S': datetime.now().isoformat()}
    }

    dynamodb_client.put_item(
        TableName=THREADS_TABLE_NAME,
        Item=item
    )

def get_stored_thread_id(chat_id):
    """
    Retrieves active thread for a chat.
    """
    response = threads_table.query(
        IndexName='UserStatusIndex',
        KeyConditionExpression=Key('chat_id').eq(chat_id) & 
                             Key('thread_status').eq('ACTIVE'),
        Limit=1
    )
    return response['Items'][0]['thread_id'] if response['Items'] else None
Enter fullscreen mode Exit fullscreen mode

Security Implementation

Secret Management

We use AWS Parameter Store to keep API tokens and other secrets safe.

# Instead of hardcoding tokens:
ssm = boto3.client('ssm')

# Get Telegram token
TELEGRAM_TOKEN = ssm.get_parameter(
    Name=TELEGRAM_TOKEN_PARAM_NAME, 
    WithDecryption=True
)['Parameter']['Value']

# Get OpenAI token
OPENAI_TOKEN = ssm.get_parameter(
    Name=OPENAI_TOKEN_PARAM_NAME, 
    WithDecryption=True
)['Parameter']['Value']
Enter fullscreen mode Exit fullscreen mode

The parameters are created using Terraform:

resource "aws_ssm_parameter" "bot-token" {
  name   = "${var.app_name}-bot-token"
  type   = "SecureString"
  key_id = "alias/aws/ssm"
  value  = "CHANGE-ME"  # Changed manually after deployment
}
Enter fullscreen mode Exit fullscreen mode

Encryption

We use KMS for encrypting data at rest. Here's how we set it up:

resource "aws_kms_key" "dynamo-encryption-key" {
  description             = "Key for DynamoDB encryption"
  deletion_window_in_days = 10
  enable_key_rotation     = true

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${local.account_id}:root"
        }
        Action   = "kms:*"
        Resource = "*"
      }
    ]
  })
}
Enter fullscreen mode Exit fullscreen mode

Access Control

We limit who can use the bot with a simple check:

ALLOWED_USERS = list(map(int, allowed_users_ids.split(',')))

@bot.message_handler(func=lambda message: message.chat.id not in ALLOWED_USERS)
def decline_strangers(message):
    response = (
        f"Access denied.\n"
        f"Your user ID: {message.chat.id}"
    )
    bot.reply_to(message, response)
Enter fullscreen mode Exit fullscreen mode

IAM Roles

Lambda needs specific permissions to access other services. Here's the IAM configuration:

module "lambda_function" {
  # ... other configuration ...

  attach_policy_statements = true
  policy_statements = {
    sqs = {
      effect    = "Allow",
      actions   = ["sqs:SendMessage"],
      resources = [aws_sqs_queue.inbound.arn]
    },
    ssm = {
      effect    = "Allow",
      actions   = ["ssm:GetParameter"],
      resources = [
        aws_ssm_parameter.bot-token.arn,
        aws_ssm_parameter.openai-token.arn,
      ]
    },
    dynamodb = {
      effect = "Allow",
      actions = [
        "dynamodb:PutItem",
        "dynamodb:Query",
        "dynamodb:Scan"
      ],
      resources = ["*"]
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Security Best Practices

Some key security measures we implemented:

  1. Network Security:

    • API Gateway uses HTTPS only
    • Lambda functions run in private VPC (optional)
  2. Data Security:

    • All sensitive data encrypted at rest
    • Secrets stored in Parameter Store
    • DynamoDB encryption enabled
  3. Access Security:

    • Minimal IAM permissions
    • User allowlist
    • API key rotation enabled

Monitoring and Operations

CloudWatch Integration

We use AWS Lambda Powertools to make monitoring easier. Here's how we set it up:

from aws_lambda_powertools import Tracer, Metrics, Logger

tracer = Tracer()
metrics = Metrics()
logger = Logger()

@tracer.capture_lambda_handler
@metrics.log_metrics
@logger.inject_lambda_context
def handler(event, _context):
    """
    Main handler with full observability.
    """
    try:
        process_event(event)
        metrics.add_metric(name="SuccessfulProcessing", value=1, unit="Count")
    except Exception:
        metrics.add_metric(name="FailedProcessing", value=1, unit="Count")
        raise
Enter fullscreen mode Exit fullscreen mode

Logging Strategy

We use structured logging to make debugging easier:

def process_event(event):
    """
    Process events with structured logging.
    """
    logger.info("Processing new event", extra={
        "event_type": "message_received",
        "timestamp": datetime.now().isoformat(),
        "source": "telegram"
    })
Enter fullscreen mode Exit fullscreen mode

Conclusion

What We Built

We created a serverless AI chatbot that combines:

  • AWS serverless infrastructure
  • OpenAI's powerful language models
  • Telegram's messaging platform

The system handles:

  • Secure message processing
  • Reliable conversation management
  • Cost-effective scaling
  • Comprehensive monitoring

Key Takeaways

  • Serverless architecture reduces operational overhead
  • Queue-based design ensures message reliability
  • DynamoDB provides flexible state management
  • KMS encryption protects sensitive data

Lessons Learned

What Worked Well

  • Serverless architecture scaled smoothly
  • SQS prevented message loss
  • Lambda Powertools improved observability

What Could Be Better

  • Cold starts need optimization
  • OpenAI API costs need monitoring
  • Error handling could be more robust

Final Thoughts

Building a serverless AI chatbot taught us that:

  • Simple architecture can handle complex tasks
  • AWS services work well together
  • Proper monitoring is crucial
  • Cost management needs constant attention

Getting Started

Want to try it yourself? Here's a quick start:

  1. Clone the repository
  2. Set up AWS credentials
  3. Deploy with Terraform
  4. Update SSM parameters with your API keys
  5. Set up the Telegram webhook

Check deployment instructions in the repository.

The code is open source and available on GitHub: https://github.com/requix/aws-telegram-ai-module
Feel free to contribute or adapt it for your needs.

This project shows how modern cloud services and AI can work together to create practical, scalable applications. While there's always room for improvement, this architecture provides a solid foundation for building AI-powered chatbots.

Top comments (1)

Collapse
 
drjoanneskiles profile image
Joanne Skiles

Looks like fun, I know what I'll be playing around with this weekend!