/* * Copyright (c) Mengning Software. 2025. All rights reserved. * Authors: DevStar Team, panshuxiao * Create: 2025-11-19 * Description: Reconciliation logic for Application CRDs. */ package application import ( "bytes" "context" "fmt" "strings" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1" application_controller_utils "code.gitea.io/gitea/modules/k8s/controller/application/utils" apps_v1 "k8s.io/api/apps/v1" core_v1 "k8s.io/api/core/v1" "google.golang.org/protobuf/types/known/durationpb" istioapinetworkingv1 "istio.io/api/networking/v1" istionetworkingv1 "istio.io/client-go/pkg/apis/networking/v1" ) // ApplicationReconciler reconciles a Application object type ApplicationReconciler struct { client.Client Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/status,verbs=get;update;patch // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch // +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create // Reconcile is part of the main kubernetes reconciliation loop func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) // 获取Application实例 app := &applicationv1.Application{} err := r.Get(ctx, req.NamespacedName, app) if err != nil { if errors.IsNotFound(err) { logger.Info("Application resource not found. Ignoring since object must be deleted") return ctrl.Result{}, nil } logger.Error(err, "Failed to get Application") return ctrl.Result{}, err } logger.Info("Processing Application", "name", app.Name, "namespace", app.Namespace, "type", app.Spec.Template.Type) // 确保命名空间存在 if err := r.ensureNamespace(ctx, app.Namespace); err != nil { logger.Error(err, "Failed to ensure namespace exists", "namespace", app.Namespace) return ctrl.Result{}, err } // 添加 finalizer 处理逻辑 finalizerName := "application.devstar.cn/finalizer" // 检查对象是否正在被删除 if !app.ObjectMeta.DeletionTimestamp.IsZero() { // 对象正在被删除 - 处理 finalizer if k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) { // 执行清理操作 logger.Info("Cleaning up resources before deletion", "name", app.Name) // 清理 Gateway 资源(包括 Secret) if err := r.cleanupGateway(ctx, app); err != nil { logger.Error(err, "Failed to cleanup gateway resources") return ctrl.Result{}, err } // 清理 Mesh 资源 if err := r.cleanupMesh(ctx, app); err != nil { logger.Error(err, "Failed to cleanup mesh resources") return ctrl.Result{}, err } // 清理完成后移除 finalizer k8s_sigs_controller_runtime_utils.RemoveFinalizer(app, finalizerName) if err := r.Update(ctx, app); err != nil { logger.Error(err, "Failed to remove finalizer") return ctrl.Result{}, err } } // 已标记为删除且处理完成,允许继续删除流程 return ctrl.Result{}, nil } // 如果对象不包含 finalizer,就添加它 if !k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) { logger.Info("Adding finalizer", "name", app.Name) k8s_sigs_controller_runtime_utils.AddFinalizer(app, finalizerName) if err := r.Update(ctx, app); err != nil { logger.Error(err, "Failed to add finalizer") return ctrl.Result{}, err } } // 根据应用类型协调相应的资源 if app.Spec.Template.Type == "stateful" { // 协调 StatefulSet if err := r.reconcileStatefulSet(ctx, app); err != nil { logger.Error(err, "Failed to reconcile StatefulSet") return ctrl.Result{}, err } } else { // 协调 Deployment(默认为无状态应用) if err := r.reconcileDeployment(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Deployment") return ctrl.Result{}, err } } // 协调 Service if err := r.reconcileService(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Service") return ctrl.Result{}, err } // 更新状态 if err := r.updateStatus(ctx, app); err != nil { logger.Error(err, "Failed to update status") return ctrl.Result{}, err } // 协调网络策略 if err := r.reconcileNetworkPolicy(ctx, app); err != nil { logger.Error(err, "Failed to reconcile network policy") return ctrl.Result{}, err } // 如果配置了旧版TrafficPolicy,为向后兼容处理旧的流量策略 if app.Spec.TrafficPolicy != nil { if err := r.reconcileIstioTraffic(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Istio traffic policy") return ctrl.Result{}, err } } logger.Info("Successfully reconciled Application", "name", app.Name) return ctrl.Result{RequeueAfter: time.Minute * 1}, nil } // 服务网格流量治理逻辑 func (r *ApplicationReconciler) reconcileIstioTraffic(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) if app.Spec.TrafficPolicy == nil { logger.Info("No legacy TrafficPolicy configured, skipping") return nil } // 如果已经有NetworkPolicy配置,使用它而不是旧版TrafficPolicy if app.Spec.NetworkPolicy != nil && ((app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled) || (app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled)) { logger.Info("NetworkPolicy already configured, skipping legacy TrafficPolicy") return nil } // 处理金丝雀发布 if app.Spec.TrafficPolicy.Canary != nil && app.Spec.TrafficPolicy.Canary.Enabled { logger.Info("Processing legacy Canary traffic configuration") // 这里实现金丝雀发布的逻辑 // 为简化实现,可以创建一个VirtualService来处理金丝雀流量 vsName := app.Name + "-canary-vs" vs := &istionetworkingv1.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: vsName, Namespace: app.Namespace, }, } // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping legacy VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // 构建金丝雀服务名 canaryService := app.Name + "-" + app.Spec.TrafficPolicy.Canary.CanaryVersion + "-svc" // 创建或更新VirtualService op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error { // 设置控制器引用 if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil { return err } // 配置金丝雀VirtualService mainWeight := int32(app.Spec.TrafficPolicy.Canary.MainWeight) canaryWeight := int32(100 - app.Spec.TrafficPolicy.Canary.MainWeight) vs.Spec.Hosts = []string{service.Name} vs.Spec.Gateways = []string{"mesh"} // 默认只在网格内生效 vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{ { Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, Weight: mainWeight, }, { Destination: &istioapinetworkingv1.Destination{ Host: canaryService, }, Weight: canaryWeight, }, }, }, } return nil }) if err != nil { return fmt.Errorf("failed to create or update legacy Canary VirtualService: %w", err) } logger.Info("Legacy Canary VirtualService reconciled", "name", vsName, "operation", op) } // 如果配置了熔断器,确保创建DestinationRule if app.Spec.TrafficPolicy.CircuitBreaker != nil { logger.Info("Processing legacy CircuitBreaker configuration") // reconcileDestinationRule函数已经支持从TrafficPolicy获取熔断器配置 if err := r.reconcileDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to reconcile legacy DestinationRule: %w", err) } } return nil } // 修复 reconcileDeployment 函数中的调用 func (r *ApplicationReconciler) reconcileDeployment(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查 Deployment 是否存在 deployment := &apps_v1.Deployment{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) if errors.IsNotFound(err) { // 创建新的 Deployment logger.Info("Creating new Deployment", "name", app.Name) newDeployment, err := application_controller_utils.NewDeployment(app) if err != nil { return fmt.Errorf("failed to generate deployment: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDeployment, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newDeployment); err != nil { return fmt.Errorf("failed to create deployment: %w", err) } logger.Info("Successfully created Deployment", "name", app.Name) return nil } else if err != nil { return fmt.Errorf("failed to get deployment: %w", err) } // 获取期望的副本数 desiredReplicas := int32(1) // 默认值 if app.Spec.Replicas != nil { desiredReplicas = *app.Spec.Replicas } // 获取当前的副本数 currentReplicas := int32(1) // 默认值 if deployment.Spec.Replicas != nil { currentReplicas = *deployment.Spec.Replicas } logger.Info("Deployment replica status", "name", app.Name, "current-spec-replicas", currentReplicas, "desired-replicas", desiredReplicas, "actual-replicas", deployment.Status.Replicas, "ready-replicas", deployment.Status.ReadyReplicas) // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 1. 检查副本数是否变更 if currentReplicas != desiredReplicas { logger.Info("Replica count changed", "current", currentReplicas, "desired", desiredReplicas) needsUpdate = true updateFields["replicas"] = desiredReplicas deployment.Spec.Replicas = &desiredReplicas } // 2. 生成期望的 Deployment 来比较其他字段 updatedDeployment, err := application_controller_utils.NewDeployment(app) if err != nil { return fmt.Errorf("failed to generate updated deployment: %w", err) } // 3. 检查镜像是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { currentImage := deployment.Spec.Template.Spec.Containers[0].Image desiredImage := updatedDeployment.Spec.Template.Spec.Containers[0].Image if currentImage != desiredImage { logger.Info("Image changed", "current", currentImage, "desired", desiredImage) needsUpdate = true updateFields["image"] = desiredImage deployment.Spec.Template.Spec.Containers[0].Image = desiredImage } } // 4. 检查环境变量是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalEnvVars(deployment.Spec.Template.Spec.Containers[0].Env, updatedDeployment.Spec.Template.Spec.Containers[0].Env) { logger.Info("Environment variables changed") needsUpdate = true updateFields["env"] = "changed" deployment.Spec.Template.Spec.Containers[0].Env = updatedDeployment.Spec.Template.Spec.Containers[0].Env } } // 5. 检查资源配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalResources(deployment.Spec.Template.Spec.Containers[0].Resources, updatedDeployment.Spec.Template.Spec.Containers[0].Resources) { logger.Info("Resource requirements changed") needsUpdate = true updateFields["resources"] = "changed" deployment.Spec.Template.Spec.Containers[0].Resources = updatedDeployment.Spec.Template.Spec.Containers[0].Resources } } // 6. 检查端口配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalPorts(deployment.Spec.Template.Spec.Containers[0].Ports, updatedDeployment.Spec.Template.Spec.Containers[0].Ports) { logger.Info("Container ports changed") needsUpdate = true updateFields["ports"] = "changed" deployment.Spec.Template.Spec.Containers[0].Ports = updatedDeployment.Spec.Template.Spec.Containers[0].Ports } } // 7. 检查健康检查配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalProbes(deployment.Spec.Template.Spec.Containers[0].LivenessProbe, updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe) || !equalProbes(deployment.Spec.Template.Spec.Containers[0].ReadinessProbe, updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe) { logger.Info("Health check probes changed") needsUpdate = true updateFields["probes"] = "changed" deployment.Spec.Template.Spec.Containers[0].LivenessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe deployment.Spec.Template.Spec.Containers[0].ReadinessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe } } // 执行更新 if needsUpdate { logger.Info("Updating deployment", "name", app.Name, "fields", updateFields) if err := r.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update deployment: %w", err) } logger.Info("Deployment updated successfully", "name", app.Name, "fields", updateFields) } else { logger.Info("No deployment updates needed", "name", app.Name) } return nil } // 添加辅助函数:比较端口配置 func equalPorts(current, desired []core_v1.ContainerPort) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]core_v1.ContainerPort) for _, port := range current { currentMap[port.Name] = port } for _, port := range desired { if currentPort, exists := currentMap[port.Name]; !exists || currentPort.ContainerPort != port.ContainerPort || currentPort.Protocol != port.Protocol { return false } } return true } // 添加辅助函数:比较探针配置 func equalProbes(current, desired *core_v1.Probe) bool { if current == nil && desired == nil { return true } if current == nil || desired == nil { return false } // 简单比较,可以根据需要扩展 if current.InitialDelaySeconds != desired.InitialDelaySeconds || current.PeriodSeconds != desired.PeriodSeconds || current.TimeoutSeconds != desired.TimeoutSeconds || current.SuccessThreshold != desired.SuccessThreshold || current.FailureThreshold != desired.FailureThreshold { return false } // 比较 HTTPGet 配置 if current.HTTPGet != nil && desired.HTTPGet != nil { return current.HTTPGet.Path == desired.HTTPGet.Path && current.HTTPGet.Port == desired.HTTPGet.Port } return current.HTTPGet == nil && desired.HTTPGet == nil } // 添加辅助函数:比较环境变量 func equalEnvVars(current, desired []core_v1.EnvVar) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]string) for _, env := range current { currentMap[env.Name] = env.Value } desiredMap := make(map[string]string) for _, env := range desired { desiredMap[env.Name] = env.Value } // 检查每个期望的环境变量是否匹配 for key, value := range desiredMap { if currentMap[key] != value { return false } } // 检查是否有多余的环境变量 for key := range currentMap { if _, exists := desiredMap[key]; !exists { return false } } return true } // 添加辅助函数:比较资源配置 func equalResources(current, desired core_v1.ResourceRequirements) bool { // 比较 Limits if current.Limits == nil && desired.Limits != nil { return false } if current.Limits != nil && desired.Limits == nil { return false } if current.Limits != nil && desired.Limits != nil { // 比较 CPU currentCPU := current.Limits.Cpu() desiredCPU := desired.Limits.Cpu() if currentCPU != nil && desiredCPU != nil { if !currentCPU.Equal(*desiredCPU) { return false } } else if currentCPU != desiredCPU { // 一个为 nil,另一个不为 nil return false } // 比较 Memory currentMemory := current.Limits.Memory() desiredMemory := desired.Limits.Memory() if currentMemory != nil && desiredMemory != nil { if !currentMemory.Equal(*desiredMemory) { return false } } else if currentMemory != desiredMemory { // 一个为 nil,另一个不为 nil return false } } // 比较 Requests if current.Requests == nil && desired.Requests != nil { return false } if current.Requests != nil && desired.Requests == nil { return false } if current.Requests != nil && desired.Requests != nil { // 比较 CPU currentCPU := current.Requests.Cpu() desiredCPU := desired.Requests.Cpu() if currentCPU != nil && desiredCPU != nil { if !currentCPU.Equal(*desiredCPU) { return false } } else if currentCPU != desiredCPU { // 一个为 nil,另一个不为 nil return false } // 比较 Memory currentMemory := current.Requests.Memory() desiredMemory := desired.Requests.Memory() if currentMemory != nil && desiredMemory != nil { if !currentMemory.Equal(*desiredMemory) { return false } } else if currentMemory != desiredMemory { // 一个为 nil,另一个不为 nil return false } } return true } func (r *ApplicationReconciler) reconcileStatefulSet(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查 StatefulSet 是否存在 statefulSet := &apps_v1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) if errors.IsNotFound(err) { // 创建新的 StatefulSet logger.Info("Creating new StatefulSet", "name", app.Name) newStatefulSet, err := application_controller_utils.NewStatefulSet(app) if err != nil { return fmt.Errorf("failed to generate statefulset: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newStatefulSet, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newStatefulSet); err != nil { return fmt.Errorf("failed to create statefulset: %w", err) } logger.Info("Successfully created StatefulSet", "name", app.Name) return nil } else if err != nil { return fmt.Errorf("failed to get statefulset: %w", err) } // 更新现有 StatefulSet logger.Info("Checking StatefulSet for updates", "name", app.Name) updatedStatefulSet, err := application_controller_utils.NewStatefulSet(app) if err != nil { return fmt.Errorf("failed to generate updated statefulset: %w", err) } // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 检查镜像是否变更 if len(statefulSet.Spec.Template.Spec.Containers) > 0 && len(updatedStatefulSet.Spec.Template.Spec.Containers) > 0 { currentImage := statefulSet.Spec.Template.Spec.Containers[0].Image desiredImage := updatedStatefulSet.Spec.Template.Spec.Containers[0].Image if currentImage != desiredImage { logger.Info("StatefulSet image changed", "current", currentImage, "desired", desiredImage) needsUpdate = true updateFields["image"] = desiredImage } } // 检查副本数是否变更 currentReplicas := int32(1) if statefulSet.Spec.Replicas != nil { currentReplicas = *statefulSet.Spec.Replicas } desiredReplicas := int32(1) if updatedStatefulSet.Spec.Replicas != nil { desiredReplicas = *updatedStatefulSet.Spec.Replicas } if currentReplicas != desiredReplicas { logger.Info("StatefulSet replica count changed", "current", currentReplicas, "desired", desiredReplicas) needsUpdate = true updateFields["replicas"] = desiredReplicas } if needsUpdate { logger.Info("Updating StatefulSet", "name", app.Name, "fields", updateFields) statefulSet.Spec = updatedStatefulSet.Spec if err := r.Update(ctx, statefulSet); err != nil { return fmt.Errorf("failed to update statefulset: %w", err) } logger.Info("StatefulSet updated successfully", "name", app.Name) } else { logger.Info("No StatefulSet updates needed", "name", app.Name) } return nil } // 添加辅助函数:比较字符串映射 func equalStringMaps(current, desired map[string]string) bool { if len(current) != len(desired) { return false } for k, v := range desired { if current[k] != v { return false } } return true } // equalIngressTLS函数已废弃 func (r *ApplicationReconciler) updateStatus(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) var deployment *apps_v1.Deployment var statefulSet *apps_v1.StatefulSet var err error // 根据应用类型获取对应的资源状态 if app.Spec.Template.Type == "stateful" { statefulSet = &apps_v1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) } else { deployment = &apps_v1.Deployment{} err = r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) } if err != nil { app.Status.Phase = "Failed" app.Status.Message = fmt.Sprintf("Workload not found: %v", err) logger.Error(err, "Failed to get workload for status update") } else { // 根据工作负载类型更新状态 if statefulSet != nil { app.Status.Replicas = statefulSet.Status.Replicas app.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas } else if deployment != nil { app.Status.Replicas = deployment.Status.Replicas app.Status.ReadyReplicas = deployment.Status.ReadyReplicas } // 状态判断逻辑 if app.Status.ReadyReplicas == 0 { app.Status.Phase = "Pending" app.Status.Message = "Application is starting" } else if app.Status.ReadyReplicas < app.Status.Replicas { app.Status.Phase = "Scaling" app.Status.Message = fmt.Sprintf("Ready: %d/%d", app.Status.ReadyReplicas, app.Status.Replicas) } else { app.Status.Phase = "Running" app.Status.Message = "Application is healthy" } } app.Status.LastUpdated = metav1.Now() if err := r.Status().Update(ctx, app); err != nil { return fmt.Errorf("failed to update status: %w", err) } logger.Info("Updated Application status", "phase", app.Status.Phase, "message", app.Status.Message) return nil } // 协调网络策略 func (r *ApplicationReconciler) reconcileNetworkPolicy(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 如果没有配置网络策略,跳过处理 if app.Spec.NetworkPolicy == nil { logger.Info("No network policy configured, skipping", "name", app.Name) return nil } // 协调Gateway(南北向流量) if app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled { if err := r.reconcileGateway(ctx, app); err != nil { return fmt.Errorf("failed to reconcile gateway: %w", err) } } else { // 清理不再需要的Gateway资源 if err := r.cleanupGateway(ctx, app); err != nil { return fmt.Errorf("failed to cleanup gateway: %w", err) } } // 协调Mesh(东西向流量) if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled { if err := r.reconcileMesh(ctx, app); err != nil { return fmt.Errorf("failed to reconcile mesh: %w", err) } } else { // 清理不再需要的Mesh资源 if err := r.cleanupMesh(ctx, app); err != nil { return fmt.Errorf("failed to cleanup mesh: %w", err) } } return nil } // cleanupGateway 清理不再需要的Gateway资源 func (r *ApplicationReconciler) cleanupGateway(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) gatewayName := app.Name + "-gateway" vsName := app.Name + "-gateway-vs" targetNamespaces := map[string]struct{}{app.Namespace: {}} if ns, err := r.determineGatewayNamespace(ctx, app); err == nil { targetNamespaces[ns] = struct{}{} } else { logger.Error(err, "Failed to determine gateway namespace during cleanup, fallback to application namespace") } for ns := range targetNamespaces { gateway := &istionetworkingv1.Gateway{} err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: ns}, gateway) if err == nil { logger.Info("Cleaning up Gateway that is no longer needed", "name", gatewayName, "namespace", ns) if err := r.Delete(ctx, gateway); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Gateway %s/%s: %w", ns, gatewayName, err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Gateway %s/%s: %w", ns, gatewayName, err) } } // 清理VirtualService vs := &istionetworkingv1.VirtualService{} err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs) if err == nil { logger.Info("Cleaning up Gateway VirtualService that is no longer needed", "name", vsName) if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Gateway VirtualService: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Gateway VirtualService: %w", err) } // 清理 TLS Secret(如果是由控制器创建的) if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil { for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS { if strings.EqualFold(tls.Mode, "PASSTHROUGH") { continue } if tls.SecretName == "" { continue } // 确定 Secret 的命名空间 secretNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace) if err != nil { logger.Error(err, "Failed to determine secret namespace for cleanup, skipping", "secretName", tls.SecretName) continue } // 检查 Secret 是否存在,并且有我们的标签(说明是我们创建的) secret := &core_v1.Secret{} err = r.Get(ctx, types.NamespacedName{Name: tls.SecretName, Namespace: secretNS}, secret) if err == nil { // 检查标签,确认是我们创建的 if secret.Labels != nil && secret.Labels["app.k8s.devstar/name"] == app.Name && secret.Labels["app.k8s.devstar/type"] == "gateway-tls" { logger.Info("Cleaning up Gateway TLS Secret", "name", tls.SecretName, "namespace", secretNS) if err := r.Delete(ctx, secret); err != nil && !errors.IsNotFound(err) { logger.Error(err, "Failed to delete Gateway TLS Secret", "name", tls.SecretName, "namespace", secretNS) // 不返回错误,继续清理其他资源 } } } else if !errors.IsNotFound(err) { logger.Error(err, "Failed to get Gateway TLS Secret for cleanup", "name", tls.SecretName, "namespace", secretNS) } } } return nil } // cleanupMesh 清理不再需要的Mesh资源 func (r *ApplicationReconciler) cleanupMesh(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) vsName := app.Name + "-mesh-vs" drName := app.Name + "-dr" // 清理VirtualService vs := &istionetworkingv1.VirtualService{} err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs) if err == nil { logger.Info("Cleaning up Mesh VirtualService that is no longer needed", "name", vsName) if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Mesh VirtualService: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Mesh VirtualService: %w", err) } // 清理DestinationRule dr := &istionetworkingv1.DestinationRule{} err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr) if err == nil { logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName) if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete DestinationRule: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get DestinationRule: %w", err) } return nil } // ensureNamespace 确保命名空间存在,如果不存在则创建 func (r *ApplicationReconciler) ensureNamespace(ctx context.Context, namespace string) error { logger := log.FromContext(ctx) // 跳过默认命名空间(这些命名空间通常由系统管理) if namespace == "default" || namespace == "kube-system" || namespace == "kube-public" { return nil } // 检查命名空间是否存在 ns := &core_v1.Namespace{} err := r.Get(ctx, types.NamespacedName{Name: namespace}, ns) if err == nil { // 命名空间已存在 return nil } if !errors.IsNotFound(err) { // 获取命名空间时发生其他错误 return fmt.Errorf("failed to get namespace %s: %w", namespace, err) } // 命名空间不存在,创建它 logger.Info("Creating namespace", "namespace", namespace) newNS := &core_v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, }, } if err := r.Create(ctx, newNS); err != nil { return fmt.Errorf("failed to create namespace %s: %w", namespace, err) } logger.Info("Successfully created namespace", "namespace", namespace) return nil } // 为了在SetupWithManager中注册Istio资源监控 func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&applicationv1.Application{}). Owns(&apps_v1.Deployment{}). Owns(&apps_v1.StatefulSet{}). Owns(&core_v1.Service{}). // 添加对Istio资源的监控 Owns(&istionetworkingv1.VirtualService{}). Owns(&istionetworkingv1.Gateway{}). Owns(&istionetworkingv1.DestinationRule{}). Complete(r) } func (r *ApplicationReconciler) reconcileService(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) serviceName := app.Name + "-svc" // 检查是否需要创建 Service shouldCreate := false if app.Spec.Service != nil { shouldCreate = app.Spec.Service.Enabled } else { // 向后兼容:使用旧的 expose 配置 shouldCreate = app.Spec.Expose && len(app.Spec.Template.Ports) > 0 } // 获取现有 Service service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{ Name: serviceName, Namespace: app.Namespace, }, service) serviceExists := !errors.IsNotFound(err) if !shouldCreate { // 如果不需要 Service 但存在,则删除 if serviceExists { logger.Info("Deleting existing Service as it's disabled", "name", serviceName) if err := r.Delete(ctx, service); err != nil { return fmt.Errorf("failed to delete service: %w", err) } } return nil } // 需要创建 Service if !serviceExists { // Service 不存在,创建新的 logger.Info("Creating new Service", "name", serviceName) newService, err := application_controller_utils.NewService(app) if err != nil { return fmt.Errorf("failed to generate service: %w", err) } if newService == nil { logger.Info("Service creation skipped", "name", serviceName) return nil } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newService, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newService); err != nil { return fmt.Errorf("failed to create service: %w", err) } logger.Info("Successfully created Service", "name", serviceName, "type", newService.Spec.Type, "ports", len(newService.Spec.Ports)) return nil } // Service 存在,检查是否需要更新 if err != nil { return fmt.Errorf("failed to get service: %w", err) } logger.Info("Checking Service for updates", "name", serviceName) updatedService, err := application_controller_utils.NewService(app) if err != nil { return fmt.Errorf("failed to generate updated service: %w", err) } if updatedService == nil { logger.Info("Service should be deleted", "name", serviceName) if err := r.Delete(ctx, service); err != nil { return fmt.Errorf("failed to delete service: %w", err) } return nil } // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 检查服务类型 if service.Spec.Type != updatedService.Spec.Type { logger.Info("Service type changed", "current", service.Spec.Type, "desired", updatedService.Spec.Type) needsUpdate = true updateFields["type"] = updatedService.Spec.Type service.Spec.Type = updatedService.Spec.Type } // 检查端口配置 if !equalServicePorts(service.Spec.Ports, updatedService.Spec.Ports) { logger.Info("Service ports changed") needsUpdate = true updateFields["ports"] = "changed" service.Spec.Ports = updatedService.Spec.Ports } // 检查 LoadBalancer 配置 if service.Spec.LoadBalancerIP != updatedService.Spec.LoadBalancerIP { needsUpdate = true updateFields["loadBalancerIP"] = updatedService.Spec.LoadBalancerIP service.Spec.LoadBalancerIP = updatedService.Spec.LoadBalancerIP } if !equalStringSlices(service.Spec.LoadBalancerSourceRanges, updatedService.Spec.LoadBalancerSourceRanges) { needsUpdate = true updateFields["loadBalancerSourceRanges"] = "changed" service.Spec.LoadBalancerSourceRanges = updatedService.Spec.LoadBalancerSourceRanges } // 检查 ExternalName if service.Spec.ExternalName != updatedService.Spec.ExternalName { needsUpdate = true updateFields["externalName"] = updatedService.Spec.ExternalName service.Spec.ExternalName = updatedService.Spec.ExternalName } // 检查会话亲和性 if service.Spec.SessionAffinity != updatedService.Spec.SessionAffinity { needsUpdate = true updateFields["sessionAffinity"] = updatedService.Spec.SessionAffinity service.Spec.SessionAffinity = updatedService.Spec.SessionAffinity } // 更新标签 if !equalStringMaps(service.Labels, updatedService.Labels) { needsUpdate = true updateFields["labels"] = "changed" if service.Labels == nil { service.Labels = make(map[string]string) } for k, v := range updatedService.Labels { service.Labels[k] = v } } // 更新注解 if !equalStringMaps(service.Annotations, updatedService.Annotations) { needsUpdate = true updateFields["annotations"] = "changed" if service.Annotations == nil { service.Annotations = make(map[string]string) } for k, v := range updatedService.Annotations { service.Annotations[k] = v } } if needsUpdate { logger.Info("Updating Service", "name", serviceName, "fields", updateFields) if err := r.Update(ctx, service); err != nil { return fmt.Errorf("failed to update service: %w", err) } logger.Info("Service updated successfully", "name", serviceName) } else { logger.Info("No service updates needed", "name", serviceName) } return nil } // 添加辅助函数:比较服务端口 func equalServicePorts(current, desired []core_v1.ServicePort) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]core_v1.ServicePort) for _, port := range current { currentMap[port.Name] = port } for _, port := range desired { if currentPort, exists := currentMap[port.Name]; !exists || currentPort.Port != port.Port || currentPort.TargetPort != port.TargetPort || currentPort.Protocol != port.Protocol || currentPort.NodePort != port.NodePort { return false } } return true } // 添加辅助函数:比较字符串切片 func equalStringSlices(current, desired []string) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]bool) for _, item := range current { currentMap[item] = true } for _, item := range desired { if !currentMap[item] { return false } } return true } // reconcileGateway 处理Gateway资源 func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) gatewayName := app.Name + "-gateway" // 在创建/更新 Gateway 前,确保 TLS Secret if err := r.reconcileGatewayTLSSecret(ctx, app); err != nil { return fmt.Errorf("failed to reconcile gateway TLS secret: %w", err) } gatewayNamespace, err := r.determineGatewayNamespace(ctx, app) if err != nil { return fmt.Errorf("failed to determine gateway namespace: %w", err) } // 删除遗留在应用命名空间的 Gateway(兼容旧版本) if gatewayNamespace != app.Namespace { legacyGateway := &istionetworkingv1.Gateway{} legacyKey := types.NamespacedName{Name: gatewayName, Namespace: app.Namespace} if legacyErr := r.Get(ctx, legacyKey, legacyGateway); legacyErr == nil { logger.Info("Deleting legacy Gateway from application namespace", "name", gatewayName) if err := r.Delete(ctx, legacyGateway); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete legacy gateway: %w", err) } } } // Get + Create/Update 以规避 CreateOrUpdate 对 protobuf 类型的反射补丁 existingGateway := &istionetworkingv1.Gateway{} gatewayKey := types.NamespacedName{Name: gatewayName, Namespace: gatewayNamespace} err = r.Get(ctx, gatewayKey, existingGateway) if errors.IsNotFound(err) { logger.Info("Creating new Gateway", "name", gatewayName) newGateway := &istionetworkingv1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: gatewayName, Namespace: gatewayNamespace, }, } ensureGatewayLabels(newGateway, app) if err := r.configureGateway(ctx, newGateway, app); err != nil { return fmt.Errorf("failed to configure Gateway: %w", err) } if gatewayNamespace == app.Namespace { if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newGateway, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } } if err := r.Create(ctx, newGateway); err != nil { return fmt.Errorf("failed to create Gateway: %w", err) } logger.Info("Gateway created", "name", gatewayName, "namespace", gatewayNamespace) } else if err != nil { return fmt.Errorf("failed to get Gateway: %w", err) } else { ensureGatewayLabels(existingGateway, app) if err := r.configureGateway(ctx, existingGateway, app); err != nil { return fmt.Errorf("failed to configure Gateway: %w", err) } if err := r.Update(ctx, existingGateway); err != nil { return fmt.Errorf("failed to update Gateway: %w", err) } logger.Info("Gateway updated", "name", gatewayName, "namespace", gatewayNamespace) } // 协调与Gateway关联的VirtualService if err := r.reconcileGatewayVirtualService(ctx, app, gatewayNamespace); err != nil { return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err) } return nil } // configureGateway 配置Gateway资源 func (r *ApplicationReconciler) configureGateway(ctx context.Context, gateway *istionetworkingv1.Gateway, app *applicationv1.Application) error { // 设置Gateway选择器 gateway.Spec.Selector = map[string]string{ "istio": "ingressgateway", } // 清空服务器列表,准备重新添加 gateway.Spec.Servers = []*istioapinetworkingv1.Server{} // 如果未配置端口,则添加默认HTTP端口 if len(app.Spec.NetworkPolicy.Gateway.Ports) == 0 { gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: 80, Protocol: "HTTP", Name: "http", }, Hosts: getHosts(app), }) } else { // 添加所有配置的端口 for _, port := range app.Spec.NetworkPolicy.Gateway.Ports { server := &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: uint32(port.Number), Protocol: port.Protocol, Name: port.Name, }, Hosts: getHosts(app), } gateway.Spec.Servers = append(gateway.Spec.Servers, server) } } // 配置TLS if len(app.Spec.NetworkPolicy.Gateway.TLS) > 0 { for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS { // 查找相应协议的服务器 var serverIndex = -1 for j, server := range gateway.Spec.Servers { if server.Port.Protocol == "HTTPS" || server.Port.Protocol == "TLS" { serverIndex = j break } } // 如果没有找到HTTPS/TLS服务器,创建一个 if serverIndex == -1 { gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: 443, Protocol: "HTTPS", Name: "https", }, Hosts: getHosts(app), }) serverIndex = len(gateway.Spec.Servers) - 1 } // 配置TLS设置 server := gateway.Spec.Servers[serverIndex] // 确定 Secret 的命名空间 secretNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace) if err != nil { return fmt.Errorf("failed to determine secret namespace for TLS: %w", err) } // 如果 Secret 不在 Gateway 所在命名空间,使用 namespace/secretName 格式 credentialName := tls.SecretName if secretNS != gateway.Namespace { credentialName = secretNS + "/" + tls.SecretName } server.Tls = &istioapinetworkingv1.ServerTLSSettings{ Mode: getIstioTLSMode(tls.Mode), CredentialName: credentialName, } // 设置最小TLS版本 if tls.MinProtocolVersion != "" { server.Tls.MinProtocolVersion = getIstioTLSVersion(tls.MinProtocolVersion) } // 如果指定了特定主机,覆盖默认主机 if len(tls.Hosts) > 0 { server.Hosts = tls.Hosts } } } return nil } // 辅助函数 func getHosts(app *applicationv1.Application) []string { // 如果NetworkPolicy.Gateway.Hosts存在,使用它 if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil && len(app.Spec.NetworkPolicy.Gateway.Hosts) > 0 { return app.Spec.NetworkPolicy.Gateway.Hosts } // 不再检查旧的Ingress配置 // 使用通配符 return []string{"*"} } func getIstioTLSMode(mode string) istioapinetworkingv1.ServerTLSSettings_TLSmode { switch mode { case "MUTUAL": return istioapinetworkingv1.ServerTLSSettings_MUTUAL case "PASSTHROUGH": return istioapinetworkingv1.ServerTLSSettings_PASSTHROUGH default: return istioapinetworkingv1.ServerTLSSettings_SIMPLE } } func getIstioTLSVersion(version string) istioapinetworkingv1.ServerTLSSettings_TLSProtocol { switch version { case "TLSv1_0": return istioapinetworkingv1.ServerTLSSettings_TLSV1_0 case "TLSv1_1": return istioapinetworkingv1.ServerTLSSettings_TLSV1_1 case "TLSv1_3": return istioapinetworkingv1.ServerTLSSettings_TLSV1_3 default: return istioapinetworkingv1.ServerTLSSettings_TLSV1_2 } } // detectIngressNamespace 自动探测 IngressGateway 所在命名空间 // 优先级:CRD 指定 -> 根据 selector {istio=ingressgateway} 查找 Service func (r *ApplicationReconciler) detectIngressNamespace(ctx context.Context, explicit string) (string, error) { if explicit != "" { return explicit, nil } // 优先找名为 istio-ingressgateway 的 Service svcList := &core_v1.ServiceList{} if err := r.List(ctx, svcList, client.MatchingLabels{"istio": "ingressgateway"}); err != nil { return "", fmt.Errorf("list ingressgateway services failed: %w", err) } var fallback string for _, svc := range svcList.Items { if svc.Name == "istio-ingressgateway" { return svc.Namespace, nil } if fallback == "" { fallback = svc.Namespace } } if fallback != "" { return fallback, nil } return "", fmt.Errorf("cannot detect ingress gateway namespace; set tls.secretNamespace or ISTIO_INGRESS_NAMESPACE") } // determineGatewayNamespace 决定 Gateway 应创建到的命名空间 func (r *ApplicationReconciler) determineGatewayNamespace(ctx context.Context, app *applicationv1.Application) (string, error) { var explicit string if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil { for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS { if tls.SecretNamespace != "" { explicit = tls.SecretNamespace break } } } return r.detectIngressNamespace(ctx, explicit) } func ensureGatewayLabels(gateway *istionetworkingv1.Gateway, app *applicationv1.Application) { if gateway.Labels == nil { gateway.Labels = make(map[string]string) } gateway.Labels["app.k8s.devstar/name"] = app.Name gateway.Labels["app.k8s.devstar/namespace"] = app.Namespace } // reconcileGatewayTLSSecret 确保 Gateway TLS 所需的 Secret 存在/最新 func (r *ApplicationReconciler) reconcileGatewayTLSSecret(ctx context.Context, app *applicationv1.Application) error { if app.Spec.NetworkPolicy == nil || app.Spec.NetworkPolicy.Gateway == nil { return nil } tlsList := app.Spec.NetworkPolicy.Gateway.TLS if len(tlsList) == 0 { return nil } for _, tls := range tlsList { if strings.EqualFold(tls.Mode, "PASSTHROUGH") { continue } secretName := tls.SecretName if secretName == "" { return fmt.Errorf("gateway.tls.secretName is required when TLS mode is %s", tls.Mode) } // 自动确定目标命名空间 targetNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace) if err != nil { return err } hasInline := tls.Certificate != "" && tls.PrivateKey != "" existing := &core_v1.Secret{} getErr := r.Get(ctx, types.NamespacedName{Namespace: targetNS, Name: secretName}, existing) if errors.IsNotFound(getErr) { if !hasInline { return fmt.Errorf("secret %s/%s not found; provide certificate/privateKey or create it manually", targetNS, secretName) } // 规范化证书和私钥格式(确保是有效的 PEM 格式) certPEM := normalizePEM(tls.Certificate, "CERTIFICATE") keyPEM := normalizePEM(tls.PrivateKey, "PRIVATE KEY") newSec := &core_v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, Namespace: targetNS, Labels: map[string]string{ "app.k8s.devstar/name": app.Name, "app.k8s.devstar/type": "gateway-tls", }, }, Type: core_v1.SecretTypeTLS, Data: map[string][]byte{ core_v1.TLSCertKey: []byte(certPEM), core_v1.TLSPrivateKeyKey: []byte(keyPEM), }, } if err := r.Create(ctx, newSec); err != nil { return fmt.Errorf("create tls secret %s/%s failed: %w", targetNS, secretName, err) } continue } else if getErr != nil { return fmt.Errorf("get secret %s/%s failed: %w", targetNS, secretName, getErr) } if hasInline { // 规范化证书和私钥格式 certPEM := normalizePEM(tls.Certificate, "CERTIFICATE") keyPEM := normalizePEM(tls.PrivateKey, "PRIVATE KEY") if existing.Type != core_v1.SecretTypeTLS || !bytes.Equal(existing.Data[core_v1.TLSCertKey], []byte(certPEM)) || !bytes.Equal(existing.Data[core_v1.TLSPrivateKeyKey], []byte(keyPEM)) { if existing.Data == nil { existing.Data = map[string][]byte{} } existing.Type = core_v1.SecretTypeTLS existing.Data[core_v1.TLSCertKey] = []byte(certPEM) existing.Data[core_v1.TLSPrivateKeyKey] = []byte(keyPEM) if existing.Labels == nil { existing.Labels = map[string]string{} } existing.Labels["app.k8s.devstar/name"] = app.Name existing.Labels["app.k8s.devstar/type"] = "gateway-tls" if err := r.Update(ctx, existing); err != nil { return fmt.Errorf("update tls secret %s/%s failed: %w", targetNS, secretName, err) } } } } return nil } // detectPrivateKeyType 检测私钥的原始格式类型 // 返回 "RSA PRIVATE KEY" 或 "PRIVATE KEY" func detectPrivateKeyType(content string) string { contentUpper := strings.ToUpper(content) if strings.Contains(contentUpper, "-----BEGIN RSA PRIVATE KEY-----") { return "RSA PRIVATE KEY" } if strings.Contains(contentUpper, "-----BEGIN PRIVATE KEY-----") { return "PRIVATE KEY" } if strings.Contains(contentUpper, "-----BEGIN EC PRIVATE KEY-----") { return "EC PRIVATE KEY" } // 默认返回 PRIVATE KEY return "PRIVATE KEY" } // normalizePEM 规范化 PEM 格式的证书或私钥 // 支持两种输入格式: // 1. 只包含 base64 内容(没有 BEGIN/END):自动添加标记并格式化 // 2. 完整 PEM 格式(包含 BEGIN/END):提取内容并重新格式化为标准格式 // 对于私钥,如果 pemType 是 "PRIVATE KEY",会自动检测原始格式(RSA PRIVATE KEY 或 PRIVATE KEY)并保持 func normalizePEM(content, pemType string) string { content = strings.TrimSpace(content) if content == "" { return "" } // 对于私钥,如果指定为 "PRIVATE KEY",自动检测原始格式 if pemType == "PRIVATE KEY" && strings.Contains(strings.ToUpper(content), "-----BEGIN") { detectedType := detectPrivateKeyType(content) if detectedType != "PRIVATE KEY" { // 使用检测到的原始格式 pemType = detectedType } } // 如果已经包含 BEGIN 标记,需要提取 BEGIN 和 END 之间的内容 if strings.Contains(content, "-----BEGIN") { // 特别处理证书链:保留所有 CERTIFICATE 分段 if pemType == "CERTIFICATE" { var blocks []string search := content for { beginIdx := strings.Index(search, "-----BEGIN CERTIFICATE-----") if beginIdx == -1 { break } searchFrom := search[beginIdx+len("-----BEGIN CERTIFICATE-----"):] endMarker := "-----END CERTIFICATE-----" endIdx := strings.Index(searchFrom, endMarker) if endIdx == -1 { // 没有匹配的 END,使用 BEGIN 之后的所有内容作为最后一段 body := searchFrom blocks = append(blocks, body) break } body := searchFrom[:endIdx] blocks = append(blocks, body) // 继续在 END 之后搜索下一段 nextStart := endIdx + len(endMarker) if nextStart >= len(searchFrom) { break } search = searchFrom[nextStart:] } if len(blocks) > 0 { return buildPEMChainFromContent(blocks, "CERTIFICATE") } // 如果没有成功解析到分段,退化为单段处理 } // 非证书链,按单段处理 // 对于私钥,需要匹配对应的 BEGIN/END 标记 if pemType == "RSA PRIVATE KEY" || pemType == "EC PRIVATE KEY" || pemType == "PRIVATE KEY" { // 查找对应的 BEGIN 标记 beginMarker := fmt.Sprintf("-----BEGIN %s-----", pemType) beginIdx := strings.Index(strings.ToUpper(content), strings.ToUpper(beginMarker)) if beginIdx == -1 { // 如果找不到精确匹配,尝试查找任何 BEGIN 标记 beginIdx = strings.Index(strings.ToUpper(content), "-----BEGIN") } if beginIdx == -1 { return buildPEMSingleFromContent(content, pemType) } // 计算 BEGIN 标记行的结束位置 // 先尝试查找换行符 beginLineEnd := strings.Index(content[beginIdx:], "\n") if beginLineEnd == -1 { beginLineEnd = strings.Index(content[beginIdx:], "\r") } if beginLineEnd == -1 { // 如果没有换行符,直接使用 BEGIN 标记的长度 // 这样可以避免在单行格式中错误地匹配到 END 标记 beginLineEnd = len(beginMarker) } else { // 找到了换行符,beginLineEnd 是相对于 beginIdx 的偏移 beginLineEnd += 1 // 包含换行符 } // 查找对应的 END 标记 endMarker := fmt.Sprintf("-----END %s-----", pemType) searchStart := beginIdx + beginLineEnd if searchStart > len(content) { searchStart = len(content) } endIdx := strings.Index(strings.ToUpper(content[searchStart:]), strings.ToUpper(endMarker)) if endIdx == -1 { // 如果找不到精确匹配,尝试查找任何 END 标记 endIdx = strings.Index(strings.ToUpper(content[searchStart:]), "-----END") } if endIdx == -1 { // 如果找不到 END,提取 BEGIN 之后的所有内容 bodyContent := content[beginIdx+beginLineEnd:] return buildPEMSingleFromContent(bodyContent, pemType) } // 提取 BEGIN 行结束和 END 标记开始之间的内容 bodyContent := content[beginIdx+beginLineEnd : searchStart+endIdx] // 移除可能包含的 END 标记前缀(防止单行格式时误包含) bodyContent = strings.TrimSpace(bodyContent) if strings.HasPrefix(strings.ToUpper(bodyContent), "-----END") { // 如果 bodyContent 以 END 标记开头,说明提取错误,需要重新提取 endMarkerStart := strings.Index(strings.ToUpper(bodyContent), strings.ToUpper(endMarker)) if endMarkerStart != -1 { bodyContent = bodyContent[:endMarkerStart] } else { // 如果找不到完整的 END 标记,尝试查找 "-----END" 的位置 endDashIdx := strings.Index(strings.ToUpper(bodyContent), "-----END") if endDashIdx != -1 { bodyContent = bodyContent[:endDashIdx] } } } return buildPEMSingleFromContent(bodyContent, pemType) } // 其他类型(证书等)的原有逻辑 // 查找 BEGIN 标记的结束位置(包含完整标记行) beginIdx := strings.Index(content, "-----BEGIN") if beginIdx == -1 { return buildPEMSingleFromContent(content, pemType) } // 找到 BEGIN 行结束(下一个换行符,或字符串结束) beginLineEnd := strings.Index(content[beginIdx:], "\n") if beginLineEnd == -1 { beginLineEnd = strings.Index(content[beginIdx:], "\r") } if beginLineEnd == -1 { // 如果没有换行,查找下一个 "-----" 作为结束 nextDash := strings.Index(content[beginIdx+10:], "-----") if nextDash != -1 { beginLineEnd = beginIdx + 10 + nextDash + 5 } else { beginLineEnd = len(content) } } else { beginLineEnd += beginIdx + 1 } // 查找 END 标记 endPattern := "-----END" searchStart := beginLineEnd if searchStart > len(content) { searchStart = len(content) } endIdx := strings.Index(content[searchStart:], endPattern) if endIdx == -1 { // 如果找不到 END,提取 BEGIN 之后的所有内容 bodyContent := content[beginLineEnd:] return buildPEMSingleFromContent(bodyContent, pemType) } // 提取 BEGIN 行结束和 END 标记开始之间的内容 bodyContent := content[beginLineEnd : searchStart+endIdx] // 清理并重新格式化 return buildPEMSingleFromContent(bodyContent, pemType) } // 如果没有 BEGIN 标记,直接格式化 return buildPEMSingleFromContent(content, pemType) } // buildPEMSingleFromContent 从清理后的内容构建单段标准 PEM 格式 // 只移除空白字符,保留所有实际的 base64 内容 func buildPEMSingleFromContent(content, pemType string) string { // 移除所有空格、换行符、制表符等空白字符 cleaned := strings.ReplaceAll(content, " ", "") cleaned = strings.ReplaceAll(cleaned, "\n", "") cleaned = strings.ReplaceAll(cleaned, "\r", "") cleaned = strings.ReplaceAll(cleaned, "\t", "") cleaned = strings.TrimSpace(cleaned) if cleaned == "" { return "" } // 构建标准 PEM 格式 var pem strings.Builder pem.WriteString("-----BEGIN ") pem.WriteString(pemType) pem.WriteString("-----\n") // 每 64 字符一行 for i := 0; i < len(cleaned); i += 64 { end := i + 64 if end > len(cleaned) { end = len(cleaned) } pem.WriteString(cleaned[i:end]) pem.WriteString("\n") } pem.WriteString("-----END ") pem.WriteString(pemType) pem.WriteString("-----\n") return pem.String() } // buildPEMChainFromContent 根据多段内容构建包含多段的 PEM 证书链 func buildPEMChainFromContent(blockBodies []string, pemType string) string { var b strings.Builder for _, body := range blockBodies { seg := buildPEMSingleFromContent(body, pemType) if seg == "" { continue } b.WriteString(seg) } return b.String() } // reconcileGatewayVirtualService 处理与Gateway关联的VirtualService func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Context, app *applicationv1.Application, gatewayNamespace string) error { logger := log.FromContext(ctx) vsName := app.Name + "-gateway-vs" gatewayName := app.Name + "-gateway" gatewayRef := gatewayName if gatewayNamespace != "" && gatewayNamespace != app.Namespace { gatewayRef = fmt.Sprintf("%s/%s", gatewayNamespace, gatewayName) } // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // Get + Create/Update VS existingVS := &istionetworkingv1.VirtualService{} err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, existingVS) if errors.IsNotFound(err) { logger.Info("Creating new Gateway VirtualService", "name", vsName) newVS := &istionetworkingv1.VirtualService{ObjectMeta: metav1.ObjectMeta{Name: vsName, Namespace: app.Namespace}} if err := r.configureGatewayVirtualService(newVS, app, service, gatewayRef); err != nil { return fmt.Errorf("failed to configure VirtualService: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newVS, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newVS); err != nil { return fmt.Errorf("failed to create VirtualService: %w", err) } logger.Info("Gateway VirtualService created", "name", vsName) } else if err != nil { return fmt.Errorf("failed to get VirtualService: %w", err) } else { if err := r.configureGatewayVirtualService(existingVS, app, service, gatewayRef); err != nil { return fmt.Errorf("failed to configure VirtualService: %w", err) } if err := r.Update(ctx, existingVS); err != nil { return fmt.Errorf("failed to update VirtualService: %w", err) } logger.Info("Gateway VirtualService updated", "name", vsName) } return nil } // configureGatewayVirtualService 配置与Gateway关联的VirtualService func (r *ApplicationReconciler) configureGatewayVirtualService(vs *istionetworkingv1.VirtualService, app *applicationv1.Application, service *core_v1.Service, gatewayRef string) error { // 设置基本字段 vs.Spec.Hosts = getHosts(app) vs.Spec.Gateways = []string{gatewayRef} // 创建HTTP路由 httpRoute := &istioapinetworkingv1.HTTPRoute{ Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, }, }, } // 配置匹配条件 if app.Spec.NetworkPolicy.Mesh != nil && len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 { for _, route := range app.Spec.NetworkPolicy.Mesh.Routes { if route.Match != nil { match := &istioapinetworkingv1.HTTPMatchRequest{} // URI匹配 if route.Match.URI != nil { match.Uri = convertStringMatch(route.Match.URI) } // 方法匹配 if route.Match.Method != nil { match.Method = convertStringMatch(route.Match.Method) } // 头部匹配 if len(route.Match.Headers) > 0 { match.Headers = make(map[string]*istioapinetworkingv1.StringMatch) for key, value := range route.Match.Headers { match.Headers[key] = convertStringMatch(&value) } } httpRoute.Match = append(httpRoute.Match, match) } } } else { // 默认匹配所有路径 httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{ { Uri: &istioapinetworkingv1.StringMatch{ MatchType: &istioapinetworkingv1.StringMatch_Prefix{ Prefix: "/", }, }, }, } } // 配置超时 if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Timeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000 httpRoute.Timeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } // 配置重试 if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Retry != nil { retryOn := "5xx,gateway-error,connect-failure" if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 { retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",") } retry := &istioapinetworkingv1.HTTPRetry{ Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts, RetryOn: retryOn, } if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000 retry.PerTryTimeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } httpRoute.Retries = retry } vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{httpRoute} return nil } // convertStringMatch 将Application CRD的StringMatch转换为Istio API的StringMatch func convertStringMatch(match *applicationv1.StringMatch) *istioapinetworkingv1.StringMatch { if match == nil { return nil } result := &istioapinetworkingv1.StringMatch{} if match.Exact != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Exact{ Exact: match.Exact, } } else if match.Prefix != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Prefix{ Prefix: match.Prefix, } } else if match.Regex != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Regex{ Regex: match.Regex, } } return result } // reconcileMesh 处理服务网格配置 func (r *ApplicationReconciler) reconcileMesh(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 注入Sidecar (如果需要) if app.Spec.NetworkPolicy.Mesh.Sidecar != nil && app.Spec.NetworkPolicy.Mesh.Sidecar.Inject { if err := r.ensureSidecarInjection(ctx, app); err != nil { return fmt.Errorf("failed to ensure sidecar injection: %w", err) } } // 协调东西向流量的VirtualService if err := r.reconcileMeshVirtualService(ctx, app); err != nil { return fmt.Errorf("failed to reconcile mesh VirtualService: %w", err) } // 协调DestinationRule (熔断器) if app.Spec.NetworkPolicy.Mesh.CircuitBreaker != nil { if err := r.reconcileDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to reconcile DestinationRule: %w", err) } } else { // 清理不再需要的DestinationRule if err := r.cleanupDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to cleanup DestinationRule: %w", err) } } logger.Info("Mesh resources reconciled", "name", app.Name) return nil } // ensureSidecarInjection 确保应用的工作负载启用了Sidecar注入 func (r *ApplicationReconciler) ensureSidecarInjection(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查应用类型,更新相应的工作负载 if app.Spec.Template.Type == "stateful" { // 更新StatefulSet statefulSet := &apps_v1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) if err == nil { // 设置注入注解 if statefulSet.Spec.Template.Annotations == nil { statefulSet.Spec.Template.Annotations = make(map[string]string) } statefulSet.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true" if err := r.Update(ctx, statefulSet); err != nil { return fmt.Errorf("failed to update StatefulSet for sidecar injection: %w", err) } logger.Info("Enabled sidecar injection for StatefulSet", "name", statefulSet.Name) } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get StatefulSet: %w", err) } } else { // 更新Deployment deployment := &apps_v1.Deployment{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) if err == nil { // 设置注入注解 if deployment.Spec.Template.Annotations == nil { deployment.Spec.Template.Annotations = make(map[string]string) } deployment.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true" if err := r.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update Deployment for sidecar injection: %w", err) } logger.Info("Enabled sidecar injection for Deployment", "name", deployment.Name) } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Deployment: %w", err) } } return nil } // reconcileMeshVirtualService 处理服务网格内的VirtualService func (r *ApplicationReconciler) reconcileMeshVirtualService(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) vsName := app.Name + "-mesh-vs" // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping mesh VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // 创建或更新VirtualService vs := &istionetworkingv1.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: vsName, Namespace: app.Namespace, }, } op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error { // 设置控制器引用 if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil { return err } // 配置VirtualService vs.Spec.Hosts = []string{service.Name} vs.Spec.Gateways = []string{"mesh"} // 服务网格内部流量 // 创建HTTP路由 httpRoutes := []*istioapinetworkingv1.HTTPRoute{} // 如果定义了路由规则,使用它们 if len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 { for _, route := range app.Spec.NetworkPolicy.Mesh.Routes { httpRoute := &istioapinetworkingv1.HTTPRoute{} // 配置匹配条件 if route.Match != nil { match := &istioapinetworkingv1.HTTPMatchRequest{} // URI匹配 if route.Match.URI != nil { match.Uri = convertStringMatch(route.Match.URI) } // 方法匹配 if route.Match.Method != nil { match.Method = convertStringMatch(route.Match.Method) } // 头部匹配 if len(route.Match.Headers) > 0 { match.Headers = make(map[string]*istioapinetworkingv1.StringMatch) for key, value := range route.Match.Headers { match.Headers[key] = convertStringMatch(&value) } } httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{match} } // 配置目标 destination := &istioapinetworkingv1.HTTPRouteDestination{ Destination: &istioapinetworkingv1.Destination{ Host: route.Destination.Host, }, Weight: route.Weight, } if route.Destination.Subset != "" { destination.Destination.Subset = route.Destination.Subset } if route.Destination.Port > 0 { destination.Destination.Port = &istioapinetworkingv1.PortSelector{ Number: uint32(route.Destination.Port), } } httpRoute.Route = []*istioapinetworkingv1.HTTPRouteDestination{destination} httpRoutes = append(httpRoutes, httpRoute) } } else { // 默认路由 httpRoute := &istioapinetworkingv1.HTTPRoute{ Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, }, }, } httpRoutes = append(httpRoutes, httpRoute) } // 配置超时 if app.Spec.NetworkPolicy.Mesh.Timeout > 0 { for _, route := range httpRoutes { seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000 route.Timeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } } // 配置重试 if app.Spec.NetworkPolicy.Mesh.Retry != nil { for _, route := range httpRoutes { retryOn := "5xx,gateway-error,connect-failure" if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 { retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",") } retry := &istioapinetworkingv1.HTTPRetry{ Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts, RetryOn: retryOn, } if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000 retry.PerTryTimeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } route.Retries = retry } } // 配置故障注入(测试用途) if app.Spec.NetworkPolicy.Mesh.FaultInjection != nil { for _, route := range httpRoutes { fault := &istioapinetworkingv1.HTTPFaultInjection{} // 延迟故障 if app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay != nil { delayDuration := &durationpb.Duration{ Seconds: int64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay / 1000), Nanos: int32((app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay % 1000) * 1000000), } fault.Delay = &istioapinetworkingv1.HTTPFaultInjection_Delay{ HttpDelayType: &istioapinetworkingv1.HTTPFaultInjection_Delay_FixedDelay{ FixedDelay: delayDuration, }, Percentage: &istioapinetworkingv1.Percent{ Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.Percentage) / 100.0, }, } } // 中止故障 if app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort != nil { httpStatus := int32(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.HttpStatus) fault.Abort = &istioapinetworkingv1.HTTPFaultInjection_Abort{ ErrorType: &istioapinetworkingv1.HTTPFaultInjection_Abort_HttpStatus{ HttpStatus: httpStatus, }, Percentage: &istioapinetworkingv1.Percent{ Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.Percentage) / 100.0, }, } } route.Fault = fault } } vs.Spec.Http = httpRoutes return nil }) if err != nil { return fmt.Errorf("failed to create or update mesh VirtualService: %w", err) } logger.Info("Mesh VirtualService reconciled", "name", vsName, "operation", op) return nil } // reconcileDestinationRule 处理DestinationRule(熔断器) func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) drName := app.Name + "-dr" // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping DestinationRule creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // Get + Create/Update DestinationRule existingDR := &istionetworkingv1.DestinationRule{} err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, existingDR) if errors.IsNotFound(err) { logger.Info("Creating new DestinationRule", "name", drName) newDR := &istionetworkingv1.DestinationRule{ObjectMeta: metav1.ObjectMeta{Name: drName, Namespace: app.Namespace}} newDR.Spec.Host = service.Name var cb *applicationv1.CircuitBreaker if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker } if cb == nil && app.Spec.TrafficPolicy != nil { cb = app.Spec.TrafficPolicy.CircuitBreaker } if cb != nil { connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, } maxEjectionPercent := uint32(100) if cb.MaxEjectionPercent > 0 { maxEjectionPercent = uint32(cb.MaxEjectionPercent) } outlierDetection := &istioapinetworkingv1.OutlierDetection{ ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent), } var loadBalancer *istioapinetworkingv1.LoadBalancerSettings if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) } else { loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} } newDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDR, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newDR); err != nil { return fmt.Errorf("failed to create DestinationRule: %w", err) } logger.Info("DestinationRule created", "name", drName) } else if err != nil { return fmt.Errorf("failed to get DestinationRule: %w", err) } else { existingDR.Spec.Host = service.Name var cb *applicationv1.CircuitBreaker if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker } if cb == nil && app.Spec.TrafficPolicy != nil { cb = app.Spec.TrafficPolicy.CircuitBreaker } if cb != nil { connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, } maxEjectionPercent := uint32(100) if cb.MaxEjectionPercent > 0 { maxEjectionPercent = uint32(cb.MaxEjectionPercent) } outlierDetection := &istioapinetworkingv1.OutlierDetection{ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent)} var loadBalancer *istioapinetworkingv1.LoadBalancerSettings if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) } else { loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} } existingDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} } else { existingDR.Spec.TrafficPolicy = nil } if err := r.Update(ctx, existingDR); err != nil { return fmt.Errorf("failed to update DestinationRule: %w", err) } logger.Info("DestinationRule updated", "name", drName) } return nil } // cleanupDestinationRule 清理不再需要的DestinationRule func (r *ApplicationReconciler) cleanupDestinationRule(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) drName := app.Name + "-dr" dr := &istionetworkingv1.DestinationRule{} err := r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr) if err == nil { logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName) if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete DestinationRule: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get DestinationRule: %w", err) } return nil } // 辅助函数 - 转换负载均衡设置 func convertLoadBalancerSettings(settings *applicationv1.LoadBalancerSettings) *istioapinetworkingv1.LoadBalancerSettings { result := &istioapinetworkingv1.LoadBalancerSettings{} if settings.ConsistentHash != nil { consistentHash := &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB{} if settings.ConsistentHash.HttpHeaderName != "" { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpHeaderName{ HttpHeaderName: settings.ConsistentHash.HttpHeaderName, } } else if settings.ConsistentHash.UseSourceIp { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{ UseSourceIp: true, } } else if settings.ConsistentHash.HttpCookie != nil { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpCookie{ HttpCookie: &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HTTPCookie{ Name: settings.ConsistentHash.HttpCookie.Name, Path: settings.ConsistentHash.HttpCookie.Path, Ttl: &durationpb.Duration{ Seconds: int64(settings.ConsistentHash.HttpCookie.Ttl), }, }, } } result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHash{ ConsistentHash: consistentHash, } } else { // 简单负载均衡 var simpleType istioapinetworkingv1.LoadBalancerSettings_SimpleLB switch settings.Simple { case "LEAST_CONN": simpleType = istioapinetworkingv1.LoadBalancerSettings_LEAST_CONN case "RANDOM": simpleType = istioapinetworkingv1.LoadBalancerSettings_RANDOM case "PASSTHROUGH": simpleType = istioapinetworkingv1.LoadBalancerSettings_PASSTHROUGH default: simpleType = istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN } result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_Simple{ Simple: simpleType, } } return result } // 此处已删除重复的 reconcileIstioTraffic 函数实现,保留第139行的定义