/* Copyright 2024. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package devcontainer import ( "context" "strconv" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" devcontainer_v1 "code.gitea.io/gitea/modules/k8s/api/v1" devcontainer_controller_utils "code.gitea.io/gitea/modules/k8s/controller/devcontainer/utils" apps_v1 "k8s.io/api/apps/v1" core_v1 "k8s.io/api/core/v1" k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ) // DevcontainerAppReconciler reconciles a DevcontainerApp object type DevcontainerAppReconciler struct { client.Client Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/status,verbs=get;update;patch // +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch // +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. // Modify the Reconcile function to compare the state specified by // the DevcontainerApp object against the actual cluster state, and then // perform operations to make the cluster state reflect the state specified by // the user. // // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile func (r *DevcontainerAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) var err error // 1. 读取缓存中的 DevcontainerApp app := &devcontainer_v1.DevcontainerApp{} err = r.Get(ctx, req.NamespacedName, app) if err != nil { // 当 CRD 资源 "DevcontainerApp" 被删除后,直接返回空结果,跳过剩下步骤 return ctrl.Result{}, client.IgnoreNotFound(err) } // 检查停止容器的注解 if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas == "0" { logger.Info("DevContainer stop requested via annotation", "name", app.Name) // 获取当前的 StatefulSet statefulSetInNamespace := &apps_v1.StatefulSet{} err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace) if err == nil { // 设置副本数为0 replicas := int32(0) statefulSetInNamespace.Spec.Replicas = &replicas if err := r.Update(ctx, statefulSetInNamespace); err != nil { logger.Error(err, "Failed to scale down StatefulSet replicas to 0") return ctrl.Result{}, err } logger.Info("StatefulSet scaled down to 0 replicas due to stop request") // 标记容器为未就绪 app.Status.Ready = false if err := r.Status().Update(ctx, app); err != nil { logger.Error(err, "Failed to update DevcontainerApp status") return ctrl.Result{}, err } // 继续处理其他逻辑(如更新 Service) } } // 2. 根据 DevcontainerApp 配置信息进行处理 // 2.1 StatefulSet 处理 statefulSet := devcontainer_controller_utils.NewStatefulSet(app) err = k8s_sigs_controller_runtime_utils.SetControllerReference(app, statefulSet, r.Scheme) if err != nil { return ctrl.Result{}, err } // 2.2 查找 集群中同名称的 StatefulSet statefulSetInNamespace := &apps_v1.StatefulSet{} err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace) if err != nil { if !errors.IsNotFound(err) { return ctrl.Result{}, err } err = r.Create(ctx, statefulSet) if err != nil && !errors.IsAlreadyExists(err) { logger.Error(err, "Failed to create StatefulSet") return ctrl.Result{}, err } } else { // 处理重启注解 if restartedAt, exists := app.Annotations["devstar.io/restartedAt"]; exists { // 检查注解是否已经应用到StatefulSet needsRestart := true if statefulSetInNamespace.Spec.Template.Annotations != nil { if currentRestartTime, exists := statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"]; exists && currentRestartTime == restartedAt { needsRestart = false } } else { statefulSetInNamespace.Spec.Template.Annotations = make(map[string]string) } if needsRestart { logger.Info("DevContainer restart requested", "name", app.Name, "time", restartedAt) // 将重启注解传递到 Pod 模板以触发滚动更新 statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"] = restartedAt // 确保副本数至少为1(防止之前被停止) replicas := int32(1) if statefulSetInNamespace.Spec.Replicas != nil && *statefulSetInNamespace.Spec.Replicas > 0 { replicas = *statefulSetInNamespace.Spec.Replicas } statefulSetInNamespace.Spec.Replicas = &replicas if err := r.Update(ctx, statefulSetInNamespace); err != nil { logger.Error(err, "Failed to update StatefulSet for restart") return ctrl.Result{}, err } logger.Info("StatefulSet restarted successfully") } } // 若 StatefulSet.Status.readyReplicas 变化,则更新 DevcontainerApp.Status.Ready 域 if statefulSetInNamespace.Status.ReadyReplicas > 0 { app.Status.Ready = true if err := r.Status().Update(ctx, app); err != nil { logger.Error(err, "Failed to update DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready) return ctrl.Result{}, err } logger.Info("DevContainer is READY", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas) } else if app.Status.Ready { // 只有当目前状态为Ready但实际不再Ready时才更新 app.Status.Ready = false if err := r.Status().Update(ctx, app); err != nil { logger.Error(err, "Failed to un-mark DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready) return ctrl.Result{}, err } logger.Info("DevContainer is NOT ready", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas) } // 修复方法:加上判断条件,避免循环触发更新 needsUpdate := false // 检查镜像是否变更 if app.Spec.StatefulSet.Image != statefulSetInNamespace.Spec.Template.Spec.Containers[0].Image { needsUpdate = true } // 检查副本数 - 如果指定了 desiredReplicas 注解但不为 0(停止已在前面处理) if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas != "0" { replicas, err := strconv.ParseInt(desiredReplicas, 10, 32) if err == nil { currentReplicas := int32(1) // 默认值 if statefulSetInNamespace.Spec.Replicas != nil { currentReplicas = *statefulSetInNamespace.Spec.Replicas } if currentReplicas != int32(replicas) { r32 := int32(replicas) statefulSet.Spec.Replicas = &r32 needsUpdate = true } } } if needsUpdate { if err := r.Update(ctx, statefulSet); err != nil { return ctrl.Result{}, err } logger.Info("StatefulSet updated", "name", statefulSet.Name) } } // 2.3 Service 处理 service := devcontainer_controller_utils.NewService(app) if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, service, r.Scheme); err != nil { return ctrl.Result{}, err } serviceInCluster := &core_v1.Service{} err = r.Get(ctx, types.NamespacedName{Name: app.Name, Namespace: app.Namespace}, serviceInCluster) if err != nil { if !errors.IsNotFound(err) { return ctrl.Result{}, err } err = r.Create(ctx, service) if err == nil { // 创建 NodePort Service 成功只执行一次 ==> 将NodePort 端口分配信息更新到 app.Status logger.Info("[DevStar][DevContainer] NodePort Assigned", "nodePortAssigned", service.Spec.Ports[0].NodePort) // 设置主 SSH 端口的 NodePort app.Status.NodePortAssigned = uint16(service.Spec.Ports[0].NodePort) // 处理额外端口 extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{} // 处理额外端口,从第二个端口开始(索引为1) // 因为第一个端口(索引为0)是 SSH 端口 for i := 1; i < len(service.Spec.Ports); i++ { port := service.Spec.Ports[i] // 查找对应的端口规格 var containerPort uint16 = 0 // 如果存在额外端口配置,尝试匹配 if app.Spec.Service.ExtraPorts != nil { for _, ep := range app.Spec.Service.ExtraPorts { if (ep.Name != "" && ep.Name == port.Name) || (uint16(port.Port) == ep.ServicePort) { containerPort = ep.ContainerPort break } } } // 如果没有找到匹配项,使用目标端口 if containerPort == 0 && port.TargetPort.IntVal > 0 { containerPort = uint16(port.TargetPort.IntVal) } // 添加到额外端口列表 extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{ Name: port.Name, ServicePort: uint16(port.Port), ContainerPort: containerPort, NodePort: uint16(port.NodePort), }) logger.Info("[DevStar][DevContainer] Extra Port NodePort Assigned", "name", port.Name, "servicePort", port.Port, "nodePort", port.NodePort) } // 更新 CRD 状态,包括额外端口 app.Status.ExtraPortsAssigned = extraPortsAssigned if err := r.Status().Update(ctx, app); err != nil { logger.Error(err, "Failed to update NodePorts of DevcontainerApp", "nodePortAssigned", service.Spec.Ports[0].NodePort, "extraPortsCount", len(extraPortsAssigned)) return ctrl.Result{}, err } } else if !errors.IsAlreadyExists(err) { logger.Error(err, "Failed to create DevcontainerApp NodePort Service", "nodePortServiceName", service.Name) return ctrl.Result{}, err } } else { // Service 已存在,检查它的端口信息 // 检查是否需要更新状态 needStatusUpdate := false // 如果主端口未记录,记录之 if app.Status.NodePortAssigned == 0 && len(serviceInCluster.Spec.Ports) > 0 { app.Status.NodePortAssigned = uint16(serviceInCluster.Spec.Ports[0].NodePort) needStatusUpdate = true logger.Info("[DevStar][DevContainer] Found existing main NodePort", "nodePort", serviceInCluster.Spec.Ports[0].NodePort) } // 处理额外端口 if len(serviceInCluster.Spec.Ports) > 1 { // 如果额外端口状态为空,或者数量不匹配 if app.Status.ExtraPortsAssigned == nil || len(app.Status.ExtraPortsAssigned) != len(serviceInCluster.Spec.Ports)-1 { extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{} // 从索引 1 开始,跳过主端口 for i := 1; i < len(serviceInCluster.Spec.Ports); i++ { port := serviceInCluster.Spec.Ports[i] // 查找对应的端口规格 var containerPort uint16 = 0 // 如果存在额外端口配置,尝试匹配 if app.Spec.Service.ExtraPorts != nil { for _, ep := range app.Spec.Service.ExtraPorts { if (ep.Name != "" && ep.Name == port.Name) || (uint16(port.Port) == ep.ServicePort) { containerPort = ep.ContainerPort break } } } // 如果没有找到匹配项,使用目标端口 if containerPort == 0 && port.TargetPort.IntVal > 0 { containerPort = uint16(port.TargetPort.IntVal) } // 添加到额外端口列表 extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{ Name: port.Name, ServicePort: uint16(port.Port), ContainerPort: containerPort, NodePort: uint16(port.NodePort), }) logger.Info("[DevStar][DevContainer] Found existing extra NodePort", "name", port.Name, "nodePort", port.NodePort) } // 更新额外端口状态 app.Status.ExtraPortsAssigned = extraPortsAssigned needStatusUpdate = true } } // 如果需要更新状态 if needStatusUpdate { if err := r.Status().Update(ctx, app); err != nil { logger.Error(err, "Failed to update NodePorts status for existing service") return ctrl.Result{}, err } logger.Info("[DevStar][DevContainer] Updated NodePorts status for existing service", "mainNodePort", app.Status.NodePortAssigned, "extraPortsCount", len(app.Status.ExtraPortsAssigned)) } } return ctrl.Result{}, nil } // SetupWithManager sets up the controller with the Manager. func (r *DevcontainerAppReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&devcontainer_v1.DevcontainerApp{}). Owns(&apps_v1.StatefulSet{}). Owns(&core_v1.Service{}). Complete(r) }