Don't add TCP proxy when error occurs during creation.
This commit is contained in:
parent
2617de2cdd
commit
c2345c6e9a
6 changed files with 149 additions and 55 deletions
|
@ -75,15 +75,6 @@ func (m *Manager) BuildHandlers(rootCtx context.Context, entryPoints []string, t
|
||||||
return entryPointHandlers
|
return entryPointHandlers
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(entryPoints []string, entryPointName string) bool {
|
|
||||||
for _, name := range entryPoints {
|
|
||||||
if name == entryPointName {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string, tls bool) map[string]map[string]*config.Router {
|
func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string, tls bool) map[string]map[string]*config.Router {
|
||||||
entryPointsRouters := make(map[string]map[string]*config.Router)
|
entryPointsRouters := make(map[string]map[string]*config.Router)
|
||||||
|
|
||||||
|
@ -121,11 +112,9 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
|
||||||
}
|
}
|
||||||
|
|
||||||
for routerName, routerConfig := range configs {
|
for routerName, routerConfig := range configs {
|
||||||
ctxRouter := log.With(ctx, log.Str(log.RouterName, routerName))
|
ctxRouter := log.With(internal.AddProviderInContext(ctx, routerName), log.Str(log.RouterName, routerName))
|
||||||
logger := log.FromContext(ctxRouter)
|
logger := log.FromContext(ctxRouter)
|
||||||
|
|
||||||
ctxRouter = internal.AddProviderInContext(ctxRouter, routerName)
|
|
||||||
|
|
||||||
handler, err := m.buildRouterHandler(ctxRouter, routerName)
|
handler, err := m.buildRouterHandler(ctxRouter, routerName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error(err)
|
logger.Error(err)
|
||||||
|
@ -197,3 +186,12 @@ func (m *Manager) buildHTTPHandler(ctx context.Context, router *config.Router, r
|
||||||
|
|
||||||
return alice.New().Extend(*mHandler).Append(tHandler).Then(sHandler)
|
return alice.New().Extend(*mHandler).Append(tHandler).Then(sHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func contains(entryPoints []string, entryPointName string) bool {
|
||||||
|
for _, name := range entryPoints {
|
||||||
|
if name == entryPointName {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -62,14 +62,12 @@ func (m *Manager) BuildHandlers(rootCtx context.Context, entryPoints []string) m
|
||||||
|
|
||||||
func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string]*config.TCPRouter, handlerHTTP http.Handler, handlerHTTPS http.Handler) (*tcp.Router, error) {
|
func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string]*config.TCPRouter, handlerHTTP http.Handler, handlerHTTPS http.Handler) (*tcp.Router, error) {
|
||||||
router := &tcp.Router{}
|
router := &tcp.Router{}
|
||||||
|
|
||||||
router.HTTPHandler(handlerHTTP)
|
router.HTTPHandler(handlerHTTP)
|
||||||
router.HTTPSHandler(handlerHTTPS, m.tlsConfig)
|
router.HTTPSHandler(handlerHTTPS, m.tlsConfig)
|
||||||
for routerName, routerConfig := range configs {
|
|
||||||
ctxRouter := log.With(ctx, log.Str(log.RouterName, routerName))
|
|
||||||
logger := log.FromContext(ctxRouter)
|
|
||||||
|
|
||||||
ctxRouter = internal.AddProviderInContext(ctxRouter, routerName)
|
for routerName, routerConfig := range configs {
|
||||||
|
ctxRouter := log.With(internal.AddProviderInContext(ctx, routerName), log.Str(log.RouterName, routerName))
|
||||||
|
logger := log.FromContext(ctxRouter)
|
||||||
|
|
||||||
handler, err := m.serviceManager.BuildTCP(ctxRouter, routerConfig.Service)
|
handler, err := m.serviceManager.BuildTCP(ctxRouter, routerConfig.Service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -79,18 +77,18 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
|
||||||
|
|
||||||
domains, err := rules.ParseHostSNI(routerConfig.Rule)
|
domains, err := rules.ParseHostSNI(routerConfig.Rule)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithoutContext().Debugf("Unknown rule %s", routerConfig.Rule)
|
logger.Debugf("Unknown rule %s", routerConfig.Rule)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, domain := range domains {
|
for _, domain := range domains {
|
||||||
log.WithoutContext().Debugf("Add route %s on TCP", domain)
|
logger.Debugf("Add route %s on TCP", domain)
|
||||||
switch {
|
switch {
|
||||||
case routerConfig.TLS != nil:
|
case routerConfig.TLS != nil:
|
||||||
if routerConfig.TLS.Passthrough {
|
if routerConfig.TLS.Passthrough {
|
||||||
router.AddRoute(domain, handler)
|
router.AddRoute(domain, handler)
|
||||||
} else {
|
} else {
|
||||||
router.AddRouteTLS(domain, handler, m.tlsConfig)
|
router.AddRouteTLS(domain, handler, m.tlsConfig)
|
||||||
|
|
||||||
}
|
}
|
||||||
case domain == "*":
|
case domain == "*":
|
||||||
router.AddCatchAllNoTLS(handler)
|
router.AddCatchAllNoTLS(handler)
|
||||||
|
@ -103,15 +101,6 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
|
||||||
return router, nil
|
return router, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(entryPoints []string, entryPointName string) bool {
|
|
||||||
for _, name := range entryPoints {
|
|
||||||
if name == entryPointName {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string) map[string]map[string]*config.TCPRouter {
|
func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string) map[string]map[string]*config.TCPRouter {
|
||||||
entryPointsRouters := make(map[string]map[string]*config.TCPRouter)
|
entryPointsRouters := make(map[string]map[string]*config.TCPRouter)
|
||||||
|
|
||||||
|
@ -120,6 +109,7 @@ func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string) map
|
||||||
if len(eps) == 0 {
|
if len(eps) == 0 {
|
||||||
eps = entryPoints
|
eps = entryPoints
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, entryPointName := range eps {
|
for _, entryPointName := range eps {
|
||||||
if !contains(entryPoints, entryPointName) {
|
if !contains(entryPoints, entryPointName) {
|
||||||
log.FromContext(log.With(ctx, log.Str(log.EntryPointName, entryPointName))).
|
log.FromContext(log.With(ctx, log.Str(log.EntryPointName, entryPointName))).
|
||||||
|
@ -137,3 +127,12 @@ func (m *Manager) filteredRouters(ctx context.Context, entryPoints []string) map
|
||||||
|
|
||||||
return entryPointsRouters
|
return entryPointsRouters
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func contains(entryPoints []string, entryPointName string) bool {
|
||||||
|
for _, name := range entryPoints {
|
||||||
|
if name == entryPointName {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -25,32 +25,41 @@ func NewManager(configs map[string]*config.TCPService) *Manager {
|
||||||
|
|
||||||
// BuildTCP Creates a tcp.Handler for a service configuration.
|
// BuildTCP Creates a tcp.Handler for a service configuration.
|
||||||
func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Handler, error) {
|
func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Handler, error) {
|
||||||
ctx := log.With(rootCtx, log.Str(log.ServiceName, serviceName))
|
serviceQualifiedName := internal.GetQualifiedName(rootCtx, serviceName)
|
||||||
|
ctx := internal.AddProviderInContext(rootCtx, serviceQualifiedName)
|
||||||
|
ctx = log.With(ctx, log.Str(log.ServiceName, serviceName))
|
||||||
|
|
||||||
serviceName = internal.GetQualifiedName(ctx, serviceName)
|
conf, ok := m.configs[serviceQualifiedName]
|
||||||
ctx = internal.AddProviderInContext(ctx, serviceName)
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("the service %q does not exits", serviceQualifiedName)
|
||||||
|
}
|
||||||
|
|
||||||
|
if conf.LoadBalancer == nil {
|
||||||
|
return nil, fmt.Errorf("the service %q doesn't have any TCP load balancer", serviceQualifiedName)
|
||||||
|
}
|
||||||
|
|
||||||
|
logger := log.FromContext(ctx)
|
||||||
|
|
||||||
if conf, ok := m.configs[serviceName]; ok {
|
|
||||||
// FIXME Check if the service is declared multiple times with different types
|
// FIXME Check if the service is declared multiple times with different types
|
||||||
if conf.LoadBalancer != nil {
|
|
||||||
loadBalancer := tcp.NewRRLoadBalancer()
|
loadBalancer := tcp.NewRRLoadBalancer()
|
||||||
|
|
||||||
var handler tcp.Handler
|
|
||||||
for _, server := range conf.LoadBalancer.Servers {
|
for _, server := range conf.LoadBalancer.Servers {
|
||||||
_, err := parseIP(server.Address)
|
if _, err := parseIP(server.Address); err != nil {
|
||||||
if err == nil {
|
logger.Errorf("Invalid IP address for a %q server %q: %v", serviceQualifiedName, server.Address, err)
|
||||||
handler, _ = tcp.NewProxy(server.Address)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
handler, err := tcp.NewProxy(server.Address)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("In service %q server %q: %v", serviceQualifiedName, server.Address, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
loadBalancer.AddServer(handler)
|
loadBalancer.AddServer(handler)
|
||||||
} else {
|
|
||||||
log.FromContext(ctx).Errorf("Invalid IP address for a %s server %s: %v", serviceName, server.Address, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return loadBalancer, nil
|
return loadBalancer, nil
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("the service %q doesn't have any TCP load balancer", serviceName)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("the service %q does not exits", serviceName)
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseIP(s string) (string, error) {
|
func parseIP(s string) (string, error) {
|
||||||
ip, _, err := net.SplitHostPort(s)
|
ip, _, err := net.SplitHostPort(s)
|
||||||
|
|
82
pkg/server/service/tcp/service_test.go
Normal file
82
pkg/server/service/tcp/service_test.go
Normal file
|
@ -0,0 +1,82 @@
|
||||||
|
package tcp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/containous/traefik/pkg/config"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestManager_BuildTCP(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
serviceName string
|
||||||
|
configs map[string]*config.TCPService
|
||||||
|
expectedError string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "without configuration",
|
||||||
|
serviceName: "test",
|
||||||
|
configs: nil,
|
||||||
|
expectedError: `the service "test" does not exits`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "missing lb configuration",
|
||||||
|
serviceName: "test",
|
||||||
|
configs: map[string]*config.TCPService{
|
||||||
|
"test": {},
|
||||||
|
},
|
||||||
|
expectedError: `the service "test" doesn't have any TCP load balancer`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "no such host",
|
||||||
|
serviceName: "test",
|
||||||
|
configs: map[string]*config.TCPService{
|
||||||
|
"test": {
|
||||||
|
LoadBalancer: &config.TCPLoadBalancerService{
|
||||||
|
Servers: []config.TCPServer{
|
||||||
|
{Address: "test:31"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "invalid IP address",
|
||||||
|
serviceName: "test",
|
||||||
|
configs: map[string]*config.TCPService{
|
||||||
|
"test": {
|
||||||
|
LoadBalancer: &config.TCPLoadBalancerService{
|
||||||
|
Servers: []config.TCPServer{
|
||||||
|
{Address: "foobar"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
test := test
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
manager := NewManager(test.configs)
|
||||||
|
|
||||||
|
handler, err := manager.BuildTCP(context.Background(), test.serviceName)
|
||||||
|
|
||||||
|
if test.expectedError != "" {
|
||||||
|
if err == nil {
|
||||||
|
require.Error(t, err)
|
||||||
|
} else {
|
||||||
|
require.EqualError(t, err, test.expectedError)
|
||||||
|
require.Nil(t, handler)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, handler)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,15 +18,15 @@ func NewProxy(address string) (*Proxy, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Proxy{
|
|
||||||
target: tcpAddr,
|
return &Proxy{target: tcpAddr}, nil
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServeTCP forwards the connection to a service
|
// ServeTCP forwards the connection to a service
|
||||||
func (p *Proxy) ServeTCP(conn net.Conn) {
|
func (p *Proxy) ServeTCP(conn net.Conn) {
|
||||||
log.Debugf("Handling connection from %s", conn.RemoteAddr())
|
log.Debugf("Handling connection from %s", conn.RemoteAddr())
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
connBackend, err := net.DialTCP("tcp", nil, p.target)
|
connBackend, err := net.DialTCP("tcp", nil, p.target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error while connection to backend: %v", err)
|
log.Errorf("Error while connection to backend: %v", err)
|
||||||
|
|
|
@ -21,6 +21,11 @@ func NewRRLoadBalancer() *RRLoadBalancer {
|
||||||
|
|
||||||
// ServeTCP forwards the connection to the right service
|
// ServeTCP forwards the connection to the right service
|
||||||
func (r *RRLoadBalancer) ServeTCP(conn net.Conn) {
|
func (r *RRLoadBalancer) ServeTCP(conn net.Conn) {
|
||||||
|
if len(r.servers) == 0 {
|
||||||
|
log.WithoutContext().Error("no available server")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
r.next().ServeTCP(conn)
|
r.next().ServeTCP(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +43,7 @@ func (r *RRLoadBalancer) next() Handler {
|
||||||
r.current = 0
|
r.current = 0
|
||||||
log.Debugf("Load balancer: going back to the first available server")
|
log.Debugf("Load balancer: going back to the first available server")
|
||||||
}
|
}
|
||||||
|
|
||||||
handler := r.servers[r.current]
|
handler := r.servers[r.current]
|
||||||
r.current++
|
r.current++
|
||||||
return handler
|
return handler
|
||||||
|
|
Loading…
Reference in a new issue