Adds Kubernetes provider support

Co-authored-by: Julien Salleyron <julien@containo.us>
This commit is contained in:
Jean-Baptiste Doumenjou 2019-02-21 23:08:05 +01:00 committed by Traefiker Bot
parent 2c0bf335ba
commit 848e45c22c
13 changed files with 3773 additions and 3 deletions

View file

@ -14,13 +14,13 @@ import (
"github.com/containous/traefik/old/provider/ecs"
"github.com/containous/traefik/old/provider/etcd"
"github.com/containous/traefik/old/provider/eureka"
"github.com/containous/traefik/old/provider/kubernetes"
"github.com/containous/traefik/old/provider/mesos"
"github.com/containous/traefik/old/provider/rancher"
"github.com/containous/traefik/old/provider/zk"
"github.com/containous/traefik/ping"
"github.com/containous/traefik/provider/docker"
"github.com/containous/traefik/provider/file"
"github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/provider/marathon"
"github.com/containous/traefik/provider/rest"
"github.com/containous/traefik/tracing/datadog"

View file

@ -27,9 +27,9 @@ import (
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/old/provider/ecs"
"github.com/containous/traefik/old/provider/kubernetes"
oldtypes "github.com/containous/traefik/old/types"
"github.com/containous/traefik/provider/aggregator"
"github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/server"
"github.com/containous/traefik/server/router"

View file

@ -15,7 +15,6 @@ import (
"github.com/containous/traefik/old/provider/ecs"
"github.com/containous/traefik/old/provider/etcd"
"github.com/containous/traefik/old/provider/eureka"
"github.com/containous/traefik/old/provider/kubernetes"
"github.com/containous/traefik/old/provider/mesos"
"github.com/containous/traefik/old/provider/rancher"
"github.com/containous/traefik/old/provider/zk"
@ -23,6 +22,7 @@ import (
acmeprovider "github.com/containous/traefik/provider/acme"
"github.com/containous/traefik/provider/docker"
"github.com/containous/traefik/provider/file"
"github.com/containous/traefik/provider/kubernetes"
"github.com/containous/traefik/provider/marathon"
"github.com/containous/traefik/provider/rest"
"github.com/containous/traefik/tls"

View file

@ -35,6 +35,10 @@ func NewProviderAggregator(conf static.Providers) ProviderAggregator {
p.quietAddProvider(conf.Rest)
}
if conf.Kubernetes != nil {
p.quietAddProvider(conf.Kubernetes)
}
return p
}

View file

@ -0,0 +1,158 @@
package kubernetes
import (
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func buildEndpoint(opts ...func(*corev1.Endpoints)) *corev1.Endpoints {
e := &corev1.Endpoints{}
for _, opt := range opts {
opt(e)
}
return e
}
func eNamespace(value string) func(*corev1.Endpoints) {
return func(i *corev1.Endpoints) {
i.Namespace = value
}
}
func eName(value string) func(*corev1.Endpoints) {
return func(i *corev1.Endpoints) {
i.Name = value
}
}
func eUID(value types.UID) func(*corev1.Endpoints) {
return func(i *corev1.Endpoints) {
i.UID = value
}
}
func subset(opts ...func(*corev1.EndpointSubset)) func(*corev1.Endpoints) {
return func(e *corev1.Endpoints) {
s := &corev1.EndpointSubset{}
for _, opt := range opts {
opt(s)
}
e.Subsets = append(e.Subsets, *s)
}
}
func eAddresses(opts ...func(*corev1.EndpointAddress)) func(*corev1.EndpointSubset) {
return func(subset *corev1.EndpointSubset) {
for _, opt := range opts {
a := &corev1.EndpointAddress{}
opt(a)
subset.Addresses = append(subset.Addresses, *a)
}
}
}
func eAddress(ip string) func(*corev1.EndpointAddress) {
return func(address *corev1.EndpointAddress) {
address.IP = ip
}
}
func eAddressWithTargetRef(targetRef, ip string) func(*corev1.EndpointAddress) {
return func(address *corev1.EndpointAddress) {
address.TargetRef = &corev1.ObjectReference{Name: targetRef}
address.IP = ip
}
}
func ePorts(opts ...func(port *corev1.EndpointPort)) func(*corev1.EndpointSubset) {
return func(spec *corev1.EndpointSubset) {
for _, opt := range opts {
p := &corev1.EndpointPort{}
opt(p)
spec.Ports = append(spec.Ports, *p)
}
}
}
func ePort(port int32, name string) func(*corev1.EndpointPort) {
return func(sp *corev1.EndpointPort) {
sp.Port = port
sp.Name = name
}
}
// Test
func TestBuildEndpoint(t *testing.T) {
actual := buildEndpoint(
eNamespace("testing"),
eName("service3"),
eUID("3"),
subset(
eAddresses(eAddress("10.15.0.1")),
ePorts(
ePort(8080, "http"),
ePort(8443, "https"),
),
),
subset(
eAddresses(eAddress("10.15.0.2")),
ePorts(
ePort(9080, "http"),
ePort(9443, "https"),
),
),
)
assert.EqualValues(t, sampleEndpoint1(), actual)
}
func sampleEndpoint1() *corev1.Endpoints {
return &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "service3",
UID: "3",
Namespace: "testing",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: "10.15.0.1",
},
},
Ports: []corev1.EndpointPort{
{
Name: "http",
Port: 8080,
},
{
Name: "https",
Port: 8443,
},
},
},
{
Addresses: []corev1.EndpointAddress{
{
IP: "10.15.0.2",
},
},
Ports: []corev1.EndpointPort{
{
Name: "http",
Port: 9080,
},
{
Name: "https",
Port: 9443,
},
},
},
},
}
}

View file

@ -0,0 +1,223 @@
package kubernetes
import (
"testing"
"github.com/stretchr/testify/assert"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
func buildIngress(opts ...func(*extensionsv1beta1.Ingress)) *extensionsv1beta1.Ingress {
i := &extensionsv1beta1.Ingress{}
for _, opt := range opts {
opt(i)
}
return i
}
func iNamespace(value string) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
i.Namespace = value
}
}
func iAnnotation(name string, value string) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
if i.Annotations == nil {
i.Annotations = make(map[string]string)
}
i.Annotations[name] = value
}
}
func iRules(opts ...func(*extensionsv1beta1.IngressSpec)) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
s := &extensionsv1beta1.IngressSpec{}
for _, opt := range opts {
opt(s)
}
i.Spec = *s
}
}
func iSpecBackends(opts ...func(*extensionsv1beta1.IngressSpec)) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
s := &extensionsv1beta1.IngressSpec{}
for _, opt := range opts {
opt(s)
}
i.Spec = *s
}
}
func iSpecBackend(opts ...func(*extensionsv1beta1.IngressBackend)) func(*extensionsv1beta1.IngressSpec) {
return func(s *extensionsv1beta1.IngressSpec) {
p := &extensionsv1beta1.IngressBackend{}
for _, opt := range opts {
opt(p)
}
s.Backend = p
}
}
func iIngressBackend(name string, port intstr.IntOrString) func(*extensionsv1beta1.IngressBackend) {
return func(p *extensionsv1beta1.IngressBackend) {
p.ServiceName = name
p.ServicePort = port
}
}
func iRule(opts ...func(*extensionsv1beta1.IngressRule)) func(*extensionsv1beta1.IngressSpec) {
return func(spec *extensionsv1beta1.IngressSpec) {
r := &extensionsv1beta1.IngressRule{}
for _, opt := range opts {
opt(r)
}
spec.Rules = append(spec.Rules, *r)
}
}
func iHost(name string) func(*extensionsv1beta1.IngressRule) {
return func(rule *extensionsv1beta1.IngressRule) {
rule.Host = name
}
}
func iPaths(opts ...func(*extensionsv1beta1.HTTPIngressRuleValue)) func(*extensionsv1beta1.IngressRule) {
return func(rule *extensionsv1beta1.IngressRule) {
rule.HTTP = &extensionsv1beta1.HTTPIngressRuleValue{}
for _, opt := range opts {
opt(rule.HTTP)
}
}
}
func onePath(opts ...func(*extensionsv1beta1.HTTPIngressPath)) func(*extensionsv1beta1.HTTPIngressRuleValue) {
return func(irv *extensionsv1beta1.HTTPIngressRuleValue) {
p := &extensionsv1beta1.HTTPIngressPath{}
for _, opt := range opts {
opt(p)
}
irv.Paths = append(irv.Paths, *p)
}
}
func iPath(name string) func(*extensionsv1beta1.HTTPIngressPath) {
return func(p *extensionsv1beta1.HTTPIngressPath) {
p.Path = name
}
}
func iBackend(name string, port intstr.IntOrString) func(*extensionsv1beta1.HTTPIngressPath) {
return func(p *extensionsv1beta1.HTTPIngressPath) {
p.Backend = extensionsv1beta1.IngressBackend{
ServiceName: name,
ServicePort: port,
}
}
}
func iTLSes(opts ...func(*extensionsv1beta1.IngressTLS)) func(*extensionsv1beta1.Ingress) {
return func(i *extensionsv1beta1.Ingress) {
for _, opt := range opts {
iTLS := extensionsv1beta1.IngressTLS{}
opt(&iTLS)
i.Spec.TLS = append(i.Spec.TLS, iTLS)
}
}
}
func iTLS(secret string, hosts ...string) func(*extensionsv1beta1.IngressTLS) {
return func(i *extensionsv1beta1.IngressTLS) {
i.SecretName = secret
i.Hosts = hosts
}
}
// Test
func TestBuildIngress(t *testing.T) {
i := buildIngress(
iNamespace("testing"),
iRules(
iRule(iHost("foo"), iPaths(
onePath(iPath("/bar"), iBackend("service1", intstr.FromInt(80))),
onePath(iPath("/namedthing"), iBackend("service4", intstr.FromString("https")))),
),
iRule(iHost("bar"), iPaths(
onePath(iBackend("service3", intstr.FromString("https"))),
onePath(iBackend("service2", intstr.FromInt(802))),
),
),
),
iTLSes(
iTLS("tls-secret", "foo"),
),
)
assert.EqualValues(t, sampleIngress(), i)
}
func sampleIngress() *extensionsv1beta1.Ingress {
return &extensionsv1beta1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testing",
},
Spec: extensionsv1beta1.IngressSpec{
Rules: []extensionsv1beta1.IngressRule{
{
Host: "foo",
IngressRuleValue: extensionsv1beta1.IngressRuleValue{
HTTP: &extensionsv1beta1.HTTPIngressRuleValue{
Paths: []extensionsv1beta1.HTTPIngressPath{
{
Path: "/bar",
Backend: extensionsv1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(80),
},
},
{
Path: "/namedthing",
Backend: extensionsv1beta1.IngressBackend{
ServiceName: "service4",
ServicePort: intstr.FromString("https"),
},
},
},
},
},
},
{
Host: "bar",
IngressRuleValue: extensionsv1beta1.IngressRuleValue{
HTTP: &extensionsv1beta1.HTTPIngressRuleValue{
Paths: []extensionsv1beta1.HTTPIngressPath{
{
Backend: extensionsv1beta1.IngressBackend{
ServiceName: "service3",
ServicePort: intstr.FromString("https"),
},
},
{
Backend: extensionsv1beta1.IngressBackend{
ServiceName: "service2",
ServicePort: intstr.FromInt(802),
},
},
},
},
},
},
},
TLS: []extensionsv1beta1.IngressTLS{
{
Hosts: []string{"foo"},
SecretName: "tls-secret",
},
},
},
}
}

View file

@ -0,0 +1,232 @@
package kubernetes
import (
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func buildService(opts ...func(*corev1.Service)) *corev1.Service {
s := &corev1.Service{}
for _, opt := range opts {
opt(s)
}
return s
}
func sNamespace(value string) func(*corev1.Service) {
return func(i *corev1.Service) {
i.Namespace = value
}
}
func sName(value string) func(*corev1.Service) {
return func(i *corev1.Service) {
i.Name = value
}
}
func sUID(value types.UID) func(*corev1.Service) {
return func(i *corev1.Service) {
i.UID = value
}
}
func sAnnotation(name string, value string) func(*corev1.Service) {
return func(s *corev1.Service) {
if s.Annotations == nil {
s.Annotations = make(map[string]string)
}
s.Annotations[name] = value
}
}
func sSpec(opts ...func(*corev1.ServiceSpec)) func(*corev1.Service) {
return func(s *corev1.Service) {
spec := &corev1.ServiceSpec{}
for _, opt := range opts {
opt(spec)
}
s.Spec = *spec
}
}
func sLoadBalancerStatus(opts ...func(*corev1.LoadBalancerStatus)) func(service *corev1.Service) {
return func(s *corev1.Service) {
loadBalancer := &corev1.LoadBalancerStatus{}
for _, opt := range opts {
if opt != nil {
opt(loadBalancer)
}
}
s.Status = corev1.ServiceStatus{
LoadBalancer: *loadBalancer,
}
}
}
func sLoadBalancerIngress(ip string, hostname string) func(*corev1.LoadBalancerStatus) {
return func(status *corev1.LoadBalancerStatus) {
ingress := corev1.LoadBalancerIngress{
IP: ip,
Hostname: hostname,
}
status.Ingress = append(status.Ingress, ingress)
}
}
func clusterIP(ip string) func(*corev1.ServiceSpec) {
return func(spec *corev1.ServiceSpec) {
spec.ClusterIP = ip
}
}
func sType(value corev1.ServiceType) func(*corev1.ServiceSpec) {
return func(spec *corev1.ServiceSpec) {
spec.Type = value
}
}
func sExternalName(name string) func(*corev1.ServiceSpec) {
return func(spec *corev1.ServiceSpec) {
spec.ExternalName = name
}
}
func sPorts(opts ...func(*corev1.ServicePort)) func(*corev1.ServiceSpec) {
return func(spec *corev1.ServiceSpec) {
for _, opt := range opts {
p := &corev1.ServicePort{}
opt(p)
spec.Ports = append(spec.Ports, *p)
}
}
}
func sPort(port int32, name string) func(*corev1.ServicePort) {
return func(sp *corev1.ServicePort) {
sp.Port = port
sp.Name = name
}
}
// Test
func TestBuildService(t *testing.T) {
actual1 := buildService(
sName("service1"),
sNamespace("testing"),
sUID("1"),
sSpec(
clusterIP("10.0.0.1"),
sPorts(sPort(80, "")),
),
)
assert.EqualValues(t, sampleService1(), actual1)
actual2 := buildService(
sName("service2"),
sNamespace("testing"),
sUID("2"),
sSpec(
clusterIP("10.0.0.2"),
sType("ExternalName"),
sExternalName("example.com"),
sPorts(
sPort(80, "http"),
sPort(443, "https"),
),
),
)
assert.EqualValues(t, sampleService2(), actual2)
actual3 := buildService(
sName("service3"),
sNamespace("testing"),
sUID("3"),
sSpec(
clusterIP("10.0.0.3"),
sType("ExternalName"),
sExternalName("example.com"),
sPorts(
sPort(8080, "http"),
sPort(8443, "https"),
),
),
)
assert.EqualValues(t, sampleService3(), actual3)
}
func sampleService1() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service1",
UID: "1",
Namespace: "testing",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []corev1.ServicePort{
{
Port: 80,
},
},
},
}
}
func sampleService2() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service2",
UID: "2",
Namespace: "testing",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.2",
Type: "ExternalName",
ExternalName: "example.com",
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 80,
},
{
Name: "https",
Port: 443,
},
},
},
}
}
func sampleService3() *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service3",
UID: "3",
Namespace: "testing",
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.0.0.3",
Type: "ExternalName",
ExternalName: "example.com",
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 8080,
},
{
Name: "https",
Port: 8443,
},
},
},
}
}

View file

@ -0,0 +1,292 @@
package kubernetes
import (
"errors"
"fmt"
"io/ioutil"
"time"
"github.com/containous/traefik/old/log"
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
const resyncPeriod = 10 * time.Minute
type resourceEventHandler struct {
ev chan<- interface{}
}
func (reh *resourceEventHandler) OnAdd(obj interface{}) {
eventHandlerFunc(reh.ev, obj)
}
func (reh *resourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
eventHandlerFunc(reh.ev, newObj)
}
func (reh *resourceEventHandler) OnDelete(obj interface{}) {
eventHandlerFunc(reh.ev, obj)
}
// Client is a client for the Provider master.
// WatchAll starts the watch of the Provider resources and updates the stores.
// The stores can then be accessed via the Get* functions.
type Client interface {
WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error)
GetIngresses() []*extensionsv1beta1.Ingress
GetService(namespace, name string) (*corev1.Service, bool, error)
GetSecret(namespace, name string) (*corev1.Secret, bool, error)
GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error)
UpdateIngressStatus(namespace, name, ip, hostname string) error
}
type clientImpl struct {
clientset *kubernetes.Clientset
factories map[string]informers.SharedInformerFactory
ingressLabelSelector labels.Selector
isNamespaceAll bool
watchedNamespaces Namespaces
}
func newClientImpl(clientset *kubernetes.Clientset) *clientImpl {
return &clientImpl{
clientset: clientset,
factories: make(map[string]informers.SharedInformerFactory),
}
}
// newInClusterClient returns a new Provider client that is expected to run
// inside the cluster.
func newInClusterClient(endpoint string) (*clientImpl, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create in-cluster configuration: %s", err)
}
if endpoint != "" {
config.Host = endpoint
}
return createClientFromConfig(config)
}
// newExternalClusterClient returns a new Provider client that may run outside
// of the cluster.
// The endpoint parameter must not be empty.
func newExternalClusterClient(endpoint, token, caFilePath string) (*clientImpl, error) {
if endpoint == "" {
return nil, errors.New("endpoint missing for external cluster client")
}
config := &rest.Config{
Host: endpoint,
BearerToken: token,
}
if caFilePath != "" {
caData, err := ioutil.ReadFile(caFilePath)
if err != nil {
return nil, fmt.Errorf("failed to read CA file %s: %s", caFilePath, err)
}
config.TLSClientConfig = rest.TLSClientConfig{CAData: caData}
}
return createClientFromConfig(config)
}
func createClientFromConfig(c *rest.Config) (*clientImpl, error) {
clientset, err := kubernetes.NewForConfig(c)
if err != nil {
return nil, err
}
return newClientImpl(clientset), nil
}
// WatchAll starts namespace-specific controllers for all relevant kinds.
func (c *clientImpl) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) {
eventCh := make(chan interface{}, 1)
if len(namespaces) == 0 {
namespaces = Namespaces{metav1.NamespaceAll}
c.isNamespaceAll = true
}
c.watchedNamespaces = namespaces
eventHandler := c.newResourceEventHandler(eventCh)
for _, ns := range namespaces {
factory := informers.NewFilteredSharedInformerFactory(c.clientset, resyncPeriod, ns, nil)
factory.Extensions().V1beta1().Ingresses().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Services().Informer().AddEventHandler(eventHandler)
factory.Core().V1().Endpoints().Informer().AddEventHandler(eventHandler)
c.factories[ns] = factory
}
for _, ns := range namespaces {
c.factories[ns].Start(stopCh)
}
for _, ns := range namespaces {
for t, ok := range c.factories[ns].WaitForCacheSync(stopCh) {
if !ok {
return nil, fmt.Errorf("timed out waiting for controller caches to sync %s in namespace %q", t.String(), ns)
}
}
}
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
for _, ns := range namespaces {
c.factories[ns].Core().V1().Secrets().Informer().AddEventHandler(eventHandler)
c.factories[ns].Start(stopCh)
}
return eventCh, nil
}
// GetIngresses returns all Ingresses for observed namespaces in the cluster.
func (c *clientImpl) GetIngresses() []*extensionsv1beta1.Ingress {
var result []*extensionsv1beta1.Ingress
for ns, factory := range c.factories {
ings, err := factory.Extensions().V1beta1().Ingresses().Lister().List(c.ingressLabelSelector)
if err != nil {
log.Errorf("Failed to list ingresses in namespace %s: %s", ns, err)
}
result = append(result, ings...)
}
return result
}
// UpdateIngressStatus updates an Ingress with a provided status.
func (c *clientImpl) UpdateIngressStatus(namespace, name, ip, hostname string) error {
if !c.isWatchedNamespace(namespace) {
return fmt.Errorf("failed to get ingress %s/%s: namespace is not within watched namespaces", namespace, name)
}
ing, err := c.factories[c.lookupNamespace(namespace)].Extensions().V1beta1().Ingresses().Lister().Ingresses(namespace).Get(name)
if err != nil {
return fmt.Errorf("failed to get ingress %s/%s: %v", namespace, name, err)
}
if len(ing.Status.LoadBalancer.Ingress) > 0 {
if ing.Status.LoadBalancer.Ingress[0].Hostname == hostname && ing.Status.LoadBalancer.Ingress[0].IP == ip {
// If status is already set, skip update
log.Debugf("Skipping status update on ingress %s/%s", ing.Namespace, ing.Name)
return nil
}
}
ingCopy := ing.DeepCopy()
ingCopy.Status = extensionsv1beta1.IngressStatus{LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{{IP: ip, Hostname: hostname}}}}
_, err = c.clientset.ExtensionsV1beta1().Ingresses(ingCopy.Namespace).UpdateStatus(ingCopy)
if err != nil {
return fmt.Errorf("failed to update ingress status %s/%s: %v", namespace, name, err)
}
log.Infof("Updated status on ingress %s/%s", namespace, name)
return nil
}
// GetService returns the named service from the given namespace.
func (c *clientImpl) GetService(namespace, name string) (*corev1.Service, bool, error) {
if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get service %s/%s: namespace is not within watched namespaces", namespace, name)
}
service, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Services().Lister().Services(namespace).Get(name)
exist, err := translateNotFoundError(err)
return service, exist, err
}
// GetEndpoints returns the named endpoints from the given namespace.
func (c *clientImpl) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get endpoints %s/%s: namespace is not within watched namespaces", namespace, name)
}
endpoint, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Endpoints().Lister().Endpoints(namespace).Get(name)
exist, err := translateNotFoundError(err)
return endpoint, exist, err
}
// GetSecret returns the named secret from the given namespace.
func (c *clientImpl) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
if !c.isWatchedNamespace(namespace) {
return nil, false, fmt.Errorf("failed to get secret %s/%s: namespace is not within watched namespaces", namespace, name)
}
secret, err := c.factories[c.lookupNamespace(namespace)].Core().V1().Secrets().Lister().Secrets(namespace).Get(name)
exist, err := translateNotFoundError(err)
return secret, exist, err
}
// lookupNamespace returns the lookup namespace key for the given namespace.
// When listening on all namespaces, it returns the client-go identifier ("")
// for all-namespaces. Otherwise, it returns the given namespace.
// The distinction is necessary because we index all informers on the special
// identifier iff all-namespaces are requested but receive specific namespace
// identifiers from the Kubernetes API, so we have to bridge this gap.
func (c *clientImpl) lookupNamespace(ns string) string {
if c.isNamespaceAll {
return metav1.NamespaceAll
}
return ns
}
func (c *clientImpl) newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
return &cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
// Ignore Ingresses that do not match our custom label selector.
if ing, ok := obj.(*extensionsv1beta1.Ingress); ok {
lbls := labels.Set(ing.GetLabels())
return c.ingressLabelSelector.Matches(lbls)
}
return true
},
Handler: &resourceEventHandler{ev: events},
}
}
// eventHandlerFunc will pass the obj on to the events channel or drop it.
// This is so passing the events along won't block in the case of high volume.
// The events are only used for signaling anyway so dropping a few is ok.
func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
select {
case events <- obj:
default:
}
}
// translateNotFoundError will translate a "not found" error to a boolean return
// value which indicates if the resource exists and a nil error.
func translateNotFoundError(err error) (bool, error) {
if kubeerror.IsNotFound(err) {
return false, nil
}
return err == nil, err
}
// isWatchedNamespace checks to ensure that the namespace is being watched before we request
// it to ensure we don't panic by requesting an out-of-watch object.
func (c *clientImpl) isWatchedNamespace(ns string) bool {
if c.isNamespaceAll {
return true
}
for _, watchedNamespace := range c.watchedNamespaces {
if watchedNamespace == ns {
return true
}
}
return false
}

View file

@ -0,0 +1,71 @@
package kubernetes
import (
corev1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
)
type clientMock struct {
ingresses []*extensionsv1beta1.Ingress
services []*corev1.Service
secrets []*corev1.Secret
endpoints []*corev1.Endpoints
watchChan chan interface{}
apiServiceError error
apiSecretError error
apiEndpointsError error
apiIngressStatusError error
}
func (c clientMock) GetIngresses() []*extensionsv1beta1.Ingress {
return c.ingresses
}
func (c clientMock) GetService(namespace, name string) (*corev1.Service, bool, error) {
if c.apiServiceError != nil {
return nil, false, c.apiServiceError
}
for _, service := range c.services {
if service.Namespace == namespace && service.Name == name {
return service, true, nil
}
}
return nil, false, c.apiServiceError
}
func (c clientMock) GetEndpoints(namespace, name string) (*corev1.Endpoints, bool, error) {
if c.apiEndpointsError != nil {
return nil, false, c.apiEndpointsError
}
for _, endpoints := range c.endpoints {
if endpoints.Namespace == namespace && endpoints.Name == name {
return endpoints, true, nil
}
}
return &corev1.Endpoints{}, false, nil
}
func (c clientMock) GetSecret(namespace, name string) (*corev1.Secret, bool, error) {
if c.apiSecretError != nil {
return nil, false, c.apiSecretError
}
for _, secret := range c.secrets {
if secret.Namespace == namespace && secret.Name == name {
return secret, true, nil
}
}
return nil, false, nil
}
func (c clientMock) WatchAll(namespaces Namespaces, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil
}
func (c clientMock) UpdateIngressStatus(namespace, name, ip, hostname string) error {
return c.apiIngressStatusError
}

View file

@ -0,0 +1,49 @@
package kubernetes
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func TestTranslateNotFoundError(t *testing.T) {
testCases := []struct {
desc string
err error
expectedExists bool
expectedError error
}{
{
desc: "kubernetes not found error",
err: kubeerror.NewNotFound(schema.GroupResource{}, "foo"),
expectedExists: false,
expectedError: nil,
},
{
desc: "nil error",
err: nil,
expectedExists: true,
expectedError: nil,
},
{
desc: "not a kubernetes not found error",
err: fmt.Errorf("bar error"),
expectedExists: false,
expectedError: fmt.Errorf("bar error"),
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
exists, err := translateNotFoundError(test.err)
assert.Equal(t, test.expectedExists, exists)
assert.Equal(t, test.expectedError, err)
})
}
}

View file

@ -0,0 +1,428 @@
package kubernetes
import (
"context"
"flag"
"fmt"
"math"
"os"
"reflect"
"sort"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff"
"github.com/containous/traefik/config"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/tls"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
)
var _ provider.Provider = (*Provider)(nil)
const (
annotationKubernetesIngressClass = "kubernetes.io/ingress.class"
traefikDefaultIngressClass = "traefik"
)
// IngressEndpoint holds the endpoint information for the Kubernetes provider.
type IngressEndpoint struct {
IP string `description:"IP used for Kubernetes Ingress endpoints"`
Hostname string `description:"Hostname used for Kubernetes Ingress endpoints"`
PublishedService string `description:"Published Kubernetes Service to copy status from"`
}
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash" export:"true"`
Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)"`
Token string `description:"Kubernetes bearer token (not needed for in-cluster client)"`
CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers" export:"true"`
EnablePassTLSCert bool `description:"Kubernetes enable Pass TLS Client Certs" export:"true"` // Deprecated
Namespaces Namespaces `description:"Kubernetes namespaces" export:"true"`
LabelSelector string `description:"Kubernetes Ingress label selector to use" export:"true"`
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for" export:"true"`
IngressEndpoint *IngressEndpoint `description:"Kubernetes Ingress Endpoint"`
lastConfiguration safe.Safe
}
func (p *Provider) newK8sClient(ctx context.Context, ingressLabelSelector string) (Client, error) {
ingLabelSel, err := labels.Parse(ingressLabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid ingress label selector: %q", ingressLabelSelector)
}
log.FromContext(ctx).Infof("ingress label selector is: %q", ingLabelSel)
withEndpoint := ""
if p.Endpoint != "" {
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
}
var cl *clientImpl
if os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "" {
log.FromContext(ctx).Infof("Creating in-cluster Provider client%s", withEndpoint)
cl, err = newInClusterClient(p.Endpoint)
} else {
log.FromContext(ctx).Infof("Creating cluster-external Provider client%s", withEndpoint)
cl, err = newExternalClusterClient(p.Endpoint, p.Token, p.CertAuthFilePath)
}
if err == nil {
cl.ingressLabelSelector = ingLabelSel
}
return cl, err
}
// Init the provider.
func (p *Provider) Init() error {
return p.BaseProvider.Init()
}
// Provide allows the k8s provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- config.Message, pool *safe.Pool) error {
ctxLog := log.With(context.Background(), log.Str(log.ProviderName, "docker"))
logger := log.FromContext(ctxLog)
// Tell glog (used by client-go) to log into STDERR. Otherwise, we risk
// certain kinds of API errors getting logged into a directory not
// available in a `FROM scratch` Docker container, causing glog to abort
// hard with an exit code > 0.
err := flag.Set("logtostderr", "true")
if err != nil {
return err
}
logger.Debugf("Using Ingress label selector: %q", p.LabelSelector)
k8sClient, err := p.newK8sClient(ctxLog, p.LabelSelector)
if err != nil {
return err
}
pool.Go(func(stop chan bool) {
operation := func() error {
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
if err != nil {
logger.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
return err
case <-stop:
return nil
}
}
for {
select {
case <-stop:
return nil
case event := <-eventsChan:
conf := p.loadConfigurationFromIngresses(ctxLog, k8sClient)
if reflect.DeepEqual(p.lastConfiguration.Get(), conf) {
logger.Debugf("Skipping Kubernetes event kind %T", event)
} else {
p.lastConfiguration.Set(conf)
configurationChan <- config.Message{
ProviderName: "kubernetes",
Configuration: conf,
}
}
}
}
}
notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error: %s; retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
logger.Errorf("Cannot connect to Provider: %s", err)
}
})
return nil
}
func checkStringQuoteValidity(value string) error {
_, err := strconv.Unquote(`"` + value + `"`)
return err
}
func loadService(client Client, namespace string, backend v1beta1.IngressBackend) (*config.Service, error) {
service, exists, err := client.GetService(namespace, backend.ServiceName)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.New("service not found")
}
var servers []config.Server
var portName string
var portSpec corev1.ServicePort
var match bool
for _, p := range service.Spec.Ports {
if (backend.ServicePort.Type == intstr.Int && backend.ServicePort.IntVal == p.Port) ||
(backend.ServicePort.Type == intstr.String && backend.ServicePort.StrVal == p.Name) {
portName = p.Name
portSpec = p
match = true
break
}
}
if !match {
return nil, errors.New("service port not found")
}
if service.Spec.Type == corev1.ServiceTypeExternalName {
servers = append(servers, config.Server{
URL: fmt.Sprintf("http://%s:%d", service.Spec.ExternalName, portSpec.Port),
Weight: 1,
})
} else {
endpoints, endpointsExists, endpointsErr := client.GetEndpoints(namespace, backend.ServiceName)
if endpointsErr != nil {
return nil, endpointsErr
}
if !endpointsExists {
return nil, errors.New("endpoints not found")
}
if len(endpoints.Subsets) == 0 {
return nil, errors.New("subset not found")
}
var port int32
for _, subset := range endpoints.Subsets {
for _, p := range subset.Ports {
if portName == p.Name {
port = p.Port
break
}
}
if port == 0 {
return nil, errors.New("cannot define a port")
}
protocol := "http"
if port == 443 || portName == "https" {
protocol = "https"
}
for _, addr := range subset.Addresses {
servers = append(servers, config.Server{
URL: fmt.Sprintf("%s://%s:%d", protocol, addr.IP, port),
Weight: 1,
})
}
}
}
return &config.Service{
LoadBalancer: &config.LoadBalancerService{
Servers: servers,
Method: "wrr",
PassHostHeader: true,
},
}, nil
}
func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Client) *config.Configuration {
conf := &config.Configuration{
Routers: map[string]*config.Router{},
Middlewares: map[string]*config.Middleware{},
Services: map[string]*config.Service{},
}
ingresses := client.GetIngresses()
tlsConfigs := make(map[string]*tls.Configuration)
for _, ingress := range ingresses {
ctx = log.With(ctx, log.Str("ingress", ingress.Name), log.Str("namespace", ingress.Namespace))
if !shouldProcessIngress(p.IngressClass, ingress.Annotations[annotationKubernetesIngressClass]) {
continue
}
err := getTLS(ctx, ingress, client, tlsConfigs)
if err != nil {
log.FromContext(ctx).Errorf("Error configuring TLS: %v", err)
}
if len(ingress.Spec.Rules) == 0 {
if ingress.Spec.Backend != nil {
if _, ok := conf.Services["default-backend"]; ok {
log.FromContext(ctx).Error("The default backend already exists.")
continue
}
service, err := loadService(client, ingress.Namespace, *ingress.Spec.Backend)
if err != nil {
log.FromContext(ctx).
WithField("serviceName", ingress.Spec.Backend.ServiceName).
WithField("servicePort", ingress.Spec.Backend.ServicePort.String()).
Errorf("Cannot create service: %v", err)
continue
}
conf.Routers["/"] = &config.Router{
Rule: "PathPrefix(`/`)",
Priority: math.MinInt32,
Service: "default-backend",
}
conf.Services["default-backend"] = service
}
}
for _, rule := range ingress.Spec.Rules {
if err := checkStringQuoteValidity(rule.Host); err != nil {
log.FromContext(ctx).Errorf("Invalid syntax for host: %s", rule.Host)
continue
}
for _, p := range rule.HTTP.Paths {
service, err := loadService(client, ingress.Namespace, p.Backend)
if err != nil {
log.FromContext(ctx).
WithField("serviceName", p.Backend.ServiceName).
WithField("servicePort", p.Backend.ServicePort.String()).
Errorf("Cannot create service: %v", err)
continue
}
if err = checkStringQuoteValidity(p.Path); err != nil {
log.FromContext(ctx).Errorf("Invalid syntax for path: %s", p.Path)
continue
}
serviceName := ingress.Namespace + "/" + p.Backend.ServiceName + "/" + p.Backend.ServicePort.String()
var rules []string
if len(rule.Host) > 0 {
rules = []string{"Host(`" + rule.Host + "`)"}
}
if len(p.Path) > 0 {
rules = append(rules, "PathPrefix(`"+p.Path+"`)")
}
conf.Routers[strings.Replace(rule.Host, ".", "-", -1)+p.Path] = &config.Router{
Rule: strings.Join(rules, " && "),
Service: serviceName,
}
conf.Services[serviceName] = service
}
}
}
conf.TLS = getTLSConfig(tlsConfigs)
return conf
}
func shouldProcessIngress(ingressClass string, ingressClassAnnotation string) bool {
return ingressClass == ingressClassAnnotation ||
(len(ingressClass) == 0 && ingressClassAnnotation == traefikDefaultIngressClass)
}
func getTLS(ctx context.Context, ingress *v1beta1.Ingress, k8sClient Client, tlsConfigs map[string]*tls.Configuration) error {
for _, t := range ingress.Spec.TLS {
if t.SecretName == "" {
log.FromContext(ctx).Debugf("Skipping TLS sub-section: No secret name provided")
continue
}
configKey := ingress.Namespace + "/" + t.SecretName
if _, tlsExists := tlsConfigs[configKey]; !tlsExists {
secret, exists, err := k8sClient.GetSecret(ingress.Namespace, t.SecretName)
if err != nil {
return fmt.Errorf("failed to fetch secret %s/%s: %v", ingress.Namespace, t.SecretName, err)
}
if !exists {
return fmt.Errorf("secret %s/%s does not exist", ingress.Namespace, t.SecretName)
}
cert, key, err := getCertificateBlocks(secret, ingress.Namespace, t.SecretName)
if err != nil {
return err
}
tlsConfigs[configKey] = &tls.Configuration{
Certificate: &tls.Certificate{
CertFile: tls.FileOrContent(cert),
KeyFile: tls.FileOrContent(key),
},
}
}
}
return nil
}
func getTLSConfig(tlsConfigs map[string]*tls.Configuration) []*tls.Configuration {
var secretNames []string
for secretName := range tlsConfigs {
secretNames = append(secretNames, secretName)
}
sort.Strings(secretNames)
var configs []*tls.Configuration
for _, secretName := range secretNames {
configs = append(configs, tlsConfigs[secretName])
}
return configs
}
func getCertificateBlocks(secret *corev1.Secret, namespace, secretName string) (string, string, error) {
var missingEntries []string
tlsCrtData, tlsCrtExists := secret.Data["tls.crt"]
if !tlsCrtExists {
missingEntries = append(missingEntries, "tls.crt")
}
tlsKeyData, tlsKeyExists := secret.Data["tls.key"]
if !tlsKeyExists {
missingEntries = append(missingEntries, "tls.key")
}
if len(missingEntries) > 0 {
return "", "", fmt.Errorf("secret %s/%s is missing the following TLS data entries: %s",
namespace, secretName, strings.Join(missingEntries, ", "))
}
cert := string(tlsCrtData)
if cert == "" {
missingEntries = append(missingEntries, "tls.crt")
}
key := string(tlsKeyData)
if key == "" {
missingEntries = append(missingEntries, "tls.key")
}
if len(missingEntries) > 0 {
return "", "", fmt.Errorf("secret %s/%s contains the following empty TLS data entries: %s",
namespace, secretName, strings.Join(missingEntries, ", "))
}
return cert, key, nil
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,32 @@
package kubernetes
import (
"fmt"
"strings"
)
// Namespaces holds kubernetes namespaces.
type Namespaces []string
// Set adds strings elem into the the parser
// it splits str on , and ;.
func (ns *Namespaces) Set(str string) error {
fargs := func(c rune) bool {
return c == ',' || c == ';'
}
// get function
slice := strings.FieldsFunc(str, fargs)
*ns = append(*ns, slice...)
return nil
}
// Get []string.
func (ns *Namespaces) Get() interface{} { return *ns }
// String return slice in a string.
func (ns *Namespaces) String() string { return fmt.Sprintf("%v", *ns) }
// SetValue sets []string into the parser.
func (ns *Namespaces) SetValue(val interface{}) {
*ns = val.(Namespaces)
}