DEV Community

Cover image for Airflow Plugin - How I wrote custom Airflow Plugins
Satyajiijt Roy
Satyajiijt Roy

Posted on

Airflow Plugin - How I wrote custom Airflow Plugins

It's been quite some time I have been using Apache Airflow, we are using Version: 1.10.12 for some legacy reasons. May be in future we might be able to upgrade it to latest version 2.3.3.

Anyways we had a requirement to create, terminate ec2 instances on the fly. With version 1.10.12, one has to install apache-airflow-backport-providers-amazon.

So apache-airflow-backport-providers-amazon does have support for ec2 but only limited to start using EC2StartInstanceOperator and stop using EC2StopInstanceOperator, given the instance_id is known. It is missing create and terminate functionality.

So I decided to take some learnings from ec2 operator and extend it with create and terminate functionality.

So, lets go through the code by understanding the folder structure first

airflow-ec2-plugin-extended
      ├── __init__.py
      ├── ec2_extended_plugins.py
      ├── hooks
      │   ├── __init__.py
      │   └── ec2_instance_hooks.py
      ├── operators
      │   ├── __init__.py
      │   ├── ec2_create_instance.py
      │   └── ec2_terminate_instance.py
      ├── requirements.txt
      └── venv
Enter fullscreen mode Exit fullscreen mode

TL;DR the plugin code is available here -> airflow-ec2-plugin-extended


ec2_extended_plugins.py

ec2_extended_plugins.py contains the definition for EC2ExtendedPlugins's hooks EC2ExtendedHooks and operators EC2ExtendedCreateInstance,EC2ExtendedTerminateInstance. Basically ec2_extended_plugins.py stitches all together (hooks and operators)


ec2_instance_hooks.py

ec2_instance_hooks.py has the class EC2ExtendedHooks which contains 2 methods


create_instance takes following inputs arguments

Arugment Name Value Type Default Required
subnet_id string None Yes
security_group_ids List[str] None Yes
image_id string None Yes
instance_type string None Yes
region_name string None Yes
key_name string None Yes
tags List[Dict[str, str]] [{'ResourceType': 'instance','Tags': tags}] No
iam_instance_profile string None No
user_data string None No
min_count int 1 No
max_count int 1 No

And returns the Instance Object


terminate_instance takes following inputs arguments

Arugment Name Value Type Default Required
instance_id string None Yes
region_name string None Yes

And returns nothing. Both create_instance and terminate_instance are powered by the operator classes EC2ExtendedCreateInstance and EC2ExtendedTerminateInstance which inherits the BaseOperator for native functionality.


How to use

Once the airflow-ec2-plugin-extended Plugin is installed and the dag is enabled you will see something like this in Airflow Graph View

Create Instance dag code snippet

from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
create_ec2 = EC2ExtendedCreateInstance(
    subnet_id=bridge_subnet,
    security_group_ids=bridge_security_group_ids,
    image_id=bridge_image_id,
    instance_type='t2.medium',
    key_name='searchops-pipeline-dev',
    tags=[{"Key": "name", "Value": "AutoDeployed via MWAA Pipeline"}],
    aws_conn_id='aws_default',
    region_name='us-east-1',
    task_id='create_ec2',
)

create_ec2 >> terminate_ec2
Enter fullscreen mode Exit fullscreen mode

Above piece of code banks on the operator and creates the ec2 instance with arguments provided and stores the result in XCom - create_ec2

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.

Terminate instance dag code snippet

from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
terminate_ec2 = EC2ExtendedTerminateInstance(
    instance_id="{{ task_instance.xcom_pull('create_ec2', dag_id=DAG_ID, key='return_value')[0] }}",
    region_name='us-east-1',
    task_id='terminate_ec2',
 )
)

create_ec2 >> terminate_ec2
Enter fullscreen mode Exit fullscreen mode

To terminate the same instance we fetch the value (basically instance_id) from XCom - create_ec2 and pass it as value for instance_id argument.

You may ask why ? [0] while fetching the values from XCom - create_ec2. So, when EC2ExtendedCreateInstance stores the value in XCom - create_ec2, it store them as List[str] in order of instance_id and private_ip_address
So, we are fetching first element of the List[str] as we need the instance_id to terminate the instance.

Here is the example dag for the same.

Hope this helps to provide some understanding how we can write some custom Apache Airflow plugins if we need one.

Happy Coding!!

Top comments (0)