Apache Airflow dags w/ backend configuration bundle.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

126 lines
3.8 KiB

"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup
from utils.network import selenoid_status, vpn_settings
from airflow.providers.postgres.operators.postgres import PostgresOperator
from utils.notify import PushoverOperator
from utils.postgres import sqlLoad
default_args = {
"owner": "donaldrich",
"depends_on_past": False,
"start_date": datetime(2016, 7, 13),
"email": ["email@gmail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=15),
# "on_failure_callback": some_function,
# "on_success_callback": some_other_function,
# "on_retry_callback": another_function,
}
# [START default_args]
# default_args = {
# 'owner': 'airflow',
# 'depends_on_past': False,
# 'email': ['airflow@example.com'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
# }
# [END default_args]
dag = DAG(
"recon",
default_args=default_args,
description="A simple tutorial DAG",
schedule_interval=None,
start_date=days_ago(2),
tags=["target"],
)
with dag:
# start = DummyOperator(task_id="start", dag=dag)
with TaskGroup("pull_images") as start2:
pull1 = BashOperator(
task_id="chrome_latest",
bash_command="sudo docker pull selenoid/chrome:latest",
)
pull2 = BashOperator(
task_id="video_recorder",
bash_command=
"sudo docker pull selenoid/video-recorder:latest-release",
)
t1 = PythonOperator(
task_id="selenoid_status",
python_callable=selenoid_status,
)
[pull1, pull2] >> t1
start2.doc_md = dedent("""
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
""")
scrape = BashOperator(
task_id="scrape_listings",
bash_command="python3 /data/scripts/scrapers/zip-scrape.py -k devops",
)
sqlLoad = PythonOperator(task_id="sql_load", python_callable=sqlLoad)
# get_birth_date = PostgresOperator(
# task_id="get_birth_date",
# postgres_conn_id="postgres_default",
# sql="sql/birth_date.sql",
# params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
# )
cleanup = BashOperator(
task_id="cleanup",
bash_command="sudo sh /data/scripts/post-scrape.sh -s zip -k devops",
)
success_notify = PushoverOperator(
task_id="finished",
title="Airflow Complete",
message="We did it!",
)
end = DummyOperator(task_id="end", dag=dag)
start2 >> scrape >> sqlLoad >> cleanup >> success_notify >> end