You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
122 lines
4.8 KiB
122 lines
4.8 KiB
# |
|
# Licensed to the Apache Software Foundation (ASF) under one |
|
# or more contributor license agreements. See the NOTICE file |
|
# distributed with this work for additional information |
|
# regarding copyright ownership. The ASF licenses this file |
|
# to you under the Apache License, Version 2.0 (the |
|
# "License"); you may not use this file except in compliance |
|
# with the License. You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, |
|
# software distributed under the License is distributed on an |
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
# KIND, either express or implied. See the License for the |
|
# specific language governing permissions and limitations |
|
# under the License. |
|
"""This module contains Google Dataprep hook.""" |
|
import json |
|
import os |
|
from typing import Any, Dict |
|
|
|
import requests |
|
from airflow.hooks.base import BaseHook |
|
from requests import HTTPError |
|
from tenacity import retry, stop_after_attempt, wait_exponential |
|
|
|
|
|
class GoogleDataprepHook(BaseHook): |
|
""" |
|
Hook for connection with Dataprep API. |
|
To get connection Dataprep with Airflow you need Dataprep token. |
|
https://clouddataprep.com/documentation/api#section/Authentication |
|
|
|
It should be added to the Connection in Airflow in JSON format. |
|
|
|
""" |
|
|
|
conn_name_attr = "dataprep_conn_id" |
|
default_conn_name = "dataprep_default" |
|
conn_type = "dataprep" |
|
hook_name = "Google Dataprep" |
|
|
|
def __init__(self, dataprep_conn_id: str = default_conn_name) -> None: |
|
super().__init__() |
|
self.dataprep_conn_id = dataprep_conn_id |
|
conn = self.get_connection(self.dataprep_conn_id) |
|
extra_dejson = conn.extra_dejson |
|
self._token = extra_dejson.get("extra__dataprep__token") |
|
self._base_url = extra_dejson.get( |
|
"extra__dataprep__base_url", "https://api.clouddataprep.com" |
|
) |
|
|
|
@property |
|
def _headers(self) -> Dict[str, str]: |
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {self._token}", |
|
} |
|
return headers |
|
|
|
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10)) |
|
def get_jobs_for_job_group(self, job_id: int) -> Dict[str, Any]: |
|
""" |
|
Get information about the batch jobs within a Cloud Dataprep job. |
|
|
|
:param job_id: The ID of the job that will be fetched |
|
:type job_id: int |
|
""" |
|
endpoint_path = f"v4/jobGroups/{job_id}/jobs" |
|
url: str = os.path.join(self._base_url, endpoint_path) |
|
response = requests.get(url, headers=self._headers) |
|
self._raise_for_status(response) |
|
return response.json() |
|
|
|
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10)) |
|
def get_job_group( |
|
self, job_group_id: int, embed: str, include_deleted: bool |
|
) -> Dict[str, Any]: |
|
""" |
|
Get the specified job group. |
|
A job group is a job that is executed from a specific node in a flow. |
|
|
|
:param job_group_id: The ID of the job that will be fetched |
|
:type job_group_id: int |
|
:param embed: Comma-separated list of objects to pull in as part of the response |
|
:type embed: str |
|
:param include_deleted: if set to "true", will include deleted objects |
|
:type include_deleted: bool |
|
""" |
|
params: Dict[str, Any] = {"embed": embed, "includeDeleted": include_deleted} |
|
endpoint_path = f"v4/jobGroups/{job_group_id}" |
|
url: str = os.path.join(self._base_url, endpoint_path) |
|
response = requests.get(url, headers=self._headers, params=params) |
|
self._raise_for_status(response) |
|
return response.json() |
|
|
|
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, max=10)) |
|
def run_job_group(self, body_request: dict) -> Dict[str, Any]: |
|
""" |
|
Creates a ``jobGroup``, which launches the specified job as the authenticated user. |
|
This performs the same action as clicking on the Run Job button in the application. |
|
To get recipe_id please follow the Dataprep API documentation |
|
https://clouddataprep.com/documentation/api#operation/runJobGroup |
|
|
|
:param body_request: The identifier for the recipe you would like to run. |
|
:type body_request: dict |
|
""" |
|
endpoint_path = "v4/jobGroups" |
|
url: str = os.path.join(self._base_url, endpoint_path) |
|
response = requests.post( |
|
url, headers=self._headers, data=json.dumps(body_request) |
|
) |
|
self._raise_for_status(response) |
|
return response.json() |
|
|
|
def _raise_for_status(self, response: requests.models.Response) -> None: |
|
try: |
|
response.raise_for_status() |
|
except HTTPError: |
|
self.log.error(response.json().get("exception")) |
|
raise
|
|
|