Enable retry on websocket
This commit is contained in:
parent
efc6560d83
commit
f400292be7
3 changed files with 125 additions and 34 deletions
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
"github.com/containous/traefik/integration/try"
|
"github.com/containous/traefik/integration/try"
|
||||||
"github.com/go-check/check"
|
"github.com/go-check/check"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
checker "github.com/vdemeester/shakers"
|
checker "github.com/vdemeester/shakers"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -38,3 +39,29 @@ func (s *RetrySuite) TestRetry(c *check.C) {
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
c.Assert(response.StatusCode, checker.Equals, http.StatusOK)
|
c.Assert(response.StatusCode, checker.Equals, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *RetrySuite) TestRetryWebsocket(c *check.C) {
|
||||||
|
whoamiEndpoint := s.composeProject.Container(c, "whoami").NetworkSettings.IPAddress
|
||||||
|
file := s.adaptFile(c, "fixtures/retry/simple.toml", struct {
|
||||||
|
WhoamiEndpoint string
|
||||||
|
}{whoamiEndpoint})
|
||||||
|
defer os.Remove(file)
|
||||||
|
|
||||||
|
cmd, display := s.traefikCmd(withConfigFile(file))
|
||||||
|
defer display(c)
|
||||||
|
err := cmd.Start()
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer cmd.Process.Kill()
|
||||||
|
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8080/api/providers", 60*time.Second, try.BodyContains("PathPrefix:/"))
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
|
||||||
|
// This simulates a DialTimeout when connecting to the backend server.
|
||||||
|
_, response, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:8000/echo", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
c.Assert(response.StatusCode, checker.Equals, http.StatusSwitchingProtocols)
|
||||||
|
|
||||||
|
_, response, err = websocket.DefaultDialer.Dial("ws://127.0.0.1:8000/echo", nil)
|
||||||
|
c.Assert(err, checker.IsNil)
|
||||||
|
c.Assert(response.StatusCode, checker.Equals, http.StatusSwitchingProtocols)
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package middlewares
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -41,11 +42,8 @@ func (retry *Retry) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||||
attempts := 1
|
attempts := 1
|
||||||
for {
|
for {
|
||||||
attemptsExhausted := attempts >= retry.attempts
|
attemptsExhausted := attempts >= retry.attempts
|
||||||
// Websocket requests can't be retried at this point in time.
|
|
||||||
// This is due to the fact that gorilla/websocket doesn't use the request
|
shouldRetry := !attemptsExhausted
|
||||||
// context and so we don't get httptrace information.
|
|
||||||
// Websocket clients should however retry on their own anyway.
|
|
||||||
shouldRetry := !attemptsExhausted && !isWebsocketRequest(r)
|
|
||||||
retryResponseWriter := newRetryResponseWriter(rw, shouldRetry)
|
retryResponseWriter := newRetryResponseWriter(rw, shouldRetry)
|
||||||
|
|
||||||
// Disable retries when the backend already received request data
|
// Disable retries when the backend already received request data
|
||||||
|
@ -150,7 +148,11 @@ func (rr *retryResponseWriterWithoutCloseNotify) WriteHeader(code int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rr *retryResponseWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
func (rr *retryResponseWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||||
return rr.responseWriter.(http.Hijacker).Hijack()
|
hijacker, ok := rr.responseWriter.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return nil, nil, fmt.Errorf("%T is not a http.Hijacker", rr.responseWriter)
|
||||||
|
}
|
||||||
|
return hijacker.Hijack()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rr *retryResponseWriterWithoutCloseNotify) Flush() {
|
func (rr *retryResponseWriterWithoutCloseNotify) Flush() {
|
||||||
|
|
|
@ -3,9 +3,13 @@ package middlewares
|
||||||
import (
|
import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/containous/traefik/testhelpers"
|
"github.com/containous/traefik/testhelpers"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/vulcand/oxy/forward"
|
"github.com/vulcand/oxy/forward"
|
||||||
"github.com/vulcand/oxy/roundrobin"
|
"github.com/vulcand/oxy/roundrobin"
|
||||||
)
|
)
|
||||||
|
@ -17,7 +21,6 @@ func TestRetry(t *testing.T) {
|
||||||
wantRetryAttempts int
|
wantRetryAttempts int
|
||||||
wantResponseStatus int
|
wantResponseStatus int
|
||||||
amountFaultyEndpoints int
|
amountFaultyEndpoints int
|
||||||
isWebsocketHandshakeRequest bool
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "no retry on success",
|
desc: "no retry on success",
|
||||||
|
@ -54,14 +57,6 @@ func TestRetry(t *testing.T) {
|
||||||
wantResponseStatus: http.StatusInternalServerError,
|
wantResponseStatus: http.StatusInternalServerError,
|
||||||
amountFaultyEndpoints: 3,
|
amountFaultyEndpoints: 3,
|
||||||
},
|
},
|
||||||
{
|
|
||||||
desc: "websocket request should not be retried",
|
|
||||||
maxRequestAttempts: 3,
|
|
||||||
wantRetryAttempts: 0,
|
|
||||||
wantResponseStatus: http.StatusBadGateway,
|
|
||||||
amountFaultyEndpoints: 1,
|
|
||||||
isWebsocketHandshakeRequest: true,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
backendServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
backendServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
@ -74,10 +69,10 @@ func TestRetry(t *testing.T) {
|
||||||
t.Fatalf("Error creating forwarder: %s", err)
|
t.Fatalf("Error creating forwarder: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, test := range testCases {
|
||||||
tc := tc
|
test := test
|
||||||
|
|
||||||
t.Run(tc.desc, func(t *testing.T) {
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
loadBalancer, err := roundrobin.New(forwarder)
|
loadBalancer, err := roundrobin.New(forwarder)
|
||||||
|
@ -86,7 +81,7 @@ func TestRetry(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
basePort := 33444
|
basePort := 33444
|
||||||
for i := 0; i < tc.amountFaultyEndpoints; i++ {
|
for i := 0; i < test.amountFaultyEndpoints; i++ {
|
||||||
// 192.0.2.0 is a non-routable IP for testing purposes.
|
// 192.0.2.0 is a non-routable IP for testing purposes.
|
||||||
// See: https://stackoverflow.com/questions/528538/non-routable-ip-address/18436928#18436928
|
// See: https://stackoverflow.com/questions/528538/non-routable-ip-address/18436928#18436928
|
||||||
// We only use the port specification here because the URL is used as identifier
|
// We only use the port specification here because the URL is used as identifier
|
||||||
|
@ -98,24 +93,91 @@ func TestRetry(t *testing.T) {
|
||||||
loadBalancer.UpsertServer(testhelpers.MustParseURL(backendServer.URL))
|
loadBalancer.UpsertServer(testhelpers.MustParseURL(backendServer.URL))
|
||||||
|
|
||||||
retryListener := &countingRetryListener{}
|
retryListener := &countingRetryListener{}
|
||||||
retry := NewRetry(tc.maxRequestAttempts, loadBalancer, retryListener)
|
retry := NewRetry(test.maxRequestAttempts, loadBalancer, retryListener)
|
||||||
|
|
||||||
recorder := httptest.NewRecorder()
|
recorder := httptest.NewRecorder()
|
||||||
req := httptest.NewRequest(http.MethodGet, "http://localhost:3000/ok", nil)
|
req := httptest.NewRequest(http.MethodGet, "http://localhost:3000/ok", nil)
|
||||||
|
|
||||||
if tc.isWebsocketHandshakeRequest {
|
|
||||||
req.Header.Add("Connection", "Upgrade")
|
|
||||||
req.Header.Add("Upgrade", "websocket")
|
|
||||||
}
|
|
||||||
|
|
||||||
retry.ServeHTTP(recorder, req)
|
retry.ServeHTTP(recorder, req)
|
||||||
|
|
||||||
if tc.wantResponseStatus != recorder.Code {
|
assert.Equal(t, test.wantResponseStatus, recorder.Code)
|
||||||
t.Errorf("got status code %d, want %d", recorder.Code, tc.wantResponseStatus)
|
assert.Equal(t, test.wantRetryAttempts, retryListener.timesCalled)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if tc.wantRetryAttempts != retryListener.timesCalled {
|
|
||||||
t.Errorf("retry listener called %d time(s), want %d time(s)", retryListener.timesCalled, tc.wantRetryAttempts)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetryWebsocket(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
maxRequestAttempts int
|
||||||
|
expectedRetryAttempts int
|
||||||
|
expectedResponseStatus int
|
||||||
|
expectedError bool
|
||||||
|
amountFaultyEndpoints int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "Switching ok after 2 retries",
|
||||||
|
maxRequestAttempts: 3,
|
||||||
|
expectedRetryAttempts: 2,
|
||||||
|
amountFaultyEndpoints: 2,
|
||||||
|
expectedResponseStatus: http.StatusSwitchingProtocols,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "Switching failed",
|
||||||
|
maxRequestAttempts: 2,
|
||||||
|
expectedRetryAttempts: 1,
|
||||||
|
amountFaultyEndpoints: 2,
|
||||||
|
expectedResponseStatus: http.StatusBadGateway,
|
||||||
|
expectedError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
forwarder, err := forward.New()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating forwarder: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
backendServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
upgrader := websocket.Upgrader{}
|
||||||
|
upgrader.Upgrade(rw, req, nil)
|
||||||
|
}))
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
test := test
|
||||||
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
loadBalancer, err := roundrobin.New(forwarder)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating load balancer: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
basePort := 33444
|
||||||
|
for i := 0; i < test.amountFaultyEndpoints; i++ {
|
||||||
|
// 192.0.2.0 is a non-routable IP for testing purposes.
|
||||||
|
// See: https://stackoverflow.com/questions/528538/non-routable-ip-address/18436928#18436928
|
||||||
|
// We only use the port specification here because the URL is used as identifier
|
||||||
|
// in the load balancer and using the exact same URL would not add a new server.
|
||||||
|
loadBalancer.UpsertServer(testhelpers.MustParseURL("http://192.0.2.0:" + string(basePort+i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// add the functioning server to the end of the load balancer list
|
||||||
|
loadBalancer.UpsertServer(testhelpers.MustParseURL(backendServer.URL))
|
||||||
|
|
||||||
|
retryListener := &countingRetryListener{}
|
||||||
|
retry := NewRetry(test.maxRequestAttempts, loadBalancer, retryListener)
|
||||||
|
|
||||||
|
retryServer := httptest.NewServer(retry)
|
||||||
|
|
||||||
|
url := strings.Replace(retryServer.URL, "http", "ws", 1)
|
||||||
|
_, response, err := websocket.DefaultDialer.Dial(url, nil)
|
||||||
|
|
||||||
|
if !test.expectedError {
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.Equal(t, test.expectedResponseStatus, response.StatusCode)
|
||||||
|
assert.Equal(t, test.expectedRetryAttempts, retryListener.timesCalled)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue