traefik/pkg/provider/hub/handler.go
2022-11-21 18:36:05 +01:00

147 lines
4 KiB
Go

package hub
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"sync/atomic"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
)
type handler struct {
mux *http.ServeMux
client http.Client
entryPoint string
port int
tlsCfg *TLS
// Accessed atomically.
lastCfgUnixNano int64
cfgChan chan<- dynamic.Message
}
func newHandler(entryPoint string, port int, cfgChan chan<- dynamic.Message, tlsCfg *TLS, client http.Client) http.Handler {
h := &handler{
mux: http.NewServeMux(),
entryPoint: entryPoint,
port: port,
cfgChan: cfgChan,
tlsCfg: tlsCfg,
client: client,
}
h.mux.HandleFunc("/config", h.handleConfig)
h.mux.HandleFunc("/discover-ip", h.handleDiscoverIP)
h.mux.HandleFunc("/state", h.handleState)
return h
}
type configRequest struct {
UnixNano int64 `json:"unixNano"`
Configuration *dynamic.Configuration `json:"configuration"`
}
func (h *handler) handleConfig(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
payload := &configRequest{Configuration: emptyDynamicConfiguration()}
if err := json.NewDecoder(req.Body).Decode(payload); err != nil {
err = fmt.Errorf("decoding config request: %w", err)
log.Error().Err(err).Msg("Handling config")
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
cfg := payload.Configuration
patchDynamicConfiguration(cfg, h.entryPoint, h.port, h.tlsCfg)
// We can safely drop messages here if the other end is not ready to receive them
// as the agent will re-apply the same configuration.
select {
case h.cfgChan <- dynamic.Message{ProviderName: "hub", Configuration: cfg}:
atomic.StoreInt64(&h.lastCfgUnixNano, payload.UnixNano)
default:
}
}
func (h *handler) handleDiscoverIP(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
xff := req.Header.Get("X-Forwarded-For")
port := req.URL.Query().Get("port")
nonce := req.URL.Query().Get("nonce")
if err := h.doDiscoveryReq(req.Context(), xff, port, nonce); err != nil {
err = fmt.Errorf("doing discovery request: %w", err)
log.Error().Err(err).Msg("Handling IP discovery")
http.Error(rw, http.StatusText(http.StatusBadGateway), http.StatusBadGateway)
return
}
if err := json.NewEncoder(rw).Encode(xff); err != nil {
err = fmt.Errorf("encoding discover ip response: %w", err)
log.Error().Err(err).Msg("Handling IP discovery")
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
}
func (h *handler) doDiscoveryReq(ctx context.Context, ip, port, nonce string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s", net.JoinHostPort(ip, port)), http.NoBody)
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
q := make(url.Values)
q.Set("nonce", nonce)
req.URL.RawQuery = q.Encode()
req.Host = "agent.traefik"
resp, err := h.client.Do(req)
if err != nil {
return fmt.Errorf("doing request: %w", err)
}
defer func() { _ = resp.Body.Close() }()
return nil
}
type stateResponse struct {
LastConfigUnixNano int64 `json:"lastConfigUnixNano"`
}
func (h *handler) handleState(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
resp := stateResponse{
LastConfigUnixNano: atomic.LoadInt64(&h.lastCfgUnixNano),
}
if err := json.NewEncoder(rw).Encode(resp); err != nil {
err = fmt.Errorf("encoding last config received response: %w", err)
log.Error().Err(err).Msg("Handling state")
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
h.mux.ServeHTTP(rw, req)
}