fix SSE subscriptions when retries are enabled
This commit is contained in:
parent
52c1909f24
commit
9d00da7285
1 changed files with 16 additions and 2 deletions
|
@ -52,6 +52,13 @@ func (retry *Retry) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||||
recorder.responseWriter = rw
|
recorder.responseWriter = rw
|
||||||
|
|
||||||
retry.next.ServeHTTP(recorder, r.WithContext(newCtx))
|
retry.next.ServeHTTP(recorder, r.WithContext(newCtx))
|
||||||
|
|
||||||
|
// It's a stream request and the body gets already sent to the client.
|
||||||
|
// Therefore we should not send the response a second time.
|
||||||
|
if recorder.streamingResponseStarted {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if !netErrorOccurred || attempts >= retry.attempts {
|
if !netErrorOccurred || attempts >= retry.attempts {
|
||||||
utils.CopyHeaders(rw.Header(), recorder.Header())
|
utils.CopyHeaders(rw.Header(), recorder.Header())
|
||||||
rw.WriteHeader(recorder.Code)
|
rw.WriteHeader(recorder.Code)
|
||||||
|
@ -114,8 +121,9 @@ type retryResponseRecorder struct {
|
||||||
HeaderMap http.Header // the HTTP response headers
|
HeaderMap http.Header // the HTTP response headers
|
||||||
Body *bytes.Buffer // if non-nil, the bytes.Buffer to append written data to
|
Body *bytes.Buffer // if non-nil, the bytes.Buffer to append written data to
|
||||||
|
|
||||||
responseWriter http.ResponseWriter
|
responseWriter http.ResponseWriter
|
||||||
err error
|
err error
|
||||||
|
streamingResponseStarted bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// newRetryResponseRecorder returns an initialized retryResponseRecorder.
|
// newRetryResponseRecorder returns an initialized retryResponseRecorder.
|
||||||
|
@ -164,6 +172,12 @@ func (rw *retryResponseRecorder) CloseNotify() <-chan bool {
|
||||||
|
|
||||||
// Flush sends any buffered data to the client.
|
// Flush sends any buffered data to the client.
|
||||||
func (rw *retryResponseRecorder) Flush() {
|
func (rw *retryResponseRecorder) Flush() {
|
||||||
|
if !rw.streamingResponseStarted {
|
||||||
|
utils.CopyHeaders(rw.responseWriter.Header(), rw.Header())
|
||||||
|
rw.responseWriter.WriteHeader(rw.Code)
|
||||||
|
rw.streamingResponseStarted = true
|
||||||
|
}
|
||||||
|
|
||||||
_, err := rw.responseWriter.Write(rw.Body.Bytes())
|
_, err := rw.responseWriter.Write(rw.Body.Bytes())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error writing response in retryResponseRecorder: %s", err)
|
log.Errorf("Error writing response in retryResponseRecorder: %s", err)
|
||||||
|
|
Loading…
Reference in a new issue