Merge pull request #516 from pnegahdar/selector
Implement Kubernetes Selectors, minor kube endpoint fix
This commit is contained in:
commit
94fa95d747
6 changed files with 82 additions and 43 deletions
|
@ -272,7 +272,8 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
|
||||||
//default Kubernetes
|
//default Kubernetes
|
||||||
var defaultKubernetes provider.Kubernetes
|
var defaultKubernetes provider.Kubernetes
|
||||||
defaultKubernetes.Watch = true
|
defaultKubernetes.Watch = true
|
||||||
defaultKubernetes.Endpoint = "http://127.0.0.1:8080"
|
defaultKubernetes.Endpoint = ""
|
||||||
|
defaultKubernetes.LabelSelector = ""
|
||||||
defaultKubernetes.Constraints = []types.Constraint{}
|
defaultKubernetes.Constraints = []types.Constraint{}
|
||||||
|
|
||||||
defaultConfiguration := GlobalConfiguration{
|
defaultConfiguration := GlobalConfiguration{
|
||||||
|
|
|
@ -665,6 +665,10 @@ Træfɪk can be configured to use Kubernetes Ingress as a backend configuration:
|
||||||
#
|
#
|
||||||
# endpoint = "http://localhost:8080"
|
# endpoint = "http://localhost:8080"
|
||||||
# namespaces = ["default","production"]
|
# namespaces = ["default","production"]
|
||||||
|
#
|
||||||
|
# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering
|
||||||
|
# labelselector = "A and not B"
|
||||||
|
#
|
||||||
```
|
```
|
||||||
|
|
||||||
Annotations can be used on containers to override default behaviour for the whole Ingress resource:
|
Annotations can be used on containers to override default behaviour for the whole Ingress resource:
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
log "github.com/Sirupsen/logrus"
|
||||||
"github.com/parnurzeal/gorequest"
|
"github.com/parnurzeal/gorequest"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -21,10 +22,10 @@ const (
|
||||||
|
|
||||||
// Client is a client for the Kubernetes master.
|
// Client is a client for the Kubernetes master.
|
||||||
type Client interface {
|
type Client interface {
|
||||||
GetIngresses(predicate func(Ingress) bool) ([]Ingress, error)
|
GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error)
|
||||||
GetService(name, namespace string) (Service, error)
|
GetService(name, namespace string) (Service, error)
|
||||||
GetEndpoints(name, namespace string) (Endpoints, error)
|
GetEndpoints(name, namespace string) (Endpoints, error)
|
||||||
WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error)
|
WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type clientImpl struct {
|
type clientImpl struct {
|
||||||
|
@ -50,11 +51,26 @@ func NewClient(baseURL string, caCert []byte, token string) (Client, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetIngresses returns all ingresses in the cluster
|
func makeQueryString(baseParams map[string]string, labelSelector string) (string, error) {
|
||||||
func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) {
|
if labelSelector != "" {
|
||||||
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
|
baseParams["labelSelector"] = labelSelector
|
||||||
|
}
|
||||||
|
queryData, err := json.Marshal(baseParams)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return string(queryData), nil
|
||||||
|
}
|
||||||
|
|
||||||
body, err := c.do(c.request(getURL))
|
// GetIngresses returns all ingresses in the cluster
|
||||||
|
func (c *clientImpl) GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error) {
|
||||||
|
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
|
||||||
|
queryParams := map[string]string{}
|
||||||
|
queryData, err := makeQueryString(queryParams, labelSelector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Had problems constructing query string %s : %v", queryParams, err)
|
||||||
|
}
|
||||||
|
body, err := c.do(c.request(getURL, queryData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create ingresses request: GET %q : %v", getURL, err)
|
return nil, fmt.Errorf("failed to create ingresses request: GET %q : %v", getURL, err)
|
||||||
}
|
}
|
||||||
|
@ -73,16 +89,16 @@ func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchIngresses returns all ingresses in the cluster
|
// WatchIngresses returns all ingresses in the cluster
|
||||||
func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c *clientImpl) WatchIngresses(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
|
getURL := c.endpointURL + extentionsEndpoint + defaultIngress
|
||||||
return c.watch(getURL, stopCh)
|
return c.watch(getURL, labelSelector, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetService returns the named service from the named namespace
|
// GetService returns the named service from the named namespace
|
||||||
func (c *clientImpl) GetService(name, namespace string) (Service, error) {
|
func (c *clientImpl) GetService(name, namespace string) (Service, error) {
|
||||||
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name
|
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name
|
||||||
|
|
||||||
body, err := c.do(c.request(getURL))
|
body, err := c.do(c.request(getURL, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err)
|
return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err)
|
||||||
}
|
}
|
||||||
|
@ -95,9 +111,9 @@ func (c *clientImpl) GetService(name, namespace string) (Service, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchServices returns all services in the cluster
|
// WatchServices returns all services in the cluster
|
||||||
func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c *clientImpl) WatchServices(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
getURL := c.endpointURL + APIEndpoint + "/services"
|
getURL := c.endpointURL + APIEndpoint + "/services"
|
||||||
return c.watch(getURL, stopCh)
|
return c.watch(getURL, labelSelector, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEndpoints returns the named Endpoints
|
// GetEndpoints returns the named Endpoints
|
||||||
|
@ -105,7 +121,7 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e
|
||||||
func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
|
func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
|
||||||
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name
|
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name
|
||||||
|
|
||||||
body, err := c.do(c.request(getURL))
|
body, err := c.do(c.request(getURL, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err)
|
return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err)
|
||||||
}
|
}
|
||||||
|
@ -118,28 +134,28 @@ func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchEndpoints returns endpoints in the cluster
|
// WatchEndpoints returns endpoints in the cluster
|
||||||
func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c *clientImpl) WatchEndpoints(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
getURL := c.endpointURL + APIEndpoint + "/endpoints"
|
getURL := c.endpointURL + APIEndpoint + "/endpoints"
|
||||||
return c.watch(getURL, stopCh)
|
return c.watch(getURL, labelSelector, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchAll returns events in the cluster
|
// WatchAll returns events in the cluster
|
||||||
func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
watchCh := make(chan interface{}, 10)
|
watchCh := make(chan interface{}, 10)
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
|
|
||||||
stopIngresses := make(chan bool)
|
stopIngresses := make(chan bool)
|
||||||
chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses)
|
chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||||
}
|
}
|
||||||
stopServices := make(chan bool)
|
stopServices := make(chan bool)
|
||||||
chanServices, chanServicesErr, err := c.WatchServices(stopServices)
|
chanServices, chanServicesErr, err := c.WatchServices(labelSelector, stopServices)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||||
}
|
}
|
||||||
stopEndpoints := make(chan bool)
|
stopEndpoints := make(chan bool)
|
||||||
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints)
|
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(labelSelector, stopEndpoints)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -188,22 +204,26 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) {
|
||||||
return body, nil
|
return body, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientImpl) request(url string) *gorequest.SuperAgent {
|
func (c *clientImpl) request(reqURL string, queryContent interface{}) *gorequest.SuperAgent {
|
||||||
// Make request to Kubernetes API
|
// Make request to Kubernetes API
|
||||||
request := gorequest.New().Get(url)
|
parsedURL, parseErr := url.Parse(reqURL)
|
||||||
|
if parseErr != nil {
|
||||||
|
log.Errorf("Had issues parsing url %s. Trying anyway.", reqURL)
|
||||||
|
}
|
||||||
|
request := gorequest.New().Get(reqURL)
|
||||||
request.Transport.DisableKeepAlives = true
|
request.Transport.DisableKeepAlives = true
|
||||||
|
|
||||||
if strings.HasPrefix(url, "http://") {
|
if parsedURL.Scheme == "https" {
|
||||||
return request
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(c.token) > 0 {
|
|
||||||
request.Header["Authorization"] = "Bearer " + c.token
|
|
||||||
pool := x509.NewCertPool()
|
pool := x509.NewCertPool()
|
||||||
pool.AppendCertsFromPEM(c.caCert)
|
pool.AppendCertsFromPEM(c.caCert)
|
||||||
c.tls = &tls.Config{RootCAs: pool}
|
c.tls = &tls.Config{RootCAs: pool}
|
||||||
|
request.TLSClientConfig(c.tls)
|
||||||
}
|
}
|
||||||
return request.TLSClientConfig(c.tls)
|
if len(c.token) > 0 {
|
||||||
|
request.Header["Authorization"] = "Bearer " + c.token
|
||||||
|
}
|
||||||
|
request.Query(queryContent)
|
||||||
|
return request
|
||||||
}
|
}
|
||||||
|
|
||||||
// GenericObject generic object
|
// GenericObject generic object
|
||||||
|
@ -212,12 +232,12 @@ type GenericObject struct {
|
||||||
ListMeta `json:"metadata,omitempty"`
|
ListMeta `json:"metadata,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
watchCh := make(chan interface{}, 10)
|
watchCh := make(chan interface{}, 10)
|
||||||
errCh := make(chan error, 10)
|
errCh := make(chan error, 10)
|
||||||
|
|
||||||
// get version
|
// get version
|
||||||
body, err := c.do(c.request(url))
|
body, err := c.do(c.request(url, ""))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return watchCh, errCh, fmt.Errorf("failed to do version request: GET %q : %v", url, err)
|
return watchCh, errCh, fmt.Errorf("failed to do version request: GET %q : %v", url, err)
|
||||||
}
|
}
|
||||||
|
@ -227,10 +247,12 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch
|
||||||
return watchCh, errCh, fmt.Errorf("failed to decode version %v", err)
|
return watchCh, errCh, fmt.Errorf("failed to decode version %v", err)
|
||||||
}
|
}
|
||||||
resourceVersion := generic.ResourceVersion
|
resourceVersion := generic.ResourceVersion
|
||||||
|
queryParams := map[string]string{"watch": "", "resourceVersion": resourceVersion}
|
||||||
url = url + "?watch&resourceVersion=" + resourceVersion
|
queryData, err := makeQueryString(queryParams, labelSelector)
|
||||||
// Make request to Kubernetes API
|
if err != nil {
|
||||||
request := c.request(url)
|
return watchCh, errCh, fmt.Errorf("Unable to construct query args")
|
||||||
|
}
|
||||||
|
request := c.request(url, queryData)
|
||||||
req, err := request.MakeRequest()
|
req, err := request.MakeRequest()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
|
return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err)
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
const (
|
const (
|
||||||
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||||
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
||||||
|
defaultKubeEndpoint = "http://127.0.0.1:8080"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Namespaces holds kubernetes namespaces
|
// Namespaces holds kubernetes namespaces
|
||||||
|
@ -54,6 +55,7 @@ type Kubernetes struct {
|
||||||
Endpoint string `description:"Kubernetes server endpoint"`
|
Endpoint string `description:"Kubernetes server endpoint"`
|
||||||
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
|
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
|
||||||
Namespaces Namespaces `description:"Kubernetes namespaces"`
|
Namespaces Namespaces `description:"Kubernetes namespaces"`
|
||||||
|
LabelSelector string `description:"Kubernetes api label selector to use"`
|
||||||
lastConfiguration safe.Safe
|
lastConfiguration safe.Safe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,9 +76,15 @@ func (provider *Kubernetes) createClient() (k8s.Client, error) {
|
||||||
}
|
}
|
||||||
kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST")
|
kubernetesHost := os.Getenv("KUBERNETES_SERVICE_HOST")
|
||||||
kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")
|
kubernetesPort := os.Getenv("KUBERNETES_SERVICE_PORT_HTTPS")
|
||||||
if len(kubernetesPort) > 0 && len(kubernetesHost) > 0 {
|
// Prioritize user provided kubernetes endpoint since kube container runtime will almost always have it
|
||||||
|
if provider.Endpoint == "" && len(kubernetesPort) > 0 && len(kubernetesHost) > 0 {
|
||||||
|
log.Debugf("Using environment provided kubernetes endpoint")
|
||||||
provider.Endpoint = "https://" + kubernetesHost + ":" + kubernetesPort
|
provider.Endpoint = "https://" + kubernetesHost + ":" + kubernetesPort
|
||||||
}
|
}
|
||||||
|
if provider.Endpoint == "" {
|
||||||
|
log.Debugf("Using default kubernetes api endpoint")
|
||||||
|
provider.Endpoint = defaultKubeEndpoint
|
||||||
|
}
|
||||||
log.Debugf("Kubernetes endpoint: %s", provider.Endpoint)
|
log.Debugf("Kubernetes endpoint: %s", provider.Endpoint)
|
||||||
return k8s.NewClient(provider.Endpoint, caCert, token)
|
return k8s.NewClient(provider.Endpoint, caCert, token)
|
||||||
}
|
}
|
||||||
|
@ -96,7 +104,8 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
||||||
for {
|
for {
|
||||||
stopWatch := make(chan bool, 5)
|
stopWatch := make(chan bool, 5)
|
||||||
defer close(stopWatch)
|
defer close(stopWatch)
|
||||||
eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch)
|
log.Debugf("Using lable selector: %s", provider.LabelSelector)
|
||||||
|
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error watching kubernetes events: %v", err)
|
log.Errorf("Error watching kubernetes events: %v", err)
|
||||||
timer := time.NewTimer(1 * time.Second)
|
timer := time.NewTimer(1 * time.Second)
|
||||||
|
@ -167,7 +176,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) {
|
func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) {
|
||||||
ingresses, err := k8sClient.GetIngresses(func(ingress k8s.Ingress) bool {
|
ingresses, err := k8sClient.GetIngresses(provider.LabelSelector, func(ingress k8s.Ingress) bool {
|
||||||
if len(provider.Namespaces) == 0 {
|
if len(provider.Namespaces) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -1250,7 +1250,7 @@ type clientMock struct {
|
||||||
watchChan chan interface{}
|
watchChan chan interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) {
|
func (c clientMock) GetIngresses(labelString string, predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) {
|
||||||
var ingresses []k8s.Ingress
|
var ingresses []k8s.Ingress
|
||||||
for _, ingress := range c.ingresses {
|
for _, ingress := range c.ingresses {
|
||||||
if predicate(ingress) {
|
if predicate(ingress) {
|
||||||
|
@ -1259,7 +1259,7 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres
|
||||||
}
|
}
|
||||||
return ingresses, nil
|
return ingresses, nil
|
||||||
}
|
}
|
||||||
func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c clientMock) WatchIngresses(labelString string, predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
return c.watchChan, make(chan error), nil
|
return c.watchChan, make(chan error), nil
|
||||||
}
|
}
|
||||||
func (c clientMock) GetService(name, namespace string) (k8s.Service, error) {
|
func (c clientMock) GetService(name, namespace string) (k8s.Service, error) {
|
||||||
|
@ -1280,6 +1280,6 @@ func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error)
|
||||||
return k8s.Endpoints{}, nil
|
return k8s.Endpoints{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||||
return c.watchChan, make(chan error), nil
|
return c.watchChan, make(chan error), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -345,6 +345,9 @@
|
||||||
#
|
#
|
||||||
# endpoint = "http://localhost:8080"
|
# endpoint = "http://localhost:8080"
|
||||||
# namespaces = ["default"]
|
# namespaces = ["default"]
|
||||||
|
#
|
||||||
|
# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering
|
||||||
|
# labelselector = "A and not B"
|
||||||
|
|
||||||
################################################################
|
################################################################
|
||||||
# Consul KV configuration backend
|
# Consul KV configuration backend
|
||||||
|
|
Loading…
Reference in a new issue