Merge pull request #408 from errm/k8s-endpoints
Build backend config using the K8S endpoint resource.
This commit is contained in:
commit
7804787e9e
4 changed files with 284 additions and 79 deletions
|
@ -16,12 +16,14 @@ const (
|
|||
APIEndpoint = "/api/v1"
|
||||
extentionsEndpoint = "/apis/extensions/v1beta1"
|
||||
defaultIngress = "/ingresses"
|
||||
namespaces = "/namespaces/"
|
||||
)
|
||||
|
||||
// Client is a client for the Kubernetes master.
|
||||
type Client interface {
|
||||
GetIngresses(predicate func(Ingress) bool) ([]Ingress, error)
|
||||
GetServices(predicate func(Service) bool) ([]Service, error)
|
||||
GetService(name, namespace string) (Service, error)
|
||||
GetEndpoints(name, namespace string) (Endpoints, error)
|
||||
WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error)
|
||||
}
|
||||
|
||||
|
@ -76,26 +78,20 @@ func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan
|
|||
return c.watch(getURL, stopCh)
|
||||
}
|
||||
|
||||
// GetServices returns all services in the cluster
|
||||
func (c *clientImpl) GetServices(predicate func(Service) bool) ([]Service, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + "/services"
|
||||
// GetService returns the named service from the named namespace
|
||||
func (c *clientImpl) GetService(name, namespace string) (Service, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name
|
||||
|
||||
body, err := c.do(c.request(getURL))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err)
|
||||
return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err)
|
||||
}
|
||||
|
||||
var serviceList ServiceList
|
||||
if err := json.Unmarshal(body, &serviceList); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode list of services resources: %v", err)
|
||||
var service Service
|
||||
if err := json.Unmarshal(body, &service); err != nil {
|
||||
return Service{}, fmt.Errorf("failed to decode service resource: %v", err)
|
||||
}
|
||||
services := serviceList.Items[:0]
|
||||
for _, service := range serviceList.Items {
|
||||
if predicate(service) {
|
||||
services = append(services, service)
|
||||
}
|
||||
}
|
||||
return services, nil
|
||||
return service, nil
|
||||
}
|
||||
|
||||
// WatchServices returns all services in the cluster
|
||||
|
@ -104,21 +100,26 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e
|
|||
return c.watch(getURL, stopCh)
|
||||
}
|
||||
|
||||
// WatchEvents returns events in the cluster
|
||||
func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + "/events"
|
||||
return c.watch(getURL, stopCh)
|
||||
// GetEndpoints returns the named Endpoints
|
||||
// Endpoints have the same name as the coresponding service
|
||||
func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name
|
||||
|
||||
body, err := c.do(c.request(getURL))
|
||||
if err != nil {
|
||||
return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err)
|
||||
}
|
||||
|
||||
var endpoints Endpoints
|
||||
if err := json.Unmarshal(body, &endpoints); err != nil {
|
||||
return Endpoints{}, fmt.Errorf("failed to decode endpoints resources: %v", err)
|
||||
}
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
// WatchPods returns pods in the cluster
|
||||
func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + "/pods"
|
||||
return c.watch(getURL, stopCh)
|
||||
}
|
||||
|
||||
// WatchReplicationControllers returns ReplicationControllers in the cluster
|
||||
func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers"
|
||||
// WatchEndpoints returns endpoints in the cluster
|
||||
func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
getURL := c.endpointURL + APIEndpoint + "/endpoints"
|
||||
return c.watch(getURL, stopCh)
|
||||
}
|
||||
|
||||
|
@ -137,13 +138,8 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
|
|||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
}
|
||||
stopPods := make(chan bool)
|
||||
chanPods, chanPodsErr, err := c.WatchPods(stopPods)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
}
|
||||
stopReplicationControllers := make(chan bool)
|
||||
chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers)
|
||||
stopEndpoints := make(chan bool)
|
||||
chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints)
|
||||
if err != nil {
|
||||
return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err)
|
||||
}
|
||||
|
@ -152,32 +148,26 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error,
|
|||
defer close(errCh)
|
||||
defer close(stopIngresses)
|
||||
defer close(stopServices)
|
||||
defer close(stopPods)
|
||||
defer close(stopReplicationControllers)
|
||||
defer close(stopEndpoints)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopCh:
|
||||
stopIngresses <- true
|
||||
stopServices <- true
|
||||
stopPods <- true
|
||||
stopReplicationControllers <- true
|
||||
stopEndpoints <- true
|
||||
return
|
||||
case err := <-chanIngressesErr:
|
||||
errCh <- err
|
||||
case err := <-chanServicesErr:
|
||||
errCh <- err
|
||||
case err := <-chanPodsErr:
|
||||
errCh <- err
|
||||
case err := <-chanReplicationControllersErr:
|
||||
case err := <-chanEndpointsErr:
|
||||
errCh <- err
|
||||
case event := <-chanIngresses:
|
||||
watchCh <- event
|
||||
case event := <-chanServices:
|
||||
watchCh <- event
|
||||
case event := <-chanPods:
|
||||
watchCh <- event
|
||||
case event := <-chanReplicationControllers:
|
||||
case event := <-chanEndpoints:
|
||||
watchCh <- event
|
||||
}
|
||||
}
|
||||
|
|
84
provider/k8s/endpoints.go
Normal file
84
provider/k8s/endpoints.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package k8s
|
||||
|
||||
// Endpoints is a collection of endpoints that implement the actual service. Example:
|
||||
// Name: "mysvc",
|
||||
// Subsets: [
|
||||
// {
|
||||
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
|
||||
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
|
||||
// },
|
||||
// {
|
||||
// Addresses: [{"ip": "10.10.3.3"}],
|
||||
// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}]
|
||||
// },
|
||||
// ]
|
||||
type Endpoints struct {
|
||||
TypeMeta `json:",inline"`
|
||||
ObjectMeta `json:"metadata,omitempty"`
|
||||
|
||||
// The set of all endpoints is the union of all subsets.
|
||||
Subsets []EndpointSubset
|
||||
}
|
||||
|
||||
// EndpointSubset is a group of addresses with a common set of ports. The
|
||||
// expanded set of endpoints is the Cartesian product of Addresses x Ports.
|
||||
// For example, given:
|
||||
// {
|
||||
// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}],
|
||||
// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}]
|
||||
// }
|
||||
// The resulting set of endpoints can be viewed as:
|
||||
// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],
|
||||
// b: [ 10.10.1.1:309, 10.10.2.2:309 ]
|
||||
type EndpointSubset struct {
|
||||
Addresses []EndpointAddress
|
||||
NotReadyAddresses []EndpointAddress
|
||||
Ports []EndpointPort
|
||||
}
|
||||
|
||||
// EndpointAddress is a tuple that describes single IP address.
|
||||
type EndpointAddress struct {
|
||||
// The IP of this endpoint.
|
||||
// IPv6 is also accepted but not fully supported on all platforms. Also, certain
|
||||
// kubernetes components, like kube-proxy, are not IPv6 ready.
|
||||
// TODO: This should allow hostname or IP, see #4447.
|
||||
IP string
|
||||
// Optional: Hostname of this endpoint
|
||||
// Meant to be used by DNS servers etc.
|
||||
Hostname string `json:"hostname,omitempty"`
|
||||
// Optional: The kubernetes object related to the entry point.
|
||||
TargetRef *ObjectReference
|
||||
}
|
||||
|
||||
// EndpointPort is a tuple that describes a single port.
|
||||
type EndpointPort struct {
|
||||
// The name of this port (corresponds to ServicePort.Name). Optional
|
||||
// if only one port is defined. Must be a DNS_LABEL.
|
||||
Name string
|
||||
|
||||
// The port number.
|
||||
Port int32
|
||||
|
||||
// The IP protocol for this port.
|
||||
Protocol Protocol
|
||||
}
|
||||
|
||||
// ObjectReference contains enough information to let you inspect or modify the referred object.
|
||||
type ObjectReference struct {
|
||||
Kind string `json:"kind,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
Name string `json:"name,omitempty"`
|
||||
UID UID `json:"uid,omitempty"`
|
||||
APIVersion string `json:"apiVersion,omitempty"`
|
||||
ResourceVersion string `json:"resourceVersion,omitempty"`
|
||||
|
||||
// Optional. If referring to a piece of an object instead of an entire object, this string
|
||||
// should contain information to identify the sub-object. For example, if the object
|
||||
// reference is to a container within a pod, this would take on a value like:
|
||||
// "spec.containers{name}" (where "name" refers to the name of the container that triggered
|
||||
// the event) or if no container name is specified "spec.containers[2]" (container with
|
||||
// index 2 in this pod). This syntax is chosen only to have some well-defined way of
|
||||
// referencing a part of an object.
|
||||
// TODO: this design is not final and this field is subject to change in the future.
|
||||
FieldPath string `json:"fieldPath,omitempty"`
|
||||
}
|
|
@ -190,31 +190,42 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
|
|||
Rule: ruleType + ":" + pa.Path,
|
||||
}
|
||||
}
|
||||
services, err := k8sClient.GetServices(func(service k8s.Service) bool {
|
||||
return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName
|
||||
})
|
||||
service, err := k8sClient.GetService(pa.Backend.ServiceName, i.ObjectMeta.Namespace)
|
||||
if err != nil {
|
||||
log.Warnf("Error retrieving services: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(services) == 0 {
|
||||
// no backends found, delete frontend...
|
||||
delete(templateObjects.Frontends, r.Host+pa.Path)
|
||||
log.Warnf("Error retrieving services %s", pa.Backend.ServiceName)
|
||||
continue
|
||||
}
|
||||
for _, service := range services {
|
||||
protocol := "http"
|
||||
for _, port := range service.Spec.Ports {
|
||||
if equalPorts(port, pa.Backend.ServicePort) {
|
||||
if port.Port == 443 {
|
||||
protocol = "https"
|
||||
}
|
||||
protocol := "http"
|
||||
for _, port := range service.Spec.Ports {
|
||||
if equalPorts(port, pa.Backend.ServicePort) {
|
||||
if port.Port == 443 {
|
||||
protocol = "https"
|
||||
}
|
||||
endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace)
|
||||
if err != nil {
|
||||
log.Errorf("Error retrieving endpoints: %v", err)
|
||||
continue
|
||||
}
|
||||
if len(endpoints.Subsets) == 0 {
|
||||
log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name)
|
||||
templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{
|
||||
URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port),
|
||||
Weight: 1,
|
||||
}
|
||||
break
|
||||
} else {
|
||||
for _, subset := range endpoints.Subsets {
|
||||
for _, address := range subset.Addresses {
|
||||
url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports))
|
||||
templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{
|
||||
URL: url,
|
||||
Weight: 1,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -223,6 +234,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur
|
|||
return &templateObjects, nil
|
||||
}
|
||||
|
||||
func endpointPortNumber(servicePort k8s.ServicePort, endpointPorts []k8s.EndpointPort) int {
|
||||
if len(endpointPorts) > 0 {
|
||||
//name is optional if there is only one port
|
||||
port := endpointPorts[0]
|
||||
for _, endpointPort := range endpointPorts {
|
||||
if servicePort.Name == endpointPort.Name {
|
||||
port = endpointPort
|
||||
}
|
||||
}
|
||||
return int(port.Port)
|
||||
}
|
||||
return servicePort.Port
|
||||
}
|
||||
|
||||
func equalPorts(servicePort k8s.ServicePort, ingressPort k8s.IntOrString) bool {
|
||||
if servicePort.Port == ingressPort.IntValue() {
|
||||
return true
|
||||
|
|
|
@ -10,6 +10,9 @@ import (
|
|||
|
||||
func TestLoadIngresses(t *testing.T) {
|
||||
ingresses := []k8s.Ingress{{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Namespace: "testing",
|
||||
},
|
||||
Spec: k8s.IngressSpec{
|
||||
Rules: []k8s.IngressRule{
|
||||
{
|
||||
|
@ -21,7 +24,7 @@ func TestLoadIngresses(t *testing.T) {
|
|||
Path: "/bar",
|
||||
Backend: k8s.IngressBackend{
|
||||
ServiceName: "service1",
|
||||
ServicePort: k8s.FromString("http"),
|
||||
ServicePort: k8s.FromInt(80),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -36,7 +39,7 @@ func TestLoadIngresses(t *testing.T) {
|
|||
{
|
||||
Backend: k8s.IngressBackend{
|
||||
ServiceName: "service3",
|
||||
ServicePort: k8s.FromInt(443),
|
||||
ServicePort: k8s.FromString("https"),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -55,23 +58,24 @@ func TestLoadIngresses(t *testing.T) {
|
|||
services := []k8s.Service{
|
||||
{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Name: "service1",
|
||||
UID: "1",
|
||||
Name: "service1",
|
||||
UID: "1",
|
||||
Namespace: "testing",
|
||||
},
|
||||
Spec: k8s.ServiceSpec{
|
||||
ClusterIP: "10.0.0.1",
|
||||
Ports: []k8s.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 801,
|
||||
Port: 80,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Name: "service2",
|
||||
UID: "2",
|
||||
Name: "service2",
|
||||
UID: "2",
|
||||
Namespace: "testing",
|
||||
},
|
||||
Spec: k8s.ServiceSpec{
|
||||
ClusterIP: "10.0.0.2",
|
||||
|
@ -84,24 +88,108 @@ func TestLoadIngresses(t *testing.T) {
|
|||
},
|
||||
{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Name: "service3",
|
||||
UID: "3",
|
||||
Name: "service3",
|
||||
UID: "3",
|
||||
Namespace: "testing",
|
||||
},
|
||||
Spec: k8s.ServiceSpec{
|
||||
ClusterIP: "10.0.0.3",
|
||||
Ports: []k8s.ServicePort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 80,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 443,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
endpoints := []k8s.Endpoints{
|
||||
{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Name: "service1",
|
||||
UID: "1",
|
||||
Namespace: "testing",
|
||||
},
|
||||
Subsets: []k8s.EndpointSubset{
|
||||
{
|
||||
Addresses: []k8s.EndpointAddress{
|
||||
{
|
||||
IP: "10.10.0.1",
|
||||
},
|
||||
},
|
||||
Ports: []k8s.EndpointPort{
|
||||
{
|
||||
Port: 8080,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Addresses: []k8s.EndpointAddress{
|
||||
{
|
||||
IP: "10.21.0.1",
|
||||
},
|
||||
},
|
||||
Ports: []k8s.EndpointPort{
|
||||
{
|
||||
Port: 8080,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: k8s.ObjectMeta{
|
||||
Name: "service3",
|
||||
UID: "3",
|
||||
Namespace: "testing",
|
||||
},
|
||||
Subsets: []k8s.EndpointSubset{
|
||||
{
|
||||
Addresses: []k8s.EndpointAddress{
|
||||
{
|
||||
IP: "10.15.0.1",
|
||||
},
|
||||
},
|
||||
Ports: []k8s.EndpointPort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 8080,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 8443,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Addresses: []k8s.EndpointAddress{
|
||||
{
|
||||
IP: "10.15.0.2",
|
||||
},
|
||||
},
|
||||
Ports: []k8s.EndpointPort{
|
||||
{
|
||||
Name: "http",
|
||||
Port: 9080,
|
||||
},
|
||||
{
|
||||
Name: "https",
|
||||
Port: 9443,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
watchChan := make(chan interface{})
|
||||
client := clientMock{
|
||||
ingresses: ingresses,
|
||||
services: services,
|
||||
endpoints: endpoints,
|
||||
watchChan: watchChan,
|
||||
}
|
||||
provider := Kubernetes{}
|
||||
|
@ -114,8 +202,12 @@ func TestLoadIngresses(t *testing.T) {
|
|||
Backends: map[string]*types.Backend{
|
||||
"foo/bar": {
|
||||
Servers: map[string]types.Server{
|
||||
"1": {
|
||||
URL: "http://10.0.0.1:801",
|
||||
"http://10.10.0.1:8080": {
|
||||
URL: "http://10.10.0.1:8080",
|
||||
Weight: 1,
|
||||
},
|
||||
"http://10.21.0.1:8080": {
|
||||
URL: "http://10.21.0.1:8080",
|
||||
Weight: 1,
|
||||
},
|
||||
},
|
||||
|
@ -128,8 +220,12 @@ func TestLoadIngresses(t *testing.T) {
|
|||
URL: "http://10.0.0.2:802",
|
||||
Weight: 1,
|
||||
},
|
||||
"3": {
|
||||
URL: "https://10.0.0.3:443",
|
||||
"https://10.15.0.1:8443": {
|
||||
URL: "https://10.15.0.1:8443",
|
||||
Weight: 1,
|
||||
},
|
||||
"https://10.15.0.2:9443": {
|
||||
URL: "https://10.15.0.2:9443",
|
||||
Weight: 1,
|
||||
},
|
||||
},
|
||||
|
@ -1150,6 +1246,7 @@ func TestHostlessIngress(t *testing.T) {
|
|||
type clientMock struct {
|
||||
ingresses []k8s.Ingress
|
||||
services []k8s.Service
|
||||
endpoints []k8s.Endpoints
|
||||
watchChan chan interface{}
|
||||
}
|
||||
|
||||
|
@ -1165,15 +1262,24 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres
|
|||
func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
return c.watchChan, make(chan error), nil
|
||||
}
|
||||
func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service, error) {
|
||||
var services []k8s.Service
|
||||
func (c clientMock) GetService(name, namespace string) (k8s.Service, error) {
|
||||
for _, service := range c.services {
|
||||
if predicate(service) {
|
||||
services = append(services, service)
|
||||
if service.Namespace == namespace && service.Name == name {
|
||||
return service, nil
|
||||
}
|
||||
}
|
||||
return services, nil
|
||||
return k8s.Service{}, nil
|
||||
}
|
||||
|
||||
func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) {
|
||||
for _, endpoints := range c.endpoints {
|
||||
if endpoints.Namespace == namespace && endpoints.Name == name {
|
||||
return endpoints, nil
|
||||
}
|
||||
}
|
||||
return k8s.Endpoints{}, nil
|
||||
}
|
||||
|
||||
func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) {
|
||||
return c.watchChan, make(chan error), nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue