Fixes #363: Allow for kubernetes label selectors

This commit is contained in:
Parham Negahdar 2016-07-12 01:25:01 -04:00
parent 40c0ed092e
commit 9f6484a328
6 changed files with 74 additions and 42 deletions

View file

@ -273,6 +273,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
var defaultKubernetes provider.Kubernetes var defaultKubernetes provider.Kubernetes
defaultKubernetes.Watch = true defaultKubernetes.Watch = true
defaultKubernetes.Endpoint = "" defaultKubernetes.Endpoint = ""
defaultKubernetes.LabelSelector = ""
defaultKubernetes.Constraints = []types.Constraint{} defaultKubernetes.Constraints = []types.Constraint{}
defaultConfiguration := GlobalConfiguration{ defaultConfiguration := GlobalConfiguration{

View file

@ -665,6 +665,10 @@ Træfɪk can be configured to use Kubernetes Ingress as a backend configuration:
# #
# endpoint = "http://localhost:8080" # endpoint = "http://localhost:8080"
# namespaces = ["default","production"] # namespaces = ["default","production"]
#
# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering
# labelselector = "A and not B"
#
``` ```
Annotations can be used on containers to override default behaviour for the whole Ingress resource: Annotations can be used on containers to override default behaviour for the whole Ingress resource:

View file

@ -5,6 +5,7 @@ import (
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
"fmt" "fmt"
log "github.com/Sirupsen/logrus"
"github.com/parnurzeal/gorequest" "github.com/parnurzeal/gorequest"
"net/http" "net/http"
"net/url" "net/url"
@ -21,10 +22,10 @@ const (
// Client is a client for the Kubernetes master. // Client is a client for the Kubernetes master.
type Client interface { type Client interface {
GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error)
GetService(name, namespace string) (Service, error) GetService(name, namespace string) (Service, error)
GetEndpoints(name, namespace string) (Endpoints, error) GetEndpoints(name, namespace string) (Endpoints, error)
WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error)
} }
type clientImpl struct { type clientImpl struct {
@ -50,11 +51,26 @@ func NewClient(baseURL string, caCert []byte, token string) (Client, error) {
}, nil }, nil
} }
// GetIngresses returns all ingresses in the cluster func makeQueryString(baseParams map[string]string, labelSelector string) (string, error) {
func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) { if labelSelector != "" {
getURL := c.endpointURL + extentionsEndpoint + defaultIngress baseParams["labelSelector"] = labelSelector
}
queryData, err := json.Marshal(baseParams)
if err != nil {
return "", err
}
return string(queryData), nil
}
body, err := c.do(c.request(getURL)) // GetIngresses returns all ingresses in the cluster
func (c *clientImpl) GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error) {
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
queryParams := map[string]string{}
queryData, err := makeQueryString(queryParams, labelSelector)
if err != nil {
return nil, fmt.Errorf("Had problems constructing query string %s : %v", queryParams, err)
}
body, err := c.do(c.request(getURL, queryData))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create ingresses request: GET %q : %v", getURL, err) return nil, fmt.Errorf("failed to create ingresses request: GET %q : %v", getURL, err)
} }
@ -73,16 +89,16 @@ func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, erro
} }
// WatchIngresses returns all ingresses in the cluster // WatchIngresses returns all ingresses in the cluster
func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchIngresses(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + extentionsEndpoint + defaultIngress getURL := c.endpointURL + extentionsEndpoint + defaultIngress
return c.watch(getURL, stopCh) return c.watch(getURL, labelSelector, stopCh)
} }
// GetService returns the named service from the named namespace // GetService returns the named service from the named namespace
func (c *clientImpl) GetService(name, namespace string) (Service, error) { func (c *clientImpl) GetService(name, namespace string) (Service, error) {
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name
body, err := c.do(c.request(getURL)) body, err := c.do(c.request(getURL, ""))
if err != nil { if err != nil {
return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err)
} }
@ -95,9 +111,9 @@ func (c *clientImpl) GetService(name, namespace string) (Service, error) {
} }
// WatchServices returns all services in the cluster // WatchServices returns all services in the cluster
func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchServices(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/services" getURL := c.endpointURL + APIEndpoint + "/services"
return c.watch(getURL, stopCh) return c.watch(getURL, labelSelector, stopCh)
} }
// GetEndpoints returns the named Endpoints // GetEndpoints returns the named Endpoints
@ -105,7 +121,7 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e
func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name
body, err := c.do(c.request(getURL)) body, err := c.do(c.request(getURL, ""))
if err != nil { if err != nil {
return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err) return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err)
} }
@ -118,28 +134,28 @@ func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
} }
// WatchEndpoints returns endpoints in the cluster // WatchEndpoints returns endpoints in the cluster
func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchEndpoints(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/endpoints" getURL := c.endpointURL + APIEndpoint + "/endpoints"
return c.watch(getURL, stopCh) return c.watch(getURL, labelSelector, stopCh)
} }
// WatchAll returns events in the cluster // WatchAll returns events in the cluster
func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}, 10) watchCh := make(chan interface{}, 10)
errCh := make(chan error, 10) errCh := make(chan error, 10)
stopIngresses := make(chan bool) stopIngresses := make(chan bool)
chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses) chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
stopServices := make(chan bool) stopServices := make(chan bool)
chanServices, chanServicesErr, err := c.WatchServices(stopServices) chanServices, chanServicesErr, err := c.WatchServices(labelSelector, stopServices)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
stopEndpoints := make(chan bool) stopEndpoints := make(chan bool)
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(labelSelector, stopEndpoints)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
@ -188,22 +204,26 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
return body, nil return body, nil
} }
func (c *clientImpl) request(url string) *gorequest.SuperAgent { func (c *clientImpl) request(reqURL string, queryContent interface{}) *gorequest.SuperAgent {
// Make request to Kubernetes API // Make request to Kubernetes API
request := gorequest.New().Get(url) parsedURL, parseErr := url.Parse(reqURL)
if parseErr != nil {
log.Errorf("Had issues parsing url %s. Trying anyway.", reqURL)
}
request := gorequest.New().Get(reqURL)
request.Transport.DisableKeepAlives = true request.Transport.DisableKeepAlives = true
if strings.HasPrefix(url, "http://") { if parsedURL.Scheme == "https" {
return request
}
if len(c.token) > 0 {
request.Header["Authorization"] = "Bearer " + c.token
pool := x509.NewCertPool() pool := x509.NewCertPool()
pool.AppendCertsFromPEM(c.caCert) pool.AppendCertsFromPEM(c.caCert)
c.tls = &tls.Config{RootCAs: pool} c.tls = &tls.Config{RootCAs: pool}
request.TLSClientConfig(c.tls)
} }
return request.TLSClientConfig(c.tls) if len(c.token) > 0 {
request.Header["Authorization"] = "Bearer " + c.token
}
request.Query(queryContent)
return request
} }
// GenericObject generic object // GenericObject generic object
@ -212,12 +232,12 @@ type GenericObject struct {
ListMeta `json:"metadata,omitempty"` ListMeta `json:"metadata,omitempty"`
} }
func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
watchCh := make(chan interface{}, 10) watchCh := make(chan interface{}, 10)
errCh := make(chan error, 10) errCh := make(chan error, 10)
// get version // get version
body, err := c.do(c.request(url)) body, err := c.do(c.request(url, ""))
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to do version request: GET %q : %v", url, err) return watchCh, errCh, fmt.Errorf("failed to do version request: GET %q : %v", url, err)
} }
@ -227,10 +247,12 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch
return watchCh, errCh, fmt.Errorf("failed to decode version %v", err) return watchCh, errCh, fmt.Errorf("failed to decode version %v", err)
} }
resourceVersion := generic.ResourceVersion resourceVersion := generic.ResourceVersion
queryParams := map[string]string{"watch": "", "resourceVersion": resourceVersion}
url = url + "?watch&resourceVersion=" + resourceVersion queryData, err := makeQueryString(queryParams, labelSelector)
// Make request to Kubernetes API if err != nil {
request := c.request(url) return watchCh, errCh, fmt.Errorf("Unable to construct query args")
}
request := c.request(url, queryData)
req, err := request.MakeRequest() req, err := request.MakeRequest()
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)

View file

@ -20,7 +20,7 @@ import (
const ( const (
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
defaultKubeEndpoint = "http://127.0.0.1:8080" defaultKubeEndpoint = "http://127.0.0.1:8080"
) )
// Namespaces holds kubernetes namespaces // Namespaces holds kubernetes namespaces
@ -55,6 +55,7 @@ type Kubernetes struct {
Endpoint string `description:"Kubernetes server endpoint"` Endpoint string `description:"Kubernetes server endpoint"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"` DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
Namespaces Namespaces `description:"Kubernetes namespaces"` Namespaces Namespaces `description:"Kubernetes namespaces"`
LabelSelector string `description:"Kubernetes api label selector to use"`
lastConfiguration safe.Safe lastConfiguration safe.Safe
} }
@ -103,7 +104,8 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
for { for {
stopWatch := make(chan bool, 5) stopWatch := make(chan bool, 5)
defer close(stopWatch) defer close(stopWatch)
eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch) log.Debugf("Using lable selector: %s", provider.LabelSelector)
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
if err != nil { if err != nil {
log.Errorf("Error watching kubernetes events: %v", err) log.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second) timer := time.NewTimer(1 * time.Second)
@ -174,7 +176,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
} }
func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) { func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) {
ingresses, err := k8sClient.GetIngresses(func(ingress k8s.Ingress) bool { ingresses, err := k8sClient.GetIngresses(provider.LabelSelector, func(ingress k8s.Ingress) bool {
if len(provider.Namespaces) == 0 { if len(provider.Namespaces) == 0 {
return true return true
} }

View file

@ -1250,7 +1250,7 @@ type clientMock struct {
watchChan chan interface{} watchChan chan interface{}
} }
func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) { func (c clientMock) GetIngresses(labelString string, predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) {
var ingresses []k8s.Ingress var ingresses []k8s.Ingress
for _, ingress := range c.ingresses { for _, ingress := range c.ingresses {
if predicate(ingress) { if predicate(ingress) {
@ -1259,7 +1259,7 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres
} }
return ingresses, nil return ingresses, nil
} }
func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { func (c clientMock) WatchIngresses(labelString string, predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) {
return c.watchChan, make(chan error), nil return c.watchChan, make(chan error), nil
} }
func (c clientMock) GetService(name, namespace string) (k8s.Service, error) { func (c clientMock) GetService(name, namespace string) (k8s.Service, error) {
@ -1280,6 +1280,6 @@ func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error)
return k8s.Endpoints{}, nil return k8s.Endpoints{}, nil
} }
func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, chan error, error) {
return c.watchChan, make(chan error), nil return c.watchChan, make(chan error), nil
} }

View file

@ -75,7 +75,7 @@
# storageFile = "acme.json" # storageFile = "acme.json"
# Entrypoint to proxy acme challenge to. # Entrypoint to proxy acme challenge to.
# WARNING, must point to an entrypoint on port 443 # WARNING, must point to an entrypoint on port 443
# #
# Required # Required
# #
@ -340,11 +340,14 @@
# and KUBERNETES_SERVICE_PORT_HTTPS as endpoint # and KUBERNETES_SERVICE_PORT_HTTPS as endpoint
# Secure token will be found in /var/run/secrets/kubernetes.io/serviceaccount/token # Secure token will be found in /var/run/secrets/kubernetes.io/serviceaccount/token
# and SSL CA cert in /var/run/secrets/kubernetes.io/serviceaccount/ca.crt # and SSL CA cert in /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
# #
# Optional # Optional
# #
# endpoint = "http://localhost:8080" # endpoint = "http://localhost:8080"
# namespaces = ["default"] # namespaces = ["default"]
#
# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering
# labelselector = "A and not B"
################################################################ ################################################################
# Consul KV configuration backend # Consul KV configuration backend
@ -545,4 +548,4 @@
# [frontends.frontend3] # [frontends.frontend3]
# entrypoints = ["http", "https"] # overrides defaultEntryPoints # entrypoints = ["http", "https"] # overrides defaultEntryPoints
# backend = "backend2" # backend = "backend2"
# rule = "Path: /test, /other" # rule = "Path: /test, /other"