把請求送入 Kubernetes —— 在腳本裡使用阜轉發

使用腳本啟動 Kubernetes 阜轉發、並將請求送入

我人不在叢集內,但我想把某些請求打進去叢集內。這個情境常發生於程式還在開發階段,但需要利用到一些叢集內的資源進行測試,而對應的手段則是開 port-forward,而我因為想要程式化地完成這項工作,所以研究了一下解決方案。

阜轉發

既然已知阜轉發(port forward)可以達成我的需求,那麼就先從這個方向下手。我使用 Kubernetes 官方的 Python 客戶端(kubernetes)來達成這項工作。

要處理阜轉發的核心 API 是 connect_get_namespaced_pod_portforward

import kubernetes.client
import kubernetes.stream
NAMESPACE = "default"
POD_NAME = "my-pod"
PORT = 8080
client = kubernetes.client.CoreV1Api()
pf = kubernetes.stream.portforward(
client.connect_get_namespaced_pod_portforward,
namespace=NAMESPACE,
name=POD_NAME,
ports=str(PORT),
)
sock = pf.socket(PORT)
sock.sendall(b"GET / HTTP/1.1\r\n")
sock.sendall(b"Host: localhost\r\n")
sock.sendall(b"Connection: close\r\n")
sock.sendall(b"\r\n")

不同於 kubectl 指令的是,客戶端不會進行阜號綁定與監聽,如果需要像 kubectl 綁定特定的阜號、監聽與轉發請求,那反而需要做額外的處理。

在標準的使用方法下,我們會拿到一個 socket 實例,然後各種請求會需要走這個通道進行溝通,而務實上我們鐵定是不會想要手刻整套 HTTP 的請求過程,接下來要嘗試把標準 HTTP 函式庫的連線導進這個通道。

搭配 requests

requests 在一定意義上可以視為是 Python 社群在處理 HTTP 請求的標準函式庫了,並且它還是 kubernetes 客戶端的相依,所以選擇 requests 的話不會增加系統的相依數量。

編按:以下的資訊是從範例腳本 pod_portforward.py 裡面學到的神奇手法,該腳本算是筆者研究這整套解決方案的突破口,畢竟 Kubernetes 客戶端一定比例的程式碼是產生器產出的,對應的說明少得可憐。

requests 是 urllib3 的高階封裝,實際處理連線、將 HTTP 序列化的都是 urllib3,故我們對 urllib3 下手就能讓成果同步生效於 requests;而 urllib3 內部有個 create_connection 工具函式是用來開啟 socket 的,正好讓我們那來動手腳:

import socket
import urllib3.util.connection
import kubernetes.client
import kubernetes.stream
def _create_connection(*args, **kwargs) -> socket.socket:
client = kubernetes.client.CoreV1Api()
pf = kubernetes.stream.portforward(
client.connect_get_namespaced_pod_portforward,
namespace=NAMESPACE,
name=POD_NAME,
ports=str(PORT),
)
return pf.socket(PORT)
urllib3.util.connection.create_connection = _create_connection

加上了這段補丁之後,任意需要透過 urllib3 或 requests 送出的請求都會走進預設的 pod 裡面。

當然我們不會滿足於這種寫死的手段,create_connection 的第一個參數會提供要連線的主機資訊,因此我們可以設定一個較有彈性的路由規則:

import random
import socket
import kubernetes.client
import kubernetes.stream
import urllib3.util.connection
original_create_connection = urllib3.util.connection.create_connection
def _create_connection(address: tuple[str, int], *args, **kwargs) -> socket.socket:
# check if we need to handle this connection
host, port = address
if not host.endswith(".kubernetes"):
return original_create_connection(address, *args, **kwargs)
# extract names from the host
name, kind, namespace, _ = host.split(".")
assert kind
match kind:
case "pod":
... # do nothing
case "service":
name, port = retrieve_pod_for_service(namespace, name, port)
case _:
raise ValueError(f"Unsupported kind: {kind}")
# redirect the request to the pod
client = kubernetes.client.CoreV1Api()
pf = kubernetes.stream.portforward(
client.connect_get_namespaced_pod_portforward,
namespace=namespace,
name=name,
ports=str(port),
)
return pf.socket(port)
def retrieve_pod_for_service(
namespace: str, service_name: str, service_port: int
) -> tuple[str, int]:
client = kubernetes.client.CoreV1Api()
# fetch the service
service = client.read_namespaced_service(service_name, namespace)
# find the target port for the service
for port in service.spec.ports:
if port.port == service_port:
target_port = port.target_port
break
else:
raise ValueError(
f"Service {service_name} in namespace {namespace} does not expose port {service_port}"
)
# fetch pods matching the service selector
pods = client.list_namespaced_pod(
namespace,
label_selector=",".join(
f"{key}={value}" for key, value in service.spec.selector.items()
),
)
# randomly select a pod from the list
# `choice` raises IndexError if no pods are in the list
pod = random.choice(pods.items)
return pod.metadata.name, target_port
urllib3.util.connection.create_connection = _create_connection

這個手法就是檢查 create_connection 的第一個參數:主機名稱與阜號,並僅在名稱符合特定格式時才觸發導流,並讓其餘的連線回歸本來的處理手法。這裏使用了 .kubernetes 作為頂級域名,由於這個尾綴目前並不是 IANA 公告的合法頂級域名、也不是 Kubernetes 的內部 DNS 格式,故應該可以放心使用。

有了上面的補丁之後,我們就可以輕鬆地使用 requests 來發出請求,然後底層的程式會自動地為我們進行導流:

import requests
requests.get("http://my-pod.pod.default.kubernetes:8080/healthz")

使用上有個小地方要注意,由於 requests 會參考 http_proxy / https_proxy 環境變數來發出請求,如果是有在使用代理的環境,會需要把 .kubernetes 排除,否則整個連線會發送到代理伺服器那邊,然後再拿到一個域名解析錯誤之類的回應:

export no_proxy=kubernetes

題外話,上面有個額外的處理是服務(service)的部分,這是因為 Kubernetes 的阜轉發只能對著 Pod 進行。相對於使用 kubectl 時,該程式允許我們直接使用服務名稱進行阜轉發的操作:

kubectl port-forward svc/service-name 8000:80

背後其實是 kubetctl 在客戶端自行處理掉(見 portforward.go),在翻這些資料前筆者一直以為這是 Kubernetes 伺服器端提供的功能。