In the last installment, we created a Python class called FundScraper
that we found wasn't really all that helpful or robust. It was more code with no real added benefits. Reading through the SOLID principles, we decided it's time to create what are called abstract classes.
If we were to think about what it is our FundScraper is doing in the ETL process, we have something that reaches out to a website, something that parses the website into html, something that transforms the html into values we want, and then something that writes to json.
Here that is in abstract classes:
from abc import ABC, abstractmethod
class WebRequester(ABC):
@abstractmethod
def get(self, url):
pass
class HTMLParser(ABC):
@abstractmethod
def parse(self, content):
pass
class DataTransformer(ABC):
@abstractmethod
def transform(self, data):
pass
class DataWriter(ABC):
@abstractmethod
def write(self, data, filename):
pass
Think of these as individual tasks in our pipeline now abstracted out. In Data Engineering, a concept called idempotence is really important. Essentially, it is no matter if a task is run once or 1000 times, you achieve the same result. It would be bad if you ran the task to write the JSON 1000x and it wrote your JSON file 1000 instead of writing over teh same file. It's the same way in a database; if I reran an INSERT task not know it's been run before, depending on the database, I would either get "duplicate key error" or in the case of Snowflake, a duplicate record inserted. Imagine a computer somehow looping on that task unable to get past it until someone stopped it. You'd have to just truncate your table and reload from scratch.
How does that work with a webscraper where the underlying website is always changing? Absent something like Wayback Machine that stores websites as they looked on certain days (but not everyday), there is no way for us to capture that again with our task. Same for databases; unless we take a snapshot of a database, there is no real way to run an extraction query on it and get the same result 5 minutes later. Thank goodness with databases, there is something called Change Data Capture (CDC) and Change Tracking (CT) that would allow us to look at the logs of the database recreate what it would look like; but that's a much deeper/longer conversation.
Now that we're thinking of these as tasks, we can go one step further and think of another website we want to extract information from. We could have the same WebRequester/HTMLParser (with a different URL) and same DataWriter, but with a different DataTransformer more suited to the values we're working with. We could call it WebSite2Scraper and pull in what we need.
But here is our FundScraper:
class FundScraper:
def __init__(self, requester: WebRequester, parser: HTMLParser, transformer: DataTransformer, writer: DataWriter):
self.requester = requester
self.parser = parser
self.transformer = transformer
self.writer = writer
def run(self, url, indexes, class_name, filename='data.json'):
response = self.requester.get(url)
if response.status_code != 200:
raise ValueError(f"Error retrieving {url}: {response.status_code}")
soup = self.parser.parse(response.content)
data = self.transformer.transform(
soup.find_all('div', class_=class_name), indexes)
self.writer.write(data, filename)
You see it's coupled to the abstract classes we created above. You'll also notice the Python packages requests, BeautifulSoup,
and json
are all absent. Again, with SOLID principles, we've fully decoupled them into new classes.
class RequestsWebRequester(WebRequester):
def get(self, url):
return requests.get(url)
class BeautifulSoupHTMLParser(HTMLParser):
def parse(self, content):
return BeautifulSoup(content, "html.parser")
class FundValueTransformer(DataTransformer):
def transform(self, values, indexes):
return {"values": [str(values[i].contents[0]).strip().replace('$US', '').replace(',', '')
for i in indexes]}
class JSONDataWriter(DataWriter):
def write(self, data, filename):
with open(filename, 'w') as f:
json.dump(data, f, indent=4)
How do we call this now? It's actually very similar to what we were doing before, but we instantiate our FundScraper class with the exact methods we want it to execute in our pipeline:
scraper = FundScraper(RequestsWebRequester(), BeautifulSoupHTMLParser(
), FundValueTransformer(), JSONDataWriter())
scraper.run(url='https://sprott.com/investment-strategies/physical-commodity-funds/uranium/',
indexes=[4, 6],
class_name='fundHeader_value',
filename='data.json')
So now, since we've decoupled everything and only call what we need, we run our FundScraper instantiating with the exact tasks and then run it on our values. Note here, this code produces a slightly different JSON than our last code since it hard codes the keys as "values." This could have been changed with a dictionary rather than a list.
As always, you can find this code on Github here. Next, we'll continue with a little more abstraction and then we'll jump into the two ways we could create our Airflow DAGs given everything we've discussed.
Top comments (0)