package application import ( "context" "fmt" "strings" "time" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1" application_controller_utils "code.gitea.io/gitea/modules/k8s/controller/application/utils" apps_v1 "k8s.io/api/apps/v1" core_v1 "k8s.io/api/core/v1" "google.golang.org/protobuf/types/known/durationpb" istioapinetworkingv1 "istio.io/api/networking/v1" istionetworkingv1 "istio.io/client-go/pkg/apis/networking/v1" ) // ApplicationReconciler reconciles a Application object type ApplicationReconciler struct { client.Client Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/status,verbs=get;update;patch // +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/finalizers,verbs=update // +kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch;update;patch // +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=create;delete;get;list;watch;update;patch // Reconcile is part of the main kubernetes reconciliation loop func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) // 获取Application实例 app := &applicationv1.Application{} err := r.Get(ctx, req.NamespacedName, app) if err != nil { if errors.IsNotFound(err) { logger.Info("Application resource not found. Ignoring since object must be deleted") return ctrl.Result{}, nil } logger.Error(err, "Failed to get Application") return ctrl.Result{}, err } logger.Info("Processing Application", "name", app.Name, "namespace", app.Namespace, "type", app.Spec.Template.Type) // 添加 finalizer 处理逻辑 finalizerName := "application.devstar.cn/finalizer" // 检查对象是否正在被删除 if !app.ObjectMeta.DeletionTimestamp.IsZero() { // 对象正在被删除 - 处理 finalizer if k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) { // 执行清理操作 logger.Info("Cleaning up resources before deletion", "name", app.Name) // 清理完成后移除 finalizer k8s_sigs_controller_runtime_utils.RemoveFinalizer(app, finalizerName) if err := r.Update(ctx, app); err != nil { logger.Error(err, "Failed to remove finalizer") return ctrl.Result{}, err } } // 已标记为删除且处理完成,允许继续删除流程 return ctrl.Result{}, nil } // 如果对象不包含 finalizer,就添加它 if !k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) { logger.Info("Adding finalizer", "name", app.Name) k8s_sigs_controller_runtime_utils.AddFinalizer(app, finalizerName) if err := r.Update(ctx, app); err != nil { logger.Error(err, "Failed to add finalizer") return ctrl.Result{}, err } } // 根据应用类型协调相应的资源 if app.Spec.Template.Type == "stateful" { // 协调 StatefulSet if err := r.reconcileStatefulSet(ctx, app); err != nil { logger.Error(err, "Failed to reconcile StatefulSet") return ctrl.Result{}, err } } else { // 协调 Deployment(默认为无状态应用) if err := r.reconcileDeployment(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Deployment") return ctrl.Result{}, err } } // 协调 Service if err := r.reconcileService(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Service") return ctrl.Result{}, err } // 更新状态 if err := r.updateStatus(ctx, app); err != nil { logger.Error(err, "Failed to update status") return ctrl.Result{}, err } // 协调网络策略 if err := r.reconcileNetworkPolicy(ctx, app); err != nil { logger.Error(err, "Failed to reconcile network policy") return ctrl.Result{}, err } // 如果配置了旧版TrafficPolicy,为向后兼容处理旧的流量策略 if app.Spec.TrafficPolicy != nil { if err := r.reconcileIstioTraffic(ctx, app); err != nil { logger.Error(err, "Failed to reconcile Istio traffic policy") return ctrl.Result{}, err } } logger.Info("Successfully reconciled Application", "name", app.Name) return ctrl.Result{RequeueAfter: time.Minute * 1}, nil } // 服务网格流量治理逻辑 func (r *ApplicationReconciler) reconcileIstioTraffic(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) if app.Spec.TrafficPolicy == nil { logger.Info("No legacy TrafficPolicy configured, skipping") return nil } // 如果已经有NetworkPolicy配置,使用它而不是旧版TrafficPolicy if app.Spec.NetworkPolicy != nil && ((app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled) || (app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled)) { logger.Info("NetworkPolicy already configured, skipping legacy TrafficPolicy") return nil } // 处理金丝雀发布 if app.Spec.TrafficPolicy.Canary != nil && app.Spec.TrafficPolicy.Canary.Enabled { logger.Info("Processing legacy Canary traffic configuration") // 这里实现金丝雀发布的逻辑 // 为简化实现,可以创建一个VirtualService来处理金丝雀流量 vsName := app.Name + "-canary-vs" vs := &istionetworkingv1.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: vsName, Namespace: app.Namespace, }, } // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping legacy VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // 构建金丝雀服务名 canaryService := app.Name + "-" + app.Spec.TrafficPolicy.Canary.CanaryVersion + "-svc" // 创建或更新VirtualService op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error { // 设置控制器引用 if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil { return err } // 配置金丝雀VirtualService mainWeight := int32(app.Spec.TrafficPolicy.Canary.MainWeight) canaryWeight := int32(100 - app.Spec.TrafficPolicy.Canary.MainWeight) vs.Spec.Hosts = []string{service.Name} vs.Spec.Gateways = []string{"mesh"} // 默认只在网格内生效 vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{ { Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, Weight: mainWeight, }, { Destination: &istioapinetworkingv1.Destination{ Host: canaryService, }, Weight: canaryWeight, }, }, }, } return nil }) if err != nil { return fmt.Errorf("failed to create or update legacy Canary VirtualService: %w", err) } logger.Info("Legacy Canary VirtualService reconciled", "name", vsName, "operation", op) } // 如果配置了熔断器,确保创建DestinationRule if app.Spec.TrafficPolicy.CircuitBreaker != nil { logger.Info("Processing legacy CircuitBreaker configuration") // reconcileDestinationRule函数已经支持从TrafficPolicy获取熔断器配置 if err := r.reconcileDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to reconcile legacy DestinationRule: %w", err) } } return nil } // 修复 reconcileDeployment 函数中的调用 func (r *ApplicationReconciler) reconcileDeployment(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查 Deployment 是否存在 deployment := &apps_v1.Deployment{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) if errors.IsNotFound(err) { // 创建新的 Deployment logger.Info("Creating new Deployment", "name", app.Name) newDeployment, err := application_controller_utils.NewDeployment(app) if err != nil { return fmt.Errorf("failed to generate deployment: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDeployment, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newDeployment); err != nil { return fmt.Errorf("failed to create deployment: %w", err) } logger.Info("Successfully created Deployment", "name", app.Name) return nil } else if err != nil { return fmt.Errorf("failed to get deployment: %w", err) } // 获取期望的副本数 desiredReplicas := int32(1) // 默认值 if app.Spec.Replicas != nil { desiredReplicas = *app.Spec.Replicas } // 获取当前的副本数 currentReplicas := int32(1) // 默认值 if deployment.Spec.Replicas != nil { currentReplicas = *deployment.Spec.Replicas } logger.Info("Deployment replica status", "name", app.Name, "current-spec-replicas", currentReplicas, "desired-replicas", desiredReplicas, "actual-replicas", deployment.Status.Replicas, "ready-replicas", deployment.Status.ReadyReplicas) // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 1. 检查副本数是否变更 if currentReplicas != desiredReplicas { logger.Info("Replica count changed", "current", currentReplicas, "desired", desiredReplicas) needsUpdate = true updateFields["replicas"] = desiredReplicas deployment.Spec.Replicas = &desiredReplicas } // 2. 生成期望的 Deployment 来比较其他字段 updatedDeployment, err := application_controller_utils.NewDeployment(app) if err != nil { return fmt.Errorf("failed to generate updated deployment: %w", err) } // 3. 检查镜像是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { currentImage := deployment.Spec.Template.Spec.Containers[0].Image desiredImage := updatedDeployment.Spec.Template.Spec.Containers[0].Image if currentImage != desiredImage { logger.Info("Image changed", "current", currentImage, "desired", desiredImage) needsUpdate = true updateFields["image"] = desiredImage deployment.Spec.Template.Spec.Containers[0].Image = desiredImage } } // 4. 检查环境变量是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalEnvVars(deployment.Spec.Template.Spec.Containers[0].Env, updatedDeployment.Spec.Template.Spec.Containers[0].Env) { logger.Info("Environment variables changed") needsUpdate = true updateFields["env"] = "changed" deployment.Spec.Template.Spec.Containers[0].Env = updatedDeployment.Spec.Template.Spec.Containers[0].Env } } // 5. 检查资源配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalResources(deployment.Spec.Template.Spec.Containers[0].Resources, updatedDeployment.Spec.Template.Spec.Containers[0].Resources) { logger.Info("Resource requirements changed") needsUpdate = true updateFields["resources"] = "changed" deployment.Spec.Template.Spec.Containers[0].Resources = updatedDeployment.Spec.Template.Spec.Containers[0].Resources } } // 6. 检查端口配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalPorts(deployment.Spec.Template.Spec.Containers[0].Ports, updatedDeployment.Spec.Template.Spec.Containers[0].Ports) { logger.Info("Container ports changed") needsUpdate = true updateFields["ports"] = "changed" deployment.Spec.Template.Spec.Containers[0].Ports = updatedDeployment.Spec.Template.Spec.Containers[0].Ports } } // 7. 检查健康检查配置是否变更 if len(deployment.Spec.Template.Spec.Containers) > 0 && len(updatedDeployment.Spec.Template.Spec.Containers) > 0 { if !equalProbes(deployment.Spec.Template.Spec.Containers[0].LivenessProbe, updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe) || !equalProbes(deployment.Spec.Template.Spec.Containers[0].ReadinessProbe, updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe) { logger.Info("Health check probes changed") needsUpdate = true updateFields["probes"] = "changed" deployment.Spec.Template.Spec.Containers[0].LivenessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe deployment.Spec.Template.Spec.Containers[0].ReadinessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe } } // 执行更新 if needsUpdate { logger.Info("Updating deployment", "name", app.Name, "fields", updateFields) if err := r.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update deployment: %w", err) } logger.Info("Deployment updated successfully", "name", app.Name, "fields", updateFields) } else { logger.Info("No deployment updates needed", "name", app.Name) } return nil } // 添加辅助函数:比较端口配置 func equalPorts(current, desired []core_v1.ContainerPort) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]core_v1.ContainerPort) for _, port := range current { currentMap[port.Name] = port } for _, port := range desired { if currentPort, exists := currentMap[port.Name]; !exists || currentPort.ContainerPort != port.ContainerPort || currentPort.Protocol != port.Protocol { return false } } return true } // 添加辅助函数:比较探针配置 func equalProbes(current, desired *core_v1.Probe) bool { if current == nil && desired == nil { return true } if current == nil || desired == nil { return false } // 简单比较,可以根据需要扩展 if current.InitialDelaySeconds != desired.InitialDelaySeconds || current.PeriodSeconds != desired.PeriodSeconds || current.TimeoutSeconds != desired.TimeoutSeconds || current.SuccessThreshold != desired.SuccessThreshold || current.FailureThreshold != desired.FailureThreshold { return false } // 比较 HTTPGet 配置 if current.HTTPGet != nil && desired.HTTPGet != nil { return current.HTTPGet.Path == desired.HTTPGet.Path && current.HTTPGet.Port == desired.HTTPGet.Port } return current.HTTPGet == nil && desired.HTTPGet == nil } // 添加辅助函数:比较环境变量 func equalEnvVars(current, desired []core_v1.EnvVar) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]string) for _, env := range current { currentMap[env.Name] = env.Value } desiredMap := make(map[string]string) for _, env := range desired { desiredMap[env.Name] = env.Value } // 检查每个期望的环境变量是否匹配 for key, value := range desiredMap { if currentMap[key] != value { return false } } // 检查是否有多余的环境变量 for key := range currentMap { if _, exists := desiredMap[key]; !exists { return false } } return true } // 添加辅助函数:比较资源配置 func equalResources(current, desired core_v1.ResourceRequirements) bool { // 比较 Limits if current.Limits == nil && desired.Limits != nil { return false } if current.Limits != nil && desired.Limits == nil { return false } if current.Limits != nil && desired.Limits != nil { // 比较 CPU currentCPU := current.Limits.Cpu() desiredCPU := desired.Limits.Cpu() if currentCPU != nil && desiredCPU != nil { if !currentCPU.Equal(*desiredCPU) { return false } } else if currentCPU != desiredCPU { // 一个为 nil,另一个不为 nil return false } // 比较 Memory currentMemory := current.Limits.Memory() desiredMemory := desired.Limits.Memory() if currentMemory != nil && desiredMemory != nil { if !currentMemory.Equal(*desiredMemory) { return false } } else if currentMemory != desiredMemory { // 一个为 nil,另一个不为 nil return false } } // 比较 Requests if current.Requests == nil && desired.Requests != nil { return false } if current.Requests != nil && desired.Requests == nil { return false } if current.Requests != nil && desired.Requests != nil { // 比较 CPU currentCPU := current.Requests.Cpu() desiredCPU := desired.Requests.Cpu() if currentCPU != nil && desiredCPU != nil { if !currentCPU.Equal(*desiredCPU) { return false } } else if currentCPU != desiredCPU { // 一个为 nil,另一个不为 nil return false } // 比较 Memory currentMemory := current.Requests.Memory() desiredMemory := desired.Requests.Memory() if currentMemory != nil && desiredMemory != nil { if !currentMemory.Equal(*desiredMemory) { return false } } else if currentMemory != desiredMemory { // 一个为 nil,另一个不为 nil return false } } return true } func (r *ApplicationReconciler) reconcileStatefulSet(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查 StatefulSet 是否存在 statefulSet := &apps_v1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) if errors.IsNotFound(err) { // 创建新的 StatefulSet logger.Info("Creating new StatefulSet", "name", app.Name) newStatefulSet, err := application_controller_utils.NewStatefulSet(app) if err != nil { return fmt.Errorf("failed to generate statefulset: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newStatefulSet, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newStatefulSet); err != nil { return fmt.Errorf("failed to create statefulset: %w", err) } logger.Info("Successfully created StatefulSet", "name", app.Name) return nil } else if err != nil { return fmt.Errorf("failed to get statefulset: %w", err) } // 更新现有 StatefulSet logger.Info("Checking StatefulSet for updates", "name", app.Name) updatedStatefulSet, err := application_controller_utils.NewStatefulSet(app) if err != nil { return fmt.Errorf("failed to generate updated statefulset: %w", err) } // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 检查镜像是否变更 if len(statefulSet.Spec.Template.Spec.Containers) > 0 && len(updatedStatefulSet.Spec.Template.Spec.Containers) > 0 { currentImage := statefulSet.Spec.Template.Spec.Containers[0].Image desiredImage := updatedStatefulSet.Spec.Template.Spec.Containers[0].Image if currentImage != desiredImage { logger.Info("StatefulSet image changed", "current", currentImage, "desired", desiredImage) needsUpdate = true updateFields["image"] = desiredImage } } // 检查副本数是否变更 currentReplicas := int32(1) if statefulSet.Spec.Replicas != nil { currentReplicas = *statefulSet.Spec.Replicas } desiredReplicas := int32(1) if updatedStatefulSet.Spec.Replicas != nil { desiredReplicas = *updatedStatefulSet.Spec.Replicas } if currentReplicas != desiredReplicas { logger.Info("StatefulSet replica count changed", "current", currentReplicas, "desired", desiredReplicas) needsUpdate = true updateFields["replicas"] = desiredReplicas } if needsUpdate { logger.Info("Updating StatefulSet", "name", app.Name, "fields", updateFields) statefulSet.Spec = updatedStatefulSet.Spec if err := r.Update(ctx, statefulSet); err != nil { return fmt.Errorf("failed to update statefulset: %w", err) } logger.Info("StatefulSet updated successfully", "name", app.Name) } else { logger.Info("No StatefulSet updates needed", "name", app.Name) } return nil } // 添加辅助函数:比较字符串映射 func equalStringMaps(current, desired map[string]string) bool { if len(current) != len(desired) { return false } for k, v := range desired { if current[k] != v { return false } } return true } // equalIngressTLS函数已废弃 func (r *ApplicationReconciler) updateStatus(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) var deployment *apps_v1.Deployment var statefulSet *apps_v1.StatefulSet var err error // 根据应用类型获取对应的资源状态 if app.Spec.Template.Type == "stateful" { statefulSet = &apps_v1.StatefulSet{} err = r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) } else { deployment = &apps_v1.Deployment{} err = r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) } if err != nil { app.Status.Phase = "Failed" app.Status.Message = fmt.Sprintf("Workload not found: %v", err) logger.Error(err, "Failed to get workload for status update") } else { // 根据工作负载类型更新状态 if statefulSet != nil { app.Status.Replicas = statefulSet.Status.Replicas app.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas } else if deployment != nil { app.Status.Replicas = deployment.Status.Replicas app.Status.ReadyReplicas = deployment.Status.ReadyReplicas } // 状态判断逻辑 if app.Status.ReadyReplicas == 0 { app.Status.Phase = "Pending" app.Status.Message = "Application is starting" } else if app.Status.ReadyReplicas < app.Status.Replicas { app.Status.Phase = "Scaling" app.Status.Message = fmt.Sprintf("Ready: %d/%d", app.Status.ReadyReplicas, app.Status.Replicas) } else { app.Status.Phase = "Running" app.Status.Message = "Application is healthy" } } app.Status.LastUpdated = metav1.Now() if err := r.Status().Update(ctx, app); err != nil { return fmt.Errorf("failed to update status: %w", err) } logger.Info("Updated Application status", "phase", app.Status.Phase, "message", app.Status.Message) return nil } // 协调网络策略 func (r *ApplicationReconciler) reconcileNetworkPolicy(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 如果没有配置网络策略,跳过处理 if app.Spec.NetworkPolicy == nil { logger.Info("No network policy configured, skipping", "name", app.Name) return nil } // 协调Gateway(南北向流量) if app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled { if err := r.reconcileGateway(ctx, app); err != nil { return fmt.Errorf("failed to reconcile gateway: %w", err) } } else { // 清理不再需要的Gateway资源 if err := r.cleanupGateway(ctx, app); err != nil { return fmt.Errorf("failed to cleanup gateway: %w", err) } } // 协调Mesh(东西向流量) if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled { if err := r.reconcileMesh(ctx, app); err != nil { return fmt.Errorf("failed to reconcile mesh: %w", err) } } else { // 清理不再需要的Mesh资源 if err := r.cleanupMesh(ctx, app); err != nil { return fmt.Errorf("failed to cleanup mesh: %w", err) } } return nil } // cleanupGateway 清理不再需要的Gateway资源 func (r *ApplicationReconciler) cleanupGateway(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) gatewayName := app.Name + "-gateway" vsName := app.Name + "-gateway-vs" // 清理Gateway gateway := &istionetworkingv1.Gateway{} err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}, gateway) if err == nil { logger.Info("Cleaning up Gateway that is no longer needed", "name", gatewayName) if err := r.Delete(ctx, gateway); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Gateway: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Gateway: %w", err) } // 清理VirtualService vs := &istionetworkingv1.VirtualService{} err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs) if err == nil { logger.Info("Cleaning up Gateway VirtualService that is no longer needed", "name", vsName) if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Gateway VirtualService: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Gateway VirtualService: %w", err) } return nil } // cleanupMesh 清理不再需要的Mesh资源 func (r *ApplicationReconciler) cleanupMesh(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) vsName := app.Name + "-mesh-vs" drName := app.Name + "-dr" // 清理VirtualService vs := &istionetworkingv1.VirtualService{} err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs) if err == nil { logger.Info("Cleaning up Mesh VirtualService that is no longer needed", "name", vsName) if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete Mesh VirtualService: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Mesh VirtualService: %w", err) } // 清理DestinationRule dr := &istionetworkingv1.DestinationRule{} err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr) if err == nil { logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName) if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete DestinationRule: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get DestinationRule: %w", err) } return nil } // 为了在SetupWithManager中注册Istio资源监控 func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&applicationv1.Application{}). Owns(&apps_v1.Deployment{}). Owns(&apps_v1.StatefulSet{}). Owns(&core_v1.Service{}). // 添加对Istio资源的监控 // Owns(&istionetworkingv1.VirtualService{}). // Owns(&istionetworkingv1.Gateway{}). // Owns(&istionetworkingv1.DestinationRule{}). Complete(r) } func (r *ApplicationReconciler) reconcileService(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) serviceName := app.Name + "-svc" // 检查是否需要创建 Service shouldCreate := false if app.Spec.Service != nil { shouldCreate = app.Spec.Service.Enabled } else { // 向后兼容:使用旧的 expose 配置 shouldCreate = app.Spec.Expose && len(app.Spec.Template.Ports) > 0 } // 获取现有 Service service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{ Name: serviceName, Namespace: app.Namespace, }, service) serviceExists := !errors.IsNotFound(err) if !shouldCreate { // 如果不需要 Service 但存在,则删除 if serviceExists { logger.Info("Deleting existing Service as it's disabled", "name", serviceName) if err := r.Delete(ctx, service); err != nil { return fmt.Errorf("failed to delete service: %w", err) } } return nil } // 需要创建 Service if !serviceExists { // Service 不存在,创建新的 logger.Info("Creating new Service", "name", serviceName) newService, err := application_controller_utils.NewService(app) if err != nil { return fmt.Errorf("failed to generate service: %w", err) } if newService == nil { logger.Info("Service creation skipped", "name", serviceName) return nil } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newService, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newService); err != nil { return fmt.Errorf("failed to create service: %w", err) } logger.Info("Successfully created Service", "name", serviceName, "type", newService.Spec.Type, "ports", len(newService.Spec.Ports)) return nil } // Service 存在,检查是否需要更新 if err != nil { return fmt.Errorf("failed to get service: %w", err) } logger.Info("Checking Service for updates", "name", serviceName) updatedService, err := application_controller_utils.NewService(app) if err != nil { return fmt.Errorf("failed to generate updated service: %w", err) } if updatedService == nil { logger.Info("Service should be deleted", "name", serviceName) if err := r.Delete(ctx, service); err != nil { return fmt.Errorf("failed to delete service: %w", err) } return nil } // 检查是否需要更新 needsUpdate := false updateFields := make(map[string]interface{}) // 检查服务类型 if service.Spec.Type != updatedService.Spec.Type { logger.Info("Service type changed", "current", service.Spec.Type, "desired", updatedService.Spec.Type) needsUpdate = true updateFields["type"] = updatedService.Spec.Type service.Spec.Type = updatedService.Spec.Type } // 检查端口配置 if !equalServicePorts(service.Spec.Ports, updatedService.Spec.Ports) { logger.Info("Service ports changed") needsUpdate = true updateFields["ports"] = "changed" service.Spec.Ports = updatedService.Spec.Ports } // 检查 LoadBalancer 配置 if service.Spec.LoadBalancerIP != updatedService.Spec.LoadBalancerIP { needsUpdate = true updateFields["loadBalancerIP"] = updatedService.Spec.LoadBalancerIP service.Spec.LoadBalancerIP = updatedService.Spec.LoadBalancerIP } if !equalStringSlices(service.Spec.LoadBalancerSourceRanges, updatedService.Spec.LoadBalancerSourceRanges) { needsUpdate = true updateFields["loadBalancerSourceRanges"] = "changed" service.Spec.LoadBalancerSourceRanges = updatedService.Spec.LoadBalancerSourceRanges } // 检查 ExternalName if service.Spec.ExternalName != updatedService.Spec.ExternalName { needsUpdate = true updateFields["externalName"] = updatedService.Spec.ExternalName service.Spec.ExternalName = updatedService.Spec.ExternalName } // 检查会话亲和性 if service.Spec.SessionAffinity != updatedService.Spec.SessionAffinity { needsUpdate = true updateFields["sessionAffinity"] = updatedService.Spec.SessionAffinity service.Spec.SessionAffinity = updatedService.Spec.SessionAffinity } // 更新标签 if !equalStringMaps(service.Labels, updatedService.Labels) { needsUpdate = true updateFields["labels"] = "changed" if service.Labels == nil { service.Labels = make(map[string]string) } for k, v := range updatedService.Labels { service.Labels[k] = v } } // 更新注解 if !equalStringMaps(service.Annotations, updatedService.Annotations) { needsUpdate = true updateFields["annotations"] = "changed" if service.Annotations == nil { service.Annotations = make(map[string]string) } for k, v := range updatedService.Annotations { service.Annotations[k] = v } } if needsUpdate { logger.Info("Updating Service", "name", serviceName, "fields", updateFields) if err := r.Update(ctx, service); err != nil { return fmt.Errorf("failed to update service: %w", err) } logger.Info("Service updated successfully", "name", serviceName) } else { logger.Info("No service updates needed", "name", serviceName) } return nil } // 添加辅助函数:比较服务端口 func equalServicePorts(current, desired []core_v1.ServicePort) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]core_v1.ServicePort) for _, port := range current { currentMap[port.Name] = port } for _, port := range desired { if currentPort, exists := currentMap[port.Name]; !exists || currentPort.Port != port.Port || currentPort.TargetPort != port.TargetPort || currentPort.Protocol != port.Protocol || currentPort.NodePort != port.NodePort { return false } } return true } // 添加辅助函数:比较字符串切片 func equalStringSlices(current, desired []string) bool { if len(current) != len(desired) { return false } currentMap := make(map[string]bool) for _, item := range current { currentMap[item] = true } for _, item := range desired { if !currentMap[item] { return false } } return true } // reconcileGateway 处理Gateway资源 func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) gatewayName := app.Name + "-gateway" // Get + Create/Update 以规避 CreateOrUpdate 对 protobuf 类型的反射补丁 existingGateway := &istionetworkingv1.Gateway{} err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}, existingGateway) if errors.IsNotFound(err) { logger.Info("Creating new Gateway", "name", gatewayName) newGateway := &istionetworkingv1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: gatewayName, Namespace: app.Namespace}} if err := r.configureGateway(newGateway, app); err != nil { return fmt.Errorf("failed to configure Gateway: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newGateway, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newGateway); err != nil { return fmt.Errorf("failed to create Gateway: %w", err) } logger.Info("Gateway created", "name", gatewayName) } else if err != nil { return fmt.Errorf("failed to get Gateway: %w", err) } else { if err := r.configureGateway(existingGateway, app); err != nil { return fmt.Errorf("failed to configure Gateway: %w", err) } if err := r.Update(ctx, existingGateway); err != nil { return fmt.Errorf("failed to update Gateway: %w", err) } logger.Info("Gateway updated", "name", gatewayName) } // 协调与Gateway关联的VirtualService if err := r.reconcileGatewayVirtualService(ctx, app); err != nil { return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err) } return nil } // configureGateway 配置Gateway资源 func (r *ApplicationReconciler) configureGateway(gateway *istionetworkingv1.Gateway, app *applicationv1.Application) error { // 设置Gateway选择器 gateway.Spec.Selector = map[string]string{ "istio": "ingressgateway", } // 清空服务器列表,准备重新添加 gateway.Spec.Servers = []*istioapinetworkingv1.Server{} // 如果未配置端口,则添加默认HTTP端口 if len(app.Spec.NetworkPolicy.Gateway.Ports) == 0 { gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: 80, Protocol: "HTTP", Name: "http", }, Hosts: getHosts(app), }) } else { // 添加所有配置的端口 for _, port := range app.Spec.NetworkPolicy.Gateway.Ports { server := &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: uint32(port.Number), Protocol: port.Protocol, Name: port.Name, }, Hosts: getHosts(app), } gateway.Spec.Servers = append(gateway.Spec.Servers, server) } } // 配置TLS if len(app.Spec.NetworkPolicy.Gateway.TLS) > 0 { for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS { // 查找相应协议的服务器 var serverIndex = -1 for j, server := range gateway.Spec.Servers { if server.Port.Protocol == "HTTPS" || server.Port.Protocol == "TLS" { serverIndex = j break } } // 如果没有找到HTTPS/TLS服务器,创建一个 if serverIndex == -1 { gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{ Port: &istioapinetworkingv1.Port{ Number: 443, Protocol: "HTTPS", Name: "https", }, Hosts: getHosts(app), }) serverIndex = len(gateway.Spec.Servers) - 1 } // 配置TLS设置 server := gateway.Spec.Servers[serverIndex] server.Tls = &istioapinetworkingv1.ServerTLSSettings{ Mode: getIstioTLSMode(tls.Mode), CredentialName: tls.SecretName, } // 设置最小TLS版本 if tls.MinProtocolVersion != "" { server.Tls.MinProtocolVersion = getIstioTLSVersion(tls.MinProtocolVersion) } // 如果指定了特定主机,覆盖默认主机 if len(tls.Hosts) > 0 { server.Hosts = tls.Hosts } } } return nil } // 辅助函数 func getHosts(app *applicationv1.Application) []string { // 如果NetworkPolicy.Gateway.Hosts存在,使用它 if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil && len(app.Spec.NetworkPolicy.Gateway.Hosts) > 0 { return app.Spec.NetworkPolicy.Gateway.Hosts } // 不再检查旧的Ingress配置 // 使用通配符 return []string{"*"} } func getIstioTLSMode(mode string) istioapinetworkingv1.ServerTLSSettings_TLSmode { switch mode { case "MUTUAL": return istioapinetworkingv1.ServerTLSSettings_MUTUAL case "PASSTHROUGH": return istioapinetworkingv1.ServerTLSSettings_PASSTHROUGH default: return istioapinetworkingv1.ServerTLSSettings_SIMPLE } } func getIstioTLSVersion(version string) istioapinetworkingv1.ServerTLSSettings_TLSProtocol { switch version { case "TLSv1_0": return istioapinetworkingv1.ServerTLSSettings_TLSV1_0 case "TLSv1_1": return istioapinetworkingv1.ServerTLSSettings_TLSV1_1 case "TLSv1_3": return istioapinetworkingv1.ServerTLSSettings_TLSV1_3 default: return istioapinetworkingv1.ServerTLSSettings_TLSV1_2 } } // reconcileGatewayVirtualService 处理与Gateway关联的VirtualService func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) vsName := app.Name + "-gateway-vs" // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // Get + Create/Update VS existingVS := &istionetworkingv1.VirtualService{} err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, existingVS) if errors.IsNotFound(err) { logger.Info("Creating new Gateway VirtualService", "name", vsName) newVS := &istionetworkingv1.VirtualService{ObjectMeta: metav1.ObjectMeta{Name: vsName, Namespace: app.Namespace}} if err := r.configureGatewayVirtualService(newVS, app, service); err != nil { return fmt.Errorf("failed to configure VirtualService: %w", err) } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newVS, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newVS); err != nil { return fmt.Errorf("failed to create VirtualService: %w", err) } logger.Info("Gateway VirtualService created", "name", vsName) } else if err != nil { return fmt.Errorf("failed to get VirtualService: %w", err) } else { if err := r.configureGatewayVirtualService(existingVS, app, service); err != nil { return fmt.Errorf("failed to configure VirtualService: %w", err) } if err := r.Update(ctx, existingVS); err != nil { return fmt.Errorf("failed to update VirtualService: %w", err) } logger.Info("Gateway VirtualService updated", "name", vsName) } return nil } // configureGatewayVirtualService 配置与Gateway关联的VirtualService func (r *ApplicationReconciler) configureGatewayVirtualService(vs *istionetworkingv1.VirtualService, app *applicationv1.Application, service *core_v1.Service) error { // 设置基本字段 vs.Spec.Hosts = getHosts(app) vs.Spec.Gateways = []string{app.Name + "-gateway"} // 创建HTTP路由 httpRoute := &istioapinetworkingv1.HTTPRoute{ Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, }, }, } // 配置匹配条件 if app.Spec.NetworkPolicy.Mesh != nil && len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 { for _, route := range app.Spec.NetworkPolicy.Mesh.Routes { if route.Match != nil { match := &istioapinetworkingv1.HTTPMatchRequest{} // URI匹配 if route.Match.URI != nil { match.Uri = convertStringMatch(route.Match.URI) } // 方法匹配 if route.Match.Method != nil { match.Method = convertStringMatch(route.Match.Method) } // 头部匹配 if len(route.Match.Headers) > 0 { match.Headers = make(map[string]*istioapinetworkingv1.StringMatch) for key, value := range route.Match.Headers { match.Headers[key] = convertStringMatch(&value) } } httpRoute.Match = append(httpRoute.Match, match) } } } else { // 默认匹配所有路径 httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{ { Uri: &istioapinetworkingv1.StringMatch{ MatchType: &istioapinetworkingv1.StringMatch_Prefix{ Prefix: "/", }, }, }, } } // 配置超时 if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Timeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000 httpRoute.Timeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } // 配置重试 if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Retry != nil { retryOn := "5xx,gateway-error,connect-failure" if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 { retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",") } retry := &istioapinetworkingv1.HTTPRetry{ Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts, RetryOn: retryOn, } if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000 retry.PerTryTimeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } httpRoute.Retries = retry } vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{httpRoute} return nil } // convertStringMatch 将Application CRD的StringMatch转换为Istio API的StringMatch func convertStringMatch(match *applicationv1.StringMatch) *istioapinetworkingv1.StringMatch { if match == nil { return nil } result := &istioapinetworkingv1.StringMatch{} if match.Exact != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Exact{ Exact: match.Exact, } } else if match.Prefix != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Prefix{ Prefix: match.Prefix, } } else if match.Regex != "" { result.MatchType = &istioapinetworkingv1.StringMatch_Regex{ Regex: match.Regex, } } return result } // reconcileMesh 处理服务网格配置 func (r *ApplicationReconciler) reconcileMesh(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 注入Sidecar (如果需要) if app.Spec.NetworkPolicy.Mesh.Sidecar != nil && app.Spec.NetworkPolicy.Mesh.Sidecar.Inject { if err := r.ensureSidecarInjection(ctx, app); err != nil { return fmt.Errorf("failed to ensure sidecar injection: %w", err) } } // 协调东西向流量的VirtualService if err := r.reconcileMeshVirtualService(ctx, app); err != nil { return fmt.Errorf("failed to reconcile mesh VirtualService: %w", err) } // 协调DestinationRule (熔断器) if app.Spec.NetworkPolicy.Mesh.CircuitBreaker != nil { if err := r.reconcileDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to reconcile DestinationRule: %w", err) } } else { // 清理不再需要的DestinationRule if err := r.cleanupDestinationRule(ctx, app); err != nil { return fmt.Errorf("failed to cleanup DestinationRule: %w", err) } } logger.Info("Mesh resources reconciled", "name", app.Name) return nil } // ensureSidecarInjection 确保应用的工作负载启用了Sidecar注入 func (r *ApplicationReconciler) ensureSidecarInjection(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) // 检查应用类型,更新相应的工作负载 if app.Spec.Template.Type == "stateful" { // 更新StatefulSet statefulSet := &apps_v1.StatefulSet{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, statefulSet) if err == nil { // 设置注入注解 if statefulSet.Spec.Template.Annotations == nil { statefulSet.Spec.Template.Annotations = make(map[string]string) } statefulSet.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true" if err := r.Update(ctx, statefulSet); err != nil { return fmt.Errorf("failed to update StatefulSet for sidecar injection: %w", err) } logger.Info("Enabled sidecar injection for StatefulSet", "name", statefulSet.Name) } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get StatefulSet: %w", err) } } else { // 更新Deployment deployment := &apps_v1.Deployment{} err := r.Get(ctx, types.NamespacedName{ Name: app.Name, Namespace: app.Namespace, }, deployment) if err == nil { // 设置注入注解 if deployment.Spec.Template.Annotations == nil { deployment.Spec.Template.Annotations = make(map[string]string) } deployment.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true" if err := r.Update(ctx, deployment); err != nil { return fmt.Errorf("failed to update Deployment for sidecar injection: %w", err) } logger.Info("Enabled sidecar injection for Deployment", "name", deployment.Name) } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get Deployment: %w", err) } } return nil } // reconcileMeshVirtualService 处理服务网格内的VirtualService func (r *ApplicationReconciler) reconcileMeshVirtualService(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) vsName := app.Name + "-mesh-vs" // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping mesh VirtualService creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // 创建或更新VirtualService vs := &istionetworkingv1.VirtualService{ ObjectMeta: metav1.ObjectMeta{ Name: vsName, Namespace: app.Namespace, }, } op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error { // 设置控制器引用 if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil { return err } // 配置VirtualService vs.Spec.Hosts = []string{service.Name} vs.Spec.Gateways = []string{"mesh"} // 服务网格内部流量 // 创建HTTP路由 httpRoutes := []*istioapinetworkingv1.HTTPRoute{} // 如果定义了路由规则,使用它们 if len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 { for _, route := range app.Spec.NetworkPolicy.Mesh.Routes { httpRoute := &istioapinetworkingv1.HTTPRoute{} // 配置匹配条件 if route.Match != nil { match := &istioapinetworkingv1.HTTPMatchRequest{} // URI匹配 if route.Match.URI != nil { match.Uri = convertStringMatch(route.Match.URI) } // 方法匹配 if route.Match.Method != nil { match.Method = convertStringMatch(route.Match.Method) } // 头部匹配 if len(route.Match.Headers) > 0 { match.Headers = make(map[string]*istioapinetworkingv1.StringMatch) for key, value := range route.Match.Headers { match.Headers[key] = convertStringMatch(&value) } } httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{match} } // 配置目标 destination := &istioapinetworkingv1.HTTPRouteDestination{ Destination: &istioapinetworkingv1.Destination{ Host: route.Destination.Host, }, Weight: route.Weight, } if route.Destination.Subset != "" { destination.Destination.Subset = route.Destination.Subset } if route.Destination.Port > 0 { destination.Destination.Port = &istioapinetworkingv1.PortSelector{ Number: uint32(route.Destination.Port), } } httpRoute.Route = []*istioapinetworkingv1.HTTPRouteDestination{destination} httpRoutes = append(httpRoutes, httpRoute) } } else { // 默认路由 httpRoute := &istioapinetworkingv1.HTTPRoute{ Route: []*istioapinetworkingv1.HTTPRouteDestination{ { Destination: &istioapinetworkingv1.Destination{ Host: service.Name, }, }, }, } httpRoutes = append(httpRoutes, httpRoute) } // 配置超时 if app.Spec.NetworkPolicy.Mesh.Timeout > 0 { for _, route := range httpRoutes { seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000 route.Timeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } } // 配置重试 if app.Spec.NetworkPolicy.Mesh.Retry != nil { for _, route := range httpRoutes { retryOn := "5xx,gateway-error,connect-failure" if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 { retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",") } retry := &istioapinetworkingv1.HTTPRetry{ Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts, RetryOn: retryOn, } if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 { seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000 nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000 retry.PerTryTimeout = &durationpb.Duration{ Seconds: int64(seconds), Nanos: int32(nanos), } } route.Retries = retry } } // 配置故障注入(测试用途) if app.Spec.NetworkPolicy.Mesh.FaultInjection != nil { for _, route := range httpRoutes { fault := &istioapinetworkingv1.HTTPFaultInjection{} // 延迟故障 if app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay != nil { delayDuration := &durationpb.Duration{ Seconds: int64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay / 1000), Nanos: int32((app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay % 1000) * 1000000), } fault.Delay = &istioapinetworkingv1.HTTPFaultInjection_Delay{ HttpDelayType: &istioapinetworkingv1.HTTPFaultInjection_Delay_FixedDelay{ FixedDelay: delayDuration, }, Percentage: &istioapinetworkingv1.Percent{ Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.Percentage) / 100.0, }, } } // 中止故障 if app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort != nil { httpStatus := int32(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.HttpStatus) fault.Abort = &istioapinetworkingv1.HTTPFaultInjection_Abort{ ErrorType: &istioapinetworkingv1.HTTPFaultInjection_Abort_HttpStatus{ HttpStatus: httpStatus, }, Percentage: &istioapinetworkingv1.Percent{ Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.Percentage) / 100.0, }, } } route.Fault = fault } } vs.Spec.Http = httpRoutes return nil }) if err != nil { return fmt.Errorf("failed to create or update mesh VirtualService: %w", err) } logger.Info("Mesh VirtualService reconciled", "name", vsName, "operation", op) return nil } // reconcileDestinationRule 处理DestinationRule(熔断器) func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) drName := app.Name + "-dr" // 检查服务是否存在 serviceName := app.Name + "-svc" service := &core_v1.Service{} err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service) if errors.IsNotFound(err) { logger.Info("Service not found, skipping DestinationRule creation", "service", serviceName) return nil } else if err != nil { return fmt.Errorf("failed to get service: %w", err) } // Get + Create/Update DestinationRule existingDR := &istionetworkingv1.DestinationRule{} err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, existingDR) if errors.IsNotFound(err) { logger.Info("Creating new DestinationRule", "name", drName) newDR := &istionetworkingv1.DestinationRule{ObjectMeta: metav1.ObjectMeta{Name: drName, Namespace: app.Namespace}} newDR.Spec.Host = service.Name var cb *applicationv1.CircuitBreaker if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker } if cb == nil && app.Spec.TrafficPolicy != nil { cb = app.Spec.TrafficPolicy.CircuitBreaker } if cb != nil { connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, } maxEjectionPercent := uint32(100) if cb.MaxEjectionPercent > 0 { maxEjectionPercent = uint32(cb.MaxEjectionPercent) } outlierDetection := &istioapinetworkingv1.OutlierDetection{ ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent), } var loadBalancer *istioapinetworkingv1.LoadBalancerSettings if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) } else { loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} } newDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} } if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDR, r.Scheme); err != nil { return fmt.Errorf("failed to set controller reference: %w", err) } if err := r.Create(ctx, newDR); err != nil { return fmt.Errorf("failed to create DestinationRule: %w", err) } logger.Info("DestinationRule created", "name", drName) } else if err != nil { return fmt.Errorf("failed to get DestinationRule: %w", err) } else { existingDR.Spec.Host = service.Name var cb *applicationv1.CircuitBreaker if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker } if cb == nil && app.Spec.TrafficPolicy != nil { cb = app.Spec.TrafficPolicy.CircuitBreaker } if cb != nil { connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, } maxEjectionPercent := uint32(100) if cb.MaxEjectionPercent > 0 { maxEjectionPercent = uint32(cb.MaxEjectionPercent) } outlierDetection := &istioapinetworkingv1.OutlierDetection{ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent)} var loadBalancer *istioapinetworkingv1.LoadBalancerSettings if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) } else { loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} } existingDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} } else { existingDR.Spec.TrafficPolicy = nil } if err := r.Update(ctx, existingDR); err != nil { return fmt.Errorf("failed to update DestinationRule: %w", err) } logger.Info("DestinationRule updated", "name", drName) } return nil } // cleanupDestinationRule 清理不再需要的DestinationRule func (r *ApplicationReconciler) cleanupDestinationRule(ctx context.Context, app *applicationv1.Application) error { logger := log.FromContext(ctx) drName := app.Name + "-dr" dr := &istionetworkingv1.DestinationRule{} err := r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr) if err == nil { logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName) if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) { return fmt.Errorf("failed to delete DestinationRule: %w", err) } } else if !errors.IsNotFound(err) { return fmt.Errorf("failed to get DestinationRule: %w", err) } return nil } // 辅助函数 - 转换负载均衡设置 func convertLoadBalancerSettings(settings *applicationv1.LoadBalancerSettings) *istioapinetworkingv1.LoadBalancerSettings { result := &istioapinetworkingv1.LoadBalancerSettings{} if settings.ConsistentHash != nil { consistentHash := &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB{} if settings.ConsistentHash.HttpHeaderName != "" { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpHeaderName{ HttpHeaderName: settings.ConsistentHash.HttpHeaderName, } } else if settings.ConsistentHash.UseSourceIp { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{ UseSourceIp: true, } } else if settings.ConsistentHash.HttpCookie != nil { consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpCookie{ HttpCookie: &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HTTPCookie{ Name: settings.ConsistentHash.HttpCookie.Name, Path: settings.ConsistentHash.HttpCookie.Path, Ttl: &durationpb.Duration{ Seconds: int64(settings.ConsistentHash.HttpCookie.Ttl), }, }, } } result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHash{ ConsistentHash: consistentHash, } } else { // 简单负载均衡 var simpleType istioapinetworkingv1.LoadBalancerSettings_SimpleLB switch settings.Simple { case "LEAST_CONN": simpleType = istioapinetworkingv1.LoadBalancerSettings_LEAST_CONN case "RANDOM": simpleType = istioapinetworkingv1.LoadBalancerSettings_RANDOM case "PASSTHROUGH": simpleType = istioapinetworkingv1.LoadBalancerSettings_PASSTHROUGH default: simpleType = istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN } result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_Simple{ Simple: simpleType, } } return result } // 此处已删除重复的 reconcileIstioTraffic 函数实现,保留第139行的定义