使用工作队列进行粗粒度并行处理
本例中,你将会运行包含多个并行工作进程的 Kubernetes Job。
本例中,每个 Pod 一旦被创建,会立即从任务队列中取走一个工作单元并完成它,然后将工作单元从队列中删除后再退出。
下面是本次示例的主要步骤:
-
启动一个消息队列服务。 本例中,我们使用 RabbitMQ,你也可以用其他的消息队列服务。 在实际工作环境中,你可以创建一次消息队列服务然后在多个任务中重复使用。
-
创建一个队列,放上消息数据。 每个消息表示一个要执行的任务。本例中,每个消息是一个整数值。 我们将基于这个整数值执行很长的计算操作。
-
启动一个在队列中执行这些任务的 Job。 该 Job 启动多个 Pod。每个 Pod 从消息队列中取走一个任务,处理任务,然后退出。
准备开始
你应当熟悉 Job 的基本用法(非并行的),请参考 Job。
你必须拥有一个 Kubernetes 的集群,且必须配置 kubectl 命令行工具让其与你的集群通信。 建议运行本教程的集群至少有两个节点,且这两个节点不能作为控制平面主机。 如果你还没有集群,你可以通过 Minikube 构建一个你自己的集群,或者你可以使用下面的 Kubernetes 练习环境之一:
你需要一个容器镜像仓库,用来向其中上传镜像以在集群中运行。
此任务示例还假设你已在本地安装了 Docker。
启动消息队列服务
本例使用了 RabbitMQ,但你可以更改该示例,使用其他 AMQP 类型的消息服务。
在实际工作中,在集群中一次性部署某个消息队列服务,之后在很多 Job 中复用,包括需要长期运行的服务。
按下面的方法启动 RabbitMQ:
# 为 StatefulSet 创建一个 Service 来使用
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-service.yaml
service "rabbitmq-service" created
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
statefulset "rabbitmq" created
测试消息队列服务
现在,我们可以试着访问消息队列。我们将会创建一个临时的可交互的 Pod, 在它上面安装一些工具,然后用队列做实验。
首先创建一个临时的可交互的 Pod:
# 创建一个临时的可交互的 Pod
kubectl run -i --tty temp --image ubuntu:22.04
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
... [ previous line repeats several times .. hit return when it stops ] ...
请注意你的 Pod 名称和命令提示符将会不同。
接下来安装 amqp-tools
,这样你就能用消息队列了。
下面是在该 Pod 的交互式 shell 中需要运行的命令:
apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils
后续,你将制作一个包含这些包的容器镜像。
接着,你将要验证可以发现 RabbitMQ 服务:
# 在 Pod 内运行这些命令
# 请注意 rabbitmq-service 拥有一个由 Kubernetes 提供的 DNS 名称:
nslookup rabbitmq-service
Server: 10.0.0.10
Address: 10.0.0.10#53
Name: rabbitmq-service.default.svc.cluster.local
Address: 10.0.147.152
(IP 地址会有所不同)
如果 kube-dns 插件没有正确安装,上一步可能会出错。 你也可以在环境变量中找到该服务的 IP 地址。
# 在 Pod 内运行此检查
env | grep RABBITMQ_SERVICE | grep HOST
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
(IP 地址会有所不同)
接下来,你将验证是否可以创建队列以及发布和使用消息。
# 在 Pod 内运行这些命令
# 下一行,rabbitmq-service 是访问 rabbitmq-service 的主机名。5672是 rabbitmq 的标准端口。
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
# 如果上一步中你不能解析 "rabbitmq-service",可以用下面的命令替换:
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
# 现在创建队列:
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d foo
foo
向队列推送一条消息:
/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
# 然后取回它:
/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo
Hello
最后一个命令中,amqp-consume
工具从队列中取走了一个消息,并把该消息传递给了随机命令的标准输出。
在这种情况下,cat
会打印它从标准输入中读取的字符,echo 会添加回车符以便示例可读。
为队列增加任务
现在用一些模拟任务填充队列。在此示例中,任务是多个待打印的字符串。
实践中,消息的内容可以是:
- 待处理的文件名
- 程序额外的参数
- 数据库表的关键字范围
- 模拟任务的配置参数
- 待渲染的场景的帧序列号
如果有大量的数据需要被 Job 的所有 Pod 读取,典型的做法是把它们放在一个共享文件系统中, 如 NFS(Network File System 网络文件系统),并以只读的方式挂载到所有 Pod,或者 Pod 中的程序从类似 HDFS (Hadoop Distributed File System 分布式文件系统)的集群文件系统中读取。
例如,你将创建队列并使用 AMQP 命令行工具向队列中填充消息。实践中,你可以写个程序来利用 AMQP 客户端库来填充这些队列。
# 在你的计算机上运行此命令,而不是在 Pod 中
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
job1
将这几项添加到队列中:
for f in apple banana cherry date fig grape lemon melon
do
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
done
你给队列中填充了 8 个消息。
创建容器镜像
现在你可以创建一个做为 Job 来运行的镜像。
这个 Job 将用 amqp-consume
实用程序从队列中读取消息并进行实际工作。
这里给出一个非常简单的示例程序:
#!/usr/bin/env python
# Just prints standard out and sleeps for 10 seconds.
import sys
import time
print("Processing " + sys.stdin.readlines()[0])
time.sleep(10)
赋予脚本执行权限:
chmod +x worker.py
现在,编译镜像。创建一个临时目录,切换到这个目录。下载 Dockerfile 和 worker.py。 无论哪种情况,都可以用下面的命令编译镜像:
docker build -t job-wq-1 .
对于 Docker Hub, 给你的应用镜像打上标签,
标签为你的用户名,然后用下面的命令推送到 Hub。用你的 Hub 用户名替换 <username>
。
docker tag job-wq-1 <username>/job-wq-1
docker push <username>/job-wq-1
如果你使用替代的镜像仓库,请标记该镜像并将其推送到那里。
定义 Job
这里给出一个 Job 的清单。你需要复制一份 Job 清单的副本(将其命名为 ./job.yaml
),
并编辑容器镜像的名称以匹配使用的名称。
apiVersion: batch/v1
kind: Job
metadata:
name: job-wq-1
spec:
completions: 8
parallelism: 2
template:
metadata:
name: job-wq-1
spec:
containers:
- name: c
image: gcr.io/<project>/job-wq-1
env:
- name: BROKER_URL
value: amqp://guest:guest@rabbitmq-service:5672
- name: QUEUE
value: job1
restartPolicy: OnFailure
本例中,每个 Pod 使用队列中的一个消息然后退出。
这样,Job 的完成计数就代表了完成的工作项的数量。
这就是示例清单将 .spec.completions
设置为 8
的原因。
运行 Job
运行 Job:
这假设你已经下载并编辑了清单
kubectl apply -f ./job.yaml
你可以等待 Job 在某个超时时间后成功:
# 状况名称的检查不区分大小写
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1
接下来查看 Job:
kubectl describe jobs/job-wq-1
Name: job-wq-1
Namespace: default
Selector: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Annotations: <none>
Parallelism: 2
Completions: 8
Start Time: Wed, 06 Sep 2022 16:42:02 +0000
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
Pod Template:
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
job-name=job-wq-1
Containers:
c:
Image: container-registry.example/causal-jigsaw-637/job-wq-1
Port:
Environment:
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
QUEUE: job1
Mounts: <none>
Volumes: <none>
Events:
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
───────── ──────── ───── ──── ───────────── ────── ────── ───────
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-hcobb
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-weytj
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-qaam5
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-b67sr
26s 26s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-xe5hj
15s 15s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-w2zqe
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-d6ppa
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-p17e0
该 Job 的所有 Pod 都已成功!你完成了。
替代方案
本文所讲述的处理方法的好处是你不需要修改你的 "worker" 程序使其知道工作队列的存在。 你可以将未修改的工作程序包含在容器镜像中。
使用此方法需要你运行消息队列服务。如果不方便运行消息队列服务, 你也许会考虑另外一种任务模式。
本文所述的方法为每个工作项创建了一个 Pod。 如果你的工作项仅需数秒钟,为每个工作项创建 Pod 会增加很多的常规消耗。 考虑另一种设计,例如精细并行工作队列示例, 这种方案可以实现每个 Pod 执行多个工作项。
示例中,你使用了 amqp-consume
从消息队列读取消息并执行真正的程序。
这样的好处是你不需要修改你的程序使其知道队列的存在。
要了解怎样使用客户端库和工作队列通信,
请参考精细并行工作队列示例。
友情提醒
如果设置的完成数量小于队列中的消息数量,会导致一部分消息项不会被执行。
如果设置的完成数量大于队列中的消息数量,当队列中所有的消息都处理完成后, Job 也会显示为未完成。Job 将创建 Pod 并阻塞等待消息输入。 你需要建立自己的机制来发现何时有工作要做,并测量队列的大小,设置匹配的完成数量。
当发生下面两种情况时,即使队列中所有的消息都处理完了,Job 也不会显示为完成状态:
- 在
amqp-consume
命令拿到消息和容器成功退出之间的时间段内,执行杀死容器操作; - 在 kubelet 向 API 服务器传回 Pod 成功运行之前,发生节点崩溃。