Build backend config using the K8S endpoint resource.

* Potentialy saves a network hop
* Ability to configure LB algothim (given some work to expose an
anotation etc...)
* K8s config Watch is triggered far less often
This commit is contained in:
Ed Robinson 2016-05-20 17:34:57 +01:00
parent b79535f369
commit e948a013cd
No known key found for this signature in database
GPG key ID: EC501FCA6421CCF0
4 changed files with 210 additions and 42 deletions

View file

@ -22,6 +22,7 @@ const (
type Client interface { type Client interface {
GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error)
GetServices(predicate func(Service) bool) ([]Service, error) GetServices(predicate func(Service) bool) ([]Service, error)
GetEndpoints(name, namespace string) (Endpoints, error)
WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error)
} }
@ -104,21 +105,26 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e
return c.watch(getURL, stopCh) return c.watch(getURL, stopCh)
} }
// WatchEvents returns events in the cluster // GetEndpoints returns the named Endpoints
func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) { // Endpoints have the same name as the coresponding service
getURL := c.endpointURL + APIEndpoint + "/events" func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
return c.watch(getURL, stopCh) getURL := c.endpointURL + APIEndpoint + "/namespaces/" + namespace + "/endpoints/" + name
body, err := c.do(c.request(getURL))
if err != nil {
return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err)
} }
// WatchPods returns pods in the cluster var endpoints Endpoints
func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) { if err := json.Unmarshal(body, &endpoints); err != nil {
getURL := c.endpointURL + APIEndpoint + "/pods" return Endpoints{}, fmt.Errorf("failed to decode endpoints resources: %v", err)
return c.watch(getURL, stopCh) }
return endpoints, nil
} }
// WatchReplicationControllers returns ReplicationControllers in the cluster // WatchEndpoints returns endpoints in the cluster
func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) {
getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers" getURL := c.endpointURL + APIEndpoint + "/endpoints"
return c.watch(getURL, stopCh) return c.watch(getURL, stopCh)
} }
@ -137,13 +143,8 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
stopPods := make(chan bool) stopEndpoints := make(chan bool)
chanPods, chanPodsErr, err := c.WatchPods(stopPods) chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints)
if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
}
stopReplicationControllers := make(chan bool)
chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers)
if err != nil { if err != nil {
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
} }
@ -152,32 +153,26 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
defer close(errCh) defer close(errCh)
defer close(stopIngresses) defer close(stopIngresses)
defer close(stopServices) defer close(stopServices)
defer close(stopPods) defer close(stopEndpoints)
defer close(stopReplicationControllers)
for { for {
select { select {
case <-stopCh: case <-stopCh:
stopIngresses <- true stopIngresses <- true
stopServices <- true stopServices <- true
stopPods <- true stopEndpoints <- true
stopReplicationControllers <- true
return return
case err := <-chanIngressesErr: case err := <-chanIngressesErr:
errCh <- err errCh <- err
case err := <-chanServicesErr: case err := <-chanServicesErr:
errCh <- err errCh <- err
case err := <-chanPodsErr: case err := <-chanEndpointsErr:
errCh <- err
case err := <-chanReplicationControllersErr:
errCh <- err errCh <- err
case event := <-chanIngresses: case event := <-chanIngresses:
watchCh <- event watchCh <- event
case event := <-chanServices: case event := <-chanServices:
watchCh <- event watchCh <- event
case event := <-chanPods: case event := <-chanEndpoints:
watchCh <- event
case event := <-chanReplicationControllers:
watchCh <- event watchCh <- event
} }
} }

84
provider/k8s/endpoints.go Normal file
View file

@ -0,0 +1,84 @@
package k8s
// Endpoints is a collection of endpoints that implement the actual service. Example:
// Name: "mysvc",
// Subsets: [
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// },
// {
// Addresses: [{"ip": "10.10.3.3"}],
// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}]
// },
// ]
type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
// The set of all endpoints is the union of all subsets.
Subsets []EndpointSubset
}
// EndpointSubset is a group of addresses with a common set of ports. The
// expanded set of endpoints is the Cartesian product of Addresses x Ports.
// For example, given:
// {
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
// }
// The resulting set of endpoints can be viewed as:
// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
// b: [ 10.10.1.1:309, 10.10.2.2:309 ]
type EndpointSubset struct {
Addresses []EndpointAddress
NotReadyAddresses []EndpointAddress
Ports []EndpointPort
}
// EndpointAddress is a tuple that describes single IP address.
type EndpointAddress struct {
// The IP of this endpoint.
// IPv6 is also accepted but not fully supported on all platforms. Also, certain
// kubernetes components, like kube-proxy, are not IPv6 ready.
// TODO: This should allow hostname or IP, see #4447.
IP string
// Optional: Hostname of this endpoint
// Meant to be used by DNS servers etc.
Hostname string `json:"hostname,omitempty"`
// Optional: The kubernetes object related to the entry point.
TargetRef *ObjectReference
}
// EndpointPort is a tuple that describes a single port.
type EndpointPort struct {
// The name of this port (corresponds to ServicePort.Name). Optional
// if only one port is defined. Must be a DNS_LABEL.
Name string
// The port number.
Port int32
// The IP protocol for this port.
Protocol Protocol
}
// ObjectReference contains enough information to let you inspect or modify the referred object.
type ObjectReference struct {
Kind string `json:"kind,omitempty"`
Namespace string `json:"namespace,omitempty"`
Name string `json:"name,omitempty"`
UID UID `json:"uid,omitempty"`
APIVersion string `json:"apiVersion,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
// Optional. If referring to a piece of an object instead of an entire object, this string
// should contain information to identify the sub-object. For example, if the object
// reference is to a container within a pod, this would take on a value like:
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
// the event) or if no container name is specified "spec.containers[2]" (container with
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
// referencing a part of an object.
// TODO: this design is not final and this field is subject to change in the future.
FieldPath string `json:"fieldPath,omitempty"`
}

View file

@ -209,10 +209,28 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
if port.Port == 443 { if port.Port == 443 {
protocol = "https" protocol = "https"
} }
endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace)
if err != nil {
log.Errorf("Error retrieving endpoints: %v", err)
continue
}
if len(endpoints.Subsets) == 0 {
log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{
URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port),
Weight: 1, Weight: 1,
} }
} else {
for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports))
templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{
URL: url,
Weight: 1,
}
}
}
}
break break
} }
} }
@ -223,6 +241,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
return &templateObjects, nil return &templateObjects, nil
} }
func endpointPortNumber(servicePort k8s.ServicePort, endpointPorts []k8s.EndpointPort) int {
if len(endpointPorts) > 0 {
//name is optional if there is only one port
port := endpointPorts[0]
for _, endpointPort := range endpointPorts {
if servicePort.Name == endpointPort.Name {
port = endpointPort
}
}
return int(port.Port)
}
return servicePort.Port
}
func equalPorts(servicePort k8s.ServicePort, ingressPort k8s.IntOrString) bool { func equalPorts(servicePort k8s.ServicePort, ingressPort k8s.IntOrString) bool {
if servicePort.Port == ingressPort.IntValue() { if servicePort.Port == ingressPort.IntValue() {
return true return true

View file

@ -10,6 +10,9 @@ import (
func TestLoadIngresses(t *testing.T) { func TestLoadIngresses(t *testing.T) {
ingresses := []k8s.Ingress{{ ingresses := []k8s.Ingress{{
ObjectMeta: k8s.ObjectMeta{
Namespace: "testing",
},
Spec: k8s.IngressSpec{ Spec: k8s.IngressSpec{
Rules: []k8s.IngressRule{ Rules: []k8s.IngressRule{
{ {
@ -57,13 +60,14 @@ func TestLoadIngresses(t *testing.T) {
ObjectMeta: k8s.ObjectMeta{ ObjectMeta: k8s.ObjectMeta{
Name: "service1", Name: "service1",
UID: "1", UID: "1",
Namespace: "testing",
}, },
Spec: k8s.ServiceSpec{ Spec: k8s.ServiceSpec{
ClusterIP: "10.0.0.1", ClusterIP: "10.0.0.1",
Ports: []k8s.ServicePort{ Ports: []k8s.ServicePort{
{ {
Name: "http", Name: "http",
Port: 801, Port: 80,
}, },
}, },
}, },
@ -72,6 +76,7 @@ func TestLoadIngresses(t *testing.T) {
ObjectMeta: k8s.ObjectMeta{ ObjectMeta: k8s.ObjectMeta{
Name: "service2", Name: "service2",
UID: "2", UID: "2",
Namespace: "testing",
}, },
Spec: k8s.ServiceSpec{ Spec: k8s.ServiceSpec{
ClusterIP: "10.0.0.2", ClusterIP: "10.0.0.2",
@ -86,6 +91,7 @@ func TestLoadIngresses(t *testing.T) {
ObjectMeta: k8s.ObjectMeta{ ObjectMeta: k8s.ObjectMeta{
Name: "service3", Name: "service3",
UID: "3", UID: "3",
Namespace: "testing",
}, },
Spec: k8s.ServiceSpec{ Spec: k8s.ServiceSpec{
ClusterIP: "10.0.0.3", ClusterIP: "10.0.0.3",
@ -98,10 +104,46 @@ func TestLoadIngresses(t *testing.T) {
}, },
}, },
} }
endpoints := []k8s.Endpoints{
{
ObjectMeta: k8s.ObjectMeta{
Name: "service1",
UID: "1",
Namespace: "testing",
},
Subsets: []k8s.EndpointSubset{
{
Addresses: []k8s.EndpointAddress{
{
IP: "10.10.0.1",
},
},
Ports: []k8s.EndpointPort{
{
Port: 8080,
},
},
},
{
Addresses: []k8s.EndpointAddress{
{
IP: "10.21.0.1",
},
},
Ports: []k8s.EndpointPort{
{
Port: 8080,
},
},
},
},
},
}
watchChan := make(chan interface{}) watchChan := make(chan interface{})
client := clientMock{ client := clientMock{
ingresses: ingresses, ingresses: ingresses,
services: services, services: services,
endpoints: endpoints,
watchChan: watchChan, watchChan: watchChan,
} }
provider := Kubernetes{} provider := Kubernetes{}
@ -114,8 +156,12 @@ func TestLoadIngresses(t *testing.T) {
Backends: map[string]*types.Backend{ Backends: map[string]*types.Backend{
"foo/bar": { "foo/bar": {
Servers: map[string]types.Server{ Servers: map[string]types.Server{
"1": { "http://10.10.0.1:8080": {
URL: "http://10.0.0.1:801", URL: "http://10.10.0.1:8080",
Weight: 1,
},
"http://10.21.0.1:8080": {
URL: "http://10.21.0.1:8080",
Weight: 1, Weight: 1,
}, },
}, },
@ -1150,6 +1196,7 @@ func TestHostlessIngress(t *testing.T) {
type clientMock struct { type clientMock struct {
ingresses []k8s.Ingress ingresses []k8s.Ingress
services []k8s.Service services []k8s.Service
endpoints []k8s.Endpoints
watchChan chan interface{} watchChan chan interface{}
} }
@ -1174,6 +1221,16 @@ func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service
} }
return services, nil return services, nil
} }
func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) {
for _, endpoints := range c.endpoints {
if endpoints.Namespace == namespace && endpoints.Name == name {
return endpoints, nil
}
}
return k8s.Endpoints{}, nil
}
func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
return c.watchChan, make(chan error), nil return c.watchChan, make(chan error), nil
} }