336 lines
9.7 KiB
Go
336 lines
9.7 KiB
Go
package servicefabric
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"strings"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/cenk/backoff"
|
|
"github.com/containous/traefik/job"
|
|
"github.com/containous/traefik/log"
|
|
"github.com/containous/traefik/provider"
|
|
"github.com/containous/traefik/safe"
|
|
"github.com/containous/traefik/types"
|
|
sf "github.com/jjcollinge/servicefabric"
|
|
)
|
|
|
|
var _ provider.Provider = (*Provider)(nil)
|
|
|
|
const traefikLabelPrefix = "traefik"
|
|
|
|
// Provider holds for configuration for the provider
|
|
type Provider struct {
|
|
provider.BaseProvider `mapstructure:",squash"`
|
|
ClusterManagementURL string `description:"Service Fabric API endpoint"`
|
|
APIVersion string `description:"Service Fabric API version" export:"true"`
|
|
RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"`
|
|
TLS *types.ClientTLS `description:"Enable TLS support" export:"true"`
|
|
}
|
|
|
|
// Provide allows the ServiceFabric provider to provide configurations to traefik
|
|
// using the given configuration channel.
|
|
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
|
|
if p.APIVersion == "" {
|
|
p.APIVersion = sf.DefaultAPIVersion
|
|
}
|
|
|
|
tlsConfig, err := p.TLS.CreateTLSConfig()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sfClient, err := sf.NewClient(http.DefaultClient, p.ClusterManagementURL, p.APIVersion, tlsConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if p.RefreshSeconds <= 0 {
|
|
p.RefreshSeconds = 10
|
|
}
|
|
|
|
return p.updateConfig(configurationChan, pool, sfClient, time.Duration(p.RefreshSeconds)*time.Second)
|
|
}
|
|
|
|
func (p *Provider) updateConfig(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, sfClient sfClient, pollInterval time.Duration) error {
|
|
pool.Go(func(stop chan bool) {
|
|
operation := func() error {
|
|
ticker := time.NewTicker(pollInterval)
|
|
for range ticker.C {
|
|
select {
|
|
case shouldStop := <-stop:
|
|
if shouldStop {
|
|
ticker.Stop()
|
|
return nil
|
|
}
|
|
default:
|
|
log.Info("Checking service fabric config")
|
|
}
|
|
|
|
configuration, err := p.buildConfiguration(sfClient)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
configurationChan <- types.ConfigMessage{
|
|
ProviderName: "servicefabric",
|
|
Configuration: configuration,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
notify := func(err error, time time.Duration) {
|
|
log.Errorf("Provider connection error: %v; retrying in %s", err, time)
|
|
}
|
|
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
|
if err != nil {
|
|
log.Errorf("Cannot connect to Provider: %v", err)
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (p *Provider) buildConfiguration(sfClient sfClient) (*types.Configuration, error) {
|
|
var sfFuncMap = template.FuncMap{
|
|
"getServices": getServices,
|
|
"hasLabel": hasFuncService,
|
|
"getLabelValue": getFuncServiceStringLabel,
|
|
"getLabelsWithPrefix": getServiceLabelsWithPrefix,
|
|
"isPrimary": isPrimary,
|
|
"isExposed": getFuncBoolLabel("expose", false),
|
|
"getBackendName": getBackendName,
|
|
"getDefaultEndpoint": getDefaultEndpoint,
|
|
"getNamedEndpoint": getNamedEndpoint, // FIXME unused
|
|
"getApplicationParameter": getApplicationParameter, // FIXME unused
|
|
"doesAppParamContain": doesAppParamContain, // FIXME unused
|
|
"filterServicesByLabelValue": filterServicesByLabelValue, // FIXME unused
|
|
}
|
|
|
|
services, err := getClusterServices(sfClient)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
templateObjects := struct {
|
|
Services []ServiceItemExtended
|
|
}{
|
|
Services: services,
|
|
}
|
|
|
|
return p.GetConfiguration(tmpl, sfFuncMap, templateObjects)
|
|
}
|
|
|
|
func getDefaultEndpoint(instance replicaInstance) string {
|
|
id, data := instance.GetReplicaData()
|
|
endpoint, err := getReplicaDefaultEndpoint(data)
|
|
if err != nil {
|
|
log.Warnf("No default endpoint for replica %s in service %s endpointData: %s", id, data.Address)
|
|
return ""
|
|
}
|
|
return endpoint
|
|
}
|
|
|
|
func getReplicaDefaultEndpoint(replicaData *sf.ReplicaItemBase) (string, error) {
|
|
endpoints, err := decodeEndpointData(replicaData.Address)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var defaultHTTPEndpoint string
|
|
for _, v := range endpoints {
|
|
if strings.Contains(v, "http") {
|
|
defaultHTTPEndpoint = v
|
|
break
|
|
}
|
|
}
|
|
|
|
if len(defaultHTTPEndpoint) == 0 {
|
|
return "", errors.New("no default endpoint found")
|
|
}
|
|
return defaultHTTPEndpoint, nil
|
|
}
|
|
|
|
func getNamedEndpoint(instance replicaInstance, endpointName string) string {
|
|
id, data := instance.GetReplicaData()
|
|
endpoint, err := getReplicaNamedEndpoint(data, endpointName)
|
|
if err != nil {
|
|
log.Warnf("No names endpoint of %s for replica %s in endpointData: %s. Error: %v", endpointName, id, data.Address, err)
|
|
return ""
|
|
}
|
|
return endpoint
|
|
}
|
|
|
|
func getReplicaNamedEndpoint(replicaData *sf.ReplicaItemBase, endpointName string) (string, error) {
|
|
endpoints, err := decodeEndpointData(replicaData.Address)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
endpoint, exists := endpoints[endpointName]
|
|
if !exists {
|
|
return "", errors.New("endpoint doesn't exist")
|
|
}
|
|
return endpoint, nil
|
|
}
|
|
|
|
func doesAppParamContain(app sf.ApplicationItem, key, shouldContain string) bool {
|
|
value := getApplicationParameter(app, key)
|
|
return strings.Contains(value, shouldContain)
|
|
}
|
|
|
|
func getApplicationParameter(app sf.ApplicationItem, key string) string {
|
|
for _, param := range app.Parameters {
|
|
if param.Key == key {
|
|
return param.Value
|
|
}
|
|
}
|
|
log.Errorf("Parameter %s doesn't exist in app %s", key, app.Name)
|
|
return ""
|
|
}
|
|
|
|
func getClusterServices(sfClient sfClient) ([]ServiceItemExtended, error) {
|
|
apps, err := sfClient.GetApplications()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var results []ServiceItemExtended
|
|
for _, app := range apps.Items {
|
|
services, err := sfClient.GetServices(app.ID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, service := range services.Items {
|
|
item := ServiceItemExtended{
|
|
ServiceItem: service,
|
|
Application: app,
|
|
}
|
|
|
|
if labels, err := sfClient.GetServiceLabels(&service, &app, traefikLabelPrefix); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
item.Labels = labels
|
|
}
|
|
|
|
if partitions, err := sfClient.GetPartitions(app.ID, service.ID); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
for _, partition := range partitions.Items {
|
|
partitionExt := PartitionItemExtended{PartitionItem: partition}
|
|
|
|
if partition.ServiceKind == "Stateful" {
|
|
partitionExt.Replicas = getValidReplicas(sfClient, app, service, partition)
|
|
} else if partition.ServiceKind == "Stateless" {
|
|
partitionExt.Instances = getValidInstances(sfClient, app, service, partition)
|
|
} else {
|
|
log.Errorf("Unsupported service kind %s in service %s", partition.ServiceKind, service.Name)
|
|
continue
|
|
}
|
|
|
|
item.Partitions = append(item.Partitions, partitionExt)
|
|
}
|
|
}
|
|
|
|
results = append(results, item)
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
func getValidReplicas(sfClient sfClient, app sf.ApplicationItem, service sf.ServiceItem, partition sf.PartitionItem) []sf.ReplicaItem {
|
|
var validReplicas []sf.ReplicaItem
|
|
|
|
if replicas, err := sfClient.GetReplicas(app.ID, service.ID, partition.PartitionInformation.ID); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
for _, instance := range replicas.Items {
|
|
if isHealthy(instance.ReplicaItemBase) && hasHTTPEndpoint(instance.ReplicaItemBase) {
|
|
validReplicas = append(validReplicas, instance)
|
|
}
|
|
}
|
|
}
|
|
return validReplicas
|
|
}
|
|
|
|
func getValidInstances(sfClient sfClient, app sf.ApplicationItem, service sf.ServiceItem, partition sf.PartitionItem) []sf.InstanceItem {
|
|
var validInstances []sf.InstanceItem
|
|
|
|
if instances, err := sfClient.GetInstances(app.ID, service.ID, partition.PartitionInformation.ID); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
for _, instance := range instances.Items {
|
|
if isHealthy(instance.ReplicaItemBase) && hasHTTPEndpoint(instance.ReplicaItemBase) {
|
|
validInstances = append(validInstances, instance)
|
|
}
|
|
}
|
|
}
|
|
return validInstances
|
|
}
|
|
|
|
func getServices(services []ServiceItemExtended, key string) map[string][]ServiceItemExtended {
|
|
result := map[string][]ServiceItemExtended{}
|
|
for _, service := range services {
|
|
if value, exists := service.Labels[key]; exists {
|
|
if matchingServices, hasKeyAlready := result[value]; hasKeyAlready {
|
|
result[value] = append(matchingServices, service)
|
|
} else {
|
|
result[value] = []ServiceItemExtended{service}
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func filterServicesByLabelValue(services []ServiceItemExtended, key, expectedValue string) []ServiceItemExtended {
|
|
var srvWithLabel []ServiceItemExtended
|
|
for _, service := range services {
|
|
value, exists := service.Labels[key]
|
|
if exists && value == expectedValue {
|
|
srvWithLabel = append(srvWithLabel, service)
|
|
}
|
|
}
|
|
return srvWithLabel
|
|
}
|
|
|
|
func isPrimary(instance replicaInstance) bool {
|
|
_, data := instance.GetReplicaData()
|
|
return data.ReplicaRole == "Primary"
|
|
}
|
|
|
|
func isHealthy(instanceData *sf.ReplicaItemBase) bool {
|
|
return instanceData != nil && (instanceData.ReplicaStatus == "Ready" || instanceData.HealthState != "Error")
|
|
}
|
|
|
|
func hasHTTPEndpoint(instanceData *sf.ReplicaItemBase) bool {
|
|
_, err := getReplicaDefaultEndpoint(instanceData)
|
|
return err == nil
|
|
}
|
|
|
|
func decodeEndpointData(endpointData string) (map[string]string, error) {
|
|
var endpointsMap map[string]map[string]string
|
|
|
|
if endpointData == "" {
|
|
return nil, errors.New("endpoint data is empty")
|
|
}
|
|
|
|
err := json.Unmarshal([]byte(endpointData), &endpointsMap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
endpoints, endpointsExist := endpointsMap["Endpoints"]
|
|
if !endpointsExist {
|
|
return nil, errors.New("endpoint doesn't exist in endpoint data")
|
|
}
|
|
|
|
return endpoints, nil
|
|
}
|
|
|
|
func getBackendName(service ServiceItemExtended, partition PartitionItemExtended) string {
|
|
return provider.Normalize(service.Name + partition.PartitionInformation.ID)
|
|
}
|