Problem statement
The requirement for this application is to extract review data from product websites that has pagination on the review section and it need to have universal support for all pagination type. A GET API is also required to return the extracted review.
Implementation
It consists of three main components, API Gateway, Lambda Function and EC2 instance. Let's dive into each components individually:
API Gateway
As the name suggests, it exposes our automation process to the network. For this case, it's a REST API with GET response like this:
{
"reviews_count": 100,
"reviews": [
{
"title": "Review Title",
"body": "Review body text",
"rating": 5,
"reviewer": "Reviewer Name"
},
...
]
}
The API we just created will trigger the lambda function which manages the process on our EC2 instance using SSM (the code block for SSM is attached in the next section).
The API endpoint should have a query search parameter named 'page'. The final endpoint should be like this: /api/reviews?page={url}
. The query search parameter will be passed to lambda function using the event parameter present in the lambda function.
We need to make sure that lambda proxy integration is set to True in order get the lambda function output as the response from our API.
Lambda Function
Our lambda function will work as a middleman which gets triggered from the API call and executes the automation pipeline in EC2 and passes the output generate by the pipeline to the API response.
The lambda function will get the query search parameter using the event parameter passed in the lambda function like this:
url = event['queryStringParameters']['page']
As mentioned earlier, it uses SSM to manage the process executed by the EC2 instance, here is the code block which is responsible for that
ssm = boto3.client('ssm', region_name='ap-south-1')
unique_id = str(uuid.uuid4())
# Send command to EC2 instance
response = ssm.send_command(
InstanceIds=['i-instanceIdOfTheEc2VM'],
DocumentName='AWS-RunPowerShellScript',
Parameters={
'commands': [f'C:\\Users\\Administrator\\AppData\\Local\\Programs\\Python\\Python311\\python.exe C:\\final-automation-w-rating.py "{url}" "{unique_id}"']
}
)
command_id = response['Command']['CommandId']
This block will execute the python script present on EC2 instance. We are passing 'unique_id' and 'url' as command line arguments for the python script.
It will then execute a synchronous block which poll the status of the process whether it's finished or not every 8 second.
while True:
try:
invocation_response = ssm.get_command_invocation(
CommandId=command_id,
InstanceId='i-07b0999d978efd1fb'
)
status = invocation_response['Status']
if status in ['Success', 'Failed', 'Cancelled', 'TimedOut']:
print(f"Command finished with status: {status}")
break
print(f"Current status: {status}. Waiting for completion...")
time.sleep(8)
except ssm.exceptions.InvocationDoesNotExist:
print("Invocation does not exist yet. Retrying...")
time.sleep(2)
The final process, fetch the extracted data from the EC2 instance. To do that, we'll be using S3 bucket to pass the data between EC2 and Lambda function. Earlier we passed 'unique_id' as command line argument to the python script, it'll serve as the file name for the json file which then will be uploaded to the S3 bucket. As we are passing unique_id from lambda function so after the EC2 process is finished, we can fetch the data from the S3 bucket and dump the data in the return statement which will serve as the response for the API.
s3_client = boto3.client('s3')
bucket_name = 'extracted-reviews'
file_name = f'{unique_id}.json'
try:
s3_response = s3_client.get_object(Bucket=bucket_name, Key=file_name)
file_data = s3_response['Body'].read().decode('utf-8')
json_data = json.loads(file_data)
return {
'statusCode': 200,
"headers": {
'Content-Type': 'application/json',
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
'body': json.dumps(json_data)
}
except Exception as e:
return {
'statusCode': 500,
"headers": {
'Content-Type': 'application/json',
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
'body': json.dumps({'error': str(e)})
}
EC2 Instance
First, we need to strip down as much code as possible from the source code to reduce the token size for the LLM, which will lower API costs, improve performance, and enhance accuracy. For this purpose, BeautifulSoup is used to remove everything wrapped within the following tags: script, style, img, nav, header, footer, picture, svg, path, and form.
def filter_source(source):
soup = BeautifulSoup(source, 'html.parser')
for script in soup(["script", "style", "img", "nav", "header", "footer", "picture", "svg", "path", "form"]):
script.decompose()
cleaned_body_content = str(soup.body)
return cleaned_body_content
Now, to go to the next review page, the program needs to click the 'next' button. To do that, it needs the class name of the button. Since every website has a unique class name for the button, we can't hard-code the class names. To address this, I'll use the LLM to determine the class names from the source code. The program will also need to retrieve the review details, so I've added code to identify the class name of the review elements as well. I'm using the Google AI Studio API (Gemini 1.5 Flash) because IT'S FREE and supports an input size of 1 million token, which pretty much guarantees that the source code will fit as input.
#global variable
review_paginate_next = ""
review_author = ""
review_title = ""
review_text = ""
review_rating = ""
prompt = """extract the following class name for each of the following elements:
- pagination "next page" button of review section
- name of reviewer
- title of review
- text of review
- rating classname
from the provided codebase.
Just return a comma seperated value of classnames, if multiple class name is found for the same section, use the most relevant one which is unique.
Don't trim the values, return the value as it is in source code.
Don't return any other text than mentioned. Here is the code: """
google_api_key = os.getenv('GOOGLE_API_KEY')
def filter_css_selector(source_text, max_retries = 3):
response = requests.post(
url=f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={google_api_key}",
headers={
"Content-Type": "application/json"
},
json={
"contents": [
{
"parts": [
{
"text": prompt + source_text
}
]
}
]
}
)
if response.status_code == 200:
data = response.json()
message_content = data['candidates'][0]['content']['parts'][0]['text']
message_content = message_content.strip("\n")
try:
global review_paginate_next, review_author, review_title, review_text, review_rating
review_paginate_next, review_author, review_title, review_text, review_rating = message_content.split(",")
next_buttons.append(f'.{review_paginate_next}')
print(review_paginate_next)
print(review_author)
print(review_title)
print(review_text)
print(review_rating)
except:
# also try with some other model
if (max_retries > 0):
time.sleep(2)
filter_css_selector(source_text, max_retries - 1)
else:
# handles model overload error or any other error encountered by LLM API
print(response.json())
if (max_retries > 0):
time.sleep(2)
filter_css_selector(source_text, max_retries - 1)
Once we know the class names, the program can scrape the review page by page using BeautifulSoup. Why use BeautifulSoup instead of the LLM? Because it's lightning fast, doesn't produce false positive values like an LLM might (though it can sometimes miss values entirely if the class name is incorrect), and doesn't have any rate limits, so we can scrape as many pages as needed.
def extract_reviews(source):
body_strainer = SoupStrainer('body')
soup = BeautifulSoup(source, 'html.parser', parse_only=body_strainer)
titles = soup.find_all(class_=review_title)
bodies = soup.find_all(class_=review_text)
authors = soup.find_all(class_=review_author)
ratings = soup.find_all(class_=review_rating)
for i in range(max(len(titles), len(bodies), len(authors), len(ratings))):
review = {
"title": titles[i].get_text(strip=True) if i < len(titles) else "",
"body": bodies[i].get_text(strip=True) if i < len(bodies) else "",
"author": authors[i].get_text(strip=True) if i < len(authors) else "",
"rating": ratings[i].get_text(strip=True) if i < len(ratings) else ""
}
reviews.append(review)
Now, the data needs to be passed back from the EC2 instance to the Lambda function so it can be returned through the API Gateway. To do this, I'll be using an S3 bucket. This approach also allows it to act as a cache store for already extracted reviews.
def upload_to_s3(data, unique_file_name):
s3_client = boto3.client('s3') # Create an S3 client
bucket_name = 'extracted-reviews' # Replace with your bucket name
s3_client.put_object(
Bucket=bucket_name,
Key=unique_file_name,
Body=json.dumps(data), # Convert list to JSON string
ContentType='application/json'
)
print(f"Responses uploaded to s3://{bucket_name}/{unique_file_name}")
Now, to combine everything and automate the process, I'll be using Playwright. It clicks the button, gets the page source, extracts the reviews, and repeats the process until all reviews are extracted (with a hard limit of 20 pages to ensure the user doesn't have to wait indefinitely, since we're not using streaming to output data on the fly). Additionally, if the review extraction fails for any reason, I've implemented a fallback function to ensure that it returns at least some review data in the response.
async def scrape(url, file_name):
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url)
await page.wait_for_selector('body')
page_source = await page.content()
cleaned_body_content = filter_source(page_source)
filter_css_selector(cleaned_body_content)
dialog_close_attempt = 1
for elm in next_buttons:
count = 0
while True:
await page.wait_for_selector('body')
page_source = await page.content()
extract_reviews(page_source)
print(count)
count += 1
if (count > 20): break
try:
next_button = page.locator(elm)
await page.mouse.click(x=0, y=page.viewport_size['height'] // 2)
await asyncio.wait_for(next_button.click(), timeout=5)
await page.wait_for_load_state('networkidle')
await page.wait_for_selector('body')
except asyncio.TimeoutError:
break
except Exception as e:
print("Bro, error with pagination? ", e)
break
#Handle infinite scroll
prev_height = -1
max_scrolls = 20 # Set a maximum number of scrolls to prevent infinite loops
scroll_count = 0
while scroll_count < max_scrolls:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(200)
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == prev_height:
break
prev_height = new_height
scroll_count += 1
page_source = await page.content()
extract_reviews(page_source)
if (len(reviews) == 0):
fallback_review_extraction(cleaned_body_content)
fallback_reviews["reviews_count"] = len(fallback_reviews["reviews"])
upload_to_s3(fallback_reviews, file_name)
# print(fallback_reviews)
else:
reviews_dict = {"reviews_count" : len(reviews), "reviews": reviews}
# print(reviews_dict)
upload_to_s3(reviews_dict, file_name)
Challenges Faced
- There were three challenges I faced while choosing an LLM: first, it was returning false positive values, and second, the source code wasn't fitting within the input context and third, the performance. I tried several free models, namely Llama, Mistral, and Qwen, but each had its shortcomings. Some had a very small input token size, some generated random output (false positives), and others were as slow as me while cooking food. 'Gemini 1.5 flash' turned out to be the best, with a response time ranging from 1.5 seconds to 10 seconds (for most cases), generating accurate values (not always but better than others), an enormous input token size of up to 1 million. (PS: I had no idea about DeepSeek at the time while building the project.)
- A dialog box would randomly pop up, blocking the 'next' button click. To prevent this, before every button click, I ensure that it click on the coordinates x=0, y=50 to make sure the dialog box disappears before clicking the 'next' button.
- Because Playwright works on Chromium, it isn't natively supported by Lambda functions. Therefore, EC2 was used as a manual workaround. There are other 3rd party services available to run automation jobs, but they require additional costs.
- Passing data from EC2 to Lambda is not directly supported, so I had to use S3 (though it adds to the cost due to read and write operations on the S3 bucket).
- AWS API Gateway has a default timeout of 29 seconds, and the runtime of the pipeline can exceed 29 seconds. Therefore, I need to increase the timeout to 2 minutes through 'Service Quotas' in AWS.
Demo
Live website: https://serene-kitten-5a66fb.netlify.app/
demo.mp4
API Endpoints
https://wb6nvu1fl1.execute-api.ap-south-1.amazonaws.com/dev/api/reviews?page={PRODUCT_URL}
Note
Make sure to enter full URL in query parameter like this: ?page=https://www.example.com
Response:
{
"statusCode":
"reviews_count": ,
"reviews": [
{
"title": "",
"body": "",
"author": "",
"rating": ""
},
]
}
Workflow
Technologies used:
- Backend: AWS Lambda, EC2, API Gateway, S3
- Script: Python (Beautiful Soup, Playwright)
- LLM: Gemini-1.5-flash
- Frontend: Next.js
Components:
-
HTML Content Filtering: The process begins by accepting a webpage URL, filtering its source code using
Beautiful Soup
to extract meaningful content while discarding irrelevant elements to reduce the token size before passing it on to the LLM. -
Extract class selector: To automate actions like pagination, extracing reviews, we need the class selector to interact with the page elements programmatically, and to get that, it uses
Gemini-1.5-flash
model. - Browser Automation: An…
Thanks for reading.
Top comments (0)