Apache Airflow dags w/ backend configuration bundle.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

154 lines
4.4 KiB

"""
Headless Site Navigation and File Download (Using Selenium) to S3
This example demonstrates using Selenium (via Firefox/GeckoDriver) to:
1) Log into a website w/ credentials stored in connection labeled 'selenium_conn_id'
2) Download a file (initiated on login)
3) Transform the CSV into JSON formatting
4) Append the current data to each record
5) Load the corresponding file into S3
To use this DAG, you will need to have the following installed:
[XVFB](https://www.x.org/archive/X11R7.6/doc/man/man1/Xvfb.1.xhtml)
[GeckoDriver](https://github.com/mozilla/geckodriver/releases/download)
selenium==3.11.0
xvfbwrapper==0.2.9
"""
# import boa
import csv
import json
import logging
import os
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.hooks import S3Hook
from airflow.models import Connection
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.db import provide_session
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.firefox.options import Options
from xvfbwrapper import Xvfb
S3_CONN_ID = ""
S3_BUCKET = ""
S3_KEY = ""
date = "{{ ds }}"
default_args = {
"start_date": datetime(2018, 2, 10, 0, 0),
"email": [],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"catchup": False,
}
dag = DAG(
"selenium_extraction_to_s3",
schedule_interval="@daily",
default_args=default_args,
catchup=False,
)
def imap_py(**kwargs):
selenium_conn_id = kwargs.get("templates_dict", None).get("selenium_conn_id", None)
filename = kwargs.get("templates_dict", None).get("filename", None)
s3_conn_id = kwargs.get("templates_dict", None).get("s3_conn_id", None)
s3_bucket = kwargs.get("templates_dict", None).get("s3_bucket", None)
s3_key = kwargs.get("templates_dict", None).get("s3_key", None)
date = kwargs.get("templates_dict", None).get("date", None)
@provide_session
def get_conn(conn_id, session=None):
conn = session.query(Connection).filter(Connection.conn_id == conn_id).first()
return conn
url = get_conn(selenium_conn_id).host
email = get_conn(selenium_conn_id).user
pwd = get_conn(selenium_conn_id).password
vdisplay = Xvfb()
vdisplay.start()
caps = webdriver.DesiredCapabilities.FIREFOX
caps["marionette"] = True
profile = webdriver.FirefoxProfile()
profile.set_preference("browser.download.manager.showWhenStarting", False)
profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "text/csv")
logging.info("Profile set...")
options = Options()
options.set_headless(headless=True)
logging.info("Options set...")
logging.info("Initializing Driver...")
driver = webdriver.Firefox(
firefox_profile=profile, firefox_options=options, capabilities=caps
)
logging.info("Driver Intialized...")
driver.get(url)
logging.info("Authenticating...")
elem = driver.find_element_by_id("email")
elem.send_keys(email)
elem = driver.find_element_by_id("password")
elem.send_keys(pwd)
elem.send_keys(Keys.RETURN)
logging.info("Successfully authenticated.")
sleep_time = 15
logging.info("Downloading File....Sleeping for {} Seconds.".format(str(sleep_time)))
time.sleep(sleep_time)
driver.close()
vdisplay.stop()
dest_s3 = S3Hook(s3_conn_id=s3_conn_id)
os.chdir("/root/Downloads")
csvfile = open(filename, "r")
output_json = "file.json"
with open(output_json, "w") as jsonfile:
reader = csv.DictReader(csvfile)
for row in reader:
# row = dict((boa.constrict(k), v) for k, v in row.items())
row["run_date"] = date
json.dump(row, jsonfile)
jsonfile.write("\n")
dest_s3.load_file(
filename=output_json, key=s3_key, bucket_name=s3_bucket, replace=True
)
dest_s3.connection.close()
with dag:
kick_off_dag = DummyOperator(task_id="kick_off_dag")
selenium = PythonOperator(
task_id="selenium_retrieval_to_s3",
python_callable=imap_py,
templates_dict={
"s3_conn_id": S3_CONN_ID,
"s3_bucket": S3_BUCKET,
"s3_key": S3_KEY,
"date": date,
},
provide_context=True,
)
kick_off_dag >> selenium