Hey developers! 👋 Continuing from the previous post, we'll implement the file translation endpoint using Python, AWS Lambda, and a clean Hexagonal Architecture. Let's dive in! You can check out my GitHub for the complete code.
The Translation Record Model
We'll reuse the Record model from the translate
endpoint. This time, instead the input and output text we will store the base64 encoded string of the the input and output bytes
#translate_file/models.py
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Record:
id: str
input_text: str
output_text: str
created_at: datetime = field(default=datetime.now)
Next, let's define the ports that handle different aspects of our application:
-
FilePersistencePort
– Responsible for storing raw byte data to a file. -
RequestPersistencePort
– Similar toTextPersistencePort
, this port manages the persistence of request input and output. -
TranslationPort
– Handles the translation of file content. These ports act as abstractions, making our system more modular and easier to extend.
#translate_file/ports.py
from typing import Protocol
from models import Record
class RequestPersistencePort(Protocol):
def save(self, input: bytes, output: bytes) -> Record:
pass
class FilePersistencePort(Protocol):
def save(self, file: bytes, extension: str) -> str:
pass
class TranslationPort(Protocol):
def translate(self, file: bytes, content_Type: str, lang: str) -> bytes:
pass
Now, let's define the adapters that implement our ports:
-
RequestPersistenceAdapter
– Stores the request input and output in DynamoDB and returns aRecord
object. -
FilePersistenceAdapter
– Saves the file as an object in an S3 bucket and returns a pre-signed URL for access. -
AWSTranslateAdapter
– Uses AWS Translate to process the file and outputs the translated file as bytes. By following this adapter pattern, we keep our architecture clean, modular, and easily extendable.
#translate_file/adapters.py
import boto3
import base64
from uuid import uuid4
from datetime import datetime
from models import Record
class RequestPersistenceAdapter:
"""
Implementation of RequestPersistencePort using DynamoDB as storage.
"""
def __init__(self, table_name: str):
dynamodb = boto3.resource("dynamodb")
self.table = dynamodb.Table(table_name)
def save(self, input: bytes, output: bytes) -> Record:
"""
Save input and output byptes to DynamoDB.
Args:
input: The input bytes to save
output: The output bytes to save
Returns:
Record object containing saved data
"""
id = str(uuid4())
created_at = datetime.now()
input_text = base64.b64encode(input).decode()
output_text = base64.b64encode(output).decode()
self.table.put_item(
Item={
"id": str(uuid4()),
"input_text": input_text,
"output_text": output_text,
"created_at": str(created_at),
}
)
record = Record(id, input_text, output_text, created_at)
return record
class FilePersistenceAdpater:
"""
Implementation of FilePersistencePort using S3
"""
def __init__(self, bucket_name: str):
self.client = boto3.client("s3")
self.bucket_name = bucket_name
def save(self, file: bytes, extension: str) -> str:
"""
Save file as object in S3 bucket
Args:
file: The file to save
Returns:
The url to the file
"""
key = str(uuid4()).replace("-", "")
if extension:
key += f".{extension}"
response = self.client.put_object(Key=key, Body=file, Bucket=self.bucket_name)
url = self.client.generate_presigned_url(
"get_object", Params={"Bucket": self.bucket_name, "Key": key}, ExpiresIn=300
)
return url
class AWSTranslateAdapter:
"""
Implementation of TranslationPort using AWSTranslate.
"""
def __init__(self):
self.client = boto3.client("translate")
def translate(self, file: bytes, content_type: str, lang: str) -> bytes:
"""
Translate input file to lang
Args:
file: The file to translate
lang: The language code to translate to
Returns:
The translated file
"""
result = self.client.translate_document(
Document={"Content": file, "ContentType": content_type},
SourceLanguageCode="auto",
TargetLanguageCode=lang,
)
translated_file = result["TranslatedDocument"]["Content"]
return translated_file
Next, we define the Handler class to process requests sent to the Lambda function. The files and target language are uploaded using multipart/form-data.
To preserve the integrity of the binary data AWS API Gateway base64 encodes the body when the binaryMediaTypes field is set to multipart/form-data. The TranslationRequest
class is responsible for:
- Decoding the request body and parsing multipart/form-data.
- Extracting the uploaded files.
- Identifying the target language for translation.
#translate_file/main.py
import os
import json
import base64
import logging
from dataclasses import dataclass
from typing import Dict, Any, Tuple
from requests_toolbelt import MultipartDecoder
from ports import RequestPersistencePort, FilePersistencePort, TranslationPort
from adapters import (
RequestPersistenceAdapter,
FilePersistenceAdpater,
AWSTranslateAdapter,
)
logger = logging.getLogger(__name__)
@dataclass
class TranslationRequest:
"""Dataclass for translation requests"""
files: list
lang: str
@staticmethod
def get_name(header: bytes) -> str:
return header.decode().split(";")[1].split("=")[1].strip('"')
@staticmethod
def get_file_info(header: bytes) -> Tuple[str, str]:
filename, extension = None, None
if len(header.decode().split(";")) >= 3:
filename = header.decode().split(";")[2].split("=")[1].strip('"')
file_ext = filename.split(".")
if len(file_ext) == 2:
extension = file_ext[1]
return filename, extension
@classmethod
def from_dict(cls, data: Dict) -> "TranslationRequest":
body = data["body"]
content_type = data["headers"].get("content-type", None) or data[
"headers"].get("Content-Type", None)
files = []
lang = ""
if data["isBase64Encoded"]:
body = base64.b64decode(body)
decoder = MultipartDecoder(body, content_type)
for part in decoder.parts:
filename, extension = cls.get_file_info(
part.headers[b"Content-Disposition"]
)
if filename:
content_type = part.headers[b"content-type"].decode()
files.append((part.content, content_type, extension))
elif cls.get_name(part.headers[b"Content-Disposition"]) == "lang":
lang = part.content.decode()
if not files:
raise ValueError("files must be provided")
if not lang:
raise ValueError("lang must be a string")
return cls(files, lang)
class Handler:
def __init__(
self,
request_port: RequestPersistencePort,
file_port: FilePersistencePort,
translate_port: TranslationPort,
):
self.request_port = request_port
self.translate_port = translate_port
self.file_port = file_port
def __call__(self, request, *args):
"""
Process a translation request.
Args:
request: dict containing the request data
Returns:
dict with status code and response body
"""
try:
request = TranslationRequest.from_dict(request)
except (json.JSONDecodeError, KeyError) as e:
logger.exception(f"Invalid request: {str(e)}")
return self._get_error_response("Invalid request", status_code=400)
urls = []
try:
lang = request.lang
for file in request.files:
result = self.translate_port.translate(file[0], file[1], lang)
output = self.request_port.save(file[0], result)
logger.info(f"Saved record with ID: {output.id}")
url = self.file_port.save(result, file[2])
urls.append(url)
return self._get_success_response(urls)
except Exception as e:
logger.exception(f"Error translating: {str(e)}")
return self._get_error_response("An error was encountered", status_code=500)
def _get_success_response(self, urls: list):
"""
Generate a successful response.
Args:
url: Path to the output file
Returns:
Dictionary with status code and response body
"""
return {"statusCode": "200",
"headers": {
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST"
},
"body": json.dumps({"urls": urls})}
def _get_error_response(self, error: str, status_code: int):
"""
Generate an error response.
Args:
error: The error message
status_code: HTTP status code
Returns:
Dictionary with status code and response body
"""
return {"statusCode": str(status_code),
"headers": {
"Access-Control-Allow-Headers": "Content-Type",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "OPTIONS,POST"
},
"body": json.dumps({"detail": error})}
request_port = RequestPersistenceAdapter(os.environ.get("DYNAMODB_TABLE"))
file_port = FilePersistenceAdpater(os.environ.get("S3_BUCKET"))
translate_port = AWSTranslateAdapter()
handler = Handler(request_port, file_port, translate_port)
Finally we add the required dependencies to a requirement.txt.
#requirements.txt
requests-toolbelt==1.0.0
Super! With our endpoint scripts complete, we now have a functional API for handling translation requests. 🎉
In the next installment, we'll dive into writing the Terraform configuration to provision our infrastructure and deploy our API seamlessly. Stay tuned! 🚀
Top comments (0)