It's been quite some time I have been using Apache Airflow, we are using Version: 1.10.12
for some legacy reasons. May be in future we might be able to upgrade it to latest version 2.3.3
.
Anyways we had a requirement to create
, terminate
ec2
instances on the fly. With version 1.10.12
, one has to install apache-airflow-backport-providers-amazon
.
So apache-airflow-backport-providers-amazon
does have support for ec2
but only limited to start using EC2StartInstanceOperator
and stop using EC2StopInstanceOperator
, given the instance_id
is known. It is missing create and terminate functionality.
So I decided to take some learnings from
ec2
operator and extend it with create and terminate functionality.
So, lets go through the code by understanding the folder structure first
airflow-ec2-plugin-extended
├── __init__.py
├── ec2_extended_plugins.py
├── hooks
│ ├── __init__.py
│ └── ec2_instance_hooks.py
├── operators
│ ├── __init__.py
│ ├── ec2_create_instance.py
│ └── ec2_terminate_instance.py
├── requirements.txt
└── venv
TL;DR the plugin code is available here ->
airflow-ec2-plugin-extended
ec2_extended_plugins.py
ec2_extended_plugins.py
contains the definition for EC2ExtendedPlugins
's hooks EC2ExtendedHooks
and operators EC2ExtendedCreateInstance
,EC2ExtendedTerminateInstance
. Basically ec2_extended_plugins.py
stitches all together (hooks and operators)
ec2_instance_hooks.py
ec2_instance_hooks.py
has the class EC2ExtendedHooks
which contains 2 methods
create_instance
takes following inputs arguments
Arugment Name | Value Type | Default | Required |
---|---|---|---|
subnet_id |
string |
None | Yes |
security_group_ids |
List[str] |
None | Yes |
image_id |
string |
None | Yes |
instance_type |
string |
None | Yes |
region_name |
string |
None | Yes |
key_name |
string |
None | Yes |
tags |
List[Dict[str, str]] |
[{'ResourceType': 'instance','Tags': tags}] |
No |
iam_instance_profile |
string |
None | No |
user_data |
string |
None | No |
min_count |
int |
1 |
No |
max_count |
int |
1 |
No |
And returns the Instance Object
terminate_instance
takes following inputs arguments
Arugment Name | Value Type | Default | Required |
---|---|---|---|
instance_id |
string |
None | Yes |
region_name |
string |
None | Yes |
And returns nothing. Both create_instance
and terminate_instance
are powered by the operator classes EC2ExtendedCreateInstance
and EC2ExtendedTerminateInstance
which inherits the BaseOperator
for native functionality.
How to use
Once the airflow-ec2-plugin-extended
Plugin is installed and the dag
is enabled you will see something like this in Airflow Graph View
Create Instance dag
code snippet
from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
create_ec2 = EC2ExtendedCreateInstance(
subnet_id=bridge_subnet,
security_group_ids=bridge_security_group_ids,
image_id=bridge_image_id,
instance_type='t2.medium',
key_name='searchops-pipeline-dev',
tags=[{"Key": "name", "Value": "AutoDeployed via MWAA Pipeline"}],
aws_conn_id='aws_default',
region_name='us-east-1',
task_id='create_ec2',
)
create_ec2 >> terminate_ec2
Above piece of code banks on the operator and creates the ec2
instance with arguments provided and stores the result in XCom - create_ec2
XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.
Terminate instance dag
code snippet
from operators.ec2_create_instance import EC2ExtendedCreateInstance
from operators.ec2_terminate_instance import EC2ExtendedTerminateInstance
....
....
....
terminate_ec2 = EC2ExtendedTerminateInstance(
instance_id="{{ task_instance.xcom_pull('create_ec2', dag_id=DAG_ID, key='return_value')[0] }}",
region_name='us-east-1',
task_id='terminate_ec2',
)
)
create_ec2 >> terminate_ec2
To terminate
the same instance we fetch the value (basically instance_id
) from XCom - create_ec2
and pass it as value for instance_id
argument.
You may ask why ?
[0]
while fetching the values from XCom -create_ec2
. So, whenEC2ExtendedCreateInstance
stores the value in XCom -create_ec2
, it store them asList[str]
in order ofinstance_id
andprivate_ip_address
So, we are fetching first element of theList[str]
as we need theinstance_id
toterminate
the instance.
Here is the example dag for the same.
Hope this helps to provide some understanding how we can write some custom Apache Airflow plugins if we need one.
Top comments (0)