Files
devstar/modules/k8s/k8s.go
2025-11-25 14:24:05 +08:00

505 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/*
* Copyright (c) Mengning Software. 2025. All rights reserved.
* Authors: DevStar Team, panshuxiao
* Create: 2025-11-19
* Description: Kubernetes client helper functions for DevContainerApp CRD.
*/
package k8s_agent
import (
"context"
"encoding/json"
"fmt"
"strings"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
k8s_api_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1"
devcontainer_errors "code.gitea.io/gitea/modules/k8s/errors"
devcontainer_k8s_agent_modules_errors "code.gitea.io/gitea/modules/k8s/errors"
apimachinery_api_metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinery_apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinery_apis_v1_unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
apimachinery_runtime_utils "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apimachinery_watch "k8s.io/apimachinery/pkg/watch"
dynamic_client "k8s.io/client-go/dynamic"
dynamicclient "k8s.io/client-go/dynamic"
clientgorest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// IsK8sDevcontainerStatusReady 工具类方法,判断给定的 DevcontainerApp.Status 是否达到就绪状态
// 1. DevcontainerApp.Status.Ready == true
// 2. DevcontainerApp.Status.NodePortAssigned 介于闭区间 [30000, 32767]
func IsK8sDevcontainerStatusReady(devcontainerAppStatus *k8s_api_v1.DevcontainerAppStatus) bool {
return devcontainerAppStatus != nil &&
devcontainerAppStatus.Ready &&
devcontainerAppStatus.NodePortAssigned >= 30000 &&
devcontainerAppStatus.NodePortAssigned <= 32767
}
// groupVersionResource 用于描述 CRD供 dynamic Client 交互使用
var groupVersionResource = schema.GroupVersionResource{
Group: k8s_api_v1.GroupVersion.Group,
Version: k8s_api_v1.GroupVersion.Version,
Resource: "devcontainerapps",
}
// GetKubernetesClient 通过用户提供的 kubeconfig 原始内容与可选的 contextName 获取动态客户端
func GetKubernetesClient(ctx context.Context, kubeconfig []byte, contextName string) (dynamicclient.Interface, error) {
var config *clientgorest.Config
var err error
if len(kubeconfig) == 0 {
// 未提供 kubeconfig 内容:优先使用本机默认 kubeconfig其次回退到 InCluster
config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
log.Warn("Failed to obtain Kubernetes config outside of cluster: " + clientcmd.RecommendedHomeFile)
config, err = clientgorest.InClusterConfig()
if err != nil {
log.Error("Failed to obtain Kubernetes config both inside/outside of cluster, the DevContainer is Disabled")
setting.DevContainerConfig.Enable = false
return nil, err
}
}
} else {
// 提供了 kubeconfig 内容:按用户提供的内容与可选 context 获取配置
config, err = restConfigFromKubeconfigBytes(kubeconfig, contextName)
if err != nil {
return nil, err
}
}
applyClientDefaults(config)
// 强制跳过 TLS 证书校验(无论 kubeconfig 是否声明 insecure-skip-tls-verify
// 同时清空 CA 配置
config.TLSClientConfig.Insecure = true
config.TLSClientConfig.CAData = nil
config.TLSClientConfig.CAFile = ""
// 尝试创建客户端如果TLS验证失败则自动跳过验证
client, err := dynamicclient.NewForConfig(config)
if err != nil {
// 再次兜底:若识别为 TLS 错误,已 Insecure无需再次设置否则将错误上抛
return nil, fmt.Errorf("failed to create k8s client: %v", err)
}
return client, nil
}
// GetKubernetesClientWithToken 通过用户提供的 k8sURL 和 token 获取动态客户端
// 如果 k8sURL 和 token 为空,则优先使用配置文件中的 K8sConfig.Url 和 K8sConfig.Token
// 如果配置文件也未配置,则返回错误(不回退到 kubeconfig
func GetKubernetesClientWithToken(ctx context.Context, k8sURL, token string) (dynamicclient.Interface, error) {
// 如果未提供 URL 和 token优先使用配置文件中的设置
if k8sURL == "" || token == "" {
// 优先使用配置文件中的 K8s 配置
if setting.K8sConfig.Enable && setting.K8sConfig.Url != "" && setting.K8sConfig.Token != "" {
k8sURL = setting.K8sConfig.Url
token = setting.K8sConfig.Token
log.Info("使用配置文件中的 K8s 配置: URL=%s", k8sURL)
}
}
// 如果仍然没有 URL 和 token直接返回错误
if k8sURL == "" || token == "" {
return nil, fmt.Errorf("k8sURL and token are required, neither provided nor found in config")
}
// 使用 token 认证创建配置
config := &clientgorest.Config{
Host: k8sURL,
BearerToken: token,
TLSClientConfig: clientgorest.TLSClientConfig{
Insecure: true,
},
}
applyClientDefaults(config)
// 强制跳过 TLS 证书校验(与 GetKubernetesClient 保持一致)
// 同时清空 CA 配置
config.TLSClientConfig.Insecure = true
config.TLSClientConfig.CAData = nil
config.TLSClientConfig.CAFile = ""
// 尝试创建客户端如果TLS验证失败则自动跳过验证
client, err := dynamicclient.NewForConfig(config)
if err != nil {
// 再次兜底:若识别为 TLS 错误,已 Insecure无需再次设置否则将错误上抛
return nil, fmt.Errorf("failed to create k8s client: %v", err)
}
return client, nil
}
// restConfigFromKubeconfigBytes 基于 kubeconfig 内容构造 *rest.Config支持指定 context为空则使用 current-context
func restConfigFromKubeconfigBytes(kubeconfig []byte, contextName string) (*clientgorest.Config, error) {
if contextName == "" {
cfg, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return nil, err
}
applyClientDefaults(cfg)
return cfg, nil
}
// 指定 context 的解析路径
apiConfig, err := clientcmd.Load(kubeconfig)
if err != nil {
return nil, err
}
overrides := &clientcmd.ConfigOverrides{CurrentContext: contextName}
clientConfig := clientcmd.NewDefaultClientConfig(*apiConfig, overrides)
cfg, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
applyClientDefaults(cfg)
return cfg, nil
}
// applyClientDefaults 统一设置 QPS/Burst可按需设置超时等
func applyClientDefaults(cfg *clientgorest.Config) {
if cfg == nil {
return
}
if cfg.QPS == 0 {
cfg.QPS = 50
}
if cfg.Burst == 0 {
cfg.Burst = 100
}
}
func GetDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
// 0. 检查参数
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
Message: "cannot be nil",
}
}
// 1. 获取 k8s CRD 资源 DevcontainerApp
devcontainerUnstructured, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Get(ctx, opts.Name, opts.GetOptions)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "Get DevcontainerApp thru k8s API Server",
Message: err.Error(),
}
}
// 2. 解析 DevcontainerApp Status 域,装填 VO
devcontainerApp := &k8s_api_v1.DevcontainerApp{}
err = apimachinery_runtime_utils.DefaultUnstructuredConverter.FromUnstructured(devcontainerUnstructured.Object, &devcontainerApp)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "Convert k8s API Server unstructured response into DevcontainerApp",
Message: err.Error(),
}
}
// 3. 检查 Devcontainer 是否就绪
if !IsK8sDevcontainerStatusReady(&devcontainerApp.Status) {
// 3.1 检查 Wait 参数,若用户不需要阻塞式等待,直接返回 “DevContainer 未就绪” 错误
if opts.Wait == false {
return nil, devcontainer_errors.ErrK8sDevcontainerNotReady{
Name: opts.Name,
Namespace: opts.Namespace,
Wait: opts.Wait,
}
}
// 3.2 执行阻塞式等待
devcontainerStatusVO, err := waitUntilDevcontainerReadyWithTimeout(ctx, client, opts)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "wait for k8s DevContainer to be ready",
Message: err.Error(),
}
}
devcontainerApp.Status.Ready = devcontainerStatusVO.Ready
devcontainerApp.Status.NodePortAssigned = devcontainerStatusVO.NodePortAssigned
}
// 4. 将就绪的 DevContainer Status VO 返回
return devcontainerApp, nil
}
// waitUntilDevcontainerReadyWithTimeout 辅助方法:在超时时间内阻塞等待 DevContainer 就绪
func waitUntilDevcontainerReadyWithTimeout(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*DevcontainerStatusK8sAgentVO, error) {
// 0. 检查参数
if ctx == nil || client == nil || opts == nil || len(opts.Name) == 0 || len(opts.Namespace) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "client", "opts", "opts.Name", "opts.Namespace"},
Message: "could not be nil",
}
}
// 1. 注册 watcher 监听 DevContainer Status 变化
watcherTimeoutSeconds := setting.DevContainerConfig.TimeoutSeconds
watcher, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Watch(ctx, apimachinery_apis_v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", opts.Name),
Watch: true,
TimeoutSeconds: &watcherTimeoutSeconds,
})
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "register watcher of DevContainer Readiness",
Message: err.Error(),
}
}
defer watcher.Stop()
// 2. 当 DevContainer Watcher 事件处理
devcontainerStatusVO := &DevcontainerStatusK8sAgentVO{}
for event := range watcher.ResultChan() {
switch event.Type {
case apimachinery_watch.Added:
// 2.1 监听 DevcontainerApp ADDED 事件,直接 fallthrough 到 MODIFIED 事件合并处理
fallthrough
case apimachinery_watch.Modified:
// 2.2 监听 DevcontainerApp MODIFIED 事件
if devcontainerUnstructured, ok := event.Object.(*apimachinery_apis_v1_unstructured.Unstructured); ok {
// 2.2.1 解析 status 域
statusDevcontainer, ok, err := apimachinery_apis_v1_unstructured.NestedMap(devcontainerUnstructured.Object, "status")
if err == nil && ok {
devcontainerCurrentStatus := &k8s_api_v1.DevcontainerAppStatus{
Ready: statusDevcontainer["ready"].(bool),
NodePortAssigned: uint16(statusDevcontainer["nodePortAssigned"].(int64)),
}
// 2.2.2 当 Status 达到就绪状态后,返回
if IsK8sDevcontainerStatusReady(devcontainerCurrentStatus) {
devcontainerStatusVO.Ready = devcontainerCurrentStatus.Ready
devcontainerStatusVO.NodePortAssigned = devcontainerCurrentStatus.NodePortAssigned
return devcontainerStatusVO, nil
}
}
}
case apimachinery_watch.Error:
// 2.3 监听 DevcontainerApp ERROR 事件,返回报错信息
apimachineryApiMetav1Status, ok := event.Object.(*apimachinery_api_metav1.Status)
if !ok {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("wait for Devcontainer '%s' in namespace '%s' to be ready", opts.Name, opts.Namespace),
Message: fmt.Sprintf("An error occurred in k8s CRD DevcontainerApp Watcher: \n"+
" Code: %v (status = %v)\n"+
"Message: %v\n"+
" Reason: %v\n"+
"Details: %v",
apimachineryApiMetav1Status.Code, apimachineryApiMetav1Status.Status,
apimachineryApiMetav1Status.Message,
apimachineryApiMetav1Status.Reason,
apimachineryApiMetav1Status.Details),
}
}
case apimachinery_watch.Deleted:
// 2.4 监听 DevcontainerApp DELETED 事件,返回报错信息
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("Open DevContainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
Message: fmt.Sprintf("'%s' of Kind DevcontainerApp has been Deleted", opts.Name),
}
}
}
// 3. k8s CRD DevcontainerApp Watcher 超时关闭处理:直接返回超时错误
return nil, devcontainer_errors.ErrOpenDevcontainerTimeout{
Name: opts.Name,
Namespace: opts.Namespace,
TimeoutSeconds: setting.DevContainerConfig.TimeoutSeconds,
}
}
// 修改 CreateDevcontainer 函数
func CreateDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *CreateDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
// 记录日志
log.Info("Creating DevContainer with options: name=%s, namespace=%s, image=%s",
opts.Name, opts.Namespace, opts.Image)
// 创建资源定义
devcontainerApp := &k8s_api_v1.DevcontainerApp{
TypeMeta: apimachinery_apis_v1.TypeMeta{
Kind: "DevcontainerApp",
APIVersion: "devcontainer.devstar.cn/v1",
},
ObjectMeta: apimachinery_apis_v1.ObjectMeta{
Name: opts.Name,
Namespace: opts.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "devcontainer-operator",
"app.kubernetes.io/managed-by": "kustomize",
},
},
Spec: k8s_api_v1.DevcontainerAppSpec{
StatefulSet: k8s_api_v1.StatefulSetSpec{
Image: opts.Image,
Command: opts.CommandList,
ContainerPort: opts.ContainerPort,
SSHPublicKeyList: opts.SSHPublicKeyList,
GitRepositoryURL: opts.GitRepositoryURL,
},
Service: k8s_api_v1.ServiceSpec{
ServicePort: opts.ServicePort,
ExtraPorts: opts.ExtraPorts, // 添加 ExtraPorts 配置
},
},
}
// 转换为 JSON
jsonData, err := json.Marshal(devcontainerApp)
if err != nil {
log.Error("Failed to marshal DevcontainerApp to JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Marshal JSON",
Message: err.Error(),
}
}
// 输出 JSON 以便调试
log.Debug("Generated JSON for DevcontainerApp:\n%s", string(jsonData))
// 转换为 Unstructured 对象
unstructuredObj := &apimachinery_apis_v1_unstructured.Unstructured{}
err = unstructuredObj.UnmarshalJSON(jsonData)
if err != nil {
log.Error("Failed to unmarshal JSON to Unstructured: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Unmarshal JSON to Unstructured",
Message: err.Error(),
}
}
// 确认 GroupVersionResource 定义
log.Debug("Using GroupVersionResource: Group=%s, Version=%s, Resource=%s",
groupVersionResource.Group, groupVersionResource.Version, groupVersionResource.Resource)
// 创建资源
log.Info("Creating DevcontainerApp resource in namespace %s", opts.Namespace)
result, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Create(ctx, unstructuredObj, opts.CreateOptions)
if err != nil {
log.Error("Failed to create DevcontainerApp: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "create DevContainer via Dynamic Client",
Message: err.Error(),
}
}
log.Info("DevcontainerApp resource created successfully")
// 将结果转换回 DevcontainerApp 结构体
resultJSON, err := result.MarshalJSON()
if err != nil {
log.Error("Failed to marshal result to JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Marshal result JSON",
Message: err.Error(),
}
}
createdDevcontainer := &k8s_api_v1.DevcontainerApp{}
if err := json.Unmarshal(resultJSON, createdDevcontainer); err != nil {
log.Error("Failed to unmarshal result JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Unmarshal result JSON",
Message: err.Error(),
}
}
return createdDevcontainer, nil
}
func DeleteDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *DeleteDevcontainerOptions) error {
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
return devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
Message: "cannot be nil",
}
}
err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Delete(ctx, opts.Name, opts.DeleteOptions)
if err != nil {
log.Warn("Failed to delete DevcontainerApp '%s' in namespace '%s': %s", opts.Name, opts.Namespace, err.Error())
return devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("delete devcontainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
Message: err.Error(),
}
}
return nil
}
// ListDevcontainers 根据条件列举 DevContainer
func ListDevcontainers(ctx context.Context, client dynamic_client.Interface, opts *ListDevcontainersOptions) (*k8s_api_v1.DevcontainerAppList, error) {
if ctx == nil || opts == nil || len(opts.Namespace) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "namespace"},
Message: "cannot be empty",
}
}
list, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).List(ctx, opts.ListOptions)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("List Devcontainer in namespace '%s'", opts.Namespace),
Message: err.Error(),
}
}
// JSON 反序列化为 DevcontainerAppList
jsonData, err := list.MarshalJSON()
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "verify JSON data of Devcontainer List",
Message: err.Error(),
}
}
devcontainerList := &k8s_api_v1.DevcontainerAppList{}
if err := json.Unmarshal(jsonData, devcontainerList); err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "deserialize Devcontainer List data",
Message: err.Error(),
}
}
return devcontainerList, nil
}
// isTLSCertificateError 检查错误是否是TLS证书验证错误
func isTLSCertificateError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// 检查常见的TLS证书验证错误尽量宽松覆盖更多 x509 报错文案)
tlsErrorPatterns := []string{
"tls: failed to verify certificate",
"x509:",
"x509: certificate",
"cannot validate certificate",
"doesn't contain any IP SANs",
"certificate is valid for",
"certificate signed by unknown authority",
"unknown authority",
"self-signed certificate",
"certificate has expired",
"certificate is not valid",
"invalid certificate",
}
for _, pattern := range tlsErrorPatterns {
if strings.Contains(errStr, pattern) {
return true
}
}
return false
}