You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
8.5 KiB
206 lines
8.5 KiB
# Licensed to the Apache Software Foundation (ASF) under one |
|
# or more contributor license agreements. See the NOTICE file |
|
# distributed with this work for additional information |
|
# regarding copyright ownership. The ASF licenses this file |
|
# to you under the Apache License, Version 2.0 (the |
|
# "License"); you may not use this file except in compliance |
|
# with the License. You may obtain a copy of the License at |
|
# |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, |
|
# software distributed under the License is distributed on an |
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
|
# KIND, either express or implied. See the License for the |
|
# specific language governing permissions and limitations |
|
# under the License. |
|
"""Run ephemeral Docker Swarm services""" |
|
from typing import Optional |
|
|
|
import requests |
|
from airflow.exceptions import AirflowException |
|
from airflow.providers.docker.operators.docker import DockerOperator |
|
from airflow.utils.decorators import apply_defaults |
|
from airflow.utils.strings import get_random_string |
|
from docker import types |
|
|
|
|
|
class DockerSwarmOperator(DockerOperator): |
|
""" |
|
Execute a command as an ephemeral docker swarm service. |
|
Example use-case - Using Docker Swarm orchestration to make one-time |
|
scripts highly available. |
|
|
|
A temporary directory is created on the host and |
|
mounted into a container to allow storing files |
|
that together exceed the default disk size of 10GB in a container. |
|
The path to the mounted directory can be accessed |
|
via the environment variable ``AIRFLOW_TMP_DIR``. |
|
|
|
If a login to a private registry is required prior to pulling the image, a |
|
Docker connection needs to be configured in Airflow and the connection ID |
|
be provided with the parameter ``docker_conn_id``. |
|
|
|
:param image: Docker image from which to create the container. |
|
If image tag is omitted, "latest" will be used. |
|
:type image: str |
|
:param api_version: Remote API version. Set to ``auto`` to automatically |
|
detect the server's version. |
|
:type api_version: str |
|
:param auto_remove: Auto-removal of the container on daemon side when the |
|
container's process exits. |
|
The default is False. |
|
:type auto_remove: bool |
|
:param command: Command to be run in the container. (templated) |
|
:type command: str or list |
|
:param docker_url: URL of the host running the docker daemon. |
|
Default is unix://var/run/docker.sock |
|
:type docker_url: str |
|
:param environment: Environment variables to set in the container. (templated) |
|
:type environment: dict |
|
:param force_pull: Pull the docker image on every run. Default is False. |
|
:type force_pull: bool |
|
:param mem_limit: Maximum amount of memory the container can use. |
|
Either a float value, which represents the limit in bytes, |
|
or a string like ``128m`` or ``1g``. |
|
:type mem_limit: float or str |
|
:param tls_ca_cert: Path to a PEM-encoded certificate authority |
|
to secure the docker connection. |
|
:type tls_ca_cert: str |
|
:param tls_client_cert: Path to the PEM-encoded certificate |
|
used to authenticate docker client. |
|
:type tls_client_cert: str |
|
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client. |
|
:type tls_client_key: str |
|
:param tls_hostname: Hostname to match against |
|
the docker server certificate or False to disable the check. |
|
:type tls_hostname: str or bool |
|
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon. |
|
:type tls_ssl_version: str |
|
:param tmp_dir: Mount point inside the container to |
|
a temporary directory created on the host by the operator. |
|
The path is also made available via the environment variable |
|
``AIRFLOW_TMP_DIR`` inside the container. |
|
:type tmp_dir: str |
|
:param user: Default user inside the docker container. |
|
:type user: int or str |
|
:param docker_conn_id: ID of the Airflow connection to use |
|
:type docker_conn_id: str |
|
:param tty: Allocate pseudo-TTY to the container of this service |
|
This needs to be set see logs of the Docker container / service. |
|
:type tty: bool |
|
:param enable_logging: Show the application's logs in operator's logs. |
|
Supported only if the Docker engine is using json-file or journald logging drivers. |
|
The `tty` parameter should be set to use this with Python applications. |
|
:type enable_logging: bool |
|
""" |
|
|
|
@apply_defaults |
|
def __init__(self, *, image: str, enable_logging: bool = True, **kwargs) -> None: |
|
super().__init__(image=image, **kwargs) |
|
|
|
self.enable_logging = enable_logging |
|
self.service = None |
|
|
|
def execute(self, context) -> None: |
|
self.cli = self._get_cli() |
|
|
|
self.environment["AIRFLOW_TMP_DIR"] = self.tmp_dir |
|
|
|
return self._run_service() |
|
|
|
def _run_service(self) -> None: |
|
self.log.info("Starting docker service from image %s", self.image) |
|
if not self.cli: |
|
raise Exception("The 'cli' should be initialized before!") |
|
self.service = self.cli.create_service( |
|
types.TaskTemplate( |
|
container_spec=types.ContainerSpec( |
|
image=self.image, |
|
command=self.get_command(), |
|
env=self.environment, |
|
user=self.user, |
|
tty=self.tty, |
|
), |
|
restart_policy=types.RestartPolicy(condition="none"), |
|
resources=types.Resources(mem_limit=self.mem_limit), |
|
), |
|
name=f"airflow-{get_random_string()}", |
|
labels={"name": f"airflow__{self.dag_id}__{self.task_id}"}, |
|
) |
|
|
|
self.log.info("Service started: %s", str(self.service)) |
|
|
|
# wait for the service to start the task |
|
while not self.cli.tasks(filters={"service": self.service["ID"]}): |
|
continue |
|
|
|
if self.enable_logging: |
|
self._stream_logs_to_output() |
|
|
|
while True: |
|
if self._has_service_terminated(): |
|
self.log.info( |
|
"Service status before exiting: %s", self._service_status() |
|
) |
|
break |
|
|
|
if self.service and self._service_status() == "failed": |
|
if self.auto_remove: |
|
self.cli.remove_service(self.service["ID"]) |
|
raise AirflowException("Service failed: " + repr(self.service)) |
|
elif self.auto_remove: |
|
if not self.service: |
|
raise Exception("The 'service' should be initialized before!") |
|
self.cli.remove_service(self.service["ID"]) |
|
|
|
def _service_status(self) -> Optional[str]: |
|
if not self.cli: |
|
raise Exception("The 'cli' should be initialized before!") |
|
return self.cli.tasks(filters={"service": self.service["ID"]})[0]["Status"][ |
|
"State" |
|
] |
|
|
|
def _has_service_terminated(self) -> bool: |
|
status = self._service_status() |
|
return status in ["failed", "complete"] |
|
|
|
def _stream_logs_to_output(self) -> None: |
|
if not self.cli: |
|
raise Exception("The 'cli' should be initialized before!") |
|
if not self.service: |
|
raise Exception("The 'service' should be initialized before!") |
|
logs = self.cli.service_logs( |
|
self.service["ID"], follow=True, stdout=True, stderr=True, is_tty=self.tty |
|
) |
|
line = "" |
|
while True: |
|
try: |
|
log = next(logs) |
|
# TODO: Remove this clause once https://github.com/docker/docker-py/issues/931 is fixed |
|
except requests.exceptions.ConnectionError: |
|
# If the service log stream stopped sending messages, check if it the service has |
|
# terminated. |
|
if self._has_service_terminated(): |
|
break |
|
except StopIteration: |
|
# If the service log stream terminated, stop fetching logs further. |
|
break |
|
else: |
|
try: |
|
log = log.decode() |
|
except UnicodeDecodeError: |
|
continue |
|
if log == "\n": |
|
self.log.info(line) |
|
line = "" |
|
else: |
|
line += log |
|
# flush any remaining log stream |
|
if line: |
|
self.log.info(line) |
|
|
|
def on_kill(self) -> None: |
|
if self.cli is not None: |
|
self.log.info("Removing docker service: %s", self.service["ID"]) |
|
self.cli.remove_service(self.service["ID"])
|
|
|