Only listen to configured k8s namespaces.

This commit is contained in:
Timo Reimann 2017-10-10 16:26:03 +02:00 committed by Traefiker
parent cf508b6d48
commit a5c644e719
4 changed files with 220 additions and 656 deletions

View file

@ -15,12 +15,13 @@ on your machine, as it is the quickest way to get a local Kubernetes cluster set
### Role Based Access Control configuration (Kubernetes 1.6+ only)
Kubernetes introduces [Role Based Access Control (RBAC)](https://kubernetes.io/docs/admin/authorization/rbac/) in 1.6+ to allow fine-grained control of Kubernetes resources and api.
Kubernetes introduces [Role Based Access Control (RBAC)](https://kubernetes.io/docs/admin/authorization/rbac/) in 1.6+ to allow fine-grained control of Kubernetes resources and API.
If your cluster is configured with RBAC, you may need to authorize Træfik to use the Kubernetes API using ClusterRole and ClusterRoleBinding resources:
If your cluster is configured with RBAC, you will need to authorize Træfik to use the Kubernetes API. There are two ways to set up the proper permission: Via namespace-specific RoleBindings or a single, global ClusterRoleBinding.
!!! note
your cluster may have suitable ClusterRoles already setup, but the following should work everywhere
RoleBindings per namespace enable to restrict granted permissions to the very namespaces only that Træfik is watching over, thereby following the least-privileges principle. This is the preferred approach if Træfik is not supposed to watch all namespaces, and the set of namespaces does not change dynamically. Otherwise, a single ClusterRoleBinding must be employed.
For the sake of simplicity, this guide will use a ClusterRoleBinding:
```yaml
---
@ -69,6 +70,8 @@ subjects:
kubectl apply -f https://raw.githubusercontent.com/containous/traefik/master/examples/k8s/traefik-rbac.yaml
```
For namespaced restrictions, one RoleBinding is required per watched namespace along with a corresponding configuration of Træfik's `kubernetes.namespaces` parameter.
## Deploy Træfik using a Deployment or DaemonSet
It is possible to use Træfik with a [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/) or a [DaemonSet](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) object,

View file

@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"sync"
"time"
"github.com/containous/traefik/safe"
@ -19,31 +20,71 @@ import (
"k8s.io/client-go/tools/cache"
)
const resyncPeriod = time.Minute * 5
const resyncPeriod = 10 * time.Minute
const (
kindIngresses = "ingresses"
kindServices = "services"
kindEndpoints = "endpoints"
kindSecrets = "secrets"
)
type resourceEventHandler struct {
ev chan<- interface{}
}
func (reh *resourceEventHandler) OnAdd(obj interface{}) {
eventHandlerFunc(reh.ev, obj)
}
func (reh *resourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
eventHandlerFunc(reh.ev, newObj)
}
func (reh *resourceEventHandler) OnDelete(obj interface{}) {
eventHandlerFunc(reh.ev, obj)
}
type informerManager struct {
informers []cache.SharedInformer
syncFuncs []cache.InformerSynced
}
func (im *informerManager) extend(informer cache.SharedInformer, withSyncFunc bool) {
im.informers = append(im.informers, informer)
if withSyncFunc {
im.syncFuncs = append(im.syncFuncs, informer.HasSynced)
}
}
// Client is a client for the Provider master.
// WatchAll starts the watch of the Provider resources and updates the stores.
// The stores can then be accessed via the Get* functions.
type Client interface {
GetIngresses(namespaces Namespaces) []*v1beta1.Ingress
WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error)
GetIngresses() []*v1beta1.Ingress
GetService(namespace, name string) (*v1.Service, bool, error)
GetSecret(namespace, name string) (*v1.Secret, bool, error)
GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error)
WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error)
}
type clientImpl struct {
ingController *cache.Controller
svcController *cache.Controller
epController *cache.Controller
secController *cache.Controller
clientset *kubernetes.Clientset
ingStores []cache.Store
svcStores map[string]cache.Store
epStores map[string]cache.Store
secStores map[string]cache.Store
isNamespaceAll bool
}
ingStore cache.Store
svcStore cache.Store
epStore cache.Store
secStore cache.Store
clientset *kubernetes.Clientset
func newClientImpl(clientset *kubernetes.Clientset) Client {
return &clientImpl{
clientset: clientset,
ingStores: []cache.Store{},
svcStores: map[string]cache.Store{},
epStores: map[string]cache.Store{},
secStores: map[string]cache.Store{},
}
}
// NewInClusterClient returns a new Provider client that is expected to run
@ -92,67 +133,120 @@ func createClientFromConfig(c *rest.Config) (Client, error) {
return nil, err
}
return &clientImpl{
clientset: clientset,
}, nil
return newClientImpl(clientset), nil
}
// GetIngresses returns all ingresses in the cluster
func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
ingList := c.ingStore.List()
result := make([]*v1beta1.Ingress, 0, len(ingList))
// WatchAll starts namespace-specific controllers for all relevant kinds.
func (c *clientImpl) WatchAll(namespaces Namespaces, labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
eventCh := make(chan interface{}, 1)
for _, obj := range ingList {
ingress := obj.(*v1beta1.Ingress)
if HasNamespace(ingress, namespaces) {
result = append(result, ingress)
kubeLabelSelector, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
}
if len(namespaces) == 0 {
namespaces = Namespaces{api.NamespaceAll}
c.isNamespaceAll = true
}
var informManager informerManager
for _, ns := range namespaces {
ns := ns
informManager.extend(c.WatchIngresses(ns, kubeLabelSelector, eventCh), true)
informManager.extend(c.WatchObjects(ns, kindServices, &v1.Service{}, c.svcStores, eventCh), true)
informManager.extend(c.WatchObjects(ns, kindEndpoints, &v1.Endpoints{}, c.epStores, eventCh), true)
// Do not wait for the Secrets store to get synced since we cannot rely on
// users having granted RBAC permissions for this object.
// https://github.com/containous/traefik/issues/1784 should improve the
// situation here in the future.
informManager.extend(c.WatchObjects(ns, kindSecrets, &v1.Secret{}, c.secStores, eventCh), false)
}
var wg sync.WaitGroup
for _, informer := range informManager.informers {
informer := informer
safe.Go(func() {
wg.Add(1)
informer.Run(stopCh)
wg.Done()
})
}
if !cache.WaitForCacheSync(stopCh, informManager.syncFuncs...) {
return nil, fmt.Errorf("timed out waiting for controller caches to sync")
}
safe.Go(func() {
<-stopCh
wg.Wait()
close(eventCh)
})
return eventCh, nil
}
// WatchIngresses sets up a watch on Ingress objects and returns a corresponding shared informer.
func (c *clientImpl) WatchIngresses(namespace string, labelSelector labels.Selector, watchCh chan<- interface{}) cache.SharedInformer {
listWatch := newListWatchFromClientWithLabelSelector(
c.clientset.ExtensionsV1beta1().RESTClient(),
kindIngresses,
namespace,
fields.Everything(),
labelSelector)
informer := loadInformer(listWatch, &v1beta1.Ingress{}, watchCh)
c.ingStores = append(c.ingStores, informer.GetStore())
return informer
}
// WatchObjects sets up a watch on objects and returns a corresponding shared informer.
func (c *clientImpl) WatchObjects(namespace, kind string, object runtime.Object, storeMap map[string]cache.Store, watchCh chan<- interface{}) cache.SharedInformer {
listWatch := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
kind,
namespace,
fields.Everything())
informer := loadInformer(listWatch, object, watchCh)
storeMap[namespace] = informer.GetStore()
return informer
}
func loadInformer(listWatch *cache.ListWatch, object runtime.Object, watchCh chan<- interface{}) cache.SharedInformer {
informer := cache.NewSharedInformer(
listWatch,
object,
resyncPeriod,
)
if err := informer.AddEventHandler(newResourceEventHandler(watchCh)); err != nil {
// This should only ever fail if we add an event handler after the
// informer has been started already, which would be a programming bug.
panic(err)
}
return informer
}
// GetIngresses returns all Ingresses for observed namespaces in the cluster.
func (c *clientImpl) GetIngresses() []*v1beta1.Ingress {
var result []*v1beta1.Ingress
for _, store := range c.ingStores {
for _, obj := range store.List() {
ing := obj.(*v1beta1.Ingress)
result = append(result, ing)
}
}
return result
}
// WatchIngresses starts the watch of Provider Ingresses resources and updates the corresponding store
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := NewListWatchFromClient(
c.clientset.ExtensionsV1beta1().RESTClient(),
"ingresses",
api.NamespaceAll,
fields.Everything(),
labelSelector)
c.ingStore, c.ingController = cache.NewInformer(
source,
&v1beta1.Ingress{},
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
safe.Go(func() {
c.ingController.Run(stopCh)
})
}
// eventHandlerFunc will pass the obj on to the events channel or drop it
// This is so passing the events along won't block in the case of high volume
// The events are only used for signalling anyway so dropping a few is ok
func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
select {
case events <- obj:
default:
}
}
func newResourceEventHandlerFuncs(events chan<- interface{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { eventHandlerFunc(events, obj) },
UpdateFunc: func(old, new interface{}) { eventHandlerFunc(events, new) },
DeleteFunc: func(obj interface{}) { eventHandlerFunc(events, obj) },
}
}
// GetService returns the named service from the named namespace
// GetService returns the named service from the given namespace.
func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, error) {
var service *v1.Service
item, exists, err := c.svcStore.GetByKey(namespace + "/" + name)
item, exists, err := c.svcStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
if item != nil {
service = item.(*v1.Service)
}
@ -160,39 +254,10 @@ func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, erro
return service, exists, err
}
func (c *clientImpl) GetSecret(namespace, name string) (*v1.Secret, bool, error) {
var secret *v1.Secret
item, exists, err := c.secStore.GetByKey(namespace + "/" + name)
if err == nil && item != nil {
secret = item.(*v1.Secret)
}
return secret, exists, err
}
// WatchServices starts the watch of Provider Service resources and updates the corresponding store
func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
"services",
api.NamespaceAll,
fields.Everything())
c.svcStore, c.svcController = cache.NewInformer(
source,
&v1.Service{},
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
safe.Go(func() {
c.svcController.Run(stopCh)
})
}
// GetEndpoints returns the named Endpoints
// Endpoints have the same name as the coresponding service
// GetEndpoints returns the named endpoints from the given namespace.
func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) {
var endpoint *v1.Endpoints
item, exists, err := c.epStore.GetByKey(namespace + "/" + name)
item, exists, err := c.epStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
if item != nil {
endpoint = item.(*v1.Endpoints)
@ -201,99 +266,33 @@ func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool,
return endpoint, exists, err
}
// WatchEndpoints starts the watch of Provider Endpoints resources and updates the corresponding store
func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
"endpoints",
api.NamespaceAll,
fields.Everything())
c.epStore, c.epController = cache.NewInformer(
source,
&v1.Endpoints{},
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
safe.Go(func() {
c.epController.Run(stopCh)
})
}
func (c *clientImpl) WatchSecrets(watchCh chan<- interface{}, stopCh <-chan struct{}) {
source := cache.NewListWatchFromClient(
c.clientset.CoreV1().RESTClient(),
"secrets",
api.NamespaceAll,
fields.Everything())
c.secStore, c.secController = cache.NewInformer(
source,
&v1.Secret{},
resyncPeriod,
newResourceEventHandlerFuncs(watchCh))
safe.Go(func() {
c.secController.Run(stopCh)
})
}
// WatchAll returns events in the cluster and updates the stores via informer
// Filters ingresses by labelSelector
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-chan interface{}, error) {
watchCh := make(chan interface{}, 1)
eventCh := make(chan interface{}, 1)
kubeLabelSelector, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
// GetSecret returns the named secret from the given namespace.
func (c *clientImpl) GetSecret(namespace, name string) (*v1.Secret, bool, error) {
var secret *v1.Secret
item, exists, err := c.secStores[c.lookupNamespace(namespace)].GetByKey(namespace + "/" + name)
if err == nil && item != nil {
secret = item.(*v1.Secret)
}
c.WatchIngresses(kubeLabelSelector, eventCh, stopCh)
c.WatchServices(eventCh, stopCh)
c.WatchEndpoints(eventCh, stopCh)
c.WatchSecrets(eventCh, stopCh)
safe.Go(func() {
defer close(watchCh)
defer close(eventCh)
for {
select {
case <-stopCh:
return
case event := <-eventCh:
c.fireEvent(event, watchCh)
}
}
})
return watchCh, nil
return secret, exists, err
}
// fireEvent checks if all controllers have synced before firing
// Used after startup or a reconnect
func (c *clientImpl) fireEvent(event interface{}, eventCh chan interface{}) {
if !c.ingController.HasSynced() || !c.svcController.HasSynced() || !c.epController.HasSynced() {
return
// lookupNamespace returns the lookup namespace key for the given namespace.
// When listening on all namespaces, it returns the client-go identifier ("")
// for all-namespaces. Otherwise, it returns the given namespace.
// The distinction is necessary because we index all informers on the special
// identifier iff all-namespaces are requested but receive specific namespace
// identifiers from the Kubernetes API, so we have to bridge this gap.
func (c *clientImpl) lookupNamespace(ns string) string {
if c.isNamespaceAll {
return api.NamespaceAll
}
eventHandlerFunc(eventCh, event)
return ns
}
// HasNamespace checks if the ingress is in one of the namespaces
func HasNamespace(ingress *v1beta1.Ingress, namespaces Namespaces) bool {
if len(namespaces) == 0 {
return true
}
for _, n := range namespaces {
if ingress.ObjectMeta.Namespace == n {
return true
}
}
return false
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, field selector and label selector.
// Extends cache.NewListWatchFromClient to support labelSelector
func NewListWatchFromClient(c cache.Getter, resource string, namespace string, fieldSelector fields.Selector, labelSelector labels.Selector) *cache.ListWatch {
// newListWatchFromClientWithLabelSelector creates a new ListWatch from the given parameters.
// It extends cache.NewListWatchFromClient to support label selectors.
func newListWatchFromClientWithLabelSelector(c cache.Getter, resource string, namespace string, fieldSelector fields.Selector, labelSelector labels.Selector) *cache.ListWatch {
listFunc := func(options api.ListOptions) (runtime.Object, error) {
return c.Get().
Namespace(namespace).
@ -316,3 +315,17 @@ func NewListWatchFromClient(c cache.Getter, resource string, namespace string, f
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
func newResourceEventHandler(events chan<- interface{}) cache.ResourceEventHandler {
return &resourceEventHandler{events}
}
// eventHandlerFunc will pass the obj on to the events channel or drop it.
// This is so passing the events along won't block in the case of high volume.
// The events are only used for signalling anyway so dropping a few is ok.
func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
select {
case events <- obj:
default:
}
}

View file

@ -88,7 +88,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
log.Debugf("Using label selector: '%s'", p.LabelSelector)
eventsChan, err := k8sClient.WatchAll(p.LabelSelector, stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, p.LabelSelector, stopWatch)
if err != nil {
log.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second)
@ -104,13 +104,13 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
case <-stop:
return nil
case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event)
log.Debugf("Received Kubernetes event kind %T", event)
templateObjects, err := p.loadIngresses(k8sClient)
if err != nil {
return err
}
if reflect.DeepEqual(p.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping event from kubernetes %+v", event)
log.Debugf("Skipping Kubernetes event kind %T", event)
} else {
p.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
@ -136,7 +136,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
}
func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error) {
ingresses := k8sClient.GetIngresses(p.Namespaces)
ingresses := k8sClient.GetIngresses()
templateObjects := types.Configuration{
Backends: map[string]*types.Backend{},

View file

@ -612,451 +612,6 @@ func TestOnlyReferencesServicesFromOwnNamespace(t *testing.T) {
assert.Equal(t, expected, actual)
}
func TestLoadNamespacedIngresses(t *testing.T) {
ingresses := []*v1beta1.Ingress{
{
ObjectMeta: v1.ObjectMeta{
Namespace: "awesome",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "foo",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/bar",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(801),
},
},
},
},
},
},
{
Host: "bar",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
ServiceName: "service3",
ServicePort: intstr.FromInt(443),
},
},
{
Backend: v1beta1.IngressBackend{
ServiceName: "service2",
ServicePort: intstr.FromInt(802),
},
},
},
},
},
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "not-awesome",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "baz",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/baz",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(801),
},
},
},
},
},
},
},
},
},
}
services := []*v1.Service{
{
ObjectMeta: v1.ObjectMeta{
Namespace: "awesome",
Name: "service1",
UID: "1",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "service1",
Namespace: "not-awesome",
UID: "1",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "service2",
Namespace: "awesome",
UID: "2",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.2",
Ports: []v1.ServicePort{
{
Port: 802,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "service3",
Namespace: "awesome",
UID: "3",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.3",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 443,
},
},
},
},
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
watchChan: watchChan,
}
provider := Provider{
Namespaces: []string{"awesome"},
}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
}
expected := &types.Configuration{
Backends: map[string]*types.Backend{
"foo/bar": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Sticky: false,
Method: "wrr",
},
},
"bar": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Sticky: false,
Method: "wrr",
},
},
},
Frontends: map[string]*types.Frontend{
"foo/bar": {
Backend: "foo/bar",
PassHostHeader: true,
Routes: map[string]types.Route{
"/bar": {
Rule: "PathPrefix:/bar",
},
"foo": {
Rule: "Host:foo",
},
},
},
"bar": {
Backend: "bar",
PassHostHeader: true,
Routes: map[string]types.Route{
"bar": {
Rule: "Host:bar",
},
},
},
},
}
assert.Equal(t, expected, actual)
}
func TestLoadMultipleNamespacedIngresses(t *testing.T) {
ingresses := []*v1beta1.Ingress{
{
ObjectMeta: v1.ObjectMeta{
Namespace: "awesome",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "foo",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/bar",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(801),
},
},
},
},
},
},
{
Host: "bar",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Backend: v1beta1.IngressBackend{
ServiceName: "service3",
ServicePort: intstr.FromInt(443),
},
},
{
Backend: v1beta1.IngressBackend{
ServiceName: "service2",
ServicePort: intstr.FromInt(802),
},
},
},
},
},
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "somewhat-awesome",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "awesome",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/quix",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(801),
},
},
},
},
},
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "not-awesome",
},
Spec: v1beta1.IngressSpec{
Rules: []v1beta1.IngressRule{
{
Host: "baz",
IngressRuleValue: v1beta1.IngressRuleValue{
HTTP: &v1beta1.HTTPIngressRuleValue{
Paths: []v1beta1.HTTPIngressPath{
{
Path: "/baz",
Backend: v1beta1.IngressBackend{
ServiceName: "service1",
ServicePort: intstr.FromInt(801),
},
},
},
},
},
},
},
},
},
}
services := []*v1.Service{
{
ObjectMeta: v1.ObjectMeta{
Name: "service1",
Namespace: "awesome",
UID: "1",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "somewhat-awesome",
Name: "service1",
UID: "17",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.4",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 801,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "awesome",
Name: "service2",
UID: "2",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.2",
Ports: []v1.ServicePort{
{
Port: 802,
},
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Namespace: "awesome",
Name: "service3",
UID: "3",
},
Spec: v1.ServiceSpec{
ClusterIP: "10.0.0.3",
Ports: []v1.ServicePort{
{
Name: "http",
Port: 443,
},
},
},
},
}
watchChan := make(chan interface{})
client := clientMock{
ingresses: ingresses,
services: services,
watchChan: watchChan,
}
provider := Provider{
Namespaces: []string{"awesome", "somewhat-awesome"},
}
actual, err := provider.loadIngresses(client)
if err != nil {
t.Fatalf("error %+v", err)
}
expected := &types.Configuration{
Backends: map[string]*types.Backend{
"foo/bar": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Sticky: false,
Method: "wrr",
},
},
"bar": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Sticky: false,
Method: "wrr",
},
},
"awesome/quix": {
Servers: map[string]types.Server{},
CircuitBreaker: nil,
LoadBalancer: &types.LoadBalancer{
Sticky: false,
Method: "wrr",
},
},
},
Frontends: map[string]*types.Frontend{
"foo/bar": {
Backend: "foo/bar",
PassHostHeader: true,
Routes: map[string]types.Route{
"/bar": {
Rule: "PathPrefix:/bar",
},
"foo": {
Rule: "Host:foo",
},
},
},
"bar": {
Backend: "bar",
PassHostHeader: true,
Routes: map[string]types.Route{
"bar": {
Rule: "Host:bar",
},
},
},
"awesome/quix": {
Backend: "awesome/quix",
PassHostHeader: true,
Routes: map[string]types.Route{
"/quix": {
Rule: "PathPrefix:/quix",
},
"awesome": {
Rule: "Host:awesome",
},
},
},
},
}
assert.Equal(t, expected, actual)
}
func TestHostlessIngress(t *testing.T) {
ingresses := []*v1beta1.Ingress{{
ObjectMeta: v1.ObjectMeta{
@ -2352,15 +1907,8 @@ type clientMock struct {
apiEndpointsError error
}
func (c clientMock) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
result := make([]*v1beta1.Ingress, 0, len(c.ingresses))
for _, ingress := range c.ingresses {
if HasNamespace(ingress, namespaces) {
result = append(result, ingress)
}
}
return result
func (c clientMock) GetIngresses() []*v1beta1.Ingress {
return c.ingresses
}
func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error) {
@ -2376,19 +1924,6 @@ func (c clientMock) GetService(namespace, name string) (*v1.Service, bool, error
return nil, false, nil
}
func (c clientMock) GetSecret(namespace, name string) (*v1.Secret, bool, error) {
if c.apiSecretError != nil {
return nil, false, c.apiSecretError
}
for _, secret := range c.secrets {
if secret.Namespace == namespace && secret.Name == name {
return secret, true, nil
}
}
return nil, false, nil
}
func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) {
if c.apiEndpointsError != nil {
return nil, false, c.apiEndpointsError
@ -2403,6 +1938,19 @@ func (c clientMock) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, e
return &v1.Endpoints{}, false, nil
}
func (c clientMock) WatchAll(labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) {
func (c clientMock) GetSecret(namespace, name string) (*v1.Secret, bool, error) {
if c.apiSecretError != nil {
return nil, false, c.apiSecretError
}
for _, secret := range c.secrets {
if secret.Namespace == namespace && secret.Name == name {
return secret, true, nil
}
}
return nil, false, nil
}
func (c clientMock) WatchAll(namespaces Namespaces, labelString string, stopCh <-chan struct{}) (<-chan interface{}, error) {
return c.watchChan, nil
}