使用工作队列进行细粒度并行处理

在此示例中,您将运行一个 Kubernetes Job,它运行多个并行任务作为工作进程,每个进程都作为单独的 Pod 运行。

在此示例中,每个 Pod 在创建时,都会从任务队列中获取一个工作单元,对其进行处理,并重复此操作,直到队列末尾为止。

以下是此示例中的步骤概述

  1. 启动一个存储服务来保存工作队列。 在此示例中,您将使用 Redis 来存储工作项。在上一个示例中,您使用了 RabbitMQ。在此示例中,您将使用 Redis 和自定义工作队列客户端库;这是因为 AMQP 没有为客户端提供一种好的方法来检测有限长度的工作队列何时为空。在实践中,您会设置一次诸如 Redis 之类的存储,并将其重用于许多作业的工作队列和其他事项。
  2. 创建一个队列,并用消息填充它。 每个消息代表要完成的一个任务。在此示例中,消息是一个整数,我们将对其进行漫长的计算。
  3. 启动一个处理队列中任务的 Job。该 Job 启动多个 Pod。每个 Pod 从消息队列中获取一个任务,对其进行处理,并重复此操作,直到队列末尾为止。

开始之前

您需要有一个 Kubernetes 集群,并且必须配置 kubectl 命令行工具才能与您的集群通信。建议在至少有两个节点且不充当控制平面主机的集群上运行本教程。如果您还没有集群,可以使用 minikube 创建一个集群,也可以使用这些 Kubernetes 游乐场之一

您将需要一个容器镜像注册表,您可以在其中上传镜像以在集群中运行。该示例使用 Docker Hub,但您可以将其调整为不同的容器镜像注册表。

此任务示例还假定您已在本地安装了 Docker。您可以使用 Docker 来构建容器镜像。

熟悉 Job 的基本、非并行用法。

启动 Redis

在此示例中,为简单起见,您将启动 Redis 的单个实例。有关可伸缩和冗余地部署 Redis 的示例,请参阅 Redis 示例

您也可以直接下载以下文件

要启动 Redis 的单个实例,您需要创建 redis Pod 和 redis 服务

kubectl apply -f https://k8s.io/examples/application/job/redis/redis-pod.yaml
kubectl apply -f https://k8s.io/examples/application/job/redis/redis-service.yaml

用任务填充队列

现在,让我们用一些“任务”填充队列。在此示例中,任务是要打印的字符串。

启动一个临时交互式 Pod 来运行 Redis CLI。

kubectl run -i --tty temp --image redis --command "/bin/sh"
Waiting for pod default/redis2-c7h78 to be running, status is Pending, pod ready: false
Hit enter for command prompt

现在按 Enter 键,启动 Redis CLI,并创建一个包含一些工作项的列表。

redis-cli -h redis
redis:6379> rpush job2 "apple"
(integer) 1
redis:6379> rpush job2 "banana"
(integer) 2
redis:6379> rpush job2 "cherry"
(integer) 3
redis:6379> rpush job2 "date"
(integer) 4
redis:6379> rpush job2 "fig"
(integer) 5
redis:6379> rpush job2 "grape"
(integer) 6
redis:6379> rpush job2 "lemon"
(integer) 7
redis:6379> rpush job2 "melon"
(integer) 8
redis:6379> rpush job2 "orange"
(integer) 9
redis:6379> lrange job2 0 -1
1) "apple"
2) "banana"
3) "cherry"
4) "date"
5) "fig"
6) "grape"
7) "lemon"
8) "melon"
9) "orange"

因此,键为 job2 的列表将成为工作队列。

注意:如果您的 Kube DNS 设置不正确,则可能需要将上述代码块的第一步更改为 redis-cli -h $REDIS_SERVICE_HOST

创建容器镜像

现在,您可以创建一个镜像,该镜像将处理该队列中的工作。

您将使用一个带有 Redis 客户端的 Python 工作程序,从消息队列中读取消息。

提供了一个名为 rediswq.py 的简单 Redis 工作队列客户端库(下载)。

Job 的每个 Pod 中的“工作程序”程序都使用工作队列客户端库来获取工作。 如下所示

#!/usr/bin/env python

import time
import rediswq

host="redis"
# Uncomment next two lines if you do not have Kube-DNS working.
# import os
# host = os.getenv("REDIS_SERVICE_HOST")

q = rediswq.RedisWQ(name="job2", host=host)
print("Worker with sessionID: " +  q.sessionID())
print("Initial queue state: empty=" + str(q.empty()))
while not q.empty():
  item = q.lease(lease_secs=10, block=True, timeout=2) 
  if item is not None:
    itemstr = item.decode("utf-8")
    print("Working on " + itemstr)
    time.sleep(10) # Put your actual work here instead of sleep.
    q.complete(item)
  else:
    print("Waiting for work")
print("Queue empty, exiting")

您也可以下载 worker.pyrediswq.pyDockerfile 文件,然后构建容器镜像。以下是使用 Docker 进行镜像构建的示例

docker build -t job-wq-2 .

推送镜像

对于 Docker Hub,请使用您的用户名标记您的应用程序镜像,并使用以下命令推送到 Hub。将 <username> 替换为您的 Hub 用户名。

docker tag job-wq-2 <username>/job-wq-2
docker push <username>/job-wq-2

您需要推送到公共存储库或配置您的集群以使其能够访问您的私有存储库

定义 Job

以下是您将创建的 Job 的清单

apiVersion: batch/v1
kind: Job
metadata:
  name: job-wq-2
spec:
  parallelism: 2
  template:
    metadata:
      name: job-wq-2
    spec:
      containers:
      - name: c
        image: gcr.io/myproject/job-wq-2
      restartPolicy: OnFailure

在此示例中,每个 Pod 处理队列中的多个项目,然后在没有更多项目时退出。由于工作程序本身会检测到工作队列何时为空,并且 Job 控制器不知道工作队列,因此它依赖于工作程序来发出它们何时完成工作的信号。工作程序通过成功退出发出队列为空的信号。因此,一旦任何工作程序成功退出,控制器就知道工作已完成,并且 Pod 将很快退出。因此,您需要取消设置 Job 的完成计数。Job 控制器将等待其他 Pod 也完成。

运行 Job

所以,现在运行 Job

# this assumes you downloaded and then edited the manifest already
kubectl apply -f ./job.yaml

现在等待一会儿,然后检查 Job

kubectl describe jobs/job-wq-2
Name:             job-wq-2
Namespace:        default
Selector:         controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
Labels:           controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                  job-name=job-wq-2
Annotations:      <none>
Parallelism:      2
Completions:      <unset>
Start Time:       Mon, 11 Jan 2022 17:07:59 +0000
Pods Statuses:    1 Running / 0 Succeeded / 0 Failed
Pod Template:
  Labels:       controller-uid=b1c7e4e3-92e1-11e7-b85e-fa163ee3c11f
                job-name=job-wq-2
  Containers:
   c:
    Image:              container-registry.example/exampleproject/job-wq-2
    Port:
    Environment:        <none>
    Mounts:             <none>
  Volumes:              <none>
Events:
  FirstSeen    LastSeen    Count    From            SubobjectPath    Type        Reason            Message
  ---------    --------    -----    ----            -------------    --------    ------            -------
  33s          33s         1        {job-controller }                Normal      SuccessfulCreate  Created pod: job-wq-2-lglf8

您可以等待 Job 成功,并设置超时

# The check for condition name is case insensitive
kubectl wait --for=condition=complete --timeout=300s job/job-wq-2
kubectl logs pods/job-wq-2-7r7b2
Worker with sessionID: bbd72d0a-9e5c-4dd6-abf6-416cc267991f
Initial queue state: empty=False
Working on banana
Working on date
Working on lemon

如您所见,此 Job 的其中一个 Pod 处理了多个工作单元。

替代方案

如果运行队列服务或修改您的容器以使用工作队列不方便,您可能需要考虑其他 Job 模式之一。

如果您有持续的后台处理工作流要运行,请考虑使用 ReplicaSet 运行后台工作程序,并考虑运行诸如 https://github.com/resque/resque 之类的后台处理库。

上次修改时间:太平洋标准时间 2024 年 3 月 16 日凌晨 2:39:修复并行处理工作队列任务的文档。(bed970676c)