Enable Ingress Status updates

This commit is contained in:
Daniel Tomcej 2018-05-18 06:12:03 -06:00 committed by Traefiker Bot
parent c37b040217
commit 9227d32d57
6 changed files with 289 additions and 14 deletions

View file

@ -81,6 +81,20 @@ See also [Kubernetes user guide](/user-guide/kubernetes).
# Default: <built-in template> # Default: <built-in template>
# #
# filename = "kubernetes.tmpl" # filename = "kubernetes.tmpl"
# Enable IngressEndpoint configuration.
# This will allow Traefik to update the status section of ingress objects, if desired.
#
# Optional
#
# [kubernetes.ingressEndpoint]
#
# At least one must be configured.
# `publishedservice` will override the `hostname` and `ip` settings if configured.
#
# hostname = "localhost"
# ip = "127.0.0.1"
# publishedService = "namespace/servicename"
``` ```
### `endpoint` ### `endpoint`
@ -105,6 +119,12 @@ A label selector can be defined to filter on specific Ingress objects only.
See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details. See [label-selectors](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors) for details.
### `ingressEndpoint`
You can configure a static hostname or IP address that Traefik will add to the status section of Ingress objects that it manages.
If you prefer, you can provide a service, which traefik will copy the status spec from.
This will give more flexibility in cloud/dynamic environments.
### TLS communication between Traefik and backend pods ### TLS communication between Traefik and backend pods
Traefik automatically requests endpoint information based on the service provided in the ingress spec. Traefik automatically requests endpoint information based on the service provided in the ingress spec.

View file

@ -45,12 +45,36 @@ func sAnnotation(name string, value string) func(*corev1.Service) {
} }
func sSpec(opts ...func(*corev1.ServiceSpec)) func(*corev1.Service) { func sSpec(opts ...func(*corev1.ServiceSpec)) func(*corev1.Service) {
return func(i *corev1.Service) { return func(s *corev1.Service) {
spec := &corev1.ServiceSpec{} spec := &corev1.ServiceSpec{}
for _, opt := range opts { for _, opt := range opts {
opt(spec) opt(spec)
} }
i.Spec = *spec s.Spec = *spec
}
}
func sLoadBalancerStatus(opts ...func(*corev1.LoadBalancerStatus)) func(service *corev1.Service) {
return func(s *corev1.Service) {
loadBalancer := &corev1.LoadBalancerStatus{}
for _, opt := range opts {
if opt != nil {
opt(loadBalancer)
}
}
s.Status = corev1.ServiceStatus{
LoadBalancer: *loadBalancer,
}
}
}
func sLoadBalancerIngress(ip string, hostname string) func(*corev1.LoadBalancerStatus) {
return func(status *corev1.LoadBalancerStatus) {
ingress := corev1.LoadBalancerIngress{
IP: ip,
Hostname: hostname,
}
status.Ingress = append(status.Ingress, ingress)
} }
} }

View file

@ -44,6 +44,7 @@ type Client interface {
GetService(namespace, name string) (*corev1.Service, bool, error) GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error) GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
UpdateIngressStatus(namespace, name, ip, hostname string) error
} }
type clientImpl struct { type clientImpl struct {
@ -166,6 +167,35 @@ func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress {
return result return result
} }
// UpdateIngressStatus updates an Ingress with a provided status.
func (c *clientImpl) UpdateIngressStatus(namespace, name, ip, hostname string) error {
keyName := namespace + "/" + name
item, exists, err := c.factories[c.lookupNamespace(namespace)].Extensions().V1beta1().Ingresses().Informer().GetStore().GetByKey(keyName)
if err != nil {
return fmt.Errorf("failed to get ingress %s with error: %v", keyName, err)
}
if !exists {
return fmt.Errorf("failed to update ingress %s because it does not exist", keyName)
}
ing := item.(*extensionsv1beta1.Ingress)
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing.Status.LoadBalancer.Ingress[0].IP == ip {
// If status is already set, skip update
log.Debugf("Skipping status update on ingress %s/%s", ing.Namespace, ing.Name)
} else {
ingCopy := ing.DeepCopy()
ingCopy.Status = extensionsv1beta1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: ip, Hostname: hostname}}}}
_, err := c.clientset.ExtensionsV1beta1().Ingresses(ingCopy.Namespace).UpdateStatus(ingCopy)
if err != nil {
return fmt.Errorf("failed to update ingress status %s with error: %v", keyName, err)
}
log.Infof("Updated status on ingress %s", keyName)
}
return nil
}
// GetService returns the named service from the given namespace. // GetService returns the named service from the given namespace.
func (c *clientImpl) GetService(namespace, name string) (*corev1.Service, bool, error) { func (c *clientImpl) GetService(namespace, name string) (*corev1.Service, bool, error) {
var service *corev1.Service var service *corev1.Service

View file

@ -12,9 +12,10 @@ type clientMock struct {
endpoints []*corev1.Endpoints endpoints []*corev1.Endpoints
watchChan chan interface{} watchChan chan interface{}
apiServiceError error apiServiceError error
apiSecretError error apiSecretError error
apiEndpointsError error apiEndpointsError error
apiIngressStatusError error
} }
func (c clientMock) GetIngresses() []*extensionsv1beta1.Ingress { func (c clientMock) GetIngresses() []*extensionsv1beta1.Ingress {
@ -31,7 +32,7 @@ func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, e
return service, true, nil return service, true, nil
} }
} }
return nil, false, nil return nil, false, c.apiServiceError
} }
func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) { func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
@ -64,3 +65,7 @@ func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, err
func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) { func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil return c.watchChan, nil
} }
func (c clientMock) UpdateIngressStatus(namespace, name, ip, hostname string) error {
return c.apiIngressStatusError
}

View file

@ -36,17 +36,25 @@ const (
traefikDefaultIngressClass = "traefik" traefikDefaultIngressClass = "traefik"
) )
// IngressEndpoint holds the endpoint information for the Kubernetes provider
type IngressEndpoint struct {
IP string `description:"IP used for Kubernetes Ingress endpoints"`
Hostname string `description:"Hostname used for Kubernetes Ingress endpoints"`
PublishedService string `description:"Published Kubernetes Service to copy status from"`
}
// Provider holds configurations of the provider. // Provider holds configurations of the provider.
type Provider struct { type Provider struct {
provider.BaseProvider `mapstructure:",squash" export:"true"` provider.BaseProvider `mapstructure:",squash" export:"true"`
Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"` Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"`
Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"` Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"`
CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"` CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"` DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"`
EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"` EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"`
Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"` Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"`
LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"` LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"`
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"` IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"`
IngressEndpoint *IngressEndpoint `description:"Kubernetes Ingress Endpoint"`
lastConfiguration safe.Safe lastConfiguration safe.Safe
} }
@ -125,10 +133,12 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
return nil return nil
case event := <-eventsChan: case event := <-eventsChan:
log.Debugf("Received Kubernetes event kind %T", event) log.Debugf("Received Kubernetes event kind %T", event)
templateObjects, err := p.loadIngresses(k8sClient) templateObjects, err := p.loadIngresses(k8sClient)
if err != nil { if err != nil {
return err return err
} }
if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) { if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping Kubernetes event kind %T", event) log.Debugf("Skipping Kubernetes event kind %T", event)
} else { } else {
@ -326,10 +336,53 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
} }
} }
} }
err = p.updateIngressStatus(i, k8sClient)
if err != nil {
log.Errorf("Cannot update Ingress %s/%s due to error: %v", i.Namespace, i.Name, err)
}
} }
return &templateObjects, nil return &templateObjects, nil
} }
func (p *Provider) updateIngressStatus(i *extensionsv1beta1.Ingress, k8sClient Client) error {
// Only process if an IngressEndpoint has been configured
if p.IngressEndpoint == nil {
return nil
}
if len(p.IngressEndpoint.PublishedService) == 0 {
if len(p.IngressEndpoint.IP) == 0 && len(p.IngressEndpoint.Hostname) == 0 {
return errors.New("publishedService or ip or hostname must be defined")
}
return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, p.IngressEndpoint.IP, p.IngressEndpoint.Hostname)
}
serviceInfo := strings.Split(p.IngressEndpoint.PublishedService, "/")
if len(serviceInfo) != 2 {
return fmt.Errorf("invalid publishedService format (expected 'namespace/service' format): %s", p.IngressEndpoint.PublishedService)
}
serviceNamespace, serviceName := serviceInfo[0], serviceInfo[1]
service, exists, err := k8sClient.GetService(serviceNamespace, serviceName)
if err != nil {
return fmt.Errorf("cannot get service %s, received error: %s", p.IngressEndpoint.PublishedService, err)
}
if exists && service.Status.LoadBalancer.Ingress == nil {
// service exists, but has no Load Balancer status
log.Debugf("Skipping updating Ingress %s/%s due to service %s having no status set", i.Namespace, i.Name, p.IngressEndpoint.PublishedService)
return nil
}
if !exists {
return fmt.Errorf("missing service: %s", p.IngressEndpoint.PublishedService)
}
return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname)
}
func (p *Provider) loadConfig(templateObjects types.Configuration) *types.Configuration { func (p *Provider) loadConfig(templateObjects types.Configuration) *types.Configuration {
var FuncMap = template.FuncMap{} var FuncMap = template.FuncMap{}
configuration, err := p.GetConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects) configuration, err := p.GetConfiguration("templates/kubernetes.tmpl", FuncMap, templateObjects)

View file

@ -2020,3 +2020,146 @@ func TestMultiPortServices(t *testing.T) {
assert.Equal(t, expected, actual) assert.Equal(t, expected, actual)
} }
func TestProviderUpdateIngressStatus(t *testing.T) {
testCases := []struct {
desc string
ingressEndpoint *IngressEndpoint
apiServiceError error
apiIngressStatusError error
expectedError bool
}{
{
desc: "without IngressEndpoint configuration",
expectedError: false,
},
{
desc: "without any IngressEndpoint option",
ingressEndpoint: &IngressEndpoint{},
expectedError: true,
},
{
desc: "PublishedService - invalid format",
ingressEndpoint: &IngressEndpoint{
PublishedService: "foo",
},
expectedError: true,
},
{
desc: "PublishedService - missing service",
ingressEndpoint: &IngressEndpoint{
PublishedService: "foo/bar",
},
expectedError: true,
},
{
desc: "PublishedService - get service error",
ingressEndpoint: &IngressEndpoint{
PublishedService: "foo/bar",
},
apiServiceError: errors.New("error"),
expectedError: true,
},
{
desc: "PublishedService - Skipping empty LoadBalancerIngress",
ingressEndpoint: &IngressEndpoint{
PublishedService: "testing/service-empty-status",
},
expectedError: false,
},
{
desc: "PublishedService - with update error",
ingressEndpoint: &IngressEndpoint{
PublishedService: "testing/service",
},
apiIngressStatusError: errors.New("error"),
expectedError: true,
},
{
desc: "PublishedService - right service",
ingressEndpoint: &IngressEndpoint{
PublishedService: "testing/service",
},
expectedError: false,
},
{
desc: "IP - valid",
ingressEndpoint: &IngressEndpoint{
IP: "127.0.0.1",
},
expectedError: false,
},
{
desc: "IP - with update error",
ingressEndpoint: &IngressEndpoint{
IP: "127.0.0.1",
},
apiIngressStatusError: errors.New("error"),
expectedError: true,
},
{
desc: "hostname - valid",
ingressEndpoint: &IngressEndpoint{
Hostname: "foo",
},
expectedError: false,
},
{
desc: "hostname - with update error",
ingressEndpoint: &IngressEndpoint{
Hostname: "foo",
},
apiIngressStatusError: errors.New("error"),
expectedError: true,
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
p := &Provider{
IngressEndpoint: test.ingressEndpoint,
}
services := []*corev1.Service{
buildService(
sName("service-empty-status"),
sNamespace("testing"),
sLoadBalancerStatus(),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(80, ""))),
),
buildService(
sName("service"),
sNamespace("testing"),
sLoadBalancerStatus(sLoadBalancerIngress("127.0.0.1", "")),
sUID("2"),
sSpec(
clusterIP("10.0.0.2"),
sPorts(sPort(80, ""))),
),
}
client := clientMock{
services: services,
apiServiceError: test.apiServiceError,
apiIngressStatusError: test.apiIngressStatusError,
}
i := &extensionsv1beta1.Ingress{}
err := p.updateIngressStatus(i, client)
if test.expectedError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}