Files
devstar/modules/k8s/controller/devcontainer/devcontainerapp_controller.go

364 lines
13 KiB
Go
Raw Normal View History

/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devcontainer
import (
"context"
"strconv"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
devcontainer_v1 "code.gitea.io/gitea/modules/k8s/api/v1"
devcontainer_controller_utils "code.gitea.io/gitea/modules/k8s/controller/devcontainer/utils"
apps_v1 "k8s.io/api/apps/v1"
core_v1 "k8s.io/api/core/v1"
k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)
// DevcontainerAppReconciler reconciles a DevcontainerApp object
type DevcontainerAppReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch
// +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// Modify the Reconcile function to compare the state specified by
// the DevcontainerApp object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile
func (r *DevcontainerAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var err error
// 1. 读取缓存中的 DevcontainerApp
app := &devcontainer_v1.DevcontainerApp{}
err = r.Get(ctx, req.NamespacedName, app)
if err != nil {
// 当 CRD 资源 "DevcontainerApp" 被删除后,直接返回空结果,跳过剩下步骤
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 检查停止容器的注解
if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas == "0" {
logger.Info("DevContainer stop requested via annotation", "name", app.Name)
// 获取当前的 StatefulSet
statefulSetInNamespace := &apps_v1.StatefulSet{}
err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace)
if err == nil {
// 设置副本数为0
replicas := int32(0)
statefulSetInNamespace.Spec.Replicas = &replicas
if err := r.Update(ctx, statefulSetInNamespace); err != nil {
logger.Error(err, "Failed to scale down StatefulSet replicas to 0")
return ctrl.Result{}, err
}
logger.Info("StatefulSet scaled down to 0 replicas due to stop request")
// 标记容器为未就绪
app.Status.Ready = false
if err := r.Status().Update(ctx, app); err != nil {
logger.Error(err, "Failed to update DevcontainerApp status")
return ctrl.Result{}, err
}
// 继续处理其他逻辑(如更新 Service
}
}
// 2. 根据 DevcontainerApp 配置信息进行处理
// 2.1 StatefulSet 处理
statefulSet := devcontainer_controller_utils.NewStatefulSet(app)
err = k8s_sigs_controller_runtime_utils.SetControllerReference(app, statefulSet, r.Scheme)
if err != nil {
return ctrl.Result{}, err
}
// 2.2 查找 集群中同名称的 StatefulSet
statefulSetInNamespace := &apps_v1.StatefulSet{}
err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
err = r.Create(ctx, statefulSet)
if err != nil && !errors.IsAlreadyExists(err) {
logger.Error(err, "Failed to create StatefulSet")
return ctrl.Result{}, err
}
} else {
// 处理重启注解
if restartedAt, exists := app.Annotations["devstar.io/restartedAt"]; exists {
// 检查注解是否已经应用到StatefulSet
needsRestart := true
if statefulSetInNamespace.Spec.Template.Annotations != nil {
if currentRestartTime, exists := statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"]; exists && currentRestartTime == restartedAt {
needsRestart = false
}
} else {
statefulSetInNamespace.Spec.Template.Annotations = make(map[string]string)
}
if needsRestart {
logger.Info("DevContainer restart requested", "name", app.Name, "time", restartedAt)
// 将重启注解传递到 Pod 模板以触发滚动更新
statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"] = restartedAt
// 确保副本数至少为1防止之前被停止
replicas := int32(1)
if statefulSetInNamespace.Spec.Replicas != nil && *statefulSetInNamespace.Spec.Replicas > 0 {
replicas = *statefulSetInNamespace.Spec.Replicas
}
statefulSetInNamespace.Spec.Replicas = &replicas
if err := r.Update(ctx, statefulSetInNamespace); err != nil {
logger.Error(err, "Failed to update StatefulSet for restart")
return ctrl.Result{}, err
}
logger.Info("StatefulSet restarted successfully")
}
}
// 若 StatefulSet.Status.readyReplicas 变化,则更新 DevcontainerApp.Status.Ready 域
if statefulSetInNamespace.Status.ReadyReplicas > 0 {
app.Status.Ready = true
if err := r.Status().Update(ctx, app); err != nil {
logger.Error(err, "Failed to update DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready)
return ctrl.Result{}, err
}
logger.Info("DevContainer is READY", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas)
} else if app.Status.Ready {
// 只有当目前状态为Ready但实际不再Ready时才更新
app.Status.Ready = false
if err := r.Status().Update(ctx, app); err != nil {
logger.Error(err, "Failed to un-mark DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready)
return ctrl.Result{}, err
}
logger.Info("DevContainer is NOT ready", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas)
}
// 修复方法:加上判断条件,避免循环触发更新
needsUpdate := false
// 检查镜像是否变更
if app.Spec.StatefulSet.Image != statefulSetInNamespace.Spec.Template.Spec.Containers[0].Image {
needsUpdate = true
}
// 检查副本数 - 如果指定了 desiredReplicas 注解但不为 0停止已在前面处理
if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas != "0" {
replicas, err := strconv.ParseInt(desiredReplicas, 10, 32)
if err == nil {
currentReplicas := int32(1) // 默认值
if statefulSetInNamespace.Spec.Replicas != nil {
currentReplicas = *statefulSetInNamespace.Spec.Replicas
}
if currentReplicas != int32(replicas) {
r32 := int32(replicas)
statefulSet.Spec.Replicas = &r32
needsUpdate = true
}
}
}
if needsUpdate {
if err := r.Update(ctx, statefulSet); err != nil {
return ctrl.Result{}, err
}
logger.Info("StatefulSet updated", "name", statefulSet.Name)
}
}
// 2.3 Service 处理
service := devcontainer_controller_utils.NewService(app)
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, service, r.Scheme); err != nil {
return ctrl.Result{}, err
}
serviceInCluster := &core_v1.Service{}
err = r.Get(ctx, types.NamespacedName{Name: app.Name, Namespace: app.Namespace}, serviceInCluster)
if err != nil {
if !errors.IsNotFound(err) {
return ctrl.Result{}, err
}
err = r.Create(ctx, service)
if err == nil {
// 创建 NodePort Service 成功只执行一次 ==> 将NodePort 端口分配信息更新到 app.Status
logger.Info("[DevStar][DevContainer] NodePort Assigned", "nodePortAssigned", service.Spec.Ports[0].NodePort)
// 设置主 SSH 端口的 NodePort
app.Status.NodePortAssigned = uint16(service.Spec.Ports[0].NodePort)
// 处理额外端口
extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{}
// 处理额外端口从第二个端口开始索引为1
// 因为第一个端口索引为0是 SSH 端口
for i := 1; i < len(service.Spec.Ports); i++ {
port := service.Spec.Ports[i]
// 查找对应的端口规格
var containerPort uint16 = 0
// 如果存在额外端口配置,尝试匹配
if app.Spec.Service.ExtraPorts != nil {
for _, ep := range app.Spec.Service.ExtraPorts {
if (ep.Name != "" && ep.Name == port.Name) ||
(uint16(port.Port) == ep.ServicePort) {
containerPort = ep.ContainerPort
break
}
}
}
// 如果没有找到匹配项,使用目标端口
if containerPort == 0 && port.TargetPort.IntVal > 0 {
containerPort = uint16(port.TargetPort.IntVal)
}
// 添加到额外端口列表
extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{
Name: port.Name,
ServicePort: uint16(port.Port),
ContainerPort: containerPort,
NodePort: uint16(port.NodePort),
})
logger.Info("[DevStar][DevContainer] Extra Port NodePort Assigned",
"name", port.Name,
"servicePort", port.Port,
"nodePort", port.NodePort)
}
// 更新 CRD 状态,包括额外端口
app.Status.ExtraPortsAssigned = extraPortsAssigned
if err := r.Status().Update(ctx, app); err != nil {
logger.Error(err, "Failed to update NodePorts of DevcontainerApp",
"nodePortAssigned", service.Spec.Ports[0].NodePort,
"extraPortsCount", len(extraPortsAssigned))
return ctrl.Result{}, err
}
} else if !errors.IsAlreadyExists(err) {
logger.Error(err, "Failed to create DevcontainerApp NodePort Service", "nodePortServiceName", service.Name)
return ctrl.Result{}, err
}
} else {
// Service 已存在,检查它的端口信息
// 检查是否需要更新状态
needStatusUpdate := false
// 如果主端口未记录,记录之
if app.Status.NodePortAssigned == 0 && len(serviceInCluster.Spec.Ports) > 0 {
app.Status.NodePortAssigned = uint16(serviceInCluster.Spec.Ports[0].NodePort)
needStatusUpdate = true
logger.Info("[DevStar][DevContainer] Found existing main NodePort",
"nodePort", serviceInCluster.Spec.Ports[0].NodePort)
}
// 处理额外端口
if len(serviceInCluster.Spec.Ports) > 1 {
// 如果额外端口状态为空,或者数量不匹配
if app.Status.ExtraPortsAssigned == nil ||
len(app.Status.ExtraPortsAssigned) != len(serviceInCluster.Spec.Ports)-1 {
extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{}
// 从索引 1 开始,跳过主端口
for i := 1; i < len(serviceInCluster.Spec.Ports); i++ {
port := serviceInCluster.Spec.Ports[i]
// 查找对应的端口规格
var containerPort uint16 = 0
// 如果存在额外端口配置,尝试匹配
if app.Spec.Service.ExtraPorts != nil {
for _, ep := range app.Spec.Service.ExtraPorts {
if (ep.Name != "" && ep.Name == port.Name) ||
(uint16(port.Port) == ep.ServicePort) {
containerPort = ep.ContainerPort
break
}
}
}
// 如果没有找到匹配项,使用目标端口
if containerPort == 0 && port.TargetPort.IntVal > 0 {
containerPort = uint16(port.TargetPort.IntVal)
}
// 添加到额外端口列表
extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{
Name: port.Name,
ServicePort: uint16(port.Port),
ContainerPort: containerPort,
NodePort: uint16(port.NodePort),
})
logger.Info("[DevStar][DevContainer] Found existing extra NodePort",
"name", port.Name,
"nodePort", port.NodePort)
}
// 更新额外端口状态
app.Status.ExtraPortsAssigned = extraPortsAssigned
needStatusUpdate = true
}
}
// 如果需要更新状态
if needStatusUpdate {
if err := r.Status().Update(ctx, app); err != nil {
logger.Error(err, "Failed to update NodePorts status for existing service")
return ctrl.Result{}, err
}
logger.Info("[DevStar][DevContainer] Updated NodePorts status for existing service",
"mainNodePort", app.Status.NodePortAssigned,
"extraPortsCount", len(app.Status.ExtraPortsAssigned))
}
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DevcontainerAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&devcontainer_v1.DevcontainerApp{}).
Owns(&apps_v1.StatefulSet{}).
Owns(&core_v1.Service{}).
Complete(r)
}