Files
devstar/services/devcontainer/k8s_agent.go
2025-10-30 19:38:23 +08:00

892 lines
33 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package devcontainer
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"code.gitea.io/gitea/models/db"
devcontainer_model "code.gitea.io/gitea/models/devcontainer"
devcontainer_dto "code.gitea.io/gitea/modules/k8s"
devcontainer_k8s_agent_module "code.gitea.io/gitea/modules/k8s"
k8s_api_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1"
"code.gitea.io/gitea/modules/k8s/errors"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"
// Istio 资源改为 dynamic/unstructured不再直接使用 typed API
)
// 为 K8s Agent 暴露所需的 DTO 类型,便于 K8s 分支创建/查询入口使用
type CreateDevcontainerDTO struct {
devcontainer_model.Devcontainer
SSHPublicKeyList []string
GitRepositoryURL string
Image string
DockerfileContent string
DevcontainerPort uint16
}
type OpenDevcontainerAppDispatcherOptions struct {
Name string `json:"name"`
Wait bool `json:"wait"`
Status uint16
Port uint16
UserPublicKey string
RepoID int64
UserID int64
}
var k8sGroupVersionResource = schema.GroupVersionResource{
Group: "devcontainer.devstar.cn",
Version: "v1",
Resource: "devcontainerapps",
}
type ErrIllegalK8sAgentParams struct {
FieldNameList []string
}
func (err ErrIllegalK8sAgentParams) Error() string {
return fmt.Sprintf("Illegal Params: %v", err.FieldNameList)
}
// AssignDevcontainerGetting2K8sOperator 获取 k8s CRD 资源 DevcontainerApp 最新状态(需要根据用户传入的 wait 参数决定是否要阻塞等待 DevContainer 就绪)
func AssignDevcontainerGetting2K8sOperator(ctx *context.Context, opts *OpenDevcontainerAppDispatcherOptions) (*k8s_api_v1.DevcontainerApp, error) {
log.Info("AssignDevcontainerGetting2K8sOperator: Starting lookup for container: %s, wait=%v",
opts.Name, opts.Wait)
// 0. 检查参数
if ctx == nil || opts == nil || len(opts.Name) == 0 {
return nil, ErrIllegalK8sAgentParams{
FieldNameList: []string{"ctx", "opts", "opts.Name"},
}
}
// 1. 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
// 层层返回错误,结束数据库事务
return nil, errors.ErrOperateDevcontainer{
Action: "Connect to k8s API Server",
Message: err.Error(),
}
}
log.Info("AssignDevcontainerGetting2K8sOperator: K8s client created successfully")
// 2. 调用 modules 层 k8s Agent 获取 k8s CRD 资源 DevcontainerApp
optsGetDevcontainer := &devcontainer_dto.GetDevcontainerOptions{
GetOptions: metav1.GetOptions{},
Name: opts.Name,
Namespace: setting.DevContainerConfig.Namespace,
Wait: opts.Wait,
}
log.Info("AssignDevcontainerGetting2K8sOperator: Retrieving DevcontainerApp %s in namespace %s (wait=%v)",
opts.Name, setting.DevContainerConfig.Namespace, opts.Wait)
devcontainerApp, err := devcontainer_k8s_agent_module.GetDevcontainer(ctxVal, client, optsGetDevcontainer)
if err != nil {
log.Error("AssignDevcontainerGetting2K8sOperator: Failed to get DevcontainerApp: %v", err)
return nil, errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("Open Devcontainer '%s' (wait=%v)", opts.Name, opts.Wait),
Message: err.Error(),
}
}
log.Info("AssignDevcontainerGetting2K8sOperator: DevcontainerApp retrieved successfully - Name: %s, NodePort: %d, Ready: %v",
devcontainerApp.Name, devcontainerApp.Status.NodePortAssigned, devcontainerApp.Status.Ready)
// 添加额外端口的日志
if len(devcontainerApp.Status.ExtraPortsAssigned) > 0 {
for i, port := range devcontainerApp.Status.ExtraPortsAssigned {
log.Info("AssignDevcontainerGetting2K8sOperator: Extra port %d - Name: %s, NodePort: %d, ContainerPort: %d",
i, port.Name, port.NodePort, port.ContainerPort)
}
} else {
log.Info("AssignDevcontainerGetting2K8sOperator: No extra ports found for DevcontainerApp %s", devcontainerApp.Name)
}
// 3. 成功获取最新的 DevcontainerApp返回
return devcontainerApp, nil
}
// 补充笔记: modules/ 与 services/ 两个目录中的 k8s Agent 区别是什么?
// - modules/ 与 k8s API Server 交互密切相关
// - services/ 进行了封装,简化用户界面使用
func AssignDevcontainerDeletion2K8sOperator(ctx *context.Context, devcontainersList *[]devcontainer_model.Devcontainer) error {
log.Info("AssignDevcontainerDeletion2K8sOperator: Starting Deletion for containers")
// 1. 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
// 层层返回错误,结束数据库事务
return err
}
// 获取标准 Kubernetes 客户端,用于删除 Ingress
stdClient, err := getStandardKubernetesClient()
if err != nil {
log.Warn("AssignDevcontainerDeletion2K8sOperator: 获取标准 K8s 客户端失败: %v", err)
// 继续执行,不阻止主流程
} else {
// 先删除与 DevContainer 相关的 Ingress 资源
for _, devcontainer := range *devcontainersList {
ingressName := fmt.Sprintf("%s-ttyd-ingress", devcontainer.Name)
log.Info("AssignDevcontainerDeletion2K8sOperator: 删除 Ingress %s", ingressName)
err := stdClient.NetworkingV1().Ingresses(setting.DevContainerConfig.Namespace).Delete(*ctx, ingressName, metav1.DeleteOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
// Ingress 已经不存在,视为正常情况
log.Info("AssignDevcontainerDeletion2K8sOperator: Ingress %s 不存在,跳过删除", ingressName)
} else {
log.Warn("AssignDevcontainerDeletion2K8sOperator: 删除 Ingress %s 失败: %v", ingressName, err)
// 继续执行,不阻止主流程
}
} else {
log.Info("AssignDevcontainerDeletion2K8sOperator: 成功删除 Ingress %s", ingressName)
}
}
}
// 2. 调用 modules 层 k8s Agent执行删除资源
opts := &devcontainer_dto.DeleteDevcontainerOptions{
DeleteOptions: metav1.DeleteOptions{},
Namespace: setting.DevContainerConfig.Namespace,
}
if devcontainersList == nil || len(*devcontainersList) == 0 {
return fmt.Errorf("delete devcontainer in namespace '%s': %s", opts.Namespace, "the DevContainer list is empty")
}
// 3. 遍历列表删除 DevContainer如果删除出错交由 module 层打印日志,交由管理员手动处理
for _, devcontainer := range *devcontainersList {
opts.Name = devcontainer.Name
_ = devcontainer_k8s_agent_module.DeleteDevcontainer(ctxVal, client, opts)
// 删除对应的 VirtualService
if err := deleteDevContainerWebTerminalVirtualService(ctx, devcontainer.Name); err != nil {
log.Warn("AssignDevcontainerDeletion2K8sOperator: 删除 VirtualService 失败 for DevContainer %s: %v", devcontainer.Name, err)
// 不阻止主流程,只记录警告
} else {
log.Info("AssignDevcontainerDeletion2K8sOperator: 成功删除 VirtualService for DevContainer: %s", devcontainer.Name)
}
}
return nil
}
// 补充笔记: modules/ 与 services/ 两个目录中的 k8s Agent 区别是什么?
// - modules/ 与 k8s API Server 交互密切相关
// - services/ 进行了封装,简化用户界面使用
// AssignDevcontainerCreation2K8sOperator 将 DevContainer 资源创建任务派遣至 k8s Operator同时根据结果更新 NodePort
//
// 注意:本方法仍然在数据库事务中,因此不适合执行长时间操作,故需要后期异步判断 DevContainer 是否就绪
func AssignDevcontainerCreation2K8sOperator(ctx *context.Context, newDevContainer *CreateDevcontainerDTO) error {
log.Info("AssignDevcontainerCreation2K8sOperator: Starting creation for container: %s", newDevContainer.Name)
log.Info("AssignDevcontainerCreation2K8sOperator: Container details - Image: %s, RepoURL: %s, SSHKeys: %d",
newDevContainer.Image, newDevContainer.GitRepositoryURL, len(newDevContainer.SSHPublicKeyList))
// 1. 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
// 层层返回错误,结束数据库事务
return err
}
// 1.1:插入 devcontainer_output 记录
dbEngine := db.GetEngine(*ctx)
// 更新状态为 1正在拉取镜像
_, err = dbEngine.Table("devcontainer").
Where("user_id = ? AND repo_id = ? ", newDevContainer.UserId, newDevContainer.RepoId).
Update(&devcontainer_model.Devcontainer{DevcontainerStatus: 1})
if err != nil {
log.Info("Failed to update status to 1: %v", err)
}
// 插入拉取镜像记录
if _, err := dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: "Pulling image for K8s container: " + newDevContainer.Image,
ListId: 0,
Status: "success", // 设为 success 以满足 created 变量的条件
UserId: newDevContainer.UserId,
RepoId: newDevContainer.RepoId,
Command: "Pull Image",
}); err != nil {
log.Info("Failed to insert Pull Image record: %v", err)
// 不返回错误,继续执行
}
// 更新状态为 2正在创建和启动容器
_, err = dbEngine.Table("devcontainer").
Where("user_id = ? AND repo_id = ? ", newDevContainer.UserId, newDevContainer.RepoId).
Update(&devcontainer_model.Devcontainer{DevcontainerStatus: 2})
if err != nil {
log.Info("Failed to update status to 2: %v", err)
}
// 插入初始化工作区记录 (满足 created = true 的关键条件)
if _, err := dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: "Initializing workspace in Kubernetes...",
Status: "success", // 必须为 success
UserId: newDevContainer.UserId,
RepoId: newDevContainer.RepoId,
Command: "Initialize Workspace",
ListId: 1, // ListId > 0 且 Status = success 是 created = true 的条件
}); err != nil {
log.Info("Failed to insert Initialize Workspace record: %v", err)
// 不返回错误,继续执行
}
// 更新状态为 3容器安装必要工具
_, err = dbEngine.Table("devcontainer").
Where("user_id = ? AND repo_id = ? ", newDevContainer.UserId, newDevContainer.RepoId).
Update(&devcontainer_model.Devcontainer{DevcontainerStatus: 3})
if err != nil {
log.Info("Failed to update status to 3: %v", err)
}
// 插入初始化 DevStar 记录
if _, err := dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: "Initializing DevStar in Kubernetes...",
Status: "success",
UserId: newDevContainer.UserId,
RepoId: newDevContainer.RepoId,
Command: "Initialize DevStar",
ListId: 2,
}); err != nil {
log.Info("Failed to insert Initialize DevStar record: %v", err)
// 不返回错误,继续执行
}
// 插入 postCreateCommand 记录
if _, err := dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: "Running post-create commands in Kubernetes...",
Status: "success",
UserId: newDevContainer.UserId,
RepoId: newDevContainer.RepoId,
Command: "Run postCreateCommand",
ListId: 3,
}); err != nil {
log.Info("Failed to insert Run postCreateCommand record: %v", err)
// 不返回错误,继续执行
}
// 添加 ttyd 端口配置 - WebTerminal 功能
log.Info("AssignDevcontainerCreation2K8sOperator: Adding ttyd port configuration (7681)")
extraPorts := []k8s_api_v1.ExtraPortSpec{
{
Name: "ttyd",
ContainerPort: 7681, // ttyd 默认端口
ServicePort: 7681,
},
}
command := []string{
"/bin/bash",
"-c",
"export DEBIAN_FRONTEND=noninteractive && " +
"apt-get update -y && " +
"apt-get install -y ssh && " +
// 改为条件生成:只有在密钥不存在时才生成
"if [ ! -f /etc/ssh/ssh_host_rsa_key ]; then " +
" echo 'Generating SSH host keys...' && " +
" ssh-keygen -A && " +
" echo 'SSH host keys generated' ; " +
"else " +
" echo 'SSH host keys already exist' ; " +
"fi && " +
"mkdir -p /var/run/sshd && " +
"/usr/sbin/sshd && " +
"if [ -f /ttyd-shared/ttyd ]; then " +
"mkdir -p /data/workspace && " +
"cd /data/workspace && " +
"/ttyd-shared/ttyd -p 7681 -i 0.0.0.0 --writable bash > /tmp/ttyd.log 2>&1 & " +
"fi && " +
"while true; do sleep 60; done",
}
log.Info("AssignDevcontainerCreation2K8sOperator: Command includes ttyd installation and startup")
// 2. 调用 modules 层 k8s Agent执行创建资源
opts := &devcontainer_dto.CreateDevcontainerOptions{
CreateOptions: metav1.CreateOptions{},
Name: newDevContainer.Name,
Namespace: setting.DevContainerConfig.Namespace,
Image: newDevContainer.Image,
/**
* 配置 Kubernetes 主容器启动命令注意事项:
* 1. 确保 Image 中已安装 OpenSSH Server
* 2. 容器启动后必须拉起 OpenSSH 后台服务
* 3. 请勿使用 sleep infinity 或者 tail -f /dev/null 等无限等待命令,
* 可以考虑无限循环 sleep 60s能够防止 k8s 中容器先变成 Completed 然后变为 CrashLoopBackOff
* 也可以防止造成大量僵尸(<defunct>)进程:
* $ ps aux | grep "<defunct>" # 列举僵尸进程
* USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
* pollina+ 2336 0.0 0.0 0 0 ? Z 17:22 0:00 [sshd] <defunct>
* pollina+ 10986 0.0 0.0 0 0 ? Z 16:12 0:00 [sshd] <defunct>
* pollina+ 24722 0.0 0.0 0 0 ? Z 18:36 0:00 [sshd] <defunct>
* pollina+ 26773 0.0 0.0 0 0 ? Z 18:37 0:00 [sshd] <defunct>
* $ ubuntu@node2:~$ ps o ppid 2336 10986 24722 26773 # 查询僵尸进程父进程PID
* PPID
* 21826
* $ ps aux | grep # 列举僵尸进程父进程详情
* USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
* root 21826 0.0 0.0 2520 408 ? Ss 18:36 0:00 sleep infinity
*/
CommandList: command,
ContainerPort: 22,
ServicePort: 22,
SSHPublicKeyList: newDevContainer.SSHPublicKeyList,
GitRepositoryURL: newDevContainer.GitRepositoryURL,
ExtraPorts: extraPorts, // 添加额外端口配置
}
// 2. 创建成功,取回集群中的 DevContainer
log.Info("AssignDevcontainerCreation2K8sOperator: Creating DevcontainerApp %s in namespace %s",
opts.Name, opts.Namespace)
devcontainerInCluster, err := devcontainer_k8s_agent_module.CreateDevcontainer(ctxVal, client, opts)
if err != nil {
log.Error("AssignDevcontainerCreation2K8sOperator: Failed to create DevcontainerApp: %v", err)
return err
}
log.Info("AssignDevcontainerCreation2K8sOperator: DevcontainerApp created successfully - Name: %s",
devcontainerInCluster.Name)
// 不再在创建后立即置为 4保持为 3待 Pod Ready 后由 GetDevContainerStatus 升级为 4
// 3. 处理 NodePort - 检查是否为0尚未分配
nodePort := devcontainerInCluster.Status.NodePortAssigned
if nodePort == 0 {
log.Info("AssignDevcontainerCreation2K8sOperator: NodePort not yet assigned, starting async updater for %s",
devcontainerInCluster.Name)
// 将端口设为0数据库中记录特殊标记
newDevContainer.DevcontainerPort = 0
// 记录容器已创建,但端口待更新
log.Info("DevContainer created in cluster - Name: %s, NodePort: pending assignment",
devcontainerInCluster.Name)
// 启动异步任务来更新端口
go updateNodePortAsync(devcontainerInCluster.Name,
setting.DevContainerConfig.Namespace,
newDevContainer.UserId,
newDevContainer.RepoId)
} else {
log.Info("AssignDevcontainerCreation2K8sOperator: NodePort %d assigned immediately to %s",
nodePort, devcontainerInCluster.Name)
// 端口已分配,直接使用
newDevContainer.DevcontainerPort = nodePort
log.Info("DevContainer created in cluster - Name: %s, NodePort: %d",
devcontainerInCluster.Name, nodePort)
}
log.Info("DevContainer created in cluster - Name: %s, NodePort: %d",
devcontainerInCluster.Name,
devcontainerInCluster.Status.NodePortAssigned)
// 为 ttyd 服务创建 Istio Gateway 和 VirtualService
log.Info("AssignDevcontainerCreation2K8sOperator: 开始创建 Istio 资源 for DevContainer: %s", devcontainerInCluster.Name)
// 1. 确保 Gateway 存在
if err := createDevContainerWebTerminalGateway(ctx); err != nil {
log.Warn("AssignDevcontainerCreation2K8sOperator: 创建 Gateway 失败: %v", err)
// 不阻止主流程,只记录警告
} else {
log.Info("AssignDevcontainerCreation2K8sOperator: Gateway 创建成功")
}
// 2. 创建 VirtualService
if err := createDevContainerWebTerminalVirtualService(ctx, devcontainerInCluster.Name); err != nil {
log.Warn("AssignDevcontainerCreation2K8sOperator: 创建 VirtualService 失败: %v", err)
// 不阻止主流程,只记录警告
} else {
log.Info("AssignDevcontainerCreation2K8sOperator: VirtualService 创建成功")
}
// 4. 层层返回 nil自动提交数据库事务完成 DevContainer 创建
return nil
}
// AssignDevcontainerRestart2K8sOperator 将 DevContainer 重启任务派遣至 K8s 控制器
func AssignDevcontainerRestart2K8sOperator(ctx *context.Context, opts *DevcontainerVO) error {
log.Info("AssignDevcontainerRestart2K8sOperator: Starting restart for container: %s", opts.DevContainerName)
// 1. 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
log.Error("Failed to get Kubernetes client: %v", err)
return err
}
// 2. 通过打补丁方式实现重启 - 更新注解以触发控制器重新部署 Pod
// 创建补丁,添加或更新 restartedAt 注解,同时确保 desiredReplicas 为 1
patchData := fmt.Sprintf(`{
"metadata": {
"annotations": {
"devstar.io/restartedAt": "%s",
"devstar.io/desiredReplicas": "1"
}
}
}`, time.Now().Format(time.RFC3339))
log.Info("AssignDevcontainerRestart2K8sOperator: Applying patch to restart container %s",
opts.DevContainerName)
log.Debug("AssignDevcontainerRestart2K8sOperator: Patch data: %s", patchData)
// 应用补丁到 DevcontainerApp CRD
_, err = client.Resource(k8sGroupVersionResource).
Namespace(setting.DevContainerConfig.Namespace).
Patch(ctxVal, opts.DevContainerName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
log.Error("Failed to patch DevcontainerApp for restart: %v", err)
return fmt.Errorf("restart k8s devcontainer '%s' failed: %v", opts.DevContainerName, err)
}
// 记录重启操作日志
log.Info("DevContainer restarted: %s", opts.DevContainerName)
log.Info("AssignDevcontainerRestart2K8sOperator: Restart patch applied successfully for %s",
opts.DevContainerName)
// 将重启操作记录到数据库
dbEngine := db.GetEngine(*ctx)
_, err = dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: fmt.Sprintf("Restarting K8s DevContainer %s", opts.DevContainerName),
Status: "success",
UserId: opts.UserId,
RepoId: opts.RepoId,
Command: "Restart DevContainer",
ListId: 0,
})
if err != nil {
log.Warn("Failed to insert restart record: %v", err)
}
return nil
}
// AssignDevcontainerStop2K8sOperator 将 DevContainer 停止任务派遣至 K8s 控制器
func AssignDevcontainerStop2K8sOperator(ctx *context.Context, opts *DevcontainerVO) error {
// 1. 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
log.Error("Failed to get Kubernetes client: %v", err)
return err
}
// 2. 通过打补丁方式实现停止 - 添加停止注解
// 创建补丁,添加或更新 stopped 和 desiredReplicas 注解
patchData := fmt.Sprintf(`{
"metadata": {
"annotations": {
"devstar.io/stoppedAt": "%s",
"devstar.io/desiredReplicas": "0"
}
}
}`, time.Now().Format(time.RFC3339))
// 应用补丁到 DevcontainerApp CRD
_, err = client.Resource(k8sGroupVersionResource).
Namespace(setting.DevContainerConfig.Namespace).
Patch(ctxVal, opts.DevContainerName, types.MergePatchType, []byte(patchData), metav1.PatchOptions{})
if err != nil {
log.Error("Failed to patch DevcontainerApp for stop: %v", err)
return fmt.Errorf("stop k8s devcontainer '%s' failed: %v", opts.DevContainerName, err)
}
// 记录停止操作日志
log.Info("DevContainer stopped: %s", opts.DevContainerName)
// 将停止操作记录到数据库
dbEngine := db.GetEngine(*ctx)
_, err = dbEngine.Table("devcontainer_output").Insert(&devcontainer_model.DevcontainerOutput{
Output: fmt.Sprintf("Stopping K8s DevContainer %s", opts.DevContainerName),
Status: "success",
UserId: opts.UserId,
RepoId: opts.RepoId,
Command: "Stop DevContainer",
ListId: 0,
})
if err != nil {
// 只记录错误,不影响主流程返回结果
log.Warn("Failed to insert stop record: %v", err)
}
return nil
}
// 异步更新 NodePort 的辅助函数
func updateNodePortAsync(containerName string, namespace string, userId, repoId int64) {
log.Info("updateNodePortAsync: Starting for container: %s in namespace: %s", containerName, namespace)
log.Info("updateNodePortAsync: Waiting 20 seconds for K8s controller to assign port")
// 等待K8s控制器完成端口分配
time.Sleep(20 * time.Second)
// 创建新的上下文和客户端
ctx := context.Background()
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctx, nil, "")
if err != nil {
log.Error("Failed to get K8s client in async updater: %v", err)
return
}
log.Info("updateNodePortAsync: K8s client created successfully")
// 尝试最多10次获取端口
for i := 0; i < 10; i++ {
log.Info("updateNodePortAsync: Attempt %d/10 to retrieve NodePort for %s", i+1, containerName)
getOpts := &devcontainer_k8s_agent_module.GetDevcontainerOptions{
GetOptions: metav1.GetOptions{},
Name: containerName,
Namespace: namespace,
Wait: false,
}
devcontainer, err := devcontainer_k8s_agent_module.GetDevcontainer(ctx, client, getOpts)
if err == nil && devcontainer != nil && devcontainer.Status.NodePortAssigned > 0 {
log.Info("updateNodePortAsync: Success! Found NodePort %d for %s",
devcontainer.Status.NodePortAssigned, containerName)
// 获取到正确的端口,更新数据库
realNodePort := devcontainer.Status.NodePortAssigned
// 记录 ttyd 端口信息到日志
if len(devcontainer.Status.ExtraPortsAssigned) > 0 {
for _, portInfo := range devcontainer.Status.ExtraPortsAssigned {
log.Info("Found extra port for %s: name=%s, nodePort=%d, containerPort=%d",
containerName, portInfo.Name, portInfo.NodePort, portInfo.ContainerPort)
}
}
log.Info("Found real NodePort %d for container %s, updating database record",
realNodePort, containerName)
engine := db.GetEngine(ctx)
_, err := engine.Table("devcontainer").
Where("user_id = ? AND repo_id = ?", userId, repoId).
Update(map[string]interface{}{
"devcontainer_port": realNodePort,
})
if err != nil {
log.Error("Failed to update NodePort in database: %v", err)
} else {
log.Info("Successfully updated NodePort in database to %d", realNodePort)
}
return
}
log.Info("updateNodePortAsync: Port not yet assigned, waiting 5 seconds before next attempt")
time.Sleep(5 * time.Second)
}
log.Warn("updateNodePortAsync: Failed to retrieve real NodePort after multiple attempts")
}
// 获取标准 Kubernetes 客户端
func getStandardKubernetesClient() (*kubernetes.Clientset, error) {
// 使用与 GetKubernetesClient 相同的逻辑获取配置
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
// 如果集群外配置失败,尝试集群内配置
log.Warn("Failed to obtain Kubernetes config outside of cluster: %v", err)
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("获取 K8s 配置失败 (集群内外均失败): %v", err)
}
}
// 创建标准客户端
stdClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("创建标准 K8s 客户端失败: %v", err)
}
return stdClient, nil
}
// 创建 DevContainer WebTerminal Gateway
func createDevContainerWebTerminalGateway(ctx *context.Context) error {
log.Info("createDevContainerWebTerminalGateway: 开始创建 DevContainer WebTerminal Gateway")
// 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
return fmt.Errorf("获取 K8s 客户端失败: %v", err)
}
gatewayName := "devcontainer-webterminal-gateway"
namespace := setting.DevContainerConfig.Namespace
// 检查 Gateway 是否已存在
gwGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1", Resource: "gateways"}
if _, err := client.Resource(gwGVR).Namespace(namespace).Get(ctxVal, gatewayName, metav1.GetOptions{}); err == nil {
log.Info("createDevContainerWebTerminalGateway: Gateway 已存在: %s", gatewayName)
return nil
} else if !k8serrors.IsNotFound(err) {
return fmt.Errorf("检查 Gateway 失败: %v", err)
}
// 从配置中读取域名
cfg, err := setting.NewConfigProviderFromFile(setting.CustomConf)
if err != nil {
return fmt.Errorf("加载配置文件失败: %v", err)
}
domain := cfg.Section("server").Key("DOMAIN").Value()
// 使用 Unstructured 定义 GatewayHTTP-only
gw := &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "networking.istio.io/v1",
"kind": "Gateway",
"metadata": map[string]interface{}{
"name": gatewayName,
"namespace": namespace,
"labels": map[string]interface{}{
"app.kubernetes.io/name": "devcontainer-webterminal",
"app.kubernetes.io/component": "gateway",
"app.kubernetes.io/managed-by": "devstar",
},
},
"spec": map[string]interface{}{
"selector": map[string]interface{}{"istio": "ingressgateway"},
"servers": []interface{}{
map[string]interface{}{
"port": map[string]interface{}{"number": 80, "name": "http", "protocol": "HTTP"},
"hosts": []interface{}{domain, "*"},
},
},
},
}}
if _, err := client.Resource(gwGVR).Namespace(namespace).Create(ctxVal, gw, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("创建 Gateway 失败: %v", err)
}
log.Info("createDevContainerWebTerminalGateway: 成功创建 Gateway: %s", gatewayName)
return nil
}
// 创建 DevContainer WebTerminal VirtualService
func createDevContainerWebTerminalVirtualService(ctx *context.Context, devcontainerName string) error {
log.Info("createDevContainerWebTerminalVirtualService: 开始创建 VirtualService for DevContainer: %s", devcontainerName)
// 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
return fmt.Errorf("获取 K8s 客户端失败: %v", err)
}
vsName := devcontainerName + "-webterminal-vs"
namespace := setting.DevContainerConfig.Namespace
// 从配置中读取域名
cfg, err := setting.NewConfigProviderFromFile(setting.CustomConf)
if err != nil {
return fmt.Errorf("加载配置文件失败: %v", err)
}
domain := cfg.Section("server").Key("DOMAIN").Value()
// VirtualService 的 hosts 只能二选一:有 DOMAIN 用 [DOMAIN],否则用 ["*"]
var vsHosts []interface{}
if domain != "" {
vsHosts = []interface{}{domain}
} else {
vsHosts = []interface{}{"*"}
}
// 从容器名称中提取用户名和仓库名
parts := strings.Split(devcontainerName, "-")
var username, repoName string
if len(parts) >= 2 {
username = parts[0]
repoName = parts[1]
} else {
username = "unknown"
repoName = "unknown"
}
// 构建访问路径
path := fmt.Sprintf("/%s/%s/dev-container-webterminal", username, repoName)
// 使用 Unstructured 定义 VS并增加路径重写到根路径
vsGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1", Resource: "virtualservices"}
vs := &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "networking.istio.io/v1",
"kind": "VirtualService",
"metadata": map[string]interface{}{
"name": vsName,
"namespace": namespace,
"labels": map[string]interface{}{
"app.kubernetes.io/name": "devcontainer-webterminal",
"app.kubernetes.io/component": "virtualservice",
"app.kubernetes.io/managed-by": "devstar",
"devcontainer-name": devcontainerName,
},
},
"spec": map[string]interface{}{
"hosts": vsHosts,
"gateways": []interface{}{"devcontainer-webterminal-gateway"},
"http": []interface{}{
map[string]interface{}{
"match": []interface{}{map[string]interface{}{"uri": map[string]interface{}{"prefix": path}}},
"rewrite": map[string]interface{}{"uri": "/"},
"route": []interface{}{
map[string]interface{}{
"destination": map[string]interface{}{
"host": devcontainerName + "-svc",
"port": map[string]interface{}{"number": 7681},
},
},
},
"timeout": "3600s",
"retries": map[string]interface{}{
"attempts": 3,
"perTryTimeout": "30s",
"retryOn": "5xx,gateway-error,connect-failure",
},
},
},
},
}}
if _, err := client.Resource(vsGVR).Namespace(namespace).Create(ctxVal, vs, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("创建 VirtualService 失败: %v", err)
}
log.Info("createDevContainerWebTerminalVirtualService: 成功创建 VirtualService: %s, 路径: %s", vsName, path)
return nil
}
// 删除 DevContainer WebTerminal VirtualService
func deleteDevContainerWebTerminalVirtualService(ctx *context.Context, devcontainerName string) error {
log.Info("deleteDevContainerWebTerminalVirtualService: 开始删除 VirtualService for DevContainer: %s", devcontainerName)
// 获取 Dynamic Client
ctxVal := *ctx
client, err := devcontainer_k8s_agent_module.GetKubernetesClient(ctxVal, nil, "")
if err != nil {
return fmt.Errorf("获取 K8s 客户端失败: %v", err)
}
vsName := devcontainerName + "-webterminal-vs"
namespace := setting.DevContainerConfig.Namespace
vsGVR := schema.GroupVersionResource{Group: "networking.istio.io", Version: "v1", Resource: "virtualservices"}
if err := client.Resource(vsGVR).Namespace(namespace).Delete(ctxVal, vsName, metav1.DeleteOptions{}); err != nil {
if k8serrors.IsNotFound(err) {
log.Info("deleteDevContainerWebTerminalVirtualService: VirtualService 不存在,无需删除: %s", vsName)
return nil
}
return fmt.Errorf("删除 VirtualService 失败: %v", err)
}
log.Info("deleteDevContainerWebTerminalVirtualService: 成功删除 VirtualService: %s", vsName)
return nil
}
// executeCommandInK8sPod 在 K8s Pod 中执行命令的辅助函数
func executeCommandInK8sPod(ctx *context.Context, client *kubernetes.Clientset, namespace, devcontainerName, containerName string, command []string) error {
log.Info("executeCommandInK8sPod: 开始为 DevContainer %s 查找对应的 Pod", devcontainerName)
// 1. 首先根据标签选择器查找对应的 Pod
labelSelector := fmt.Sprintf("app=%s", devcontainerName)
pods, err := client.CoreV1().Pods(namespace).List(*ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
log.Error("executeCommandInK8sPod: 查找 Pod 失败: %v", err)
return fmt.Errorf("查找 Pod 失败: %v", err)
}
if len(pods.Items) == 0 {
log.Error("executeCommandInK8sPod: 未找到 DevContainer %s 对应的 Pod", devcontainerName)
return fmt.Errorf("未找到 DevContainer %s 对应的 Pod", devcontainerName)
}
// 2. 找到第一个运行中的 Pod
var targetPod *v1.Pod
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Status.Phase == v1.PodRunning {
targetPod = pod
break
}
}
if targetPod == nil {
log.Error("executeCommandInK8sPod: DevContainer %s 没有运行中的 Pod", devcontainerName)
return fmt.Errorf("DevContainer %s 没有运行中的 Pod", devcontainerName)
}
podName := targetPod.Name
log.Info("executeCommandInK8sPod: 找到运行中的 Pod: %s, 在容器 %s 中执行命令",
podName, containerName)
// 3. 执行命令
req := client.CoreV1().RESTClient().Post().
Resource("pods").
Name(podName).
Namespace(namespace).
SubResource("exec").
Param("container", containerName)
req.VersionedParams(&v1.PodExecOptions{
Container: containerName,
Command: command,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
// 获取 executor
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
// 如果集群外配置失败,尝试集群内配置
config, err = rest.InClusterConfig()
if err != nil {
return fmt.Errorf("获取 K8s 配置失败: %v", err)
}
}
executor, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return fmt.Errorf("创建命令执行器失败: %v", err)
}
// 执行命令
var stdout, stderr bytes.Buffer
err = executor.StreamWithContext(*ctx, remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
log.Error("executeCommandInK8sPod: 命令执行失败: %v, stderr: %s",
err, stderr.String())
return fmt.Errorf("命令执行失败: %v, stderr: %s", err, stderr.String())
}
log.Info("executeCommandInK8sPod: 命令执行成功, stdout: %s", stdout.String())
return nil
}