本文发布时间已超过一年。较旧的文章可能包含过时的内容。请检查页面中的信息自发布以来是否已变得不正确。

Kubernetes 上的 Airflow(第一部分):一种不同的操作符

简介

作为彭博社持续致力于开发 Kubernetes 生态系统的一部分,我们很高兴宣布 Kubernetes Airflow Operator;这是一个用于 Apache Airflow(一个流行的工作流编排框架)的机制,它可以使用 Kubernetes API 原生启动任意 Kubernetes Pod。

什么是 Airflow?

Apache Airflow 是 DevOps “配置即代码” 理念的一种实现。Airflow 允许用户使用简单的 Python 对象 DAG(有向无环图)启动多步骤管道。您可以定义依赖关系、以编程方式构建复杂的工作流,并在易于阅读的 UI 中监控计划的作业。

Airflow DAGs Airflow UI

为什么要在 Kubernetes 上使用 Airflow?

自成立以来,Airflow 最大的优势在于其灵活性。Airflow 为从 Spark 和 HBase 到各种云提供商的服务提供了广泛的集成。Airflow 还通过其插件框架提供了易于扩展的功能。然而,该项目的一个限制是,Airflow 用户受限于执行时 Airflow worker 上存在的框架和客户端。一个组织可以拥有各种 Airflow 工作流,从数据科学管道到应用程序部署。这种用例的差异会在依赖项管理中产生问题,因为两个团队可能在其工作流中使用截然不同的库。

为了解决这个问题,我们利用 Kubernetes 允许用户启动任意 Kubernetes Pod 和配置。Airflow 用户现在可以完全控制其运行时环境、资源和密钥,基本上将 Airflow 变成“任何你想要的作业”的工作流编排器。

Kubernetes Operator

在继续之前,我们应该澄清一下,Airflow 中的 Operator 是一种任务定义。当用户创建 DAG 时,他们会使用诸如“SparkSubmitOperator”或“PythonOperator”之类的 Operator 分别提交/监控 Spark 作业或 Python 函数。Airflow 带有用于 Apache Spark、BigQuery、Hive 和 EMR 等框架的内置 Operator。它还提供了一个插件入口点,允许 DevOps 工程师开发自己的连接器。

Airflow 用户一直在寻找使部署和 ETL 管道更易于管理的方法。任何能够解耦管道步骤同时增加监控的机会,都可以减少未来的中断和紧急修复。以下是 Airflow Kubernetes Operator 提供的好处列表:

  • 提高部署的灵活性
    Airflow 的插件 API 一直为希望在其 DAG 中测试新功能的工程师提供了巨大的好处。不利的一面是,每当开发人员想要创建新的 Operator 时,他们都必须开发一个全新的插件。现在,任何可以在 Docker 容器中运行的任务都可以通过完全相同的 Operator 访问,而无需维护额外的 Airflow 代码。

  • 配置和依赖项的灵活性: 对于在静态 Airflow worker 中运行的 Operator,依赖项管理可能变得相当困难。如果开发人员想要运行一个需要 SciPy 的任务,以及另一个需要 NumPy 的任务,开发人员将不得不在所有 Airflow worker 中维护这两个依赖项,或者将任务卸载到外部机器(如果该外部机器以未跟踪的方式更改,则可能会导致错误)。自定义 Docker 映像允许用户确保任务环境、配置和依赖项完全是幂等的。

  • 使用 Kubernetes 密钥增强安全性: 处理敏感数据是任何 DevOps 工程师的核心责任。Airflow 用户希望尽可能地隔离任何 API 密钥、数据库密码和登录凭据,并严格按照“需要知道”的原则进行。通过 Kubernetes Operator,用户可以利用 Kubernetes Vault 技术来存储所有敏感数据。这意味着 Airflow worker 将永远无法访问此信息,并且可以简单地请求构建仅包含他们需要的密钥的 Pod。

架构

Airflow Architecture

Kubernetes Operator 使用 Kubernetes Python Client 生成一个由 APIServer 处理的请求 (1)。然后,Kubernetes 将使用您定义的任何规范启动您的 Pod (2)。映像将加载所有必要的环境变量、密钥和依赖项,执行单个命令。作业启动后,Operator 只需要监控运行状况的跟踪日志 (3)。用户可以选择将日志收集到本地调度程序或当前 Kubernetes 集群中的任何分布式日志服务。

使用 Kubernetes Operator

一个基本示例

以下 DAG 可能是我们可以编写的最简单的示例,以展示 Kubernetes Operator 的工作原理。此 DAG 在 Kubernetes 上创建两个 Pod:一个带有 Python 的 Linux 发行版和一个没有 Python 的基本 Ubuntu 发行版。Python Pod 将正确运行 Python 请求,而没有 Python 的 Pod 将向用户报告失败。如果 Operator 工作正常,则 passing-task Pod 应该完成,而 failing-task Pod 会向 Airflow Web 服务器返回失败。

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
Basic DAG Run

但这与我的工作流程有什么关系?

虽然此示例仅使用基本映像,但 Docker 的神奇之处在于,同一个 DAG 将适用于您想要的任何映像/命令配对。以下是在 Airflow DAG 上运行生产就绪代码的推荐 CI/CD 管道。

1:Github 中的 PR

使用 Travis 或 Jenkins 运行单元测试和集成测试,贿赂您最喜欢的队友 PR 您的代码,并合并到主分支以触发自动 CI 构建。

2:通过 Jenkins 进行 CI/CD -> Docker 映像

在 Jenkins 构建中生成 Docker 映像并更新发布版本.

3:Airflow 启动任务

最后,更新您的 DAG 以反映新的发布版本,您应该就可以开始了!

production_task = KubernetesPodOperator(namespace='default',
                          # image="my-production-job:release-1.0.1", <-- old release
                          image="my-production-job:release-1.0.2",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

启动测试部署

由于 Kubernetes Operator 尚未发布,因此我们尚未发布官方的 helm chart 或 Operator(但两者目前都在进行中)。但是,我们将在下面提供基本部署的说明,并正在积极寻找鲁莽的 Beta 测试人员来尝试此新功能。要试用此系统,请按照以下步骤操作

步骤 1:设置您的 kubeconfig 以指向 Kubernetes 集群

步骤 2:克隆 Airflow 存储库

运行 git clone https://github.com/apache/incubator-airflow.git 以克隆官方 Airflow 存储库。

步骤 3:运行

为了运行此基本部署,我们正在采用我们目前用于 Kubernetes Executor 的集成测试脚本(将在本系列的下一篇文章中解释)。要启动此部署,请运行以下三个命令

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml
./scripts/ci/kubernetes/Docker/build.sh
./scripts/ci/kubernetes/kube/deploy.sh

在继续之前,让我们讨论一下这些命令的作用

sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

Kubernetes Executor 是 Airflow 的另一个功能,它允许以幂等 Pod 的形式动态分配任务。我们将此切换到 LocalExecutor 的原因只是为了一次引入一个功能。如果您想尝试 Kubernetes Executor,则可以跳过此步骤,但我们将在以后的文章中详细介绍。

./scripts/ci/kubernetes/Docker/build.sh

此脚本将打包 Airflow 主源代码,并基于 Airflow 发行版构建 Docker 容器

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们在您的集群上创建完整的 Airflow 部署。这包括 Airflow 配置、Postgres 后端、Web 服务器 + 调度程序以及它们之间的所有必要服务。需要注意的一点是,提供的角色绑定是集群管理员,因此如果您在集群上没有该级别的权限,则可以在 scripts/ci/kubernetes/kube/airflow.yaml 中修改此设置

步骤 4:登录您的 Web 服务器

现在您的 Airflow 实例正在运行,让我们看一下 UI!UI 位于 Airflow Pod 的 8080 端口中,因此只需运行

WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)
kubectl port-forward $WEB 8080:8080

现在,Airflow UI 将存在于 https://127.0.0.1:8080 上。要登录,只需输入 airflow/airflow,您就可以完全访问 Airflow Web UI。

步骤 5:上传测试文档

要修改/添加您自己的 DAG,您可以使用 kubectl cp 将本地文件上传到 Airflow 调度程序的 DAG 文件夹中。然后,Airflow 将读取新的 DAG 并自动将其上传到其系统。以下命令会将任何本地文件上传到正确的目录

kubectl cp <本地文件> <命名空间>/<Pod>:/root/airflow/dags -c scheduler

步骤 6:尽情享用!

那么我什么时候可以使用它?

虽然此功能仍处于早期阶段,但我们希望在未来几个月内看到它发布以供广泛使用。

参与其中

此功能只是改进 Apache Airflow 与 Kubernetes 集成的多项重大努力的开始。Kubernetes Operator 已合并到 Airflow 的 1.10 发布分支(以实验模式运行的执行器),以及一个名为 Kubernetes Executor 的完全 K8s 原生调度程序(文章即将发布)。这些功能仍处于早期采用者/贡献者可以对这些功能的未来产生巨大影响的阶段。

对于那些有兴趣加入这些工作的人,我建议查看以下步骤

  • 加入 [email protected] 上的 airflow-dev 邮件列表。
  • Apache Airflow JIRA 中提交问题
  • 参加我们在太平洋标准时间星期三上午 10 点举行的 SIG-BigData 会议。
  • 在 kubernetes.slack.com 上的 #sig-big-data 中与我们联系

特别感谢 Apache Airflow 和 Kubernetes 社区,特别是 Grant Nicholas、Ben Goldberg、Anirudh Ramanathan、Fokko Dreisprong 和 Bolke de Bruin,感谢你们在这些功能以及我们未来的工作中提供的出色帮助。