将istio资源的监控加入application-controller

This commit is contained in:
panshuxiao
2025-09-16 17:19:24 +08:00
repo.diff.parent c0e4044dab
repo.diff.commit c0fbbfb618

repo.diff.view_file

@@ -807,9 +807,9 @@ func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
Owns(&apps_v1.StatefulSet{}).
Owns(&core_v1.Service{}).
// 添加对Istio资源的监控
// Owns(&istionetworkingv1.VirtualService{}).
//Owns(&istionetworkingv1.Gateway{}).
//Owns(&istionetworkingv1.DestinationRule{}).
Owns(&istionetworkingv1.VirtualService{}).
Owns(&istionetworkingv1.Gateway{}).
Owns(&istionetworkingv1.DestinationRule{}).
Complete(r)
}
@@ -1028,30 +1028,34 @@ func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *appli
logger := log.FromContext(ctx)
gatewayName := app.Name + "-gateway"
// 创建或更新Gateway资源
gateway := &istionetworkingv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: gatewayName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, gateway, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, gateway, r.Scheme); err != nil {
return err
// Get + Create/Update 以规避 CreateOrUpdate 对 protobuf 类型的反射补丁
existingGateway := &istionetworkingv1.Gateway{}
err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}, existingGateway)
if errors.IsNotFound(err) {
logger.Info("Creating new Gateway", "name", gatewayName)
newGateway := &istionetworkingv1.Gateway{ObjectMeta: metav1.ObjectMeta{Name: gatewayName, Namespace: app.Namespace}}
if err := r.configureGateway(newGateway, app); err != nil {
return fmt.Errorf("failed to configure Gateway: %w", err)
}
// 配置Gateway
return r.configureGateway(gateway, app)
})
if err != nil {
return fmt.Errorf("failed to create or update Gateway: %w", err)
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newGateway, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newGateway); err != nil {
return fmt.Errorf("failed to create Gateway: %w", err)
}
logger.Info("Gateway created", "name", gatewayName)
} else if err != nil {
return fmt.Errorf("failed to get Gateway: %w", err)
} else {
if err := r.configureGateway(existingGateway, app); err != nil {
return fmt.Errorf("failed to configure Gateway: %w", err)
}
if err := r.Update(ctx, existingGateway); err != nil {
return fmt.Errorf("failed to update Gateway: %w", err)
}
logger.Info("Gateway updated", "name", gatewayName)
}
logger.Info("Gateway reconciled", "name", gatewayName, "operation", op)
// 协调与Gateway关联的VirtualService
if err := r.reconcileGatewayVirtualService(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err)
@@ -1195,29 +1199,34 @@ func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Conte
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新VirtualService
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
// Get + Create/Update VS
existingVS := &istionetworkingv1.VirtualService{}
err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, existingVS)
if errors.IsNotFound(err) {
logger.Info("Creating new Gateway VirtualService", "name", vsName)
newVS := &istionetworkingv1.VirtualService{ObjectMeta: metav1.ObjectMeta{Name: vsName, Namespace: app.Namespace}}
if err := r.configureGatewayVirtualService(newVS, app, service); err != nil {
return fmt.Errorf("failed to configure VirtualService: %w", err)
}
// 配置VirtualService
return r.configureGatewayVirtualService(vs, app, service)
})
if err != nil {
return fmt.Errorf("failed to create or update VirtualService: %w", err)
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newVS, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newVS); err != nil {
return fmt.Errorf("failed to create VirtualService: %w", err)
}
logger.Info("Gateway VirtualService created", "name", vsName)
} else if err != nil {
return fmt.Errorf("failed to get VirtualService: %w", err)
} else {
if err := r.configureGatewayVirtualService(existingVS, app, service); err != nil {
return fmt.Errorf("failed to configure VirtualService: %w", err)
}
if err := r.Update(ctx, existingVS); err != nil {
return fmt.Errorf("failed to update VirtualService: %w", err)
}
logger.Info("Gateway VirtualService updated", "name", vsName)
}
logger.Info("Gateway VirtualService reconciled", "name", vsName, "operation", op)
return nil
}
@@ -1635,98 +1644,91 @@ func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, ap
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新DestinationRule
dr := &istionetworkingv1.DestinationRule{
ObjectMeta: metav1.ObjectMeta{
Name: drName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, dr, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, dr, r.Scheme); err != nil {
return err
}
// 基本配置
dr.Spec.Host = service.Name
// 配置熔断器
// Get + Create/Update DestinationRule
existingDR := &istionetworkingv1.DestinationRule{}
err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, existingDR)
if errors.IsNotFound(err) {
logger.Info("Creating new DestinationRule", "name", drName)
newDR := &istionetworkingv1.DestinationRule{ObjectMeta: metav1.ObjectMeta{Name: drName, Namespace: app.Namespace}}
newDR.Spec.Host = service.Name
var cb *applicationv1.CircuitBreaker
// 从NetworkPolicy.Mesh中获取熔断器配置
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil {
cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker
}
// 如果NetworkPolicy中没有尝试从旧版TrafficPolicy中获取
if cb == nil && app.Spec.TrafficPolicy != nil {
cb = app.Spec.TrafficPolicy.CircuitBreaker
}
if cb != nil {
// 连接池设置
connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{
Http1MaxPendingRequests: 100,
MaxRequestsPerConnection: 1,
},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{
MaxConnections: 100,
},
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100},
}
// 异常检测(熔断器核心逻辑)
maxEjectionPercent := uint32(100)
if cb.MaxEjectionPercent > 0 {
maxEjectionPercent = uint32(cb.MaxEjectionPercent)
}
outlierDetection := &istioapinetworkingv1.OutlierDetection{
ConsecutiveErrors: int32(cb.ConsecutiveErrors),
Interval: &durationpb.Duration{
Seconds: 1,
},
BaseEjectionTime: &durationpb.Duration{
Seconds: int64(cb.BaseEjectionTime),
},
ConsecutiveErrors: int32(cb.ConsecutiveErrors),
Interval: &durationpb.Duration{Seconds: 1},
BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)},
MaxEjectionPercent: int32(maxEjectionPercent),
}
// 负载均衡策略
var loadBalancer *istioapinetworkingv1.LoadBalancerSettings
if app.Spec.NetworkPolicy != nil &&
app.Spec.NetworkPolicy.Mesh != nil &&
app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer)
} else if app.Spec.TrafficPolicy != nil &&
app.Spec.TrafficPolicy.LoadBalancer != nil {
} else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer)
} else {
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{
LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{
Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN,
},
}
}
dr.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{
ConnectionPool: connectionPool,
OutlierDetection: outlierDetection,
LoadBalancer: loadBalancer,
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}}
}
newDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update DestinationRule: %w", err)
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDR, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newDR); err != nil {
return fmt.Errorf("failed to create DestinationRule: %w", err)
}
logger.Info("DestinationRule created", "name", drName)
} else if err != nil {
return fmt.Errorf("failed to get DestinationRule: %w", err)
} else {
existingDR.Spec.Host = service.Name
var cb *applicationv1.CircuitBreaker
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil {
cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker
}
if cb == nil && app.Spec.TrafficPolicy != nil {
cb = app.Spec.TrafficPolicy.CircuitBreaker
}
if cb != nil {
connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100},
}
maxEjectionPercent := uint32(100)
if cb.MaxEjectionPercent > 0 {
maxEjectionPercent = uint32(cb.MaxEjectionPercent)
}
outlierDetection := &istioapinetworkingv1.OutlierDetection{ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent)}
var loadBalancer *istioapinetworkingv1.LoadBalancerSettings
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer)
} else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer)
} else {
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}}
}
existingDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer}
} else {
existingDR.Spec.TrafficPolicy = nil
}
if err := r.Update(ctx, existingDR); err != nil {
return fmt.Errorf("failed to update DestinationRule: %w", err)
}
logger.Info("DestinationRule updated", "name", drName)
}
logger.Info("DestinationRule reconciled", "name", drName, "operation", op)
return nil
}