You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
185 lines
5.7 KiB
185 lines
5.7 KiB
# |
|
# Licensed to the Apache Software Foundation (ASF) under one |
|
# or more contributor license agreements. See the NOTICE file |
|
# distributed with this work for additional information |
|
# regarding copyright ownership. The ASF licenses this file |
|
# to you under the Apache License, Version 2.0 (the |
|
# "License"); you may not use this file except in compliance |
|
# with the License. You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, |
|
# software distributed under the License is distributed on an |
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
# KIND, either express or implied. See the License for the |
|
# specific language governing permissions and limitations |
|
# under the License. |
|
""" |
|
This is an example dag for using the KubernetesPodOperator. |
|
""" |
|
|
|
from airflow import DAG |
|
from airflow.kubernetes.secret import Secret |
|
from airflow.operators.bash import BashOperator |
|
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( |
|
KubernetesPodOperator, |
|
) |
|
from airflow.utils.dates import days_ago |
|
from kubernetes.client import models as k8s |
|
|
|
# [START howto_operator_k8s_cluster_resources] |
|
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn") |
|
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn") |
|
secret_all_keys = Secret("env", None, "airflow-secrets-2") |
|
volume_mount = k8s.V1VolumeMount( |
|
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True |
|
) |
|
|
|
configmaps = [ |
|
k8s.V1EnvFromSource( |
|
config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1") |
|
), |
|
k8s.V1EnvFromSource( |
|
config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2") |
|
), |
|
] |
|
|
|
volume = k8s.V1Volume( |
|
name="test-volume", |
|
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( |
|
claim_name="test-volume" |
|
), |
|
) |
|
|
|
port = k8s.V1ContainerPort(name="http", container_port=80) |
|
|
|
init_container_volume_mounts = [ |
|
k8s.V1VolumeMount( |
|
mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True |
|
) |
|
] |
|
|
|
init_environments = [ |
|
k8s.V1EnvVar(name="key1", value="value1"), |
|
k8s.V1EnvVar(name="key2", value="value2"), |
|
] |
|
|
|
init_container = k8s.V1Container( |
|
name="init-container", |
|
image="ubuntu:16.04", |
|
env=init_environments, |
|
volume_mounts=init_container_volume_mounts, |
|
command=["bash", "-cx"], |
|
args=["echo 10"], |
|
) |
|
|
|
affinity = k8s.V1Affinity( |
|
node_affinity=k8s.V1NodeAffinity( |
|
preferred_during_scheduling_ignored_during_execution=[ |
|
k8s.V1PreferredSchedulingTerm( |
|
weight=1, |
|
preference=k8s.V1NodeSelectorTerm( |
|
match_expressions=[ |
|
k8s.V1NodeSelectorRequirement( |
|
key="disktype", operator="in", values=["ssd"] |
|
) |
|
] |
|
), |
|
) |
|
] |
|
), |
|
pod_affinity=k8s.V1PodAffinity( |
|
required_during_scheduling_ignored_during_execution=[ |
|
k8s.V1WeightedPodAffinityTerm( |
|
weight=1, |
|
pod_affinity_term=k8s.V1PodAffinityTerm( |
|
label_selector=k8s.V1LabelSelector( |
|
match_expressions=[ |
|
k8s.V1LabelSelectorRequirement( |
|
key="security", operator="In", values="S1" |
|
) |
|
] |
|
), |
|
topology_key="failure-domain.beta.kubernetes.io/zone", |
|
), |
|
) |
|
] |
|
), |
|
) |
|
|
|
tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] |
|
|
|
# [END howto_operator_k8s_cluster_resources] |
|
|
|
|
|
default_args = { |
|
"owner": "airflow", |
|
} |
|
|
|
with DAG( |
|
dag_id="example_kubernetes_operator", |
|
default_args=default_args, |
|
schedule_interval=None, |
|
start_date=days_ago(2), |
|
tags=["example"], |
|
) as dag: |
|
k = KubernetesPodOperator( |
|
namespace="default", |
|
image="ubuntu:16.04", |
|
cmds=["bash", "-cx"], |
|
arguments=["echo", "10"], |
|
labels={"foo": "bar"}, |
|
secrets=[secret_file, secret_env, secret_all_keys], |
|
ports=[port], |
|
volumes=[volume], |
|
volume_mounts=[volume_mount], |
|
env_from=configmaps, |
|
name="airflow-test-pod", |
|
task_id="task", |
|
affinity=affinity, |
|
is_delete_operator_pod=True, |
|
hostnetwork=False, |
|
tolerations=tolerations, |
|
init_containers=[init_container], |
|
priority_class_name="medium", |
|
) |
|
|
|
# [START howto_operator_k8s_private_image] |
|
quay_k8s = KubernetesPodOperator( |
|
namespace="default", |
|
image="quay.io/apache/bash", |
|
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")], |
|
cmds=["bash", "-cx"], |
|
arguments=["echo", "10", "echo pwd"], |
|
labels={"foo": "bar"}, |
|
name="airflow-private-image-pod", |
|
is_delete_operator_pod=True, |
|
in_cluster=True, |
|
task_id="task-two", |
|
get_logs=True, |
|
) |
|
# [END howto_operator_k8s_private_image] |
|
|
|
# [START howto_operator_k8s_write_xcom] |
|
write_xcom = KubernetesPodOperator( |
|
namespace="default", |
|
image="alpine", |
|
cmds=[ |
|
"sh", |
|
"-c", |
|
"mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json", |
|
], |
|
name="write-xcom", |
|
do_xcom_push=True, |
|
is_delete_operator_pod=True, |
|
in_cluster=True, |
|
task_id="write-xcom", |
|
get_logs=True, |
|
) |
|
|
|
pod_task_xcom_result = BashOperator( |
|
bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", |
|
task_id="pod_task_xcom_result", |
|
) |
|
# [END howto_operator_k8s_write_xcom]
|
|
|