From 4a315440247b6f68322b958d766515848d60562a Mon Sep 17 00:00:00 2001 From: Julien Salleyron Date: Mon, 4 May 2020 11:40:46 +0200 Subject: [PATCH] feat: Traefik Pilot integration. Co-authored-by: Ludovic Fernandez --- cmd/traefik/traefik.go | 25 ++- pkg/config/static/static_config.go | 12 ++ pkg/pilot/pilot.go | 246 +++++++++++++++++++++++++++++ pkg/pilot/pilot_test.go | 123 +++++++++++++++ pkg/server/routerfactory.go | 5 +- pkg/server/routerfactory_test.go | 7 +- pkg/version/version.go | 17 +- 7 files changed, 421 insertions(+), 14 deletions(-) create mode 100644 pkg/pilot/pilot.go create mode 100644 pkg/pilot/pilot_test.go diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index 0551ddabe..dcb4a394d 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -18,10 +18,12 @@ import ( "github.com/containous/traefik/v2/pkg/cli" "github.com/containous/traefik/v2/pkg/collector" "github.com/containous/traefik/v2/pkg/config/dynamic" + "github.com/containous/traefik/v2/pkg/config/runtime" "github.com/containous/traefik/v2/pkg/config/static" "github.com/containous/traefik/v2/pkg/log" "github.com/containous/traefik/v2/pkg/metrics" "github.com/containous/traefik/v2/pkg/middlewares/accesslog" + "github.com/containous/traefik/v2/pkg/pilot" "github.com/containous/traefik/v2/pkg/provider/acme" "github.com/containous/traefik/v2/pkg/provider/aggregator" "github.com/containous/traefik/v2/pkg/provider/traefik" @@ -224,7 +226,16 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err metricsRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix())) }) - watcher.AddListener(switchRouter(routerFactory, acmeProviders, serverEntryPointsTCP, serverEntryPointsUDP)) + var aviator *pilot.Pilot + if staticConfiguration.Experimental != nil && staticConfiguration.Experimental.Pilot != nil && + staticConfiguration.Experimental.Pilot.Token != "" { + aviator = pilot.New(staticConfiguration.Experimental.Pilot.Token, routinesPool) + routinesPool.GoCtx(func(ctx context.Context) { + aviator.Tick(ctx) + }) + } + + watcher.AddListener(switchRouter(routerFactory, acmeProviders, serverEntryPointsTCP, serverEntryPointsUDP, aviator)) watcher.AddListener(func(conf dynamic.Configuration) { if metricsRegistry.IsEpEnabled() || metricsRegistry.IsSvcEnabled() { @@ -258,9 +269,12 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err return server.NewServer(routinesPool, serverEntryPointsTCP, serverEntryPointsUDP, watcher, chainBuilder, accessLog), nil } -func switchRouter(routerFactory *server.RouterFactory, acmeProviders []*acme.Provider, serverEntryPointsTCP server.TCPEntryPoints, serverEntryPointsUDP server.UDPEntryPoints) func(conf dynamic.Configuration) { +func switchRouter(routerFactory *server.RouterFactory, acmeProviders []*acme.Provider, serverEntryPointsTCP server.TCPEntryPoints, serverEntryPointsUDP server.UDPEntryPoints, aviator *pilot.Pilot) func(conf dynamic.Configuration) { return func(conf dynamic.Configuration) { - routers, udpRouters := routerFactory.CreateRouters(conf) + rtConf := runtime.NewConfig(conf) + + routers, udpRouters := routerFactory.CreateRouters(rtConf) + for entryPointName, rt := range routers { for _, p := range acmeProviders { if p != nil && p.HTTPChallenge != nil && p.HTTPChallenge.EntryPoint == entryPointName { @@ -269,6 +283,11 @@ func switchRouter(routerFactory *server.RouterFactory, acmeProviders []*acme.Pro } } } + + if aviator != nil { + aviator.SetRuntimeConfiguration(rtConf) + } + serverEntryPointsTCP.Switch(routers) serverEntryPointsUDP.Switch(udpRouters) } diff --git a/pkg/config/static/static_config.go b/pkg/config/static/static_config.go index d23402c14..9e035b602 100644 --- a/pkg/config/static/static_config.go +++ b/pkg/config/static/static_config.go @@ -70,6 +70,18 @@ type Configuration struct { HostResolver *types.HostResolverConfig `description:"Enable CNAME Flattening." json:"hostResolver,omitempty" toml:"hostResolver,omitempty" yaml:"hostResolver,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"` CertificatesResolvers map[string]CertificateResolver `description:"Certificates resolvers configuration." json:"certificatesResolvers,omitempty" toml:"certificatesResolvers,omitempty" yaml:"certificatesResolvers,omitempty" export:"true"` + + Experimental *Experimental `description:"experimental features." json:"experimental,omitempty" toml:"experimental,omitempty" yaml:"experimental,omitempty"` +} + +// Experimental the experimental feature configuration. +type Experimental struct { + Pilot *PilotConfiguration `description:"Pilot configuration." json:"pilot,omitempty" toml:"pilot,omitempty" yaml:"pilot,omitempty" export:"true"` +} + +// PilotConfiguration holds pilot configuration. +type PilotConfiguration struct { + Token string `description:"Pilot token." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" export:"true"` } // CertificateResolver contains the configuration for the different types of certificates resolver. diff --git a/pkg/pilot/pilot.go b/pkg/pilot/pilot.go new file mode 100644 index 000000000..5be1d6fe1 --- /dev/null +++ b/pkg/pilot/pilot.go @@ -0,0 +1,246 @@ +package pilot + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/containous/traefik/v2/pkg/config/runtime" + "github.com/containous/traefik/v2/pkg/log" + "github.com/containous/traefik/v2/pkg/safe" + "github.com/containous/traefik/v2/pkg/version" +) + +const baseURL = "https://instance-info.pilot.traefik.io/public" + +const tokenHeader = "X-Token" + +const ( + pilotTimer = 5 * time.Minute + maxElapsedTime = 4 * time.Minute +) + +// RunTimeRepresentation is the configuration information exposed by the API handler. +type RunTimeRepresentation struct { + Routers map[string]*runtime.RouterInfo `json:"routers,omitempty"` + Middlewares map[string]*runtime.MiddlewareInfo `json:"middlewares,omitempty"` + Services map[string]*serviceInfoRepresentation `json:"services,omitempty"` + TCPRouters map[string]*runtime.TCPRouterInfo `json:"tcpRouters,omitempty"` + TCPServices map[string]*runtime.TCPServiceInfo `json:"tcpServices,omitempty"` + UDPRouters map[string]*runtime.UDPRouterInfo `json:"udpRouters,omitempty"` + UDPServices map[string]*runtime.UDPServiceInfo `json:"udpServices,omitempty"` +} + +type serviceInfoRepresentation struct { + *runtime.ServiceInfo + ServerStatus map[string]string `json:"serverStatus,omitempty"` +} + +type instanceInfo struct { + ID string `json:"id,omitempty"` + Configuration RunTimeRepresentation `json:"configuration,omitempty"` +} + +// New creates a new Pilot. +func New(token string, pool *safe.Pool) *Pilot { + return &Pilot{ + rtConfChan: make(chan *runtime.Configuration), + client: &client{ + token: token, + httpClient: &http.Client{Timeout: 5 * time.Second}, + baseURL: baseURL, + }, + routinesPool: pool, + } +} + +// Pilot connector with Pilot. +type Pilot struct { + routinesPool *safe.Pool + client *client + + rtConf *runtime.Configuration + rtConfChan chan *runtime.Configuration +} + +// SetRuntimeConfiguration stores the runtime configuration. +func (p *Pilot) SetRuntimeConfiguration(rtConf *runtime.Configuration) { + p.rtConfChan <- rtConf +} + +func (p *Pilot) getRepresentation() RunTimeRepresentation { + if p.rtConf == nil { + return RunTimeRepresentation{} + } + + siRepr := make(map[string]*serviceInfoRepresentation, len(p.rtConf.Services)) + for k, v := range p.rtConf.Services { + siRepr[k] = &serviceInfoRepresentation{ + ServiceInfo: v, + ServerStatus: v.GetAllStatus(), + } + } + + result := RunTimeRepresentation{ + Routers: p.rtConf.Routers, + Middlewares: p.rtConf.Middlewares, + Services: siRepr, + TCPRouters: p.rtConf.TCPRouters, + TCPServices: p.rtConf.TCPServices, + UDPRouters: p.rtConf.UDPRouters, + UDPServices: p.rtConf.UDPServices, + } + + return result +} + +func (p *Pilot) sendData(ctx context.Context, conf RunTimeRepresentation) { + err := p.client.SendData(ctx, conf) + if err != nil { + log.WithoutContext().Error(err) + } +} + +// Tick sends data periodically. +func (p *Pilot) Tick(ctx context.Context) { + select { + case rtConf := <-p.rtConfChan: + p.rtConf = rtConf + break + case <-ctx.Done(): + return + } + + conf := p.getRepresentation() + + p.routinesPool.GoCtx(func(ctxRt context.Context) { + p.sendData(ctxRt, conf) + }) + + ticker := time.NewTicker(pilotTimer) + for { + select { + case tick := <-ticker.C: + log.WithoutContext().Debugf("Send to pilot: %s", tick) + + conf := p.getRepresentation() + + p.routinesPool.GoCtx(func(ctxRt context.Context) { + p.sendData(ctxRt, conf) + }) + case rtConf := <-p.rtConfChan: + p.rtConf = rtConf + case <-ctx.Done(): + return + } + } +} + +type client struct { + httpClient *http.Client + baseURL string + token string + uuid string +} + +func (c *client) createUUID() (string, error) { + data := []byte(`{"version":"` + version.Version + `","codeName":"` + version.Codename + `"}`) + req, err := http.NewRequest(http.MethodPost, c.baseURL+"/", bytes.NewBuffer(data)) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set(tokenHeader, c.token) + + resp, err := c.httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed call Pilot: %w", err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed read response body: %w", err) + } + + if resp.StatusCode/100 != 2 { + return "", fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body) + } + + created := instanceInfo{} + err = json.Unmarshal(body, &created) + if err != nil { + return "", fmt.Errorf("failed to unmarshal response body: %w", err) + } + + return created.ID, nil +} + +// SendData sends data to Pilot. +func (c *client) SendData(ctx context.Context, rtConf RunTimeRepresentation) error { + exponentialBackOff := backoff.NewExponentialBackOff() + exponentialBackOff.MaxElapsedTime = maxElapsedTime + + return backoff.RetryNotify( + func() error { + return c.sendData(rtConf) + }, + backoff.WithContext(exponentialBackOff, ctx), + func(err error, duration time.Duration) { + log.WithoutContext().Errorf("retry in %s due to: %v ", duration, err) + }) +} + +func (c *client) sendData(_ RunTimeRepresentation) error { + if len(c.uuid) == 0 { + var err error + c.uuid, err = c.createUUID() + if err != nil { + return fmt.Errorf("failed to create UUID: %w", err) + } + + version.UUID = c.uuid + } + + info := instanceInfo{ + ID: c.uuid, + } + + b, err := json.Marshal(info) + if err != nil { + return fmt.Errorf("failed to marshall request body: %w", err) + } + + request, err := http.NewRequest(http.MethodPost, c.baseURL+"/command", bytes.NewBuffer(b)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + request.Header.Set("Content-Type", "application/json") + request.Header.Set(tokenHeader, c.token) + + resp, err := c.httpClient.Do(request) + if err != nil { + return fmt.Errorf("failed to call Pilot: %w", err) + } + + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body) + } + + return nil +} diff --git a/pkg/pilot/pilot_test.go b/pkg/pilot/pilot_test.go new file mode 100644 index 000000000..8ba61ec94 --- /dev/null +++ b/pkg/pilot/pilot_test.go @@ -0,0 +1,123 @@ +package pilot + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/containous/traefik/v2/pkg/config/runtime" + "github.com/containous/traefik/v2/pkg/safe" + "github.com/stretchr/testify/require" +) + +func TestTick(t *testing.T) { + receivedConfig := make(chan bool) + + mux := http.NewServeMux() + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + + mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(rw, "invalid method", http.StatusMethodNotAllowed) + return + } + + err := json.NewEncoder(rw).Encode(instanceInfo{ID: "123"}) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + }) + + mux.HandleFunc("/command", func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(rw, "invalid method", http.StatusMethodNotAllowed) + return + } + + receivedConfig <- true + }) + + pilot := New("token", safe.NewPool(context.Background())) + pilot.client.baseURL = server.URL + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + go pilot.Tick(ctx) + + pilot.SetRuntimeConfiguration(&runtime.Configuration{}) + pilot.SetRuntimeConfiguration(&runtime.Configuration{}) + + select { + case <-time.Tick(10 * time.Second): + t.Fatal("Timeout") + case <-receivedConfig: + return + } +} + +func TestClient_SendConfiguration(t *testing.T) { + myToken := "myToken" + + mux := http.NewServeMux() + server := httptest.NewServer(mux) + t.Cleanup(server.Close) + + mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(rw, "invalid method", http.StatusMethodNotAllowed) + return + } + + tk := req.Header.Get(tokenHeader) + if tk != myToken { + http.Error(rw, fmt.Sprintf("invalid token: %s", tk), http.StatusUnauthorized) + } + + err := json.NewEncoder(rw).Encode(instanceInfo{ID: "123"}) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + }) + + mux.HandleFunc("/command", func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(rw, "invalid method", http.StatusMethodNotAllowed) + return + } + + tk := req.Header.Get(tokenHeader) + if tk != myToken { + http.Error(rw, fmt.Sprintf("invalid token: %s", tk), http.StatusUnauthorized) + } + + defer req.Body.Close() + + info := &instanceInfo{} + err := json.NewDecoder(req.Body).Decode(info) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + return + } + + if info.ID != "123" { + http.Error(rw, fmt.Sprintf("invalid ID: %s", info.ID), http.StatusBadRequest) + } + }) + + client := client{ + baseURL: server.URL, + httpClient: http.DefaultClient, + token: myToken, + } + + err := client.SendData(context.Background(), RunTimeRepresentation{}) + require.NoError(t, err) +} diff --git a/pkg/server/routerfactory.go b/pkg/server/routerfactory.go index 1a2dfdc1c..02ef786ca 100644 --- a/pkg/server/routerfactory.go +++ b/pkg/server/routerfactory.go @@ -3,7 +3,6 @@ package server import ( "context" - "github.com/containous/traefik/v2/pkg/config/dynamic" "github.com/containous/traefik/v2/pkg/config/runtime" "github.com/containous/traefik/v2/pkg/config/static" "github.com/containous/traefik/v2/pkg/log" @@ -58,11 +57,9 @@ func NewRouterFactory(staticConfiguration static.Configuration, managerFactory * } // CreateRouters creates new TCPRouters and UDPRouters. -func (f *RouterFactory) CreateRouters(conf dynamic.Configuration) (map[string]*tcpCore.Router, map[string]udpCore.Handler) { +func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string]*tcpCore.Router, map[string]udpCore.Handler) { ctx := context.Background() - rtConf := runtime.NewConfig(conf) - // HTTP serviceManager := f.managerFactory.Build(rtConf) diff --git a/pkg/server/routerfactory_test.go b/pkg/server/routerfactory_test.go index 54ea8c73b..9b96f66c5 100644 --- a/pkg/server/routerfactory_test.go +++ b/pkg/server/routerfactory_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/containous/traefik/v2/pkg/config/dynamic" + "github.com/containous/traefik/v2/pkg/config/runtime" "github.com/containous/traefik/v2/pkg/config/static" "github.com/containous/traefik/v2/pkg/metrics" "github.com/containous/traefik/v2/pkg/server/middleware" @@ -52,7 +53,7 @@ func TestReuseService(t *testing.T) { factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil)) - entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: dynamicConfigs}) + entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs})) // Test that the /ok path returns a status 200. responseRecorderOk := &httptest.ResponseRecorder{} @@ -186,7 +187,7 @@ func TestServerResponseEmptyBackend(t *testing.T) { factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil)) - entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: test.config(testServer.URL)}) + entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)})) responseRecorder := &httptest.ResponseRecorder{} request := httptest.NewRequest(http.MethodGet, testServer.URL+requestPath, nil) @@ -225,7 +226,7 @@ func TestInternalServices(t *testing.T) { factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil)) - entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: dynamicConfigs}) + entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs})) // Test that the /ok path returns a status 200. responseRecorderOk := &httptest.ResponseRecorder{} diff --git a/pkg/version/version.go b/pkg/version/version.go index e01f38ec4..5e5e0ff0c 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/url" + "time" "github.com/containous/traefik/v2/pkg/log" "github.com/google/go-github/v28/github" @@ -19,6 +20,10 @@ var ( Codename = "cheddar" // beta cheese // BuildDate holds the build date of traefik. BuildDate = "I don't remember exactly" + // StartDate holds the start date of traefik. + StartDate = time.Now() + // UUID instance uuid. + UUID string ) // Handler expose version routes. @@ -33,11 +38,15 @@ func (v Handler) Append(router *mux.Router) { router.Methods(http.MethodGet).Path("/api/version"). HandlerFunc(func(response http.ResponseWriter, request *http.Request) { v := struct { - Version string - Codename string + Version string + Codename string + StartDate time.Time `json:"startDate"` + UUID string `json:"uuid"` }{ - Version: Version, - Codename: Codename, + Version: Version, + Codename: Codename, + StartDate: StartDate, + UUID: UUID, } if err := templatesRenderer.JSON(response, http.StatusOK, v); err != nil {