Quickly reviewing what we've done in the previous post, we turned an object that was responsible for the full ETL process and created separate objects for extracting, transforming, and loading and created a pipeline object called FundScraper to run through the process. However, FundScraper isn't very abstracted. You can put any URL you want in there, but the transformations are quite limited. The pipeline will only work for a single URL, which is a bit of a pain, and you could swap out the JSONWriter with a different file format, but we would need to modify the object to write to a database. Let's refactor it a bit so that it can handle configurations and be slightly more extendible.
# How it currently is
class FundScraper:
def __init__(self, requester: WebRequester, parser: HTMLParser, transformer: DataTransformer, writer: DataWriter):
self.requester = requester
self.parser = parser
self.transformer = transformer
self.writer = writer
def run(self, url, indexes, class_name, filename='data.json'):
response = self.requester.get(url)
if response.status_code != 200:
raise ValueError(f"Error retrieving {url}: {response.status_code}")
soup = self.parser.parse(response.content)
data = self.transformer.transform(
soup.find_all('div', class_=class_name), indexes)
self.writer.write(data, filename)
In all likelihood, we would probably want to run this across multiple webpages, potentially multiple divs, for multiple values. We could add a for-loop to collect data from multiple places on a single webpage like this:
data = {}
for index, index_number in enumerate(indexes):
data = self.transformer.transform(
soup.find_all('div', class_=class_name), data, index_number, value_names[index])
Modifying the transform method to this:
class FundValueTransformer(DataTransformer):
def transform(self, values, dictionary: dict, index: int, value_name: str):
dictionary[value_name] = str(values[index].contents[0]).strip().replace(
'$US', '').replace(',', '')
return dictionary
So now it's able to request a website, pull the contents with beautifulsoup, extract the values (i.e. transform the contents to something useful), and write to JSON.
How do we give it instructions? We could parse a JSON, create simple Python parameters, or we could use Yet Another Markup Language (YAML), which is a pretty popular configuration tool found in other applications.
A config we could use for our example would be like this:
sprott:
url: https://sprott.com/investment-strategies/physical-commodity-funds/uranium/
class_name:
- fundHeader_value: {4: shareprice, 6: u3o8_stock}
Really simply, the keys followed by colons show up as dictionaries when parsed in Python and the dashes show up as lists. You can also provide it a dictionary like I do for the index and value_name. You can see how we could easily add more values, more html tags, and more URLs to this list.
Last, but not least, we have to reconfigure our basic if script is called function at the bottom to parse the yaml. That could be done with the following:
if __name__ == '__main__':
config = safe_load(open('config.yml', 'r'))
scraper = FundScraper(RequestsWebRequester(), BeautifulSoupHTMLParser(
), FundValueTransformer(), JSONDataWriter())
for key, value in config.items():
for class_name in value['class_name']:
for tag, indexes in class_name.items():
scraper.run(url=value['url'],
class_name=tag,
indexes=[i for i in indexes.keys()],
value_names=[v for v in indexes.values()],
filename=f"{key}.json")
It's a bit more complex than the last round where we just instantiated the scraper and ran it because now we're running it multiple times over different URLs the separate configurations for each URL.
All of this is being done this way for a reason that will be clearer in a couple of weeks after I create an Airflow DAG with it all, but before then, we need to dive into Airflow first at a high level as a workflow orchestrator and, second, its components.
As always, the code can be found here.
Top comments (0)