彙整 Argo Workflows 中平行步驟的產出物

嘗試解決在平行步驟(Step)之後要彙整人造物(artifact)的困難

在 Argo Workflows 裡面我們可以透過 withItems, withParamwithSequence 將一個 WorkflowStep 展開並進行平行處理。而問題會發生在當我們需要將這些平行的步驟(step)間的產出物彙整以進行後續加工時會遭遇一點困難:Argo 沒有內建的人造物(artifact)蒐集機制。

美好的世界:參數傳遞

當事情發生在參數(parameter)傳遞時工作會簡單非常多:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: demo-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: print-message
template: echo
withParam: >
["hello", "world"]
arguments:
parameters:
- name: message
value: "{{ item }}"
- - name: print-aggregated-messages
template: echo
arguments:
parameters:
- name: message
value: "{{ steps.print-message.outputs.parameters.message }}"
- name: echo
inputs:
parameters:
- name: message
script:
image: busybox:stable
env:
- name: MESSAGE
value: "{{ inputs.parameters.message }}"
command: [sh]
source: echo "$MESSAGE" | tee '{{ outputs.parameters.message.path }}'
outputs:
parameters:
- name: message
valueFrom:
path: /tmp/message.txt

在這個範例中,Argo 將會建立兩個 print-message 節點,分別給予 hello, world 兩組輸入,而後方的 print-aggregated-messages 可以直接引用 print-message 的輸出進行操作;當我們執行了上面的 Workflow 之後會注意到:print-message 的輸出會被蒐集在一個 JSON 陣列再一次餵給 print-aggregated-messages

Argo 在彙整平行步驟的參數輸出時、會將所有對應輸出整理在一個 JSON 陣列裡,而後續的步驟可以直接使用步驟名稱取得彙整完成的陣列。

人造物的傳遞

參數傳遞的世界很美好,但人造物的傳遞就不是了:由於人造物可能為二進制資料,Argo 想當然耳不會自主將人造物合併,那畢竟有破壞其資料的風險。

但參數也不是萬能——它有大小上限,且在 UI 上參數還挺不好讀的。所以轉用人造物是個終究要面對的課題。

常見的替代方案

這個命題在網路上常見的解答是:使用第三方儲存空間(如 S3)作為資料傳遞的媒介,即在平行的步驟中將資料儲存到其他持續性記憶體上,然後再手動蒐集回來;包含在 Argo Workflows 官方的範例中也能看到這樣的操作:map-reduce.yaml

但使用第三方儲存空間則意味著額外的成本與風險:儲存、請求、傳輸都會計入成本、網路連線問題,甚至如 S3 如果物件生命週期沒設好還會導致中間產物被長期放置而產生的倉儲成本。

好一點的解決方案

目前認為好一點的答案會是 spec.volumeClaimTemplates1

使用這個功能的話在 Workflow 啟動時 Argo 會根據提供的定義建立 PersistentVolumeClaim,並且在 Workflow 生命週期結束時刪除該 PVC。因此我們可以將中繼層的資料存放在這個 PVC 裡面,並讓這些資料跟著 Workflow 的生命週期刪除。

邏輯上它沒有離開上面的思路——把資料放置到其他儲存空間,但搭配這個功能資料生命週期會跟著 Workflow 綁定、且資料儲存的實體位置回歸 Kubernetes 管理,這大幅地減少了維護上的複雜性。

補充:volumes + emptyDir 在當前需求下不可行

在 PVC 來當中繼儲存媒介之前,我有嘗試過使用 Workflow 的 spec.volumes1 搭配 emptyDir 來試,畢竟這樣看起來會像是擁有一個屬於 Workflow 的臨時儲存空間,但結果就是不可行,該 emptyDir 的生命週期還是跟著每個步驟各自的 Pod 走的,故無法達成資料傳遞。

範例

下方範例裡將會操作兩次 echo 各自寫入資料到 PVC 中、並在 collect 步驟中將檔案蒐集在同一個資料夾內:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: demo-
spec:
entrypoint: main
volumeClaimTemplates:
- metadata:
name: shared-volume
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Mi
templates:
- name: main
steps:
- - name: print-message
template: echo
withParam: >
["hello", "world"]
arguments:
artifacts:
- name: message
raw:
data: "{{ item }}"
- - name: collect
template: collect
- name: echo
inputs:
artifacts:
- name: message
path: /tmp/message.txt
script:
image: busybox:stable
command: [sh]
source: cat '{{ inputs.artifacts.message.path }}' | tee '{{ outputs.artifacts.message.path }}'
volumeMounts:
- name: shared-volume
mountPath: /mnt
outputs:
artifacts:
- name: message
path: /mnt/message-{{= sprig.randAlphaNum(6) }}.txt
- name: collect
container:
image: busybox:stable
command: ["true"]
volumeMounts:
- name: shared-volume
mountPath: /mnt
outputs:
artifacts:
- name: messages
path: /mnt

使用這個手法時應注意到所有的 Pod 都會寫入同一個儲存位置,會有競爭關係,故上面使用 randAlphaNum 對寫入的檔名增加亂數尾綴。

另外這個範例中 outputs.artifacts 其實是冗餘——因為資料實際上的傳遞是依賴 PVC,但我很推薦還是放著,在補上那段 path 之後就能直接從 Argo Workflows UI 上面拉輸出下來看,除錯的流程與體驗會順暢很多。

補充:臨時的 PVC

在上面範例執行的過程中可以發現生出來的 PVC 是以 workflow name + volume claim templates name 的方式命名的:

$ pbpaste | kube create -f -
workflow.argoproj.io/demo-cltp7 created
$ kube get pvc -o custom-columns=":metadata.name" --no-headers
demo-cltp7-shared-volume

同場加映:合併 JSON 物件

在前面的美好世界裡,Argo 幫助我自動將各步驟輸出的參數合併為一個 JSON 陣列;而我雖然因為參數大小限制而轉用人造物了,我還是希望能有這個效果。所以就用 jq 組了一個類似的行為出來:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: demo-
spec:
entrypoint: main
volumeClaimTemplates:
- metadata:
name: shared-volume
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Mi
templates:
- name: main
steps:
- - name: print-message
template: post
withParam: >
["hello", "world"]
arguments:
parameters:
- name: message
value: "{{ item }}"
- - name: collect
template: collect-json
- name: post
inputs:
parameters:
- name: message
script:
image: busybox:stable
command: [sh]
source: |
echo '{"message": "{{ inputs.parameters.message }}"}' | tee '{{ outputs.artifacts.message.path }}'
volumeMounts:
- name: shared-volume
mountPath: /mnt
outputs:
artifacts:
- name: message
path: /mnt/message-{{= sprig.randAlphaNum(6) }}.json
- name: collect-json
script:
image: linuxserver/yq:latest
command: [sh]
source: |
find /mnt -name 'message-*.json' -exec jq -s . '{}' '+' > '{{ outputs.artifacts.aggregated.path }}'
volumeMounts:
- name: shared-volume
mountPath: /mnt
outputs:
artifacts:
- name: aggregated
path: /tmp/messages.json

從 UI 上直接拉 Output Artifact 下來看:

可以驗證這樣的組合有達到需求:

[
{
"message": "hello"
},
{
"message": "world"
}
]

收工。

  1. WorkflowSpec 的欄位定義文件