在 Argo Workflows 裡面我們可以透過 withItems
, withParam
或 withSequence
將一個 WorkflowStep 展開並進行平行處理。而問題會發生在當我們需要將這些平行的步驟(step)間的產出物彙整以進行後續加工時會遭遇一點困難:Argo 沒有內建的人造物(artifact)蒐集機制。
編按:半年前我曾經撰寫 彙整 Argo Workflows 中平行步驟的產出物 一文,但後來發現裡面介紹的機制在目前的 Argo Workflow 版本(3.6)下仍然有 bug;本文嘗試有提出新的解決方案,但不少的文字是直接複製自原本的文章。
美好的世界:參數傳遞
當事情發生在參數(parameter)傳遞時工作會簡單非常多:Argo 在彙整平行步驟的參數輸出時、會將所有對應輸出整理在一個 JSON 陣列裡,而後續的步驟可以直接使用步驟名稱取得彙整完成的陣列。
人造物的傳遞
參數傳遞的世界很美好,但人造物的傳遞就不是了:由於人造物可能為二進制資料,Argo 想當然耳不會自主將人造物合併,那畢竟有破壞其資料的風險。
但參數也不是萬能——它有大小上限,且在 UI 上參數還挺不好讀的。所以轉用人造物是個終究要面對的課題。
常見的替代方案
這個命題在網路上常見的解答是:使用第三方儲存空間(如 S3)作為資料傳遞的媒介,即在平行的步驟中將資料儲存到其他持續性記憶體上,然後再手動蒐集回來;包含在 Argo Workflows 官方的範例中也能看到這樣的操作:map-reduce.yaml
。
但使用第三方儲存空間則意味著額外的成本與風險:儲存、請求、傳輸都會計入成本、網路連線問題,甚至如 S3 如果物件生命週期沒設好還會導致中間產物被長期放置而產生的倉儲成本。
好一點的解決方案
目前認為好一點的答案會是 spec.volumeClaimTemplates
,使用這個功能的話——在想像上——在 workflow 啟動時 Argo 會根據提供的定義建立 [PersistentVolumeClaim],並且在 workflow 生命週期結束時刪除該 PVC。因此我們可以將中繼層的資料存放在這個 PVC 裡面,並讓這些資料跟著 workflow 的生命週期刪除。
而這個方法就是前一篇文章中提及、然後發現有所限制的手法,Argo Workflows 在 3.6 版中還不允許我們在巢狀的 workflow 中使用這項功能。
重新出發,PVC 自己開
最近被同事提醒道,既然系統不幫忙,那就自己開一個吧——並且這個手法甚至還有出現在 Argo Workflows 的官方說明文件裡:Volumes (永久連結)
這裏利用 Argo 提供的 Kubernetes resource template,並且搭配 setOwnerReference
屬性,這樣就能夠開出一個臨時的 PVC、並且其生命週期綁定於 workflow,使 PVC 與其中的資料在 workflow 生命週期結束時自動刪除:
templates:
- name: create-temp-pvc
inputs:
parameters:
- name: pvc-size
default: 64Mi
resource:
action: create
manifest: |+
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
generateName: {{= trimSuffix(sprig.trunc(58, workflow.name), '-') }}-
spec:
accessModes:
- ReadWriteOnce
- ReadOnlyMany
resources:
requests:
storage: {{ inputs.parameters.pvc-size }}
setOwnerReference: true
outputs:
parameters:
- name: pvc-name
valueFrom:
jsonPath: '{.metadata.name}'
這段 template 在執行時會產出隨機命名的 PVC,並且設為其輸出供後續步驟使用。
裝上 PVC
在需要存取該儲存空間的地方,我們則需要補上對應的 volumes
及 volumeMounts
資訊,這裏的規則跟 pod 是一樣的:
- name: demo
inputs:
parameters:
- name: pvc-name
container:
image: busybox:stable
command: ["true"]
volumeMounts:
- name: temp-storage
mountPath: /mnt
volumes:
- name: temp-storage
persistentVolumeClaim:
claimName: "{{ inputs.parameters.pvc-name }}"
到這裡為止解決方案大概成型,但有個麻煩的地方:pvc-name
的傳遞。
我不太想花心力去在每個需要使用這個儲存空間的 template 補上輸入參數。
使用全域變數廣播 PVC 名稱
要用來達成偷懶手段的工具就是 globalName
,我們可以用它來把一個輸出的引數廣播為全域變數,廣播方式就是在 parameters
下直接補上 globalName
:
templates:
- name: create-temp-pvc
...
outputs:
parameters:
- name: pvc-name
valueFrom:
jsonPath: '{.metadata.name}'
globalName: TEMP-PVC-NAME
而在後續的步驟裡面,則可以從 workflow.outputs
存取這些全域變數:
- name: demo
...
volumes:
- name: temp-storage
persistentVolumeClaim:
claimName: "{{ workflow.outputs.parameters.TEMP-PVC-NAME }}"
同場加映
合併 JSON 物件
在使用 parameters
進行輸出時,Argo Workflows 會自動幫我們把所有輸出合併為一個陣列。雖然轉用人造物了、還是希望能有這個效果,所以叫 GPT 幫忙用 jq 組一個拼裝車出來:
find /mnt -name '<PATTERN>' -exec jq -s . '{}' '+'
組合起來的效果是:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: demo-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: create-pvc
template: create-temp-pvc
- name: print-messages
template: print-message
depends: create-pvc
withParam: >
["hello", "world"]
arguments:
parameters:
- name: message
value: "{{ item }}"
- name: collect
template: collect
depends: print-messages
- name: create-temp-pvc
inputs:
parameters:
- name: pvc-size
default: 8Mi
resource:
action: create
manifest: |+
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
generateName: {{= trimSuffix(sprig.trunc(58, workflow.name), '-') }}-
spec:
accessModes:
- ReadWriteOnce
- ReadOnlyMany
resources:
requests:
storage: {{ inputs.parameters.pvc-size }}
setOwnerReference: true
outputs:
parameters:
- name: pvc-name
valueFrom:
jsonPath: "{.metadata.name}"
globalName: temp-pvc-name
- name: print-message
inputs:
parameters:
- name: message
script:
image: busybox:stable
command: [sh]
source: |+
echo '{"message": "{{ inputs.parameters.message }}"}' | tee '{{ outputs.artifacts.message.path }}'
volumeMounts:
- name: temp-storage
mountPath: /mnt
volumes:
- name: temp-storage
persistentVolumeClaim:
claimName: "{{ workflow.outputs.parameters.temp-pvc-name }}"
outputs:
artifacts:
- name: message
path: /mnt/message-{{= sprig.randAlphaNum(6) }}.json
- name: collect
script:
image: linuxserver/yq:latest
command: [sh]
source: |
find /mnt -name 'message-*.json' -exec jq -s . '{}' '+' > '{{ outputs.artifacts.aggregated.path }}'
volumeMounts:
- name: temp-storage
mountPath: /mnt
volumes:
- name: temp-storage
persistentVolumeClaim:
claimName: "{{ workflow.outputs.parameters.temp-pvc-name }}"
outputs:
artifacts:
- name: aggregated
path: /tmp/messages.json
收工 ✨🍰✨