From 58dcbb43f97644e62a3f4c4fcdb37b19e3c44ef7 Mon Sep 17 00:00:00 2001 From: Romain Date: Thu, 11 Jul 2024 11:26:03 +0200 Subject: [PATCH] Retry on Gateway API resource status update Co-authored-by: Kevin Pollet --- integration/k8s_conformance_test.go | 7 - pkg/provider/kubernetes/gateway/client.go | 262 +++++++++++------- pkg/provider/kubernetes/gateway/httproute.go | 2 +- pkg/provider/kubernetes/gateway/kubernetes.go | 20 +- pkg/provider/kubernetes/gateway/tcproute.go | 2 +- pkg/provider/kubernetes/gateway/tlsroute.go | 2 +- 6 files changed, 175 insertions(+), 120 deletions(-) diff --git a/integration/k8s_conformance_test.go b/integration/k8s_conformance_test.go index 562bdcbe8..2768ce99e 100644 --- a/integration/k8s_conformance_test.go +++ b/integration/k8s_conformance_test.go @@ -207,13 +207,6 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() { features.SupportHTTPRoutePathRewrite, features.SupportHTTPRoutePathRedirect, ), - ExemptFeatures: sets.New( - features.SupportHTTPRouteRequestTimeout, - features.SupportHTTPRouteBackendTimeout, - features.SupportHTTPRouteResponseHeaderModification, - features.SupportHTTPRouteRequestMirror, - features.SupportHTTPRouteRequestMultipleMirrors, - ), }) require.NoError(s.T(), err) diff --git a/pkg/provider/kubernetes/gateway/client.go b/pkg/provider/kubernetes/gateway/client.go index e91fc949f..ce5619b72 100644 --- a/pkg/provider/kubernetes/gateway/client.go +++ b/pkg/provider/kubernetes/gateway/client.go @@ -21,6 +21,7 @@ import ( kclientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/retry" gatev1 "sigs.k8s.io/gateway-api/apis/v1" gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -51,8 +52,8 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) { // The stores can then be accessed via the Get* functions. type Client interface { WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error) - UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error - UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error + UpdateGatewayStatus(ctx context.Context, gateway ktypes.NamespacedName, status gatev1.GatewayStatus) error + UpdateGatewayClassStatus(ctx context.Context, name string, condition metav1.Condition) error UpdateHTTPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error UpdateTCPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TCPRouteStatus) error UpdateTLSRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TLSRouteStatus) error @@ -377,53 +378,76 @@ func (c *clientWrapper) ListGatewayClasses() ([]*gatev1.GatewayClass, error) { return c.factoryGatewayClass.Gateway().V1().GatewayClasses().Lister().List(labels.Everything()) } -func (c *clientWrapper) UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error { - gc := gatewayClass.DeepCopy() - - var newConditions []metav1.Condition - for _, cond := range gc.Status.Conditions { - // No update for identical condition. - if cond.Type == condition.Type && cond.Status == condition.Status && cond.ObservedGeneration == condition.ObservedGeneration { - return nil +func (c *clientWrapper) UpdateGatewayClassStatus(ctx context.Context, name string, condition metav1.Condition) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentGatewayClass, err := c.factoryGatewayClass.Gateway().V1().GatewayClasses().Lister().Get(name) + if err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err } - // Keep other condition types. - if cond.Type != condition.Type { - newConditions = append(newConditions, cond) + currentGatewayClass = currentGatewayClass.DeepCopy() + var newConditions []metav1.Condition + for _, cond := range currentGatewayClass.Status.Conditions { + // No update for identical condition. + if cond.Type == condition.Type && cond.Status == condition.Status && cond.ObservedGeneration == condition.ObservedGeneration { + return nil + } + + // Keep other condition types. + if cond.Type != condition.Type { + newConditions = append(newConditions, cond) + } } - } - // Append the condition to update. - newConditions = append(newConditions, condition) - gc.Status.Conditions = newConditions + // Append the condition to update. + newConditions = append(newConditions, condition) + currentGatewayClass.Status.Conditions = newConditions - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + if _, err = c.csGateway.GatewayV1().GatewayClasses().UpdateStatus(ctx, currentGatewayClass, metav1.UpdateOptions{}); err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } - _, err := c.csGateway.GatewayV1().GatewayClasses().UpdateStatus(ctx, gc, metav1.UpdateOptions{}) + return nil + }) if err != nil { - return fmt.Errorf("failed to update GatewayClass %q status: %w", gatewayClass.Name, err) + return fmt.Errorf("failed to update GatewayClass %q status: %w", name, err) } return nil } -func (c *clientWrapper) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error { +func (c *clientWrapper) UpdateGatewayStatus(ctx context.Context, gateway ktypes.NamespacedName, status gatev1.GatewayStatus) error { if !c.isWatchedNamespace(gateway.Namespace) { return fmt.Errorf("cannot update Gateway status %s/%s: namespace is not within watched namespaces", gateway.Namespace, gateway.Name) } - if gatewayStatusEquals(gateway.Status, gatewayStatus) { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentGateway, err := c.factoriesGateway[c.lookupNamespace(gateway.Namespace)].Gateway().V1().Gateways().Lister().Gateways(gateway.Namespace).Get(gateway.Name) + if err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + + if gatewayStatusEquals(currentGateway.Status, status) { + return nil + } + + currentGateway = currentGateway.DeepCopy() + currentGateway.Status = status + + if _, err = c.csGateway.GatewayV1().Gateways(gateway.Namespace).UpdateStatus(ctx, currentGateway, metav1.UpdateOptions{}); err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + return nil - } - - g := gateway.DeepCopy() - g.Status = gatewayStatus - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - _, err := c.csGateway.GatewayV1().Gateways(gateway.Namespace).UpdateStatus(ctx, g, metav1.UpdateOptions{}) + }) if err != nil { return fmt.Errorf("failed to update Gateway %q status: %w", gateway.Name, err) } @@ -436,32 +460,44 @@ func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes. return fmt.Errorf("updating HTTPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name) } - currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(route.Namespace).Get(route.Name) - if err != nil { - return fmt.Errorf("getting HTTPRoute %s/%s: %w", route.Namespace, route.Name, err) - } - - // TODO: keep statuses for gateways managed by other Traefik instances. - var parentStatuses []gatev1.RouteParentStatus - for _, currentParentStatus := range currentRoute.Status.Parents { - if currentParentStatus.ControllerName != controllerName { - parentStatuses = append(parentStatuses, currentParentStatus) - continue + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(route.Namespace).Get(route.Name) + if err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err } + + // TODO: keep statuses for gateways managed by other Traefik instances. + var parentStatuses []gatev1.RouteParentStatus + for _, currentParentStatus := range currentRoute.Status.Parents { + if currentParentStatus.ControllerName != controllerName { + parentStatuses = append(parentStatuses, currentParentStatus) + continue + } + } + + parentStatuses = append(parentStatuses, status.Parents...) + + currentRoute = currentRoute.DeepCopy() + currentRoute.Status = gatev1.HTTPRouteStatus{ + RouteStatus: gatev1.RouteStatus{ + Parents: parentStatuses, + }, + } + + if _, err = c.csGateway.GatewayV1().HTTPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to update HTTPRoute %q status: %w", route.Name, err) } - parentStatuses = append(parentStatuses, status.Parents...) - - currentRoute = currentRoute.DeepCopy() - currentRoute.Status = gatev1.HTTPRouteStatus{ - RouteStatus: gatev1.RouteStatus{ - Parents: parentStatuses, - }, - } - - if _, err := c.csGateway.GatewayV1().HTTPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("updating HTTPRoute %s/%s status: %w", route.Namespace, route.Name, err) - } return nil } @@ -470,32 +506,44 @@ func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.N return fmt.Errorf("updating TCPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name) } - currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TCPRoutes().Lister().TCPRoutes(route.Namespace).Get(route.Name) - if err != nil { - return fmt.Errorf("getting TCPRoute %s/%s: %w", route.Namespace, route.Name, err) - } - - // TODO: keep statuses for gateways managed by other Traefik instances. - var parentStatuses []gatev1alpha2.RouteParentStatus - for _, currentParentStatus := range currentRoute.Status.Parents { - if currentParentStatus.ControllerName != controllerName { - parentStatuses = append(parentStatuses, currentParentStatus) - continue + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TCPRoutes().Lister().TCPRoutes(route.Namespace).Get(route.Name) + if err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err } + + // TODO: keep statuses for gateways managed by other Traefik instances. + var parentStatuses []gatev1alpha2.RouteParentStatus + for _, currentParentStatus := range currentRoute.Status.Parents { + if currentParentStatus.ControllerName != controllerName { + parentStatuses = append(parentStatuses, currentParentStatus) + continue + } + } + + parentStatuses = append(parentStatuses, status.Parents...) + + currentRoute = currentRoute.DeepCopy() + currentRoute.Status = gatev1alpha2.TCPRouteStatus{ + RouteStatus: gatev1.RouteStatus{ + Parents: parentStatuses, + }, + } + + if _, err = c.csGateway.GatewayV1alpha2().TCPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to update TCPRoute %q status: %w", route.Name, err) } - parentStatuses = append(parentStatuses, status.Parents...) - - currentRoute = currentRoute.DeepCopy() - currentRoute.Status = gatev1alpha2.TCPRouteStatus{ - RouteStatus: gatev1.RouteStatus{ - Parents: parentStatuses, - }, - } - - if _, err := c.csGateway.GatewayV1alpha2().TCPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("updating TCPRoute %s/%s status: %w", route.Namespace, route.Name, err) - } return nil } @@ -504,32 +552,44 @@ func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.N return fmt.Errorf("updating TLSRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name) } - currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TLSRoutes().Lister().TLSRoutes(route.Namespace).Get(route.Name) - if err != nil { - return fmt.Errorf("getting TLSRoute %s/%s: %w", route.Namespace, route.Name, err) - } - - // TODO: keep statuses for gateways managed by other Traefik instances. - var parentStatuses []gatev1alpha2.RouteParentStatus - for _, currentParentStatus := range currentRoute.Status.Parents { - if currentParentStatus.ControllerName != controllerName { - parentStatuses = append(parentStatuses, currentParentStatus) - continue + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TLSRoutes().Lister().TLSRoutes(route.Namespace).Get(route.Name) + if err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err } + + // TODO: keep statuses for gateways managed by other Traefik instances. + var parentStatuses []gatev1alpha2.RouteParentStatus + for _, currentParentStatus := range currentRoute.Status.Parents { + if currentParentStatus.ControllerName != controllerName { + parentStatuses = append(parentStatuses, currentParentStatus) + continue + } + } + + parentStatuses = append(parentStatuses, status.Parents...) + + currentRoute = currentRoute.DeepCopy() + currentRoute.Status = gatev1alpha2.TLSRouteStatus{ + RouteStatus: gatev1.RouteStatus{ + Parents: parentStatuses, + }, + } + + if _, err = c.csGateway.GatewayV1alpha2().TLSRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { + // We have to return err itself here (not wrapped inside another error) + // so that RetryOnConflict can identify it correctly. + return err + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to update TLSRoute %q status: %w", route.Name, err) } - parentStatuses = append(parentStatuses, status.Parents...) - - currentRoute = currentRoute.DeepCopy() - currentRoute.Status = gatev1alpha2.TLSRouteStatus{ - RouteStatus: gatev1.RouteStatus{ - Parents: parentStatuses, - }, - } - - if _, err := c.csGateway.GatewayV1alpha2().TLSRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil { - return fmt.Errorf("updating TLSRoute %s/%s status: %w", route.Namespace, route.Name, err) - } return nil } diff --git a/pkg/provider/kubernetes/gateway/httproute.go b/pkg/provider/kubernetes/gateway/httproute.go index 535c4a1b7..fc6c9cf1b 100644 --- a/pkg/provider/kubernetes/gateway/httproute.go +++ b/pkg/provider/kubernetes/gateway/httproute.go @@ -91,7 +91,7 @@ func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewa }, } if err := p.client.UpdateHTTPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, status); err != nil { - logger.Error(). + logger.Warn(). Err(err). Msg("Unable to update HTTPRoute status") } diff --git a/pkg/provider/kubernetes/gateway/kubernetes.go b/pkg/provider/kubernetes/gateway/kubernetes.go index 6af304e0c..ce1e6d579 100644 --- a/pkg/provider/kubernetes/gateway/kubernetes.go +++ b/pkg/provider/kubernetes/gateway/kubernetes.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + ktypes "k8s.io/apimachinery/pkg/types" "k8s.io/utils/ptr" gatev1 "sigs.k8s.io/gateway-api/apis/v1" gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" @@ -317,7 +318,7 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C gatewayClassNames[gatewayClass.Name] = struct{}{} - err := p.client.UpdateGatewayClassStatus(gatewayClass, metav1.Condition{ + err := p.client.UpdateGatewayClassStatus(ctx, gatewayClass.Name, metav1.Condition{ Type: string(gatev1.GatewayClassConditionStatusAccepted), Status: metav1.ConditionTrue, ObservedGeneration: gatewayClass.Generation, @@ -327,7 +328,7 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C }) if err != nil { log.Ctx(ctx). - Error(). + Warn(). Err(err). Str("gateway_class", gatewayClass.Name). Msg("Unable to update GatewayClass status") @@ -370,17 +371,18 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C } } - gatewayStatus, errG := p.makeGatewayStatus(gateway, listeners, addresses) - if err = p.client.UpdateGatewayStatus(gateway, gatewayStatus); err != nil { + gatewayStatus, err := p.makeGatewayStatus(gateway, listeners, addresses) + if err != nil { logger.Error(). Err(err). - Msg("Unable to update Gateway status") - } - if errG != nil { - logger.Error(). - Err(errG). Msg("Unable to create Gateway status") } + + if err = p.client.UpdateGatewayStatus(ctx, ktypes.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}, gatewayStatus); err != nil { + logger.Warn(). + Err(err). + Msg("Unable to update Gateway status") + } } return conf diff --git a/pkg/provider/kubernetes/gateway/tcproute.go b/pkg/provider/kubernetes/gateway/tcproute.go index c74355d6f..89189ded4 100644 --- a/pkg/provider/kubernetes/gateway/tcproute.go +++ b/pkg/provider/kubernetes/gateway/tcproute.go @@ -80,7 +80,7 @@ func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gateway }, } if err := p.client.UpdateTCPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil { - logger.Error(). + logger.Warn(). Err(err). Msg("Unable to update TCPRoute status") } diff --git a/pkg/provider/kubernetes/gateway/tlsroute.go b/pkg/provider/kubernetes/gateway/tlsroute.go index 131fb1d92..087b09e83 100644 --- a/pkg/provider/kubernetes/gateway/tlsroute.go +++ b/pkg/provider/kubernetes/gateway/tlsroute.go @@ -82,7 +82,7 @@ func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gateway }, } if err := p.client.UpdateTLSRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil { - logger.Error(). + logger.Warn(). Err(err). Msg("Unable to update TLSRoute status") }