diff --git a/modules/k8s/controller/application/application_controller.go b/modules/k8s/controller/application/application_controller.go index 4402d61db6..942602f615 100644 --- a/modules/k8s/controller/application/application_controller.go +++ b/modules/k8s/controller/application/application_controller.go @@ -807,9 +807,9 @@ func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { Owns(&apps_v1.StatefulSet{}). Owns(&core_v1.Service{}). // 添加对Istio资源的监控 - // Owns(&istionetworkingv1.VirtualService{}). - //Owns(&istionetworkingv1.Gateway{}). - //Owns(&istionetworkingv1.DestinationRule{}). + Owns(&istionetworkingv1.VirtualService{}). + Owns(&istionetworkingv1.Gateway{}). + Owns(&istionetworkingv1.DestinationRule{}). Complete(r) } @@ -1028,30 +1028,34 @@ func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *appli logger := log.FromContext(ctx) gatewayName := app.Name + "-gateway" - // 创建或更新Gateway资源 - gateway := &istionetworkingv1.Gateway{ - ObjectMeta: metav1.ObjectMeta{ - Name: gatewayName, - Namespace: app.Namespace, - }, - } - - op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, gateway, func() error { - // 设置控制器引用 - if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, gateway, r.Scheme); err != nil { - return err + // Get + Create/Update 以规避 CreateOrUpdate 对 protobuf 类型的反射补丁 + existingGateway := &istionetworkingv1.Gateway{} + err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}, existingGateway) + if errors.IsNotFound(err) { + logger.Info("Creating new Gateway", "name", gatewayName) + newGateway := &istionetworkingv1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: gatewayName, Namespace: app.Namespace}} + if err := r.configureGateway(newGateway, app); err != nil { + return fmt.Errorf("failed to configure Gateway: %w", err) } - - // 配置Gateway - return r.configureGateway(gateway, app) - }) - - if err != nil { - return fmt.Errorf("failed to create or update Gateway: %w", err) + if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newGateway, r.Scheme); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + if err := r.Create(ctx, newGateway); err != nil { + return fmt.Errorf("failed to create Gateway: %w", err) + } + logger.Info("Gateway created", "name", gatewayName) + } else if err != nil { + return fmt.Errorf("failed to get Gateway: %w", err) + } else { + if err := r.configureGateway(existingGateway, app); err != nil { + return fmt.Errorf("failed to configure Gateway: %w", err) + } + if err := r.Update(ctx, existingGateway); err != nil { + return fmt.Errorf("failed to update Gateway: %w", err) + } + logger.Info("Gateway updated", "name", gatewayName) } - logger.Info("Gateway reconciled", "name", gatewayName, "operation", op) - // 协调与Gateway关联的VirtualService if err := r.reconcileGatewayVirtualService(ctx, app); err != nil { return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err) @@ -1195,29 +1199,34 @@ func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Conte return fmt.Errorf("failed to get service: %w", err) } - // 创建或更新VirtualService - vs := &istionetworkingv1.VirtualService{ - ObjectMeta: metav1.ObjectMeta{ - Name: vsName, - Namespace: app.Namespace, - }, - } - - op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error { - // 设置控制器引用 - if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil { - return err + // Get + Create/Update VS + existingVS := &istionetworkingv1.VirtualService{} + err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, existingVS) + if errors.IsNotFound(err) { + logger.Info("Creating new Gateway VirtualService", "name", vsName) + newVS := &istionetworkingv1.VirtualService{ObjectMeta: metav1.ObjectMeta{Name: vsName, Namespace: app.Namespace}} + if err := r.configureGatewayVirtualService(newVS, app, service); err != nil { + return fmt.Errorf("failed to configure VirtualService: %w", err) } - - // 配置VirtualService - return r.configureGatewayVirtualService(vs, app, service) - }) - - if err != nil { - return fmt.Errorf("failed to create or update VirtualService: %w", err) + if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newVS, r.Scheme); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + if err := r.Create(ctx, newVS); err != nil { + return fmt.Errorf("failed to create VirtualService: %w", err) + } + logger.Info("Gateway VirtualService created", "name", vsName) + } else if err != nil { + return fmt.Errorf("failed to get VirtualService: %w", err) + } else { + if err := r.configureGatewayVirtualService(existingVS, app, service); err != nil { + return fmt.Errorf("failed to configure VirtualService: %w", err) + } + if err := r.Update(ctx, existingVS); err != nil { + return fmt.Errorf("failed to update VirtualService: %w", err) + } + logger.Info("Gateway VirtualService updated", "name", vsName) } - logger.Info("Gateway VirtualService reconciled", "name", vsName, "operation", op) return nil } @@ -1635,98 +1644,91 @@ func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, ap return fmt.Errorf("failed to get service: %w", err) } - // 创建或更新DestinationRule - dr := &istionetworkingv1.DestinationRule{ - ObjectMeta: metav1.ObjectMeta{ - Name: drName, - Namespace: app.Namespace, - }, - } - - op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, dr, func() error { - // 设置控制器引用 - if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, dr, r.Scheme); err != nil { - return err - } - - // 基本配置 - dr.Spec.Host = service.Name - - // 配置熔断器 + // Get + Create/Update DestinationRule + existingDR := &istionetworkingv1.DestinationRule{} + err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, existingDR) + if errors.IsNotFound(err) { + logger.Info("Creating new DestinationRule", "name", drName) + newDR := &istionetworkingv1.DestinationRule{ObjectMeta: metav1.ObjectMeta{Name: drName, Namespace: app.Namespace}} + newDR.Spec.Host = service.Name var cb *applicationv1.CircuitBreaker - - // 从NetworkPolicy.Mesh中获取熔断器配置 if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker } - - // 如果NetworkPolicy中没有,尝试从旧版TrafficPolicy中获取 if cb == nil && app.Spec.TrafficPolicy != nil { cb = app.Spec.TrafficPolicy.CircuitBreaker } - if cb != nil { - // 连接池设置 connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ - Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{ - Http1MaxPendingRequests: 100, - MaxRequestsPerConnection: 1, - }, - Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{ - MaxConnections: 100, - }, + Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, + Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, } - - // 异常检测(熔断器核心逻辑) maxEjectionPercent := uint32(100) if cb.MaxEjectionPercent > 0 { maxEjectionPercent = uint32(cb.MaxEjectionPercent) } - outlierDetection := &istioapinetworkingv1.OutlierDetection{ - ConsecutiveErrors: int32(cb.ConsecutiveErrors), - Interval: &durationpb.Duration{ - Seconds: 1, - }, - BaseEjectionTime: &durationpb.Duration{ - Seconds: int64(cb.BaseEjectionTime), - }, + ConsecutiveErrors: int32(cb.ConsecutiveErrors), + Interval: &durationpb.Duration{Seconds: 1}, + BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent), } - - // 负载均衡策略 var loadBalancer *istioapinetworkingv1.LoadBalancerSettings - - if app.Spec.NetworkPolicy != nil && - app.Spec.NetworkPolicy.Mesh != nil && - app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { + if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) - } else if app.Spec.TrafficPolicy != nil && - app.Spec.TrafficPolicy.LoadBalancer != nil { + } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) } else { - loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{ - LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{ - Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN, - }, - } - } - - dr.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ - ConnectionPool: connectionPool, - OutlierDetection: outlierDetection, - LoadBalancer: loadBalancer, + loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} } + newDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} } - - return nil - }) - - if err != nil { - return fmt.Errorf("failed to create or update DestinationRule: %w", err) + if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDR, r.Scheme); err != nil { + return fmt.Errorf("failed to set controller reference: %w", err) + } + if err := r.Create(ctx, newDR); err != nil { + return fmt.Errorf("failed to create DestinationRule: %w", err) + } + logger.Info("DestinationRule created", "name", drName) + } else if err != nil { + return fmt.Errorf("failed to get DestinationRule: %w", err) + } else { + existingDR.Spec.Host = service.Name + var cb *applicationv1.CircuitBreaker + if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil { + cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker + } + if cb == nil && app.Spec.TrafficPolicy != nil { + cb = app.Spec.TrafficPolicy.CircuitBreaker + } + if cb != nil { + connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{ + Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1}, + Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100}, + } + maxEjectionPercent := uint32(100) + if cb.MaxEjectionPercent > 0 { + maxEjectionPercent = uint32(cb.MaxEjectionPercent) + } + outlierDetection := &istioapinetworkingv1.OutlierDetection{ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent)} + var loadBalancer *istioapinetworkingv1.LoadBalancerSettings + if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil { + loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer) + } else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil { + loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer) + } else { + loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}} + } + existingDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer} + } else { + existingDR.Spec.TrafficPolicy = nil + } + if err := r.Update(ctx, existingDR); err != nil { + return fmt.Errorf("failed to update DestinationRule: %w", err) + } + logger.Info("DestinationRule updated", "name", drName) } - logger.Info("DestinationRule reconciled", "name", drName, "operation", op) return nil }