traefik/provider/kubernetes/weight_allocator.go

190 lines
5.6 KiB
Go

package kubernetes
import (
"fmt"
"sort"
"strings"
"github.com/containous/traefik/provider/label"
"gopkg.in/yaml.v2"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
)
type weightAllocator interface {
getWeight(host, path, serviceName string) int
}
var _ weightAllocator = &defaultWeightAllocator{}
var _ weightAllocator = &fractionalWeightAllocator{}
type defaultWeightAllocator struct{}
func (d *defaultWeightAllocator) getWeight(host, path, serviceName string) int {
return label.DefaultWeight
}
type ingressService struct {
host string
path string
service string
}
type fractionalWeightAllocator map[ingressService]int
// String returns a string representation as service name / percentage tuples
// sorted by service names.
// Example: [foo-svc: 30.000% bar-svc: 70.000%]
func (f *fractionalWeightAllocator) String() string {
var sorted []ingressService
for ingServ := range map[ingressService]int(*f) {
sorted = append(sorted, ingServ)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].service < sorted[j].service
})
var res []string
for _, ingServ := range sorted {
res = append(res, fmt.Sprintf("%s: %s", ingServ.service, percentageValue(map[ingressService]int(*f)[ingServ])))
}
return fmt.Sprintf("[%s]", strings.Join(res, " "))
}
func newFractionalWeightAllocator(ingress *extensionsv1beta1.Ingress, client Client) (*fractionalWeightAllocator, error) {
servicePercentageWeights, err := getServicesPercentageWeights(ingress)
if err != nil {
return nil, err
}
serviceInstanceCounts, err := getServiceInstanceCounts(ingress, client)
if err != nil {
return nil, err
}
serviceWeights := map[ingressService]int{}
for _, rule := range ingress.Spec.Rules {
// key: rule path string
// value: service names
fractionalPathServices := map[string][]string{}
// key: rule path string
// value: fractional percentage weight
fractionalPathWeights := map[string]percentageValue{}
for _, pa := range rule.HTTP.Paths {
if _, ok := fractionalPathWeights[pa.Path]; !ok {
fractionalPathWeights[pa.Path] = newPercentageValueFromFloat64(1)
}
if weight, ok := servicePercentageWeights[pa.Backend.ServiceName]; ok {
ingSvc := ingressService{
host: rule.Host,
path: pa.Path,
service: pa.Backend.ServiceName,
}
serviceWeights[ingSvc] = weight.computeWeight(serviceInstanceCounts[ingSvc])
fractionalPathWeights[pa.Path] -= weight
if fractionalPathWeights[pa.Path].toFloat64() < 0 {
assignedWeight := newPercentageValueFromFloat64(1) - fractionalPathWeights[pa.Path]
return nil, fmt.Errorf("percentage value %s must not exceed 100%%", assignedWeight.String())
}
} else {
fractionalPathServices[pa.Path] = append(fractionalPathServices[pa.Path], pa.Backend.ServiceName)
}
}
for pa, fractionalWeight := range fractionalPathWeights {
fractionalServices := fractionalPathServices[pa]
if len(fractionalServices) == 0 {
if fractionalWeight > 0 {
assignedWeight := newPercentageValueFromFloat64(1) - fractionalWeight
return nil, fmt.Errorf("the sum of weights(%s) in the path %s%s must be 100%% when no omitted fractional service left", assignedWeight.String(), rule.Host, pa)
}
continue
}
totalFractionalInstanceCount := 0
for _, svc := range fractionalServices {
totalFractionalInstanceCount += serviceInstanceCounts[ingressService{
host: rule.Host,
path: pa,
service: svc,
}]
}
for _, svc := range fractionalServices {
ingSvc := ingressService{
host: rule.Host,
path: pa,
service: svc,
}
serviceWeights[ingSvc] = fractionalWeight.computeWeight(totalFractionalInstanceCount)
}
}
}
allocator := fractionalWeightAllocator(serviceWeights)
return &allocator, nil
}
func (f *fractionalWeightAllocator) getWeight(host, path, serviceName string) int {
return map[ingressService]int(*f)[ingressService{
host: host,
path: path,
service: serviceName,
}]
}
func getServicesPercentageWeights(ingress *extensionsv1beta1.Ingress) (map[string]percentageValue, error) {
percentageWeight := make(map[string]string)
annotationPercentageWeights := getAnnotationName(ingress.Annotations, annotationKubernetesServiceWeights)
if err := yaml.Unmarshal([]byte(ingress.Annotations[annotationPercentageWeights]), percentageWeight); err != nil {
return nil, err
}
servicesPercentageWeights := make(map[string]percentageValue)
for serviceName, percentageStr := range percentageWeight {
percentageValue, err := newPercentageValueFromString(percentageStr)
if err != nil {
return nil, fmt.Errorf("invalid percentage value %q", percentageStr)
}
servicesPercentageWeights[serviceName] = percentageValue
}
return servicesPercentageWeights, nil
}
func getServiceInstanceCounts(ingress *extensionsv1beta1.Ingress, client Client) (map[ingressService]int, error) {
serviceInstanceCounts := map[ingressService]int{}
for _, rule := range ingress.Spec.Rules {
for _, pa := range rule.HTTP.Paths {
count := 0
endpoints, exists, err := client.GetEndpoints(ingress.Namespace, pa.Backend.ServiceName)
if err != nil {
return nil, fmt.Errorf("failed to get endpoints %s/%s: %v", ingress.Namespace, pa.Backend.ServiceName, err)
}
if !exists {
return nil, fmt.Errorf("endpoints not found for %s/%s", ingress.Namespace, pa.Backend.ServiceName)
}
for _, subset := range endpoints.Subsets {
count += len(subset.Addresses)
}
serviceInstanceCounts[ingressService{
host: rule.Host,
path: pa.Path,
service: pa.Backend.ServiceName,
}] += count
}
}
return serviceInstanceCounts, nil
}