Retry on Gateway API resource status update
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
parent
173a18fdc1
commit
58dcbb43f9
6 changed files with 175 additions and 120 deletions
|
@ -207,13 +207,6 @@ func (s *K8sConformanceSuite) TestK8sGatewayAPIConformance() {
|
|||
features.SupportHTTPRoutePathRewrite,
|
||||
features.SupportHTTPRoutePathRedirect,
|
||||
),
|
||||
ExemptFeatures: sets.New(
|
||||
features.SupportHTTPRouteRequestTimeout,
|
||||
features.SupportHTTPRouteBackendTimeout,
|
||||
features.SupportHTTPRouteResponseHeaderModification,
|
||||
features.SupportHTTPRouteRequestMirror,
|
||||
features.SupportHTTPRouteRequestMultipleMirrors,
|
||||
),
|
||||
})
|
||||
require.NoError(s.T(), err)
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
kclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/util/retry"
|
||||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
gatev1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
|
||||
gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
|
@ -51,8 +52,8 @@ func (reh *resourceEventHandler) OnDelete(obj interface{}) {
|
|||
// The stores can then be accessed via the Get* functions.
|
||||
type Client interface {
|
||||
WatchAll(namespaces []string, stopCh <-chan struct{}) (<-chan interface{}, error)
|
||||
UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error
|
||||
UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error
|
||||
UpdateGatewayStatus(ctx context.Context, gateway ktypes.NamespacedName, status gatev1.GatewayStatus) error
|
||||
UpdateGatewayClassStatus(ctx context.Context, name string, condition metav1.Condition) error
|
||||
UpdateHTTPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1.HTTPRouteStatus) error
|
||||
UpdateTCPRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TCPRouteStatus) error
|
||||
UpdateTLSRouteStatus(ctx context.Context, route ktypes.NamespacedName, status gatev1alpha2.TLSRouteStatus) error
|
||||
|
@ -377,11 +378,18 @@ func (c *clientWrapper) ListGatewayClasses() ([]*gatev1.GatewayClass, error) {
|
|||
return c.factoryGatewayClass.Gateway().V1().GatewayClasses().Lister().List(labels.Everything())
|
||||
}
|
||||
|
||||
func (c *clientWrapper) UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayClass, condition metav1.Condition) error {
|
||||
gc := gatewayClass.DeepCopy()
|
||||
func (c *clientWrapper) UpdateGatewayClassStatus(ctx context.Context, name string, condition metav1.Condition) error {
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentGatewayClass, err := c.factoryGatewayClass.Gateway().V1().GatewayClasses().Lister().Get(name)
|
||||
if err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
currentGatewayClass = currentGatewayClass.DeepCopy()
|
||||
var newConditions []metav1.Condition
|
||||
for _, cond := range gc.Status.Conditions {
|
||||
for _, cond := range currentGatewayClass.Status.Conditions {
|
||||
// No update for identical condition.
|
||||
if cond.Type == condition.Type && cond.Status == condition.Status && cond.ObservedGeneration == condition.ObservedGeneration {
|
||||
return nil
|
||||
|
@ -395,35 +403,51 @@ func (c *clientWrapper) UpdateGatewayClassStatus(gatewayClass *gatev1.GatewayCla
|
|||
|
||||
// Append the condition to update.
|
||||
newConditions = append(newConditions, condition)
|
||||
gc.Status.Conditions = newConditions
|
||||
currentGatewayClass.Status.Conditions = newConditions
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err = c.csGateway.GatewayV1().GatewayClasses().UpdateStatus(ctx, currentGatewayClass, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := c.csGateway.GatewayV1().GatewayClasses().UpdateStatus(ctx, gc, metav1.UpdateOptions{})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update GatewayClass %q status: %w", gatewayClass.Name, err)
|
||||
return fmt.Errorf("failed to update GatewayClass %q status: %w", name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) UpdateGatewayStatus(gateway *gatev1.Gateway, gatewayStatus gatev1.GatewayStatus) error {
|
||||
func (c *clientWrapper) UpdateGatewayStatus(ctx context.Context, gateway ktypes.NamespacedName, status gatev1.GatewayStatus) error {
|
||||
if !c.isWatchedNamespace(gateway.Namespace) {
|
||||
return fmt.Errorf("cannot update Gateway status %s/%s: namespace is not within watched namespaces", gateway.Namespace, gateway.Name)
|
||||
}
|
||||
|
||||
if gatewayStatusEquals(gateway.Status, gatewayStatus) {
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentGateway, err := c.factoriesGateway[c.lookupNamespace(gateway.Namespace)].Gateway().V1().Gateways().Lister().Gateways(gateway.Namespace).Get(gateway.Name)
|
||||
if err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
if gatewayStatusEquals(currentGateway.Status, status) {
|
||||
return nil
|
||||
}
|
||||
|
||||
g := gateway.DeepCopy()
|
||||
g.Status = gatewayStatus
|
||||
currentGateway = currentGateway.DeepCopy()
|
||||
currentGateway.Status = status
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err = c.csGateway.GatewayV1().Gateways(gateway.Namespace).UpdateStatus(ctx, currentGateway, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
_, err := c.csGateway.GatewayV1().Gateways(gateway.Namespace).UpdateStatus(ctx, g, metav1.UpdateOptions{})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update Gateway %q status: %w", gateway.Name, err)
|
||||
}
|
||||
|
@ -436,9 +460,12 @@ func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes.
|
|||
return fmt.Errorf("updating HTTPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
|
||||
}
|
||||
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1().HTTPRoutes().Lister().HTTPRoutes(route.Namespace).Get(route.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting HTTPRoute %s/%s: %w", route.Namespace, route.Name, err)
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: keep statuses for gateways managed by other Traefik instances.
|
||||
|
@ -459,9 +486,18 @@ func (c *clientWrapper) UpdateHTTPRouteStatus(ctx context.Context, route ktypes.
|
|||
},
|
||||
}
|
||||
|
||||
if _, err := c.csGateway.GatewayV1().HTTPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("updating HTTPRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
if _, err = c.csGateway.GatewayV1().HTTPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update HTTPRoute %q status: %w", route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -470,9 +506,12 @@ func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.N
|
|||
return fmt.Errorf("updating TCPRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
|
||||
}
|
||||
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TCPRoutes().Lister().TCPRoutes(route.Namespace).Get(route.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting TCPRoute %s/%s: %w", route.Namespace, route.Name, err)
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: keep statuses for gateways managed by other Traefik instances.
|
||||
|
@ -493,9 +532,18 @@ func (c *clientWrapper) UpdateTCPRouteStatus(ctx context.Context, route ktypes.N
|
|||
},
|
||||
}
|
||||
|
||||
if _, err := c.csGateway.GatewayV1alpha2().TCPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("updating TCPRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
if _, err = c.csGateway.GatewayV1alpha2().TCPRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update TCPRoute %q status: %w", route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -504,9 +552,12 @@ func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.N
|
|||
return fmt.Errorf("updating TLSRoute status %s/%s: namespace is not within watched namespaces", route.Namespace, route.Name)
|
||||
}
|
||||
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
currentRoute, err := c.factoriesGateway[c.lookupNamespace(route.Namespace)].Gateway().V1alpha2().TLSRoutes().Lister().TLSRoutes(route.Namespace).Get(route.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting TLSRoute %s/%s: %w", route.Namespace, route.Name, err)
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: keep statuses for gateways managed by other Traefik instances.
|
||||
|
@ -527,9 +578,18 @@ func (c *clientWrapper) UpdateTLSRouteStatus(ctx context.Context, route ktypes.N
|
|||
},
|
||||
}
|
||||
|
||||
if _, err := c.csGateway.GatewayV1alpha2().TLSRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
return fmt.Errorf("updating TLSRoute %s/%s status: %w", route.Namespace, route.Name, err)
|
||||
if _, err = c.csGateway.GatewayV1alpha2().TLSRoutes(route.Namespace).UpdateStatus(ctx, currentRoute, metav1.UpdateOptions{}); err != nil {
|
||||
// We have to return err itself here (not wrapped inside another error)
|
||||
// so that RetryOnConflict can identify it correctly.
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update TLSRoute %q status: %w", route.Name, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -91,7 +91,7 @@ func (p *Provider) loadHTTPRoutes(ctx context.Context, gatewayListeners []gatewa
|
|||
},
|
||||
}
|
||||
if err := p.client.UpdateHTTPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, status); err != nil {
|
||||
logger.Error().
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update HTTPRoute status")
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
ktypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/utils/ptr"
|
||||
gatev1 "sigs.k8s.io/gateway-api/apis/v1"
|
||||
gatev1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
|
||||
|
@ -317,7 +318,7 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
|
||||
gatewayClassNames[gatewayClass.Name] = struct{}{}
|
||||
|
||||
err := p.client.UpdateGatewayClassStatus(gatewayClass, metav1.Condition{
|
||||
err := p.client.UpdateGatewayClassStatus(ctx, gatewayClass.Name, metav1.Condition{
|
||||
Type: string(gatev1.GatewayClassConditionStatusAccepted),
|
||||
Status: metav1.ConditionTrue,
|
||||
ObservedGeneration: gatewayClass.Generation,
|
||||
|
@ -327,7 +328,7 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
})
|
||||
if err != nil {
|
||||
log.Ctx(ctx).
|
||||
Error().
|
||||
Warn().
|
||||
Err(err).
|
||||
Str("gateway_class", gatewayClass.Name).
|
||||
Msg("Unable to update GatewayClass status")
|
||||
|
@ -370,17 +371,18 @@ func (p *Provider) loadConfigurationFromGateways(ctx context.Context) *dynamic.C
|
|||
}
|
||||
}
|
||||
|
||||
gatewayStatus, errG := p.makeGatewayStatus(gateway, listeners, addresses)
|
||||
if err = p.client.UpdateGatewayStatus(gateway, gatewayStatus); err != nil {
|
||||
gatewayStatus, err := p.makeGatewayStatus(gateway, listeners, addresses)
|
||||
if err != nil {
|
||||
logger.Error().
|
||||
Err(err).
|
||||
Msg("Unable to update Gateway status")
|
||||
}
|
||||
if errG != nil {
|
||||
logger.Error().
|
||||
Err(errG).
|
||||
Msg("Unable to create Gateway status")
|
||||
}
|
||||
|
||||
if err = p.client.UpdateGatewayStatus(ctx, ktypes.NamespacedName{Name: gateway.Name, Namespace: gateway.Namespace}, gatewayStatus); err != nil {
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update Gateway status")
|
||||
}
|
||||
}
|
||||
|
||||
return conf
|
||||
|
|
|
@ -80,7 +80,7 @@ func (p *Provider) loadTCPRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
},
|
||||
}
|
||||
if err := p.client.UpdateTCPRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil {
|
||||
logger.Error().
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update TCPRoute status")
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ func (p *Provider) loadTLSRoutes(ctx context.Context, gatewayListeners []gateway
|
|||
},
|
||||
}
|
||||
if err := p.client.UpdateTLSRouteStatus(ctx, ktypes.NamespacedName{Namespace: route.Namespace, Name: route.Name}, routeStatus); err != nil {
|
||||
logger.Error().
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update TLSRoute status")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue