From 027093a5a53730fc2b29b0a2577277f228250dd5 Mon Sep 17 00:00:00 2001 From: SALLEYRON Julien Date: Wed, 11 Jul 2018 09:08:03 +0200 Subject: [PATCH] Create init method on provider interface --- Gopkg.lock | 24 +- Gopkg.toml | 2 +- cmd/traefik/traefik.go | 6 +- configuration/provider_aggregator.go | 70 +-- provider/acme/provider.go | 75 ++-- provider/boltdb/boltdb.go | 19 +- provider/consul/consul.go | 19 +- provider/consulcatalog/consul_catalog.go | 17 +- provider/docker/docker.go | 8 +- provider/dynamodb/dynamodb.go | 9 +- provider/ecs/ecs.go | 10 +- provider/etcd/etcd.go | 19 +- provider/eureka/eureka.go | 7 +- provider/file/file.go | 7 +- provider/file/file_test.go | 6 +- provider/kubernetes/kubernetes.go | 8 +- provider/kv/kv.go | 3 +- provider/marathon/marathon.go | 8 +- provider/mesos/mesos.go | 7 +- provider/provider.go | 9 +- provider/rancher/api.go | 3 +- provider/rancher/metadata.go | 4 +- provider/rancher/rancher.go | 11 +- provider/rest/rest.go | 7 +- provider/zk/zk.go | 19 +- server/server.go | 10 +- vendor/code.cloudfoundry.org/clock/LICENSE | 201 +++++++++ vendor/code.cloudfoundry.org/clock/NOTICE | 20 + vendor/code.cloudfoundry.org/clock/clock.go | 53 +++ vendor/code.cloudfoundry.org/clock/package.go | 1 + vendor/code.cloudfoundry.org/clock/ticker.go | 20 + vendor/code.cloudfoundry.org/clock/timer.go | 25 ++ .../Microsoft/ApplicationInsights-Go/LICENSE | 22 + .../appinsights/bond.go | 122 ++++++ .../appinsights/client.go | 132 ++++++ .../appinsights/clock.go | 11 + .../appinsights/concurrentrandom.go | 45 ++ .../appinsights/configuration.go | 19 + .../appinsights/datacontracts.go | 228 ++++++++++ .../appinsights/diagnostics.go | 64 +++ .../appinsights/inmemorychannel.go | 408 ++++++++++++++++++ .../appinsights/jsonserializer.go | 45 ++ .../appinsights/package.go | 8 + .../appinsights/telemetrychannel.go | 47 ++ .../appinsights/telemetrycontext.go | 400 +++++++++++++++++ .../appinsights/throttle.go | 144 +++++++ .../appinsights/transmitter.go | 237 ++++++++++ .../servicefabric.go | 45 +- .../servicefabric_config.go | 2 +- .../jjcollinge/logrus-appinsights/LICENSE | 21 + .../jjcollinge/logrus-appinsights/config.go | 11 + .../jjcollinge/logrus-appinsights/hook.go | 173 ++++++++ 52 files changed, 2760 insertions(+), 131 deletions(-) create mode 100644 vendor/code.cloudfoundry.org/clock/LICENSE create mode 100644 vendor/code.cloudfoundry.org/clock/NOTICE create mode 100644 vendor/code.cloudfoundry.org/clock/clock.go create mode 100644 vendor/code.cloudfoundry.org/clock/package.go create mode 100644 vendor/code.cloudfoundry.org/clock/ticker.go create mode 100644 vendor/code.cloudfoundry.org/clock/timer.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/LICENSE create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/bond.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/client.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/clock.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/concurrentrandom.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/configuration.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/datacontracts.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/diagnostics.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/jsonserializer.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/package.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrychannel.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrycontext.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/throttle.go create mode 100644 vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/transmitter.go create mode 100644 vendor/github.com/jjcollinge/logrus-appinsights/LICENSE create mode 100644 vendor/github.com/jjcollinge/logrus-appinsights/config.go create mode 100644 vendor/github.com/jjcollinge/logrus-appinsights/hook.go diff --git a/Gopkg.lock b/Gopkg.lock index 62fe7fc3f..d12e5d5d0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -7,6 +7,12 @@ revision = "056a55f54a6cc77b440b31a56a5e7c3982d32811" version = "v0.22.0" +[[projects]] + branch = "master" + name = "code.cloudfoundry.org/clock" + packages = ["."] + revision = "02e53af36e6c978af692887ed449b74026d76fec" + [[projects]] branch = "master" name = "github.com/ArthurHlt/go-eureka-client" @@ -80,6 +86,11 @@ packages = ["."] revision = "e039e20e500c2c025d9145be375e27cf42a94174" +[[projects]] + name = "github.com/Microsoft/ApplicationInsights-Go" + packages = ["appinsights"] + revision = "98ac7ca026c26818888600ea0d966987aa56f043" + [[projects]] name = "github.com/Microsoft/go-winio" packages = ["."] @@ -265,10 +276,10 @@ version = "v3.1.1" [[projects]] + branch = "init-provider" name = "github.com/containous/traefik-extra-service-fabric" packages = ["."] - revision = "08668650856571f3529bde394a36a5c9cf16a991" - version = "v1.2.0" + revision = "eb4d5cf161b3213bf45be611dc1f56e8b2161e46" [[projects]] name = "github.com/coreos/bbolt" @@ -764,6 +775,13 @@ version = "v1.3.7" [[projects]] + branch = "master" + name = "github.com/jjcollinge/logrus-appinsights" + packages = ["."] + revision = "9b66602d496a139e4722bdde32f0f1ac1c12d4a8" + +[[projects]] + branch = "master" name = "github.com/jjcollinge/servicefabric" packages = ["."] revision = "8eebe170fa1ba25d3dfb928b3f86a7313b13b9fe" @@ -1738,6 +1756,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "a3429eccf578f09c7c521a32f81df1af5f0a24cb8c9702b7b768f3db34153216" + inputs-digest = "c228c6029e36e15b6c74bdfa587ee0fa39787af0dc0d4047752d80ee2fb690c1" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index ad60b9905..a5ade8e33 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -66,7 +66,7 @@ [[constraint]] name = "github.com/containous/traefik-extra-service-fabric" - version = "1.2.0" + branch = "init-provider" [[constraint]] name = "github.com/coreos/go-systemd" diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index e721ff397..2647523f0 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -184,7 +184,11 @@ func runCmd(globalConfiguration *configuration.GlobalConfiguration, configFile s acmeprovider := globalConfiguration.InitACMEProvider() if acmeprovider != nil { - providerAggregator.AddProvider(acmeprovider) + err := providerAggregator.AddProvider(acmeprovider) + if err != nil { + log.Errorf("Error initializing provider ACME: %v", err) + acmeprovider = nil + } } entryPoints := map[string]server.EntryPoint{} diff --git a/configuration/provider_aggregator.go b/configuration/provider_aggregator.go index 20661176e..ce36dc384 100644 --- a/configuration/provider_aggregator.go +++ b/configuration/provider_aggregator.go @@ -2,7 +2,6 @@ package configuration import ( "encoding/json" - "reflect" "github.com/containous/traefik/log" "github.com/containous/traefik/provider" @@ -12,82 +11,101 @@ import ( // ProviderAggregator aggregate providers type ProviderAggregator struct { - providers []provider.Provider + providers []provider.Provider + constraints types.Constraints } // NewProviderAggregator return an aggregate of all the providers configured in GlobalConfiguration func NewProviderAggregator(gc *GlobalConfiguration) ProviderAggregator { - provider := ProviderAggregator{} + provider := ProviderAggregator{ + constraints: gc.Constraints, + } if gc.Docker != nil { - provider.providers = append(provider.providers, gc.Docker) + provider.quietAddProvider(gc.Docker) } if gc.Marathon != nil { - provider.providers = append(provider.providers, gc.Marathon) + provider.quietAddProvider(gc.Marathon) } if gc.File != nil { - provider.providers = append(provider.providers, gc.File) + provider.quietAddProvider(gc.File) } if gc.Rest != nil { - provider.providers = append(provider.providers, gc.Rest) + provider.quietAddProvider(gc.Rest) } if gc.Consul != nil { - provider.providers = append(provider.providers, gc.Consul) + provider.quietAddProvider(gc.Consul) } if gc.ConsulCatalog != nil { - provider.providers = append(provider.providers, gc.ConsulCatalog) + provider.quietAddProvider(gc.ConsulCatalog) } if gc.Etcd != nil { - provider.providers = append(provider.providers, gc.Etcd) + provider.quietAddProvider(gc.Etcd) } if gc.Zookeeper != nil { - provider.providers = append(provider.providers, gc.Zookeeper) + provider.quietAddProvider(gc.Zookeeper) } if gc.Boltdb != nil { - provider.providers = append(provider.providers, gc.Boltdb) + provider.quietAddProvider(gc.Boltdb) } if gc.Kubernetes != nil { - provider.providers = append(provider.providers, gc.Kubernetes) + provider.quietAddProvider(gc.Kubernetes) } if gc.Mesos != nil { - provider.providers = append(provider.providers, gc.Mesos) + provider.quietAddProvider(gc.Mesos) } if gc.Eureka != nil { - provider.providers = append(provider.providers, gc.Eureka) + provider.quietAddProvider(gc.Eureka) } if gc.ECS != nil { - provider.providers = append(provider.providers, gc.ECS) + provider.quietAddProvider(gc.ECS) } if gc.Rancher != nil { - provider.providers = append(provider.providers, gc.Rancher) + provider.quietAddProvider(gc.Rancher) } if gc.DynamoDB != nil { - provider.providers = append(provider.providers, gc.DynamoDB) + provider.quietAddProvider(gc.DynamoDB) } if gc.ServiceFabric != nil { - provider.providers = append(provider.providers, gc.ServiceFabric) + provider.quietAddProvider(gc.ServiceFabric) } return provider } +func (p *ProviderAggregator) quietAddProvider(provider provider.Provider) { + err := p.AddProvider(provider) + if err != nil { + log.Errorf("Error initializing provider %T: %v", provider, err) + } +} + // AddProvider add a provider in the providers map -func (p *ProviderAggregator) AddProvider(provider provider.Provider) { +func (p *ProviderAggregator) AddProvider(provider provider.Provider) error { + err := provider.Init(p.constraints) + if err != nil { + return err + } p.providers = append(p.providers, provider) + return nil +} + +// Init the provider +func (p ProviderAggregator) Init(_ types.Constraints) error { + return nil } // Provide call the provide method of every providers -func (p ProviderAggregator) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p ProviderAggregator) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { for _, p := range p.providers { - providerType := reflect.TypeOf(p) jsonConf, err := json.Marshal(p) if err != nil { - log.Debugf("Unable to marshal provider conf %v with error: %v", providerType, err) + log.Debugf("Unable to marshal provider conf %T with error: %v", p, err) } - log.Infof("Starting provider %v %s", providerType, jsonConf) + log.Infof("Starting provider %T %s", p, jsonConf) currentProvider := p safe.Go(func() { - err := currentProvider.Provide(configurationChan, pool, constraints) + err := currentProvider.Provide(configurationChan, pool) if err != nil { - log.Errorf("Error starting provider %v: %s", providerType, err) + log.Errorf("Error starting provider %T: %v", p, err) } }) } diff --git a/provider/acme/provider.go b/provider/acme/provider.go index 0f4dadb99..7c6beb97a 100644 --- a/provider/acme/provider.go +++ b/provider/acme/provider.go @@ -110,14 +110,45 @@ func (p *Provider) ListenRequest(domain string) (*tls.Certificate, error) { return &certificate, err } +// Init for compatibility reason the BaseProvider implements an empty Init +func (p *Provider) Init(_ types.Constraints) error { + acme.UserAgent = fmt.Sprintf("containous-traefik/%s", version.Version) + if p.ACMELogging { + legolog.Logger = fmtlog.New(log.WriterLevel(logrus.InfoLevel), "legolog: ", 0) + } else { + legolog.Logger = fmtlog.New(ioutil.Discard, "", 0) + } + + if p.Store == nil { + return errors.New("no store found for the ACME provider") + } + + var err error + p.account, err = p.Store.GetAccount() + if err != nil { + return fmt.Errorf("unable to get ACME account : %v", err) + } + + // Reset Account if caServer changed, thus registration URI can be updated + if p.account != nil && p.account.Registration != nil && !strings.HasPrefix(p.account.Registration.URI, p.CAServer) { + p.account = nil + } + + p.certificates, err = p.Store.GetCertificates() + if err != nil { + return fmt.Errorf("unable to get ACME certificates : %v", err) + } + + return nil +} + // Provide allows the file provider to provide configurations to traefik // using the given Configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { p.pool = pool - err := p.init() - if err != nil { - return err - } + + p.watchCertificate() + p.watchNewDomains() p.configurationChan = configurationChan p.refreshCertificates() @@ -150,40 +181,6 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s return nil } -func (p *Provider) init() error { - acme.UserAgent = fmt.Sprintf("containous-traefik/%s", version.Version) - if p.ACMELogging { - legolog.Logger = fmtlog.New(log.WriterLevel(logrus.InfoLevel), "legolog: ", 0) - } else { - legolog.Logger = fmtlog.New(ioutil.Discard, "", 0) - } - - if p.Store == nil { - return errors.New("no store found for the ACME provider") - } - - var err error - p.account, err = p.Store.GetAccount() - if err != nil { - return fmt.Errorf("unable to get ACME account : %v", err) - } - - // Reset Account if caServer changed, thus registration URI can be updated - if p.account != nil && p.account.Registration != nil && !strings.HasPrefix(p.account.Registration.URI, p.CAServer) { - p.account = nil - } - - p.certificates, err = p.Store.GetCertificates() - if err != nil { - return fmt.Errorf("unable to get ACME certificates : %v", err) - } - - p.watchCertificate() - p.watchNewDomains() - - return nil -} - func (p *Provider) getClient() (*acme.Client, error) { p.clientMutex.Lock() defer p.clientMutex.Unlock() diff --git a/provider/boltdb/boltdb.go b/provider/boltdb/boltdb.go index f476d7729..df6a0cb19 100644 --- a/provider/boltdb/boltdb.go +++ b/provider/boltdb/boltdb.go @@ -18,15 +18,26 @@ type Provider struct { kv.Provider `mapstructure:",squash" export:"true"` } -// Provide allows the boltdb provider to Provide configurations to traefik -// using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + err := p.Provider.Init(constraints) + if err != nil { + return err + } + store, err := p.CreateStore() if err != nil { return fmt.Errorf("failed to Connect to KV store: %v", err) } + p.SetKVClient(store) - return p.Provider.Provide(configurationChan, pool, constraints) + return nil +} + +// Provide allows the boltdb provider to Provide configurations to traefik +// using the given configuration channel. +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { + return p.Provider.Provide(configurationChan, pool) } // CreateStore creates the KV store diff --git a/provider/consul/consul.go b/provider/consul/consul.go index 87d203314..3004e1b5d 100644 --- a/provider/consul/consul.go +++ b/provider/consul/consul.go @@ -18,15 +18,26 @@ type Provider struct { kv.Provider `mapstructure:",squash" export:"true"` } -// Provide allows the consul provider to provide configurations to traefik -// using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + err := p.Provider.Init(constraints) + if err != nil { + return err + } + store, err := p.CreateStore() if err != nil { return fmt.Errorf("failed to Connect to KV store: %v", err) } + p.SetKVClient(store) - return p.Provider.Provide(configurationChan, pool, constraints) + return nil +} + +// Provide allows the consul provider to provide configurations to traefik +// using the given configuration channel. +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { + return p.Provider.Provide(configurationChan, pool) } // CreateStore creates the KV store diff --git a/provider/consulcatalog/consul_catalog.go b/provider/consulcatalog/consul_catalog.go index e1642ae96..312fd6da5 100644 --- a/provider/consulcatalog/consul_catalog.go +++ b/provider/consulcatalog/consul_catalog.go @@ -89,18 +89,27 @@ func (a nodeSorter) Less(i int, j int) bool { return lEntry.Service.Port < rEntry.Service.Port } -// Provide allows the consul catalog provider to provide configurations to traefik -// using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + err := p.BaseProvider.Init(constraints) + if err != nil { + return err + } + client, err := p.createClient() if err != nil { return err } p.client = client - p.Constraints = append(p.Constraints, constraints...) p.setupFrontEndRuleTemplate() + return nil +} + +// Provide allows the consul catalog provider to provide configurations to traefik +// using the given configuration channel. +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { pool.Go(func(stop chan bool) { notify := func(err error, time time.Duration) { log.Errorf("Consul connection error %+v, retrying in %s", err, time) diff --git a/provider/docker/docker.go b/provider/docker/docker.go index c3b9c825a..250430998 100644 --- a/provider/docker/docker.go +++ b/provider/docker/docker.go @@ -48,6 +48,11 @@ type Provider struct { Network string `description:"Default Docker network used" export:"true"` } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // dockerData holds the need data to the Provider p type dockerData struct { ServiceName string @@ -115,8 +120,7 @@ func (p *Provider) createClient() (client.APIClient, error) { // Provide allows the docker provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { // TODO register this routine in pool, and watch for stop channel safe.Go(func() { operation := func() error { diff --git a/provider/dynamodb/dynamodb.go b/provider/dynamodb/dynamodb.go index cf2f79a74..e2876da02 100644 --- a/provider/dynamodb/dynamodb.go +++ b/provider/dynamodb/dynamodb.go @@ -37,6 +37,11 @@ type dynamoClient struct { db dynamodbiface.DynamoDBAPI } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // createClient configures aws credentials and creates a dynamoClient func (p *Provider) createClient() (*dynamoClient, error) { log.Info("Creating Provider client...") @@ -145,9 +150,7 @@ func (p *Provider) buildConfiguration(client *dynamoClient) (*types.Configuratio // Provide provides the configuration to traefik via the configuration channel // if watch is enabled it polls dynamodb -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - log.Debugf("Providing Provider...") - p.Constraints = append(p.Constraints, constraints...) +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { handleCanceled := func(ctx context.Context, err error) error { if ctx.Err() == context.Canceled || err == context.Canceled { return nil diff --git a/provider/ecs/ecs.go b/provider/ecs/ecs.go index fc8a0c953..fb35e0372 100644 --- a/provider/ecs/ecs.go +++ b/provider/ecs/ecs.go @@ -65,11 +65,17 @@ type awsClient struct { ec2 *ec2.EC2 } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + func (p *Provider) createClient() (*awsClient, error) { sess, err := session.NewSession() if err != nil { return nil, err } + ec2meta := ec2metadata.New(sess) if p.Region == "" { log.Infoln("No EC2 region provided, querying instance metadata endpoint...") @@ -110,9 +116,7 @@ func (p *Provider) createClient() (*awsClient, error) { // Provide allows the ecs provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) - +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { handleCanceled := func(ctx context.Context, err error) error { if ctx.Err() == context.Canceled || err == context.Canceled { return nil diff --git a/provider/etcd/etcd.go b/provider/etcd/etcd.go index 4d1bb4f74..cc78951ef 100644 --- a/provider/etcd/etcd.go +++ b/provider/etcd/etcd.go @@ -21,15 +21,26 @@ type Provider struct { UseAPIV3 bool `description:"Use ETCD API V3" export:"true"` } -// Provide allows the etcd provider to Provide configurations to traefik -// using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + err := p.Provider.Init(constraints) + if err != nil { + return err + } + store, err := p.CreateStore() if err != nil { return fmt.Errorf("failed to Connect to KV store: %v", err) } + p.SetKVClient(store) - return p.Provider.Provide(configurationChan, pool, constraints) + return nil +} + +// Provide allows the etcd provider to Provide configurations to traefik +// using the given configuration channel. +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { + return p.Provider.Provide(configurationChan, pool) } // CreateStore creates the KV store diff --git a/provider/eureka/eureka.go b/provider/eureka/eureka.go index cb64f5d53..b8d45553e 100644 --- a/provider/eureka/eureka.go +++ b/provider/eureka/eureka.go @@ -22,9 +22,14 @@ type Provider struct { RefreshSeconds flaeg.Duration `description:"Override default configuration time between refresh" export:"true"` } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows the eureka provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { eureka.GetLogger().SetOutput(ioutil.Discard) operation := func() error { diff --git a/provider/file/file.go b/provider/file/file.go index 5a241f92b..b15236768 100644 --- a/provider/file/file.go +++ b/provider/file/file.go @@ -27,9 +27,14 @@ type Provider struct { TraefikFile string } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows the file provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { configuration, err := p.BuildConfiguration() if err != nil { diff --git a/provider/file/file_test.go b/provider/file/file_test.go index 074a11dba..1e62175ae 100644 --- a/provider/file/file_test.go +++ b/provider/file/file_test.go @@ -198,7 +198,7 @@ func TestProvideWithoutWatch(t *testing.T) { configChan := make(chan types.ConfigMessage) go func() { - err := provider.Provide(configChan, safe.NewPool(context.Background()), types.Constraints{}) + err := provider.Provide(configChan, safe.NewPool(context.Background())) assert.NoError(t, err) }() @@ -226,7 +226,7 @@ func TestProvideWithWatch(t *testing.T) { configChan := make(chan types.ConfigMessage) go func() { - err := provider.Provide(configChan, safe.NewPool(context.Background()), types.Constraints{}) + err := provider.Provide(configChan, safe.NewPool(context.Background())) assert.NoError(t, err) }() @@ -276,7 +276,7 @@ func TestErrorWhenEmptyConfig(t *testing.T) { configChan := make(chan types.ConfigMessage) errorChan := make(chan struct{}) go func() { - err := provider.Provide(configChan, safe.NewPool(context.Background()), types.Constraints{}) + err := provider.Provide(configChan, safe.NewPool(context.Background())) assert.Error(t, err) close(errorChan) }() diff --git a/provider/kubernetes/kubernetes.go b/provider/kubernetes/kubernetes.go index e2b6fa939..c89e244f4 100644 --- a/provider/kubernetes/kubernetes.go +++ b/provider/kubernetes/kubernetes.go @@ -95,9 +95,14 @@ func (p *Provider) newK8sClient(ingressLabelSelector string) (Client, error) { return cl, err } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows the k8s provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { // Tell glog (used by client-go) to log into STDERR. Otherwise, we risk // certain kinds of API errors getting logged into a directory not // available in a `FROM scratch` Docker container, causing glog to abort @@ -112,7 +117,6 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s if err != nil { return err } - p.Constraints = append(p.Constraints, constraints...) pool.Go(func(stop chan bool) { operation := func() error { diff --git a/provider/kv/kv.go b/provider/kv/kv.go index c7223946c..23c70e382 100644 --- a/provider/kv/kv.go +++ b/provider/kv/kv.go @@ -97,8 +97,7 @@ func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix } // Provide provides the configuration to traefik via the configuration channel -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { operation := func() error { if _, err := p.kvClient.Exists(p.Prefix+"/qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj", nil); err != nil { return fmt.Errorf("failed to test KV store connection: %v", err) diff --git a/provider/marathon/marathon.go b/provider/marathon/marathon.go index 193e67777..1685590ee 100644 --- a/provider/marathon/marathon.go +++ b/provider/marathon/marathon.go @@ -72,10 +72,14 @@ type Basic struct { HTTPBasicPassword string `description:"Basic authentication Password"` } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows the marathon provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { operation := func() error { config := marathon.NewDefaultConfig() config.URL = p.Endpoint diff --git a/provider/mesos/mesos.go b/provider/mesos/mesos.go index 0428d71bc..85a8beaaf 100644 --- a/provider/mesos/mesos.go +++ b/provider/mesos/mesos.go @@ -38,9 +38,14 @@ type Provider struct { Masters []string } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows the mesos provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { operation := func() error { // initialize logging diff --git a/provider/provider.go b/provider/provider.go index 6bd9688a9..f6fa1ef70 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -19,7 +19,8 @@ import ( type Provider interface { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. - Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error + Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error + Init(constraints types.Constraints) error } // BaseProvider should be inherited by providers @@ -32,6 +33,12 @@ type BaseProvider struct { DebugLogGeneratedTemplate bool `description:"Enable debug logging of generated configuration template." export:"true"` } +// Init for compatibility reason the BaseProvider implements an empty Init +func (p *BaseProvider) Init(constraints types.Constraints) error { + p.Constraints = append(p.Constraints, constraints...) + return nil +} + // MatchConstraints must match with EVERY single constraint // returns first constraint that do not match or nil func (p *BaseProvider) MatchConstraints(tags []string) (bool, *types.Constraint) { diff --git a/provider/rancher/api.go b/provider/rancher/api.go index 3f831c579..a3e81b6cb 100644 --- a/provider/rancher/api.go +++ b/provider/rancher/api.go @@ -55,8 +55,7 @@ func getenv(key, fallback string) string { return value } -func (p *Provider) apiProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) +func (p *Provider) apiProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { if p.API == nil { p.API = &APIConfiguration{} diff --git a/provider/rancher/metadata.go b/provider/rancher/metadata.go index 4e8a42608..76e91e550 100644 --- a/provider/rancher/metadata.go +++ b/provider/rancher/metadata.go @@ -22,9 +22,7 @@ type MetadataConfiguration struct { Prefix string `description:"Prefix used for accessing the Rancher metadata service"` } -func (p *Provider) metadataProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { - p.Constraints = append(p.Constraints, constraints...) - +func (p *Provider) metadataProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { metadataServiceURL := fmt.Sprintf("http://rancher-metadata.rancher.internal/%s", p.Metadata.Prefix) safe.Go(func() { diff --git a/provider/rancher/rancher.go b/provider/rancher/rancher.go index 7952fc55e..3fe96b21c 100644 --- a/provider/rancher/rancher.go +++ b/provider/rancher/rancher.go @@ -51,13 +51,18 @@ func (r rancherData) String() string { return fmt.Sprintf("{name:%s, labels:%v, containers: %v, health: %s, state: %s}", r.Name, r.Labels, r.Containers, r.Health, r.State) } +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + return p.BaseProvider.Init(constraints) +} + // Provide allows either the Rancher API or metadata service provider to // seed configuration into Traefik using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { if p.Metadata == nil { - return p.apiProvide(configurationChan, pool, constraints) + return p.apiProvide(configurationChan, pool) } - return p.metadataProvide(configurationChan, pool, constraints) + return p.metadataProvide(configurationChan, pool) } func containerFilter(name, healthState, state string) bool { diff --git a/provider/rest/rest.go b/provider/rest/rest.go index 5041e874f..f3ffe5136 100644 --- a/provider/rest/rest.go +++ b/provider/rest/rest.go @@ -21,6 +21,11 @@ type Provider struct { var templatesRenderer = render.New(render.Options{Directory: "nowhere"}) +// Init the provider +func (p *Provider) Init(_ types.Constraints) error { + return nil +} + // AddRoutes add rest provider routes on a router func (p *Provider) AddRoutes(systemRouter *mux.Router) { systemRouter. @@ -57,7 +62,7 @@ func (p *Provider) AddRoutes(systemRouter *mux.Router) { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { p.configurationChan = configurationChan return nil } diff --git a/provider/zk/zk.go b/provider/zk/zk.go index b889c9eb1..7edc48c29 100644 --- a/provider/zk/zk.go +++ b/provider/zk/zk.go @@ -18,15 +18,26 @@ type Provider struct { kv.Provider `mapstructure:",squash" export:"true"` } -// Provide allows the zk provider to Provide configurations to traefik -// using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + err := p.Provider.Init(constraints) + if err != nil { + return err + } + store, err := p.CreateStore() if err != nil { return fmt.Errorf("failed to Connect to KV store: %v", err) } + p.SetKVClient(store) - return p.Provider.Provide(configurationChan, pool, constraints) + return nil +} + +// Provide allows the zk provider to Provide configurations to traefik +// using the given configuration channel. +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { + return p.Provider.Provide(configurationChan, pool) } // CreateStore creates the KV store diff --git a/server/server.go b/server/server.go index 62399d0b9..0cd9e6b95 100644 --- a/server/server.go +++ b/server/server.go @@ -14,7 +14,6 @@ import ( "net/url" "os" "os/signal" - "reflect" "sync" "time" @@ -308,17 +307,16 @@ func (s *serverEntryPoint) getCertificate(clientHello *tls.ClientHelloInfo) (*tl func (s *Server) startProvider() { // start providers - providerType := reflect.TypeOf(s.provider) jsonConf, err := json.Marshal(s.provider) if err != nil { - log.Debugf("Unable to marshal provider conf %v with error: %v", providerType, err) + log.Debugf("Unable to marshal provider conf %T with error: %v", s.provider, err) } - log.Infof("Starting provider %v %s", providerType, jsonConf) + log.Infof("Starting provider %T %s", s.provider, jsonConf) currentProvider := s.provider safe.Go(func() { - err := currentProvider.Provide(s.configurationChan, s.routinesPool, s.globalConfiguration.Constraints) + err := currentProvider.Provide(s.configurationChan, s.routinesPool) if err != nil { - log.Errorf("Error starting provider %v: %s", providerType, err) + log.Errorf("Error starting provider %T: %s", s.provider, err) } }) } diff --git a/vendor/code.cloudfoundry.org/clock/LICENSE b/vendor/code.cloudfoundry.org/clock/LICENSE new file mode 100644 index 000000000..f49a4e16e --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/vendor/code.cloudfoundry.org/clock/NOTICE b/vendor/code.cloudfoundry.org/clock/NOTICE new file mode 100644 index 000000000..29c0e5ff0 --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/NOTICE @@ -0,0 +1,20 @@ +Copyright (c) 2015-Present CloudFoundry.org Foundation, Inc. All Rights Reserved. + +This project contains software that is Copyright (c) 2015 Pivotal Software, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This project may include a number of subcomponents with separate +copyright notices and license terms. Your use of these subcomponents +is subject to the terms and conditions of each subcomponent's license, +as noted in the LICENSE file. diff --git a/vendor/code.cloudfoundry.org/clock/clock.go b/vendor/code.cloudfoundry.org/clock/clock.go new file mode 100644 index 000000000..6b091d99a --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/clock.go @@ -0,0 +1,53 @@ +package clock + +import "time" + +type Clock interface { + Now() time.Time + Sleep(d time.Duration) + Since(t time.Time) time.Duration + // After waits for the duration to elapse and then sends the current time + // on the returned channel. + // It is equivalent to clock.NewTimer(d).C. + // The underlying Timer is not recovered by the garbage collector + // until the timer fires. If efficiency is a concern, use clock.NewTimer + // instead and call Timer.Stop if the timer is no longer needed. + After(d time.Duration) <-chan time.Time + + NewTimer(d time.Duration) Timer + NewTicker(d time.Duration) Ticker +} + +type realClock struct{} + +func NewClock() Clock { + return &realClock{} +} + +func (clock *realClock) Now() time.Time { + return time.Now() +} + +func (clock *realClock) Since(t time.Time) time.Duration { + return time.Now().Sub(t) +} + +func (clock *realClock) Sleep(d time.Duration) { + <-clock.NewTimer(d).C() +} + +func (clock *realClock) After(d time.Duration) <-chan time.Time { + return clock.NewTimer(d).C() +} + +func (clock *realClock) NewTimer(d time.Duration) Timer { + return &realTimer{ + t: time.NewTimer(d), + } +} + +func (clock *realClock) NewTicker(d time.Duration) Ticker { + return &realTicker{ + t: time.NewTicker(d), + } +} diff --git a/vendor/code.cloudfoundry.org/clock/package.go b/vendor/code.cloudfoundry.org/clock/package.go new file mode 100644 index 000000000..349f67c82 --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/package.go @@ -0,0 +1 @@ +package clock // import "code.cloudfoundry.org/clock" diff --git a/vendor/code.cloudfoundry.org/clock/ticker.go b/vendor/code.cloudfoundry.org/clock/ticker.go new file mode 100644 index 000000000..f25129e1c --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/ticker.go @@ -0,0 +1,20 @@ +package clock + +import "time" + +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + t *time.Ticker +} + +func (t *realTicker) C() <-chan time.Time { + return t.t.C +} + +func (t *realTicker) Stop() { + t.t.Stop() +} diff --git a/vendor/code.cloudfoundry.org/clock/timer.go b/vendor/code.cloudfoundry.org/clock/timer.go new file mode 100644 index 000000000..cf8c22125 --- /dev/null +++ b/vendor/code.cloudfoundry.org/clock/timer.go @@ -0,0 +1,25 @@ +package clock + +import "time" + +type Timer interface { + C() <-chan time.Time + Reset(d time.Duration) bool + Stop() bool +} + +type realTimer struct { + t *time.Timer +} + +func (t *realTimer) C() <-chan time.Time { + return t.t.C +} + +func (t *realTimer) Reset(d time.Duration) bool { + return t.t.Reset(d) +} + +func (t *realTimer) Stop() bool { + return t.t.Stop() +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/LICENSE b/vendor/github.com/Microsoft/ApplicationInsights-Go/LICENSE new file mode 100644 index 000000000..01d022c22 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015-2017 Microsoft + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/bond.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/bond.go new file mode 100644 index 000000000..1af6f5949 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/bond.go @@ -0,0 +1,122 @@ +package appinsights + +type Domain interface { +} + +type domain struct { + Ver int `json:"ver"` + Properties map[string]string `json:"properties"` +} + +type data struct { + BaseType string `json:"baseType"` + BaseData Domain `json:"baseData"` +} + +type envelope struct { + Name string `json:"name"` + Time string `json:"time"` + IKey string `json:"iKey"` + Tags map[string]string `json:"tags"` + Data *data `json:"data"` +} + +type DataPointType int + +const ( + Measurement DataPointType = iota + Aggregation +) + +type DataPoint struct { + Name string `json:"name"` + Kind DataPointType `json:"kind"` + Value float32 `json:"value"` + Count int `json:"count"` + min float32 `json:"min"` + max float32 `json:"max"` + stdDev float32 `json:"stdDev"` +} + +type metricData struct { + domain + Metrics []*DataPoint `json:"metrics"` +} + +type eventData struct { + domain + Name string `json:"name"` + Measurements map[string]float32 `json:"measurements"` +} + +type SeverityLevel int + +const ( + Verbose SeverityLevel = iota + Information + Warning + Error + Critical +) + +type messageData struct { + domain + Message string `json:"message"` + SeverityLevel SeverityLevel `json:"severityLevel"` +} + +type requestData struct { + domain + Id string `json:"id"` + Name string `json:"name"` + StartTime string `json:"startTime"` // yyyy-mm-ddThh:mm:ss.fffffff-hh:mm + Duration string `json:"duration"` // d:hh:mm:ss.fffffff + ResponseCode string `json:"responseCode"` + Success bool `json:"success"` + HttpMethod string `json:"httpMethod"` + Url string `json:"url"` + Measurements map[string]float32 `json:"measurements"` +} + +type ContextTagKeys string + +const ( + ApplicationVersion ContextTagKeys = "ai.application.ver" + ApplicationBuild = "ai.application.build" + CloudRole = "ai.cloud.role" + CloudRoleInstance = "ai.cloud.roleInstance" + DeviceId = "ai.device.id" + DeviceIp = "ai.device.ip" + DeviceLanguage = "ai.device.language" + DeviceLocale = "ai.device.locale" + DeviceModel = "ai.device.model" + DeviceNetwork = "ai.device.network" + DeviceOEMName = "ai.device.oemName" + DeviceOS = "ai.device.os" + DeviceOSVersion = "ai.device.osVersion" + DeviceRoleInstance = "ai.device.roleInstance" + DeviceRoleName = "ai.device.roleName" + DeviceScreenResolution = "ai.device.screenResolution" + DeviceType = "ai.device.type" + DeviceMachineName = "ai.device.machineName" + LocationIp = "ai.location.ip" + OperationCorrelationVector = "ai.operation.correlationVector" + OperationId = "ai.operation.id" + OperationName = "ai.operation.name" + OperationParentId = "ai.operation.parentId" + OperationRootId = "ai.operation.rootId" + OperationSyntheticSource = "ai.operation.syntheticSource" + OperationIsSynthetic = "ai.operation.isSynthetic" + SessionId = "ai.session.id" + SessionIsFirst = "ai.session.isFirst" + SessionIsNew = "ai.session.isNew" + UserAccountAcquisitionDate = "ai.user.accountAcquisitionDate" + UserAccountId = "ai.user.accountId" + UserAgent = "ai.user.userAgent" + UserAuthUserId = "ai.user.authUserId" + UserId = "ai.user.id" + UserStoreRegion = "ai.user.storeRegion" + SampleRate = "ai.sample.sampleRate" + InternalSdkVersion = "ai.internal.sdkVersion" + InternalAgentVersion = "ai.internal.agentVersion" +) diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/client.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/client.go new file mode 100644 index 000000000..c7d56161e --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/client.go @@ -0,0 +1,132 @@ +package appinsights + +import "time" + +type TelemetryClient interface { + Context() TelemetryContext + InstrumentationKey() string + Channel() TelemetryChannel + IsEnabled() bool + SetIsEnabled(bool) + Track(Telemetry) + TrackEvent(string) + TrackEventTelemetry(*EventTelemetry) + TrackMetric(string, float32) + TrackMetricTelemetry(*MetricTelemetry) + TrackTrace(string) + TrackTraceTelemetry(*TraceTelemetry) + TrackRequest(string, string, string, time.Time, time.Duration, string, bool) + TrackRequestTelemetry(*RequestTelemetry) +} + +type telemetryClient struct { + TelemetryConfiguration *TelemetryConfiguration + channel TelemetryChannel + context TelemetryContext + isEnabled bool +} + +func NewTelemetryClient(iKey string) TelemetryClient { + return NewTelemetryClientFromConfig(NewTelemetryConfiguration(iKey)) +} + +func NewTelemetryClientFromConfig(config *TelemetryConfiguration) TelemetryClient { + channel := NewInMemoryChannel(config) + context := NewClientTelemetryContext() + return &telemetryClient{ + TelemetryConfiguration: config, + channel: channel, + context: context, + isEnabled: true, + } +} + +func (tc *telemetryClient) Context() TelemetryContext { + return tc.context +} + +func (tc *telemetryClient) Channel() TelemetryChannel { + return tc.channel +} + +func (tc *telemetryClient) InstrumentationKey() string { + return tc.TelemetryConfiguration.InstrumentationKey +} + +func (tc *telemetryClient) IsEnabled() bool { + return tc.isEnabled +} + +func (tc *telemetryClient) SetIsEnabled(isEnabled bool) { + tc.isEnabled = isEnabled +} + +func (tc *telemetryClient) Track(item Telemetry) { + if tc.isEnabled { + iKey := tc.context.InstrumentationKey() + if len(iKey) == 0 { + iKey = tc.TelemetryConfiguration.InstrumentationKey + } + + itemContext := item.Context().(*telemetryContext) + itemContext.iKey = iKey + + clientContext := tc.context.(*telemetryContext) + + for tagkey, tagval := range clientContext.tags { + if itemContext.tags[tagkey] == "" { + itemContext.tags[tagkey] = tagval + } + } + + tc.channel.Send(item) + } +} + +func (tc *telemetryClient) TrackEvent(name string) { + item := NewEventTelemetry(name) + tc.TrackEventTelemetry(item) +} + +func (tc *telemetryClient) TrackEventTelemetry(event *EventTelemetry) { + var item Telemetry + item = event + + tc.Track(item) +} + +func (tc *telemetryClient) TrackMetric(name string, value float32) { + item := NewMetricTelemetry(name, value) + tc.TrackMetricTelemetry(item) +} + +func (tc *telemetryClient) TrackMetricTelemetry(metric *MetricTelemetry) { + var item Telemetry + item = metric + + tc.Track(item) +} + +func (tc *telemetryClient) TrackTrace(message string) { + item := NewTraceTelemetry(message, Information) + tc.TrackTraceTelemetry(item) +} + +func (tc *telemetryClient) TrackTraceTelemetry(trace *TraceTelemetry) { + var item Telemetry + item = trace + + tc.Track(item) +} + +func (tc *telemetryClient) TrackRequest(name, method, url string, timestamp time.Time, duration time.Duration, responseCode string, success bool) { + item := NewRequestTelemetry(name, method, url, timestamp, duration, responseCode, success) + tc.TrackRequestTelemetry(item) +} + +func (tc *telemetryClient) TrackRequestTelemetry(request *RequestTelemetry) { + var item Telemetry + item = request + + tc.Track(item) +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/clock.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/clock.go new file mode 100644 index 000000000..1178b9eaa --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/clock.go @@ -0,0 +1,11 @@ +package appinsights + +// We need to mock out the clock for tests; we'll use this to do it. + +import "code.cloudfoundry.org/clock" + +var currentClock clock.Clock + +func init() { + currentClock = clock.NewClock() +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/concurrentrandom.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/concurrentrandom.go new file mode 100644 index 000000000..552d498fd --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/concurrentrandom.go @@ -0,0 +1,45 @@ +package appinsights + +import ( + "encoding/base64" + "math/rand" + "sync/atomic" + "time" + "unsafe" +) + +type concurrentRandom struct { + channel chan string + random *rand.Rand +} + +var randomGenerator *concurrentRandom + +func newConcurrentRandom() *concurrentRandom { + source := rand.NewSource(time.Now().UnixNano()) + return &concurrentRandom{ + channel: make(chan string, 4), + random: rand.New(source), + } +} + +func (generator *concurrentRandom) run() { + buf := make([]byte, 8) + for { + generator.random.Read(buf) + generator.channel <- base64.StdEncoding.EncodeToString(buf) + } +} + +func randomId() string { + if randomGenerator == nil { + r := newConcurrentRandom() + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&randomGenerator)), unsafe.Pointer(nil), unsafe.Pointer(r)) { + go r.run() + } else { + close(r.channel) + } + } + + return <-randomGenerator.channel +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/configuration.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/configuration.go new file mode 100644 index 000000000..37c602ed6 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/configuration.go @@ -0,0 +1,19 @@ +package appinsights + +import "time" + +type TelemetryConfiguration struct { + InstrumentationKey string + EndpointUrl string + MaxBatchSize int + MaxBatchInterval time.Duration +} + +func NewTelemetryConfiguration(instrumentationKey string) *TelemetryConfiguration { + return &TelemetryConfiguration{ + InstrumentationKey: instrumentationKey, + EndpointUrl: "https://dc.services.visualstudio.com/v2/track", + MaxBatchSize: 1024, + MaxBatchInterval: time.Duration(10) * time.Second, + } +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/datacontracts.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/datacontracts.go new file mode 100644 index 000000000..136a86e48 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/datacontracts.go @@ -0,0 +1,228 @@ +package appinsights + +import ( + "fmt" + "time" +) + +type Telemetry interface { + Timestamp() time.Time + Context() TelemetryContext + baseTypeName() string + baseData() Domain + SetProperty(string, string) +} + +type BaseTelemetry struct { + timestamp time.Time + context TelemetryContext +} + +type TraceTelemetry struct { + BaseTelemetry + data *messageData +} + +func NewTraceTelemetry(message string, severityLevel SeverityLevel) *TraceTelemetry { + now := time.Now() + data := &messageData{ + Message: message, + SeverityLevel: severityLevel, + } + + data.Ver = 2 + + item := &TraceTelemetry{ + data: data, + } + + item.timestamp = now + item.context = NewItemTelemetryContext() + + return item +} + +func (item *TraceTelemetry) Timestamp() time.Time { + return item.timestamp +} + +func (item *TraceTelemetry) Context() TelemetryContext { + return item.context +} + +func (item *TraceTelemetry) baseTypeName() string { + return "Message" +} + +func (item *TraceTelemetry) baseData() Domain { + return item.data +} + +func (item *TraceTelemetry) SetProperty(key, value string) { + if item.data.Properties == nil { + item.data.Properties = make(map[string]string) + } + item.data.Properties[key] = value +} + +type EventTelemetry struct { + BaseTelemetry + data *eventData +} + +func NewEventTelemetry(name string) *EventTelemetry { + now := time.Now() + data := &eventData{ + Name: name, + } + + data.Ver = 2 + + item := &EventTelemetry{ + data: data, + } + + item.timestamp = now + item.context = NewItemTelemetryContext() + + return item +} + +func (item *EventTelemetry) Timestamp() time.Time { + return item.timestamp +} + +func (item *EventTelemetry) Context() TelemetryContext { + return item.context +} + +func (item *EventTelemetry) baseTypeName() string { + return "Event" +} + +func (item *EventTelemetry) baseData() Domain { + return item.data +} + +func (item *EventTelemetry) SetProperty(key, value string) { + if item.data.Properties == nil { + item.data.Properties = make(map[string]string) + } + item.data.Properties[key] = value +} + +type MetricTelemetry struct { + BaseTelemetry + data *metricData +} + +func NewMetricTelemetry(name string, value float32) *MetricTelemetry { + now := time.Now() + metric := &DataPoint{ + Name: name, + Value: value, + Count: 1, + } + + data := &metricData{ + Metrics: make([]*DataPoint, 1), + } + + data.Ver = 2 + data.Metrics[0] = metric + + item := &MetricTelemetry{ + data: data, + } + + item.timestamp = now + item.context = NewItemTelemetryContext() + + return item +} + +func (item *MetricTelemetry) Timestamp() time.Time { + return item.timestamp +} + +func (item *MetricTelemetry) Context() TelemetryContext { + return item.context +} + +func (item *MetricTelemetry) baseTypeName() string { + return "Metric" +} + +func (item *MetricTelemetry) baseData() Domain { + return item.data +} + +func (item *MetricTelemetry) SetProperty(key, value string) { + if item.data.Properties == nil { + item.data.Properties = make(map[string]string) + } + item.data.Properties[key] = value +} + +type RequestTelemetry struct { + BaseTelemetry + data *requestData +} + +func NewRequestTelemetry(name, httpMethod, url string, timestamp time.Time, duration time.Duration, responseCode string, success bool) *RequestTelemetry { + now := time.Now() + data := &requestData{ + Name: name, + StartTime: timestamp.Format(time.RFC3339Nano), + Duration: formatDuration(duration), + ResponseCode: responseCode, + Success: success, + HttpMethod: httpMethod, + Url: url, + Id: randomId(), + } + + data.Ver = 2 + + item := &RequestTelemetry{ + data: data, + } + + item.timestamp = now + item.context = NewItemTelemetryContext() + + return item +} + +func (item *RequestTelemetry) Timestamp() time.Time { + return item.timestamp +} + +func (item *RequestTelemetry) Context() TelemetryContext { + return item.context +} + +func (item *RequestTelemetry) baseTypeName() string { + return "Request" +} + +func (item *RequestTelemetry) baseData() Domain { + return item.data +} + +func (item *RequestTelemetry) SetProperty(key, value string) { + if item.data.Properties == nil { + item.data.Properties = make(map[string]string) + } + item.data.Properties[key] = value +} + +func formatDuration(d time.Duration) string { + ticks := int64(d/(time.Nanosecond*100)) % 10000000 + seconds := int64(d/time.Second) % 60 + minutes := int64(d/time.Minute) % 60 + hours := int64(d/time.Hour) % 24 + days := int64(d / (time.Hour * 24)) + + return fmt.Sprintf("%d.%02d:%02d:%02d.%07d", days, hours, minutes, seconds, ticks) +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/diagnostics.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/diagnostics.go new file mode 100644 index 000000000..7d609719e --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/diagnostics.go @@ -0,0 +1,64 @@ +package appinsights + +import "fmt" + +type DiagnosticsMessageWriter interface { + Write(string) + appendListener(*diagnosticsMessageListener) +} + +type diagnosticsMessageWriter struct { + listeners []chan string +} + +type DiagnosticsMessageProcessor func(string) + +type DiagnosticsMessageListener interface { + ProcessMessages(DiagnosticsMessageProcessor) +} + +type diagnosticsMessageListener struct { + channel chan string +} + +var diagnosticsWriter *diagnosticsMessageWriter = &diagnosticsMessageWriter{ + listeners: make([]chan string, 0), +} + +func NewDiagnosticsMessageListener() DiagnosticsMessageListener { + listener := &diagnosticsMessageListener{ + channel: make(chan string), + } + + diagnosticsWriter.appendListener(listener) + + return listener +} + +func (writer *diagnosticsMessageWriter) appendListener(listener *diagnosticsMessageListener) { + writer.listeners = append(writer.listeners, listener.channel) +} + +func (writer *diagnosticsMessageWriter) Write(message string) { + for _, c := range writer.listeners { + c <- message + } +} + +func (writer *diagnosticsMessageWriter) Printf(message string, args ...interface{}) { + // Don't bother with Sprintf if nobody is listening + if writer.hasListeners() { + writer.Write(fmt.Sprintf(message, args...)) + } +} + +func (writer *diagnosticsMessageWriter) hasListeners() bool { + return len(writer.listeners) > 0 +} + +func (listener *diagnosticsMessageListener) ProcessMessages(process DiagnosticsMessageProcessor) { + for { + message := <-listener.channel + process(message) + } +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go new file mode 100644 index 000000000..272170187 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/inmemorychannel.go @@ -0,0 +1,408 @@ +package appinsights + +import ( + "sync" + "time" + + "code.cloudfoundry.org/clock" +) + +var ( + submit_retries = []time.Duration{time.Duration(10 * time.Second), time.Duration(30 * time.Second), time.Duration(60 * time.Second)} +) + +type TelemetryBufferItems []Telemetry + +type InMemoryChannel struct { + endpointAddress string + isDeveloperMode bool + collectChan chan Telemetry + controlChan chan *inMemoryChannelControl + batchSize int + batchInterval time.Duration + waitgroup sync.WaitGroup + throttle *throttleManager + transmitter transmitter +} + +type inMemoryChannelControl struct { + // If true, flush the buffer. + flush bool + + // If true, stop listening on the channel. (Flush is required if any events are to be sent) + stop bool + + // If stopping and flushing, this specifies whether to retry submissions on error. + retry bool + + // If retrying, what is the max time to wait before finishing up? + timeout time.Duration + + // If specified, a message will be sent on this channel when all pending telemetry items have been submitted + callback chan struct{} +} + +func NewInMemoryChannel(config *TelemetryConfiguration) *InMemoryChannel { + channel := &InMemoryChannel{ + endpointAddress: config.EndpointUrl, + collectChan: make(chan Telemetry), + controlChan: make(chan *inMemoryChannelControl), + batchSize: config.MaxBatchSize, + batchInterval: config.MaxBatchInterval, + throttle: newThrottleManager(), + transmitter: newTransmitter(config.EndpointUrl), + } + + go channel.acceptLoop() + + return channel +} + +func (channel *InMemoryChannel) EndpointAddress() string { + return channel.endpointAddress +} + +func (channel *InMemoryChannel) Send(item Telemetry) { + if item != nil && channel.collectChan != nil { + channel.collectChan <- item + } +} + +func (channel *InMemoryChannel) Flush() { + if channel.controlChan != nil { + channel.controlChan <- &inMemoryChannelControl{ + flush: true, + } + } +} + +func (channel *InMemoryChannel) Stop() { + if channel.controlChan != nil { + channel.controlChan <- &inMemoryChannelControl{ + stop: true, + } + } +} + +func (channel *InMemoryChannel) IsThrottled() bool { + return channel.throttle != nil && channel.throttle.IsThrottled() +} + +func (channel *InMemoryChannel) Close(timeout ...time.Duration) <-chan struct{} { + if channel.controlChan != nil { + callback := make(chan struct{}) + + ctl := &inMemoryChannelControl{ + stop: true, + flush: true, + retry: false, + callback: callback, + } + + if len(timeout) > 0 { + ctl.retry = true + ctl.timeout = timeout[0] + } + + channel.controlChan <- ctl + + return callback + } else { + return nil + } +} + +func (channel *InMemoryChannel) acceptLoop() { + channelState := newInMemoryChannelState(channel) + + for !channelState.stopping { + channelState.start() + } + + channelState.stop() +} + +// Data shared between parts of a channel +type inMemoryChannelState struct { + channel *InMemoryChannel + stopping bool + buffer TelemetryBufferItems + retry bool + retryTimeout time.Duration + callback chan struct{} + timer clock.Timer +} + +func newInMemoryChannelState(channel *InMemoryChannel) *inMemoryChannelState { + return &inMemoryChannelState{ + channel: channel, + buffer: make(TelemetryBufferItems, 0, 16), + stopping: false, + timer: currentClock.NewTimer(channel.batchInterval), + } +} + +// Part of channel accept loop: Initialize buffer and accept first message, handle controls. +func (state *inMemoryChannelState) start() bool { + if len(state.buffer) > 16 { + // Start out with the size of the previous buffer + state.buffer = make(TelemetryBufferItems, 0, cap(state.buffer)) + } else if len(state.buffer) > 0 { + // Start out with at least 16 slots + state.buffer = make(TelemetryBufferItems, 0, 16) + } + + // Wait for an event + select { + case event := <-state.channel.collectChan: + if event == nil { + // Channel closed? Not intercepted by Send()? + panic("Received nil event") + } + + state.buffer = append(state.buffer, event) + + case ctl := <-state.channel.controlChan: + // The buffer is empty, so there would be no point in flushing + state.channel.signalWhenDone(ctl.callback) + + if ctl.stop { + state.stopping = true + return false + } + } + + if len(state.buffer) == 0 { + return true + } + + return state.waitToSend() +} + +// Part of channel accept loop: Wait for buffer to fill, timeout to expire, or flush +func (state *inMemoryChannelState) waitToSend() bool { + // Things that are used by the sender if we receive a control message + state.retryTimeout = 0 + state.retry = true + state.callback = nil + + // Delay until timeout passes or buffer fills up + state.timer.Reset(state.channel.batchInterval) + for { + select { + case event := <-state.channel.collectChan: + if event == nil { + // Channel closed? Not intercepted by Send()? + panic("Received nil event") + } + + state.buffer = append(state.buffer, event) + if len(state.buffer) >= state.channel.batchSize { + return state.send() + } + + case ctl := <-state.channel.controlChan: + if ctl.stop { + state.stopping = true + state.retry = ctl.retry + if !ctl.flush { + // No flush? Just exit. + state.channel.signalWhenDone(ctl.callback) + return false + } + } + + if ctl.flush { + state.retryTimeout = ctl.timeout + state.callback = ctl.callback + return state.send() + } + + case _ = <-state.timer.C(): + // Timeout expired + return state.send() + } + } +} + +// Part of channel accept loop: Check and wait on throttle, submit pending telemetry +func (state *inMemoryChannelState) send() bool { + // Hold up transmission if we're being throttled + if !state.stopping && state.channel.throttle.IsThrottled() { + if !state.waitThrottle() { + // Stopped + return false + } + } + + // Send + if len(state.buffer) > 0 { + state.channel.waitgroup.Add(1) + + // If we have a callback, wait on the waitgroup now that it's + // incremented. + state.channel.signalWhenDone(state.callback) + + go func(buffer TelemetryBufferItems, retry bool, retryTimeout time.Duration) { + defer state.channel.waitgroup.Done() + state.channel.transmitRetry(buffer, retry, retryTimeout) + }(state.buffer, state.retry, state.retryTimeout) + } else if state.callback != nil { + state.channel.signalWhenDone(state.callback) + } + + return true +} + +// Part of channel accept loop: Wait for throttle to expire while dropping messages +func (state *inMemoryChannelState) waitThrottle() bool { + // Channel is currently throttled. Once the buffer fills, messages will + // be lost... If we're exiting, then we'll just try to submit anyway. That + // request may be throttled and transmitRetry will perform the backoff correctly. + + diagnosticsWriter.Write("Channel is throttled, events may be dropped.") + throttleDone := state.channel.throttle.NotifyWhenReady() + dropped := 0 + + defer diagnosticsWriter.Printf("Channel dropped %d events while throttled", dropped) + + for { + select { + case <-throttleDone: + close(throttleDone) + return true + + case event := <-state.channel.collectChan: + // If there's still room in the buffer, then go ahead and add it. + if len(state.buffer) < state.channel.batchSize { + state.buffer = append(state.buffer, event) + } else { + if dropped == 0 { + diagnosticsWriter.Write("Buffer is full, dropping further events.") + } + + dropped++ + } + + case ctl := <-state.channel.controlChan: + if ctl.stop { + state.stopping = true + state.retry = ctl.retry + if !ctl.flush { + state.channel.signalWhenDone(ctl.callback) + return false + } else { + // Make an exception when stopping + return true + } + } + + // Cannot flush + // TODO: Figure out what to do about callback? + if ctl.flush { + state.channel.signalWhenDone(ctl.callback) + } + } + } +} + +// Part of channel accept loop: Clean up and close telemetry channel +func (state *inMemoryChannelState) stop() { + close(state.channel.collectChan) + close(state.channel.controlChan) + + state.channel.collectChan = nil + state.channel.controlChan = nil + + // Throttle can't close until transmitters are done using it. + state.channel.waitgroup.Wait() + state.channel.throttle.Stop() + + state.channel.throttle = nil +} + +func (channel *InMemoryChannel) transmitRetry(items TelemetryBufferItems, retry bool, retryTimeout time.Duration) { + payload := items.serialize() + retryTimeRemaining := retryTimeout + + for _, wait := range submit_retries { + result, err := channel.transmitter.Transmit(payload, items) + if err == nil && result != nil && result.IsSuccess() { + return + } + + if !retry { + diagnosticsWriter.Write("Refusing to retry telemetry submission (retry==false)") + return + } + + // Check for success, determine if we need to retry anything + if result != nil { + if result.CanRetry() { + // Filter down to failed items + payload, items = result.GetRetryItems(payload, items) + if len(payload) == 0 || len(items) == 0 { + return + } + } else { + diagnosticsWriter.Write("Cannot retry telemetry submission") + return + } + + // Check for throttling + if result.IsThrottled() { + if result.retryAfter != nil { + diagnosticsWriter.Printf("Channel is throttled until %s", *result.retryAfter) + channel.throttle.RetryAfter(*result.retryAfter) + } else { + // TODO: Pick a time + } + } + } + + if retryTimeout > 0 { + // We're on a time schedule here. Make sure we don't try longer + // than we have been allowed. + if retryTimeRemaining < wait { + // One more chance left -- we'll wait the max time we can + // and then retry on the way out. + currentClock.Sleep(retryTimeRemaining) + break + } else { + // Still have time left to go through the rest of the regular + // retry schedule + retryTimeRemaining -= wait + } + } + + diagnosticsWriter.Printf("Waiting %s to retry submission", wait) + currentClock.Sleep(wait) + + // Wait if the channel is throttled and we're not on a schedule + if channel.IsThrottled() && retryTimeout == 0 { + diagnosticsWriter.Printf("Channel is throttled; extending wait time.") + ch := channel.throttle.NotifyWhenReady() + result := <-ch + close(ch) + + if !result { + return + } + } + } + + // One final try + _, err := channel.transmitter.Transmit(payload, items) + if err != nil { + diagnosticsWriter.Write("Gave up transmitting payload; exhausted retries") + } +} + +func (channel *InMemoryChannel) signalWhenDone(callback chan struct{}) { + if callback != nil { + go func() { + channel.waitgroup.Wait() + close(callback) + }() + } +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/jsonserializer.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/jsonserializer.go new file mode 100644 index 000000000..326812310 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/jsonserializer.go @@ -0,0 +1,45 @@ +package appinsights + +import ( + "bytes" + "encoding/json" + "fmt" + "time" +) + +func (items TelemetryBufferItems) serialize() []byte { + var result bytes.Buffer + encoder := json.NewEncoder(&result) + + for _, item := range items { + end := result.Len() + if err := encoder.Encode(prepare(item)); err != nil { + diagnosticsWriter.Write(fmt.Sprintf("Telemetry item failed to serialize: %s", err.Error())) + result.Truncate(end) + } + } + + return result.Bytes() +} + +func prepare(item Telemetry) *envelope { + data := &data{ + BaseType: item.baseTypeName() + "Data", + BaseData: item.baseData(), + } + + context := item.Context() + + envelope := &envelope{ + Name: "Microsoft.ApplicationInsights." + item.baseTypeName(), + Time: item.Timestamp().Format(time.RFC3339), + IKey: context.InstrumentationKey(), + Data: data, + } + + if tcontext, ok := context.(*telemetryContext); ok { + envelope.Tags = tcontext.tags + } + + return envelope +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/package.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/package.go new file mode 100644 index 000000000..f498c1be7 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/package.go @@ -0,0 +1,8 @@ +// Package appinsights provides an interface to submit telemetry to Application Insights. +// See more at https://azure.microsoft.com/en-us/services/application-insights/ +package appinsights + +const ( + sdkName = "go" + Version = "0.3.1-pre" +) diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrychannel.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrychannel.go new file mode 100644 index 000000000..01b3914a0 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrychannel.go @@ -0,0 +1,47 @@ +package appinsights + +import "time" + +// Implementations of TelemetryChannel are responsible for queueing and +// periodically submitting telemetry items. +type TelemetryChannel interface { + // The address of the endpoint to which telemetry is sent + EndpointAddress() string + + // Queues a single telemetry item + Send(Telemetry) + + // Forces the current queue to be sent + Flush() + + // Tears down the submission goroutines, closes internal channels. + // Any telemetry waiting to be sent is discarded. Further calls to + // Send() have undefined behavior. This is a more abrupt version of + // Close(). + Stop() + + // Returns true if this channel has been throttled by the data + // collector. + IsThrottled() bool + + // Flushes and tears down the submission goroutine and closes + // internal channels. Returns a channel that is closed when all + // pending telemetry items have been submitted and it is safe to + // shut down without losing telemetry. + // + // If retryTimeout is specified and non-zero, then failed + // submissions will be retried until one succeeds or the timeout + // expires, whichever occurs first. A retryTimeout of zero + // indicates that failed submissions will be retried as usual. An + // omitted retryTimeout indicates that submissions should not be + // retried if they fail. + // + // Note that the returned channel may not be closed before + // retryTimeout even if it is specified. This is because + // retryTimeout only applies to the latest telemetry buffer. This + // may be typical for applications that submit a large amount of + // telemetry or are prone to being throttled. When exiting, you + // should select on the result channel and your own timer to avoid + // long delays. + Close(retryTimeout ...time.Duration) <-chan struct{} +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrycontext.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrycontext.go new file mode 100644 index 000000000..7a081e033 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/telemetrycontext.go @@ -0,0 +1,400 @@ +package appinsights + +import ( + "os" + "runtime" + "strconv" +) + +type TelemetryContext interface { + InstrumentationKey() string + loadDeviceContext() + + Component() ComponentContext + Device() DeviceContext + Cloud() CloudContext + Session() SessionContext + User() UserContext + Operation() OperationContext + Location() LocationContext +} + +type telemetryContext struct { + iKey string + tags map[string]string +} + +type ComponentContext interface { + GetVersion() string + SetVersion(string) +} + +type DeviceContext interface { + GetType() string + SetType(string) + GetId() string + SetId(string) + GetOperatingSystem() string + SetOperatingSystem(string) + GetOemName() string + SetOemName(string) + GetModel() string + SetModel(string) + GetNetworkType() string + SetNetworkType(string) + GetScreenResolution() string + SetScreenResolution(string) + GetLanguage() string + SetLanguage(string) +} + +type CloudContext interface { + GetRoleName() string + SetRoleName(string) + GetRoleInstance() string + SetRoleInstance(string) +} + +type SessionContext interface { + GetId() string + SetId(string) + GetIsFirst() bool + SetIsFirst(bool) +} + +type UserContext interface { + GetId() string + SetId(string) + GetAccountId() string + SetAccountId(string) + GetUserAgent() string + SetUserAgent(string) + GetAuthenticatedUserId() string + SetAuthenticatedUserId(string) +} + +type OperationContext interface { + GetId() string + SetId(string) + GetParentId() string + SetParentId(string) + GetCorrelationVector() string + SetCorrelationVector(string) + GetName() string + SetName(string) + GetSyntheticSource() string + SetSyntheticSource(string) +} + +type LocationContext interface { + GetIp() string + SetIp(string) +} + +func NewItemTelemetryContext() TelemetryContext { + context := &telemetryContext{ + tags: make(map[string]string), + } + return context +} + +func NewClientTelemetryContext() TelemetryContext { + context := &telemetryContext{ + tags: make(map[string]string), + } + context.loadDeviceContext() + context.loadInternalContext() + return context +} + +func (context *telemetryContext) InstrumentationKey() string { + return context.iKey +} + +func (context *telemetryContext) loadDeviceContext() { + hostname, err := os.Hostname() + if err == nil { + context.tags[DeviceId] = hostname + context.tags[DeviceMachineName] = hostname + context.tags[DeviceRoleInstance] = hostname + } + context.tags[DeviceOS] = runtime.GOOS +} + +func (context *telemetryContext) loadInternalContext() { + context.tags[InternalSdkVersion] = sdkName + ":" + Version +} + +func (context *telemetryContext) Component() ComponentContext { + return &componentContext{context: context} +} + +func (context *telemetryContext) Device() DeviceContext { + return &deviceContext{context: context} +} + +func (context *telemetryContext) Cloud() CloudContext { + return &cloudContext{context: context} +} + +func (context *telemetryContext) Session() SessionContext { + return &sessionContext{context: context} +} + +func (context *telemetryContext) User() UserContext { + return &userContext{context: context} +} + +func (context *telemetryContext) Operation() OperationContext { + return &operationContext{context: context} +} + +func (context *telemetryContext) Location() LocationContext { + return &locationContext{context: context} +} + +func (context *telemetryContext) getTagString(key ContextTagKeys) string { + if val, ok := context.tags[string(key)]; ok { + return val + } + + return "" +} + +func (context *telemetryContext) setTagString(key ContextTagKeys, value string) { + if value != "" { + context.tags[string(key)] = value + } else { + delete(context.tags, string(key)) + } +} + +func (context *telemetryContext) getTagBool(key ContextTagKeys) bool { + if val, ok := context.tags[string(key)]; ok { + if b, err := strconv.ParseBool(val); err != nil { + return b + } + } + + return false +} + +func (context *telemetryContext) setTagBool(key ContextTagKeys, value bool) { + if value { + context.tags[string(key)] = "true" + } else { + delete(context.tags, string(key)) + } +} + +type componentContext struct { + context *telemetryContext +} + +type deviceContext struct { + context *telemetryContext +} + +type cloudContext struct { + context *telemetryContext +} + +type sessionContext struct { + context *telemetryContext +} + +type userContext struct { + context *telemetryContext +} + +type operationContext struct { + context *telemetryContext +} + +type locationContext struct { + context *telemetryContext +} + +func (context *componentContext) GetVersion() string { + return context.context.getTagString(ApplicationVersion) +} + +func (context *componentContext) SetVersion(value string) { + context.context.setTagString(ApplicationVersion, value) +} + +func (context *deviceContext) GetType() string { + return context.context.getTagString(DeviceType) +} + +func (context *deviceContext) SetType(value string) { + context.context.setTagString(DeviceType, value) +} + +func (context *deviceContext) GetId() string { + return context.context.getTagString(DeviceId) +} + +func (context *deviceContext) SetId(value string) { + context.context.setTagString(DeviceId, value) +} + +func (context *deviceContext) GetOperatingSystem() string { + return context.context.getTagString(DeviceOSVersion) +} + +func (context *deviceContext) SetOperatingSystem(value string) { + context.context.setTagString(DeviceOSVersion, value) +} + +func (context *deviceContext) GetOemName() string { + return context.context.getTagString(DeviceOEMName) +} + +func (context *deviceContext) SetOemName(value string) { + context.context.setTagString(DeviceOEMName, value) +} + +func (context *deviceContext) GetModel() string { + return context.context.getTagString(DeviceModel) +} + +func (context *deviceContext) SetModel(value string) { + context.context.setTagString(DeviceModel, value) +} + +func (context *deviceContext) GetNetworkType() string { + return context.context.getTagString(DeviceNetwork) +} + +func (context *deviceContext) SetNetworkType(value string) { + context.context.setTagString(DeviceNetwork, value) +} + +func (context *deviceContext) GetScreenResolution() string { + return context.context.getTagString(DeviceScreenResolution) +} + +func (context *deviceContext) SetScreenResolution(value string) { + context.context.setTagString(DeviceScreenResolution, value) +} + +func (context *deviceContext) GetLanguage() string { + return context.context.getTagString(DeviceLanguage) +} + +func (context *deviceContext) SetLanguage(value string) { + context.context.setTagString(DeviceLanguage, value) +} + +func (context *cloudContext) GetRoleName() string { + return context.context.getTagString(CloudRole) +} + +func (context *cloudContext) SetRoleName(value string) { + context.context.setTagString(CloudRole, value) +} + +func (context *cloudContext) GetRoleInstance() string { + return context.context.getTagString(CloudRoleInstance) +} + +func (context *cloudContext) SetRoleInstance(value string) { + context.context.setTagString(CloudRoleInstance, value) +} + +func (context *sessionContext) GetId() string { + return context.context.getTagString(SessionId) +} + +func (context *sessionContext) SetId(value string) { + context.context.setTagString(SessionId, value) +} + +func (context *sessionContext) GetIsFirst() bool { + return context.context.getTagBool(SessionIsFirst) +} + +func (context *sessionContext) SetIsFirst(value bool) { + context.context.setTagBool(SessionIsFirst, value) +} + +func (context *userContext) GetId() string { + return context.context.getTagString(UserId) +} + +func (context *userContext) SetId(value string) { + context.context.setTagString(UserId, value) +} + +func (context *userContext) GetAccountId() string { + return context.context.getTagString(UserAccountId) +} + +func (context *userContext) SetAccountId(value string) { + context.context.setTagString(UserAccountId, value) +} + +func (context *userContext) GetUserAgent() string { + return context.context.getTagString(UserAgent) +} + +func (context *userContext) SetUserAgent(value string) { + context.context.setTagString(UserAgent, value) +} + +func (context *userContext) GetAuthenticatedUserId() string { + return context.context.getTagString(UserAuthUserId) +} + +func (context *userContext) SetAuthenticatedUserId(value string) { + context.context.setTagString(UserAuthUserId, value) +} + +func (context *operationContext) GetId() string { + return context.context.getTagString(OperationId) +} + +func (context *operationContext) SetId(value string) { + context.context.setTagString(OperationId, value) +} + +func (context *operationContext) GetParentId() string { + return context.context.getTagString(OperationParentId) +} + +func (context *operationContext) SetParentId(value string) { + context.context.setTagString(OperationParentId, value) +} + +func (context *operationContext) GetCorrelationVector() string { + return context.context.getTagString(OperationCorrelationVector) +} + +func (context *operationContext) SetCorrelationVector(value string) { + context.context.setTagString(OperationCorrelationVector, value) +} + +func (context *operationContext) GetName() string { + return context.context.getTagString(OperationName) +} + +func (context *operationContext) SetName(value string) { + context.context.setTagString(OperationName, value) +} + +func (context *operationContext) GetSyntheticSource() string { + return context.context.getTagString(OperationSyntheticSource) +} + +func (context *operationContext) SetSyntheticSource(value string) { + context.context.setTagString(OperationSyntheticSource, value) +} + +func (context *locationContext) GetIp() string { + return context.context.getTagString(LocationIp) +} + +func (context *locationContext) SetIp(value string) { + context.context.setTagString(LocationIp, value) +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/throttle.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/throttle.go new file mode 100644 index 000000000..2c85800d1 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/throttle.go @@ -0,0 +1,144 @@ +package appinsights + +import ( + "time" +) + +type throttleManager struct { + msgs chan *throttleMessage +} + +type throttleMessage struct { + query bool + wait bool + throttle bool + stop bool + timestamp time.Time + result chan bool +} + +func newThrottleManager() *throttleManager { + result := &throttleManager{ + msgs: make(chan *throttleMessage), + } + + go result.run() + return result +} + +func (throttle *throttleManager) RetryAfter(t time.Time) { + throttle.msgs <- &throttleMessage{ + throttle: true, + timestamp: t, + } +} + +func (throttle *throttleManager) IsThrottled() bool { + ch := make(chan bool) + throttle.msgs <- &throttleMessage{ + query: true, + result: ch, + } + + result := <-ch + close(ch) + return result +} + +func (throttle *throttleManager) NotifyWhenReady() chan bool { + result := make(chan bool, 1) + throttle.msgs <- &throttleMessage{ + wait: true, + result: result, + } + + return result +} + +func (throttle *throttleManager) Stop() { + result := make(chan bool) + throttle.msgs <- &throttleMessage{ + stop: true, + result: result, + } + + <-result + close(result) +} + +func (throttle *throttleManager) run() { + for { + throttledUntil, ok := throttle.waitForThrottle() + if !ok { + break + } + + if !throttle.waitForReady(throttledUntil) { + break + } + } + + close(throttle.msgs) +} + +func (throttle *throttleManager) waitForThrottle() (time.Time, bool) { + for { + msg := <-throttle.msgs + if msg.query { + msg.result <- false + } else if msg.wait { + msg.result <- true + } else if msg.stop { + return time.Time{}, false + } else if msg.throttle { + return msg.timestamp, true + } + } +} + +func (throttle *throttleManager) waitForReady(throttledUntil time.Time) bool { + duration := throttledUntil.Sub(currentClock.Now()) + if duration <= 0 { + return true + } + + var notify []chan bool + + // --- Throttled and waiting --- + t := currentClock.NewTimer(duration) + + for { + select { + case <-t.C(): + for _, n := range notify { + n <- true + } + + return true + case msg := <-throttle.msgs: + if msg.query { + msg.result <- true + } else if msg.wait { + notify = append(notify, msg.result) + } else if msg.stop { + for _, n := range notify { + n <- false + } + + msg.result <- true + + return false + } else if msg.throttle { + if msg.timestamp.After(throttledUntil) { + throttledUntil = msg.timestamp + + if !t.Stop() { + <-t.C() + } + + t.Reset(throttledUntil.Sub(currentClock.Now())) + } + } + } + } +} diff --git a/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/transmitter.go b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/transmitter.go new file mode 100644 index 000000000..0aac35265 --- /dev/null +++ b/vendor/github.com/Microsoft/ApplicationInsights-Go/appinsights/transmitter.go @@ -0,0 +1,237 @@ +package appinsights + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "io/ioutil" + "net/http" + "sort" + "time" +) + +type transmitter interface { + Transmit(payload []byte, items TelemetryBufferItems) (*transmissionResult, error) +} + +type httpTransmitter struct { + endpoint string +} + +type transmissionResult struct { + statusCode int + retryAfter *time.Time + response *backendResponse +} + +// Structures returned by data collector +type backendResponse struct { + ItemsReceived int `json:"itemsReceived"` + ItemsAccepted int `json:"itemsAccepted"` + Errors itemTransmissionResults `json:"errors"` +} + +// This needs to be its own type because it implements sort.Interface +type itemTransmissionResults []*itemTransmissionResult + +type itemTransmissionResult struct { + Index int `json:"index"` + StatusCode int `json:"statusCode"` + Message string `json:"message"` +} + +const ( + successResponse = 200 + partialSuccessResponse = 206 + requestTimeoutResponse = 408 + tooManyRequestsResponse = 429 + tooManyRequestsOverExtendedTimeResponse = 439 + errorResponse = 500 + serviceUnavailableResponse = 503 +) + +func newTransmitter(endpointAddress string) transmitter { + return &httpTransmitter{endpointAddress} +} + +func (transmitter *httpTransmitter) Transmit(payload []byte, items TelemetryBufferItems) (*transmissionResult, error) { + diagnosticsWriter.Printf("----------- Transmitting %d items ---------", len(items)) + startTime := time.Now() + + // Compress the payload + var postBody bytes.Buffer + gzipWriter := gzip.NewWriter(&postBody) + if _, err := gzipWriter.Write(payload); err != nil { + diagnosticsWriter.Printf("Failed to compress the payload: %s", err.Error()) + gzipWriter.Close() + return nil, err + } + + gzipWriter.Close() + + req, err := http.NewRequest("POST", transmitter.endpoint, &postBody) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("Content-Type", "application/x-json-stream") + req.Header.Set("Accept-Encoding", "gzip, deflate") + + client := http.DefaultClient + resp, err := client.Do(req) + if err != nil { + diagnosticsWriter.Printf("Failed to transmit telemetry: %s", err.Error()) + return nil, err + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + diagnosticsWriter.Printf("Failed to read response from server: %s", err.Error()) + return nil, err + } + + duration := time.Since(startTime) + + result := &transmissionResult{statusCode: resp.StatusCode} + + // Grab Retry-After header + if retryAfterValue, ok := resp.Header[http.CanonicalHeaderKey("Retry-After")]; ok && len(retryAfterValue) == 1 { + if retryAfterTime, err := time.Parse(time.RFC1123, retryAfterValue[0]); err == nil { + result.retryAfter = &retryAfterTime + } + } + + // Parse body, if possible + response := &backendResponse{} + if err := json.Unmarshal(body, &response); err == nil { + result.response = response + } + + // Write diagnostics + if diagnosticsWriter.hasListeners() { + diagnosticsWriter.Printf("Telemetry transmitted in %s", duration) + diagnosticsWriter.Printf("Response: %d", result.statusCode) + if result.response != nil { + diagnosticsWriter.Printf("Items accepted/received: %d/%d", result.response.ItemsAccepted, result.response.ItemsReceived) + if len(result.response.Errors) > 0 { + diagnosticsWriter.Printf("Errors:") + for _, err := range result.response.Errors { + if err.Index < len(items) { + diagnosticsWriter.Printf("#%d - %d %s", err.Index, err.StatusCode, err.Message) + diagnosticsWriter.Printf("Telemetry item:\n\t%s", err.Index, string(items[err.Index:err.Index+1].serialize())) + } + } + } + } + } + + return result, nil +} + +func (result *transmissionResult) IsSuccess() bool { + return result.statusCode == successResponse || + // Partial response but all items accepted + (result.statusCode == partialSuccessResponse && + result.response != nil && + result.response.ItemsReceived == result.response.ItemsAccepted) +} + +func (result *transmissionResult) IsFailure() bool { + return result.statusCode != successResponse && result.statusCode != partialSuccessResponse +} + +func (result *transmissionResult) CanRetry() bool { + if result.IsSuccess() { + return false + } + + return result.statusCode == partialSuccessResponse || + result.retryAfter != nil || + (result.statusCode == requestTimeoutResponse || + result.statusCode == serviceUnavailableResponse || + result.statusCode == errorResponse || + result.statusCode == tooManyRequestsResponse || + result.statusCode == tooManyRequestsOverExtendedTimeResponse) +} + +func (result *transmissionResult) IsPartialSuccess() bool { + return result.statusCode == partialSuccessResponse && + result.response != nil && + result.response.ItemsReceived != result.response.ItemsAccepted +} + +func (result *transmissionResult) IsThrottled() bool { + return result.statusCode == tooManyRequestsResponse || + result.statusCode == tooManyRequestsOverExtendedTimeResponse || + result.retryAfter != nil +} + +func (result *itemTransmissionResult) CanRetry() bool { + return result.StatusCode == requestTimeoutResponse || + result.StatusCode == serviceUnavailableResponse || + result.StatusCode == errorResponse || + result.StatusCode == tooManyRequestsResponse || + result.StatusCode == tooManyRequestsOverExtendedTimeResponse +} + +func (result *transmissionResult) GetRetryItems(payload []byte, items TelemetryBufferItems) ([]byte, TelemetryBufferItems) { + if result.statusCode == partialSuccessResponse && result.response != nil { + // Make sure errors are ordered by index + sort.Sort(result.response.Errors) + + var resultPayload bytes.Buffer + resultItems := make(TelemetryBufferItems, 0) + ptr := 0 + idx := 0 + + // Find each retryable error + for _, responseResult := range result.response.Errors { + if responseResult.CanRetry() { + // Advance ptr to start of desired line + for ; idx < responseResult.Index && ptr < len(payload); ptr++ { + if payload[ptr] == '\n' { + idx++ + } + } + + startPtr := ptr + + // Read to end of line + for ; idx == responseResult.Index && ptr < len(payload); ptr++ { + if payload[ptr] == '\n' { + idx++ + } + } + + // Copy item into output buffer + resultPayload.Write(payload[startPtr:ptr]) + resultItems = append(resultItems, items[responseResult.Index]) + } + } + + return resultPayload.Bytes(), resultItems + } else if result.CanRetry() { + return payload, items + } else { + return payload[:0], items[:0] + } +} + +// sort.Interface implementation for Errors[] list + +func (results itemTransmissionResults) Len() int { + return len(results) +} + +func (results itemTransmissionResults) Less(i, j int) bool { + return results[i].Index < results[j].Index +} + +func (results itemTransmissionResults) Swap(i, j int) { + tmp := results[i] + results[i] = results[j] + results[j] = tmp +} diff --git a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go index ebdb89f69..8ae139884 100644 --- a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go +++ b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go @@ -8,12 +8,14 @@ import ( "time" "github.com/cenk/backoff" + "github.com/containous/flaeg" "github.com/containous/traefik/job" "github.com/containous/traefik/log" "github.com/containous/traefik/provider" "github.com/containous/traefik/provider/label" "github.com/containous/traefik/safe" "github.com/containous/traefik/types" + "github.com/jjcollinge/logrus-appinsights" sf "github.com/jjcollinge/servicefabric" ) @@ -31,13 +33,23 @@ type Provider struct { provider.BaseProvider `mapstructure:",squash"` ClusterManagementURL string `description:"Service Fabric API endpoint"` APIVersion string `description:"Service Fabric API version" export:"true"` - RefreshSeconds int `description:"Polling interval (in seconds)" export:"true"` + RefreshSeconds flaeg.Duration `description:"Polling interval (in seconds)" export:"true"` TLS *types.ClientTLS `description:"Enable TLS support" export:"true"` + AppInsightsClientName string `description:"The client name, Identifies the cloud instance"` + AppInsightsKey string `description:"Application Insights Instrumentation Key"` + AppInsightsBatchSize int `description:"Number of trace lines per batch, optional"` + AppInsightsInterval flaeg.Duration `description:"The interval for sending data to Application Insights, optional"` +} + +// Init the provider +func (p *Provider) Init(constraints types.Constraints) error { + p.BaseProvider.Init(constraints) + return nil } // Provide allows the ServiceFabric provider to provide configurations to traefik // using the given configuration channel. -func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { +func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { if p.APIVersion == "" { p.APIVersion = sf.DefaultAPIVersion } @@ -53,10 +65,20 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s } if p.RefreshSeconds <= 0 { - p.RefreshSeconds = 10 + p.RefreshSeconds = flaeg.Duration(10 * time.Second) } - return p.updateConfig(configurationChan, pool, sfClient, time.Duration(p.RefreshSeconds)*time.Second) + if p.AppInsightsClientName != "" && p.AppInsightsKey != "" { + if p.AppInsightsBatchSize == 0 { + p.AppInsightsBatchSize = 10 + } + if p.AppInsightsInterval == 0 { + p.AppInsightsInterval = flaeg.Duration(5 * time.Second) + } + createAppInsightsHook(p.AppInsightsClientName, p.AppInsightsKey, p.AppInsightsBatchSize, p.AppInsightsInterval) + } + + return p.updateConfig(configurationChan, pool, sfClient, time.Duration(p.RefreshSeconds)) } func (p *Provider) updateConfig(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, sfClient sfClient, pollInterval time.Duration) error { @@ -263,3 +285,18 @@ func getLabels(sfClient sfClient, service *sf.ServiceItem, app *sf.ApplicationIt } return labels, nil } + +func createAppInsightsHook(appInsightsClientName string, instrumentationKey string, maxBatchSize int, interval flaeg.Duration) { + hook, err := logrus_appinsights.New(appInsightsClientName, logrus_appinsights.Config{ + InstrumentationKey: instrumentationKey, + MaxBatchSize: maxBatchSize, // optional + MaxBatchInterval: time.Duration(interval), // optional + }) + if err != nil || hook == nil { + panic(err) + } + + // ignore fields + hook.AddIgnore("private") + log.AddHook(hook) +} diff --git a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_config.go b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_config.go index 8c6b1e90f..b3baa8346 100644 --- a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_config.go +++ b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_config.go @@ -76,7 +76,7 @@ func getDefaultEndpoint(instance replicaInstance) string { id, data := instance.GetReplicaData() endpoint, err := getReplicaDefaultEndpoint(data) if err != nil { - log.Warnf("No default endpoint for replica %s in service %s endpointData: %s", id, data.Address) + log.Warnf("No default endpoint for replica %s in service %s endpointData: %s", id, data.Address, err) return "" } return endpoint diff --git a/vendor/github.com/jjcollinge/logrus-appinsights/LICENSE b/vendor/github.com/jjcollinge/logrus-appinsights/LICENSE new file mode 100644 index 000000000..73897a54d --- /dev/null +++ b/vendor/github.com/jjcollinge/logrus-appinsights/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2018 jjcollinge + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/jjcollinge/logrus-appinsights/config.go b/vendor/github.com/jjcollinge/logrus-appinsights/config.go new file mode 100644 index 000000000..5e297a559 --- /dev/null +++ b/vendor/github.com/jjcollinge/logrus-appinsights/config.go @@ -0,0 +1,11 @@ +package logrus_appinsights + +import "time" + +// Config for Application Insights settings +type Config struct { + InstrumentationKey string + EndpointUrl string + MaxBatchSize int + MaxBatchInterval time.Duration +} diff --git a/vendor/github.com/jjcollinge/logrus-appinsights/hook.go b/vendor/github.com/jjcollinge/logrus-appinsights/hook.go new file mode 100644 index 000000000..f288124cd --- /dev/null +++ b/vendor/github.com/jjcollinge/logrus-appinsights/hook.go @@ -0,0 +1,173 @@ +package logrus_appinsights + +import ( + "encoding/json" + "fmt" + + "github.com/Microsoft/ApplicationInsights-Go/appinsights" + "github.com/sirupsen/logrus" +) + +var defaultLevels = []logrus.Level{ + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + logrus.InfoLevel, +} + +var levelMap = map[logrus.Level]appinsights.SeverityLevel{ + logrus.PanicLevel: appinsights.Critical, + logrus.FatalLevel: appinsights.Critical, + logrus.ErrorLevel: appinsights.Error, + logrus.WarnLevel: appinsights.Warning, + logrus.InfoLevel: appinsights.Information, +} + +// AppInsightsHook is a logrus hook for Application Insights +type AppInsightsHook struct { + client appinsights.TelemetryClient + + async bool + levels []logrus.Level + ignoreFields map[string]struct{} + filters map[string]func(interface{}) interface{} +} + +// New returns an initialised logrus hook for Application Insights +func New(name string, conf Config) (*AppInsightsHook, error) { + if conf.InstrumentationKey == "" { + return nil, fmt.Errorf("InstrumentationKey is required and missing from configuration") + } + telemetryConf := appinsights.NewTelemetryConfiguration(conf.InstrumentationKey) + if conf.MaxBatchSize != 0 { + telemetryConf.MaxBatchSize = conf.MaxBatchSize + } + if conf.MaxBatchInterval != 0 { + telemetryConf.MaxBatchInterval = conf.MaxBatchInterval + } + if conf.EndpointUrl != "" { + telemetryConf.EndpointUrl = conf.EndpointUrl + } + telemetryClient := appinsights.NewTelemetryClientFromConfig(telemetryConf) + if name != "" { + telemetryClient.Context().Cloud().SetRoleName(name) + } + return &AppInsightsHook{ + client: telemetryClient, + levels: defaultLevels, + ignoreFields: make(map[string]struct{}), + filters: make(map[string]func(interface{}) interface{}), + }, nil +} + +// NewWithAppInsightsConfig returns an initialised logrus hook for Application Insights +func NewWithAppInsightsConfig(name string, conf *appinsights.TelemetryConfiguration) (*AppInsightsHook, error) { + if conf == nil { + return nil, fmt.Errorf("Nil configuration provided") + } + if conf.InstrumentationKey == "" { + return nil, fmt.Errorf("InstrumentationKey is required in configuration") + } + telemetryClient := appinsights.NewTelemetryClientFromConfig(conf) + if name != "" { + telemetryClient.Context().Cloud().SetRoleName(name) + } + return &AppInsightsHook{ + client: telemetryClient, + levels: defaultLevels, + ignoreFields: make(map[string]struct{}), + filters: make(map[string]func(interface{}) interface{}), + }, nil +} + +// Levels returns logging level to fire this hook. +func (hook *AppInsightsHook) Levels() []logrus.Level { + return hook.levels +} + +// SetLevels sets logging level to fire this hook. +func (hook *AppInsightsHook) SetLevels(levels []logrus.Level) { + hook.levels = levels +} + +// SetAsync sets async flag for sending logs asynchronously. +// If use this true, Fire() does not return error. +func (hook *AppInsightsHook) SetAsync(async bool) { + hook.async = async +} + +// AddIgnore adds field name to ignore. +func (hook *AppInsightsHook) AddIgnore(name string) { + hook.ignoreFields[name] = struct{}{} +} + +// AddFilter adds a custom filter function. +func (hook *AppInsightsHook) AddFilter(name string, fn func(interface{}) interface{}) { + hook.filters[name] = fn +} + +// Fire is invoked by logrus and sends log data to Application Insights. +func (hook *AppInsightsHook) Fire(entry *logrus.Entry) error { + if !hook.async { + return hook.fire(entry) + } + // async - fire and forget + go hook.fire(entry) + return nil +} + +func (hook *AppInsightsHook) fire(entry *logrus.Entry) error { + trace, err := hook.buildTrace(entry) + if err != nil { + return err + } + hook.client.TrackTraceTelemetry(trace) + return nil +} + +func (hook *AppInsightsHook) buildTrace(entry *logrus.Entry) (*appinsights.TraceTelemetry, error) { + // Add the message as a field if it isn't already + if _, ok := entry.Data["message"]; !ok { + entry.Data["message"] = entry.Message + } + + level := levelMap[entry.Level] + trace := appinsights.NewTraceTelemetry(entry.Message, level) + if trace == nil { + return nil, fmt.Errorf("Could not create telemetry trace with entry %+v", entry) + } + for k, v := range entry.Data { + if _, ok := hook.ignoreFields[k]; ok { + continue + } + if fn, ok := hook.filters[k]; ok { + v = fn(v) // apply custom filter + } else { + v = formatData(v) // use default formatter + } + vStr := fmt.Sprintf("%v", v) + trace.SetProperty(k, vStr) + } + trace.SetProperty("source_level", entry.Level.String()) + trace.SetProperty("source_timestamp", entry.Time.String()) + return trace, nil +} + +// formatData returns value as a suitable format. +func formatData(value interface{}) (formatted interface{}) { + switch value := value.(type) { + case json.Marshaler: + return value + case error: + return value.Error() + case fmt.Stringer: + return value.String() + default: + return value + } +} + +func stringPtr(str string) *string { + return &str +}