Add flush interval option on backend
This commit is contained in:
parent
c6dd1dccc3
commit
e6e9a86919
43 changed files with 420 additions and 85 deletions
|
@ -143,6 +143,14 @@ var _templatesConsul_catalogTmpl = []byte(`[backends]
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $service.TraefikLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $service.TraefikLabels }}
|
{{ $loadBalancer := getLoadBalancer $service.TraefikLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
@ -620,6 +628,12 @@ var _templatesDockerTmpl = []byte(`{{$backendServers := .Servers}}
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
@ -948,6 +962,12 @@ var _templatesEcsTmpl = []byte(`[backends]
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $firstInstance.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $serviceName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $serviceName }}".loadBalancer]
|
[backends."backend-{{ $serviceName }}".loadBalancer]
|
||||||
|
@ -1258,6 +1278,11 @@ var _templatesKubernetesTmpl = []byte(`[backends]
|
||||||
expression = "{{ $backend.CircuitBreaker.Expression }}"
|
expression = "{{ $backend.CircuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{if $backend.ResponseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $backend.responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
method = "{{ $backend.LoadBalancer.Method }}"
|
method = "{{ $backend.LoadBalancer.Method }}"
|
||||||
sticky = {{ $backend.LoadBalancer.Sticky }}
|
sticky = {{ $backend.LoadBalancer.Sticky }}
|
||||||
|
@ -1493,6 +1518,12 @@ var _templatesKvTmpl = []byte(`[backends]
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.flushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend }}
|
{{ $loadBalancer := getLoadBalancer $backend }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
|
@ -1863,6 +1894,12 @@ var _templatesMarathonTmpl = []byte(`{{ $apps := .Applications }}
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $app.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $app.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $app.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
|
@ -2177,6 +2214,12 @@ var _templatesMesosTmpl = []byte(`[backends]
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $app.TraefikLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $app.TraefikLabels }}
|
{{ $loadBalancer := getLoadBalancer $app.TraefikLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
@ -2545,6 +2588,12 @@ var _templatesRancherTmpl = []byte(`{{ $backendServers := .Backends }}
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -105,6 +105,7 @@ Additional settings can be defined using Consul Catalog tags.
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `<prefix>.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend. ex: `NetworkErrorRatio() > 0.` |
|
| `<prefix>.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend. ex: `NetworkErrorRatio() > 0.` |
|
||||||
|
| `<prefix>.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `<prefix>.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `<prefix>.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `<prefix>.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
| `<prefix>.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
||||||
| `<prefix>.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
| `<prefix>.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
||||||
|
|
|
@ -225,6 +225,7 @@ Labels can be used on containers to override default behavior.
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
||||||
|
| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
||||||
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
||||||
|
|
|
@ -150,6 +150,7 @@ Labels can be used on task containers to override default behaviour:
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
||||||
|
| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
||||||
| `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. |
|
| `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. |
|
||||||
|
|
|
@ -24,6 +24,9 @@ Traefik can be configured with a file.
|
||||||
[backends.backend1.circuitBreaker]
|
[backends.backend1.circuitBreaker]
|
||||||
expression = "NetworkErrorRatio() > 0.5"
|
expression = "NetworkErrorRatio() > 0.5"
|
||||||
|
|
||||||
|
[backends.backend1.responseForwarding]
|
||||||
|
flushInterval = "10ms"
|
||||||
|
|
||||||
[backends.backend1.loadBalancer]
|
[backends.backend1.loadBalancer]
|
||||||
method = "drr"
|
method = "drr"
|
||||||
[backends.backend1.loadBalancer.stickiness]
|
[backends.backend1.loadBalancer.stickiness]
|
||||||
|
|
|
@ -277,6 +277,7 @@ The following annotations are applicable on the Service object associated with a
|
||||||
| `traefik.backend.loadbalancer.sticky: "true"` | Enable backend sticky sessions (DEPRECATED). |
|
| `traefik.backend.loadbalancer.sticky: "true"` | Enable backend sticky sessions (DEPRECATED). |
|
||||||
| `traefik.ingress.kubernetes.io/affinity: "true"` | Enable backend sticky sessions. |
|
| `traefik.ingress.kubernetes.io/affinity: "true"` | Enable backend sticky sessions. |
|
||||||
| `traefik.ingress.kubernetes.io/circuit-breaker-expression: <expression>` | Set the circuit breaker expression for the backend. |
|
| `traefik.ingress.kubernetes.io/circuit-breaker-expression: <expression>` | Set the circuit breaker expression for the backend. |
|
||||||
|
| `traefik.ingress.kubernetes.io/responseforwarding-flushinterval: "10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.ingress.kubernetes.io/load-balancer-method: drr` | Override the default `wrr` load balancer algorithm. |
|
| `traefik.ingress.kubernetes.io/load-balancer-method: drr` | Override the default `wrr` load balancer algorithm. |
|
||||||
| `traefik.ingress.kubernetes.io/max-conn-amount: "10"` | Sets the maximum number of simultaneous connections to the backend.<br>Must be used in conjunction with the label below to take effect. |
|
| `traefik.ingress.kubernetes.io/max-conn-amount: "10"` | Sets the maximum number of simultaneous connections to the backend.<br>Must be used in conjunction with the label below to take effect. |
|
||||||
| `traefik.ingress.kubernetes.io/max-conn-extractor-func: client.ip` | Set the function to be used against the request to determine what to limit maximum connections to the backend by.<br>Must be used in conjunction with the above label to take effect. |
|
| `traefik.ingress.kubernetes.io/max-conn-extractor-func: client.ip` | Set the function to be used against the request to determine what to limit maximum connections to the backend by.<br>Must be used in conjunction with the above label to take effect. |
|
||||||
|
|
|
@ -208,6 +208,7 @@ The following labels can be defined on Marathon applications. They adjust the be
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
||||||
|
| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
||||||
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
||||||
|
|
|
@ -122,6 +122,7 @@ The following labels can be defined on Mesos tasks. They adjust the behavior for
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
||||||
|
| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. (Default: 30s) |
|
||||||
| `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. |
|
| `traefik.backend.healthcheck.scheme=http` | Overrides the server URL scheme. |
|
||||||
|
|
|
@ -152,6 +152,7 @@ Labels can be used on task containers to override default behavior:
|
||||||
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.memResponseBodyBytes=0` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
| `traefik.backend.buffering.retryExpression=EXPR` | See [buffering](/configuration/commons/#buffering) section. |
|
||||||
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
| `traefik.backend.circuitbreaker.expression=EXPR` | Creates a [circuit breaker](/basics/#backends) to be used against the backend |
|
||||||
|
| `traefik.backend.responseForwarding.flushInterval=10ms` | Defines the interval between two flushes when forwarding response from backend to client. |
|
||||||
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
| `traefik.backend.healthcheck.path=/health` | Enables health check for the backend, hitting the container at `path`. |
|
||||||
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
| `traefik.backend.healthcheck.interval=1s` | Defines the health check interval. |
|
||||||
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
| `traefik.backend.healthcheck.port=8080` | Sets a different port for the health check. |
|
||||||
|
|
31
integration/fixtures/grpc/config_with_flush.toml
Normal file
31
integration/fixtures/grpc/config_with_flush.toml
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
defaultEntryPoints = ["https"]
|
||||||
|
|
||||||
|
rootCAs = [ """{{ .CertContent }}""" ]
|
||||||
|
|
||||||
|
[entryPoints]
|
||||||
|
[entryPoints.https]
|
||||||
|
address = ":4443"
|
||||||
|
[entryPoints.https.tls]
|
||||||
|
[[entryPoints.https.tls.certificates]]
|
||||||
|
certFile = """{{ .CertContent }}"""
|
||||||
|
keyFile = """{{ .KeyContent }}"""
|
||||||
|
|
||||||
|
|
||||||
|
[api]
|
||||||
|
|
||||||
|
[file]
|
||||||
|
|
||||||
|
[backends]
|
||||||
|
[backends.backend1]
|
||||||
|
[backends.backend1.responseForwarding]
|
||||||
|
flushInterval="1ms"
|
||||||
|
[backends.backend1.servers.server1]
|
||||||
|
url = "https://127.0.0.1:{{ .GRPCServerPort }}"
|
||||||
|
weight = 1
|
||||||
|
|
||||||
|
|
||||||
|
[frontends]
|
||||||
|
[frontends.frontend1]
|
||||||
|
backend = "backend1"
|
||||||
|
[frontends.frontend1.routes.test_1]
|
||||||
|
rule = "Host:127.0.0.1"
|
|
@ -356,3 +356,64 @@ func (s *GRPCSuite) TestGRPCBuffer(c *check.C) {
|
||||||
})
|
})
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *GRPCSuite) TestGRPCBufferWithFlushInterval(c *check.C) {
|
||||||
|
stopStreamExample := make(chan bool)
|
||||||
|
defer func() { stopStreamExample <- true }()
|
||||||
|
lis, err := net.Listen("tcp", ":0")
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
_, port, err := net.SplitHostPort(lis.Addr().String())
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err := startGRPCServer(lis, &myserver{
|
||||||
|
stopStreamExample: stopStreamExample,
|
||||||
|
})
|
||||||
|
c.Log(err)
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
}()
|
||||||
|
|
||||||
|
file := s.adaptFile(c, "fixtures/grpc/config_with_flush.toml", struct {
|
||||||
|
CertContent string
|
||||||
|
KeyContent string
|
||||||
|
GRPCServerPort string
|
||||||
|
}{
|
||||||
|
CertContent: string(LocalhostCert),
|
||||||
|
KeyContent: string(LocalhostKey),
|
||||||
|
GRPCServerPort: port,
|
||||||
|
})
|
||||||
|
|
||||||
|
defer os.Remove(file)
|
||||||
|
cmd, display := s.traefikCmd(withConfigFile(file))
|
||||||
|
defer display(c)
|
||||||
|
|
||||||
|
err = cmd.Start()
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
defer cmd.Process.Kill()
|
||||||
|
|
||||||
|
// wait for Traefik
|
||||||
|
err = try.GetRequest("http://127.0.0.1:8080/api/providers", 1*time.Second, try.BodyContains("Host:127.0.0.1"))
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
var client helloworld.Greeter_StreamExampleClient
|
||||||
|
client, closer, err := callStreamExampleClientGRPC()
|
||||||
|
defer closer()
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
received := make(chan bool)
|
||||||
|
go func() {
|
||||||
|
tr, err := client.Recv()
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
c.Assert(len(tr.Data), check.Equals, 512)
|
||||||
|
received <- true
|
||||||
|
}()
|
||||||
|
|
||||||
|
err = try.Do(time.Millisecond*100, func() error {
|
||||||
|
select {
|
||||||
|
case <-received:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return errors.New("failed to receive stream data")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
c.Assert(err, check.IsNil)
|
||||||
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ func (p *Provider) buildConfigurationV2(catalog []catalogUpdate) *types.Configur
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
"getServer": p.getServer,
|
"getServer": p.getServer,
|
||||||
|
|
||||||
// Frontend functions
|
// Frontend functions
|
||||||
|
|
|
@ -405,6 +405,7 @@ func TestProviderBuildConfiguration(t *testing.T) {
|
||||||
label.TraefikBackend + "=foobar",
|
label.TraefikBackend + "=foobar",
|
||||||
|
|
||||||
label.TraefikBackendCircuitBreakerExpression + "=NetworkErrorRatio() > 0.5",
|
label.TraefikBackendCircuitBreakerExpression + "=NetworkErrorRatio() > 0.5",
|
||||||
|
label.TraefikBackendResponseForwardingFlushInterval + "=10ms",
|
||||||
label.TraefikBackendHealthCheckPath + "=/health",
|
label.TraefikBackendHealthCheckPath + "=/health",
|
||||||
label.TraefikBackendHealthCheckScheme + "=http",
|
label.TraefikBackendHealthCheckScheme + "=http",
|
||||||
label.TraefikBackendHealthCheckPort + "=880",
|
label.TraefikBackendHealthCheckPort + "=880",
|
||||||
|
@ -673,6 +674,9 @@ func TestProviderBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -38,6 +38,7 @@ func (p *Provider) buildConfigurationV2(containersInspected []dockerData) *types
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
"getCircuitBreaker": label.GetCircuitBreaker,
|
"getCircuitBreaker": label.GetCircuitBreaker,
|
||||||
"getLoadBalancer": label.GetLoadBalancer,
|
"getLoadBalancer": label.GetLoadBalancer,
|
||||||
|
|
||||||
|
|
|
@ -434,6 +434,7 @@ func TestDockerBuildConfiguration(t *testing.T) {
|
||||||
label.TraefikBackend: "foobar",
|
label.TraefikBackend: "foobar",
|
||||||
|
|
||||||
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
||||||
|
label.TraefikBackendResponseForwardingFlushInterval: "10ms",
|
||||||
label.TraefikBackendHealthCheckScheme: "http",
|
label.TraefikBackendHealthCheckScheme: "http",
|
||||||
label.TraefikBackendHealthCheckPath: "/health",
|
label.TraefikBackendHealthCheckPath: "/health",
|
||||||
label.TraefikBackendHealthCheckPort: "880",
|
label.TraefikBackendHealthCheckPort: "880",
|
||||||
|
@ -666,6 +667,9 @@ func TestDockerBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -383,6 +383,7 @@ func TestSwarmBuildConfiguration(t *testing.T) {
|
||||||
label.TraefikBackend: "foobar",
|
label.TraefikBackend: "foobar",
|
||||||
|
|
||||||
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
||||||
|
label.TraefikBackendResponseForwardingFlushInterval: "10ms",
|
||||||
label.TraefikBackendHealthCheckScheme: "http",
|
label.TraefikBackendHealthCheckScheme: "http",
|
||||||
label.TraefikBackendHealthCheckPath: "/health",
|
label.TraefikBackendHealthCheckPath: "/health",
|
||||||
label.TraefikBackendHealthCheckPort: "880",
|
label.TraefikBackendHealthCheckPort: "880",
|
||||||
|
@ -584,6 +585,9 @@ func TestSwarmBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -28,6 +28,8 @@ func (p *Provider) buildConfigurationV2(instances []ecsInstance) (*types.Configu
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
|
|
||||||
"getServers": getServers,
|
"getServers": getServers,
|
||||||
|
|
||||||
// Frontend functions
|
// Frontend functions
|
||||||
|
|
|
@ -342,6 +342,7 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
label.TraefikBackend: aws.String("foobar"),
|
label.TraefikBackend: aws.String("foobar"),
|
||||||
|
|
||||||
label.TraefikBackendCircuitBreakerExpression: aws.String("NetworkErrorRatio() > 0.5"),
|
label.TraefikBackendCircuitBreakerExpression: aws.String("NetworkErrorRatio() > 0.5"),
|
||||||
|
label.TraefikBackendResponseForwardingFlushInterval: aws.String("10ms"),
|
||||||
label.TraefikBackendHealthCheckScheme: aws.String("http"),
|
label.TraefikBackendHealthCheckScheme: aws.String("http"),
|
||||||
label.TraefikBackendHealthCheckPath: aws.String("/health"),
|
label.TraefikBackendHealthCheckPath: aws.String("/health"),
|
||||||
label.TraefikBackendHealthCheckPort: aws.String("880"),
|
label.TraefikBackendHealthCheckPort: aws.String("880"),
|
||||||
|
@ -458,6 +459,9 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -40,6 +40,7 @@ const (
|
||||||
annotationKubernetesRateLimit = "ingress.kubernetes.io/rate-limit"
|
annotationKubernetesRateLimit = "ingress.kubernetes.io/rate-limit"
|
||||||
annotationKubernetesErrorPages = "ingress.kubernetes.io/error-pages"
|
annotationKubernetesErrorPages = "ingress.kubernetes.io/error-pages"
|
||||||
annotationKubernetesBuffering = "ingress.kubernetes.io/buffering"
|
annotationKubernetesBuffering = "ingress.kubernetes.io/buffering"
|
||||||
|
annotationKubernetesResponseForwardingFlushInterval = "ingress.kubernetes.io/responseforwarding-flushinterval"
|
||||||
annotationKubernetesAppRoot = "ingress.kubernetes.io/app-root"
|
annotationKubernetesAppRoot = "ingress.kubernetes.io/app-root"
|
||||||
annotationKubernetesServiceWeights = "ingress.kubernetes.io/service-weights"
|
annotationKubernetesServiceWeights = "ingress.kubernetes.io/service-weights"
|
||||||
annotationKubernetesRequestModifier = "ingress.kubernetes.io/request-modifier"
|
annotationKubernetesRequestModifier = "ingress.kubernetes.io/request-modifier"
|
||||||
|
|
|
@ -93,6 +93,13 @@ func circuitBreaker(exp string) func(*types.Backend) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func responseForwarding(interval string) func(*types.Backend) {
|
||||||
|
return func(b *types.Backend) {
|
||||||
|
b.ResponseForwarding = &types.ResponseForwarding{}
|
||||||
|
b.ResponseForwarding.FlushInterval = interval
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func buffering(opts ...func(*types.Buffering)) func(*types.Backend) {
|
func buffering(opts ...func(*types.Buffering)) func(*types.Backend) {
|
||||||
return func(b *types.Backend) {
|
return func(b *types.Backend) {
|
||||||
if b.Buffering == nil {
|
if b.Buffering == nil {
|
||||||
|
|
|
@ -337,6 +337,7 @@ func (p *Provider) loadIngresses(k8sClient Client) (*types.Configuration, error)
|
||||||
templateObjects.Backends[baseName].LoadBalancer = getLoadBalancer(service)
|
templateObjects.Backends[baseName].LoadBalancer = getLoadBalancer(service)
|
||||||
templateObjects.Backends[baseName].MaxConn = getMaxConn(service)
|
templateObjects.Backends[baseName].MaxConn = getMaxConn(service)
|
||||||
templateObjects.Backends[baseName].Buffering = getBuffering(service)
|
templateObjects.Backends[baseName].Buffering = getBuffering(service)
|
||||||
|
templateObjects.Backends[baseName].ResponseForwarding = getResponseForwarding(service)
|
||||||
|
|
||||||
protocol := label.DefaultProtocol
|
protocol := label.DefaultProtocol
|
||||||
|
|
||||||
|
@ -494,6 +495,7 @@ func (p *Provider) addGlobalBackend(cl Client, i *extensionsv1beta1.Ingress, tem
|
||||||
templateObjects.Backends[defaultBackendName].LoadBalancer = getLoadBalancer(service)
|
templateObjects.Backends[defaultBackendName].LoadBalancer = getLoadBalancer(service)
|
||||||
templateObjects.Backends[defaultBackendName].MaxConn = getMaxConn(service)
|
templateObjects.Backends[defaultBackendName].MaxConn = getMaxConn(service)
|
||||||
templateObjects.Backends[defaultBackendName].Buffering = getBuffering(service)
|
templateObjects.Backends[defaultBackendName].Buffering = getBuffering(service)
|
||||||
|
templateObjects.Backends[defaultBackendName].ResponseForwarding = getResponseForwarding(service)
|
||||||
|
|
||||||
endpoints, exists, err := cl.GetEndpoints(service.Namespace, service.Name)
|
endpoints, exists, err := cl.GetEndpoints(service.Namespace, service.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -951,6 +953,17 @@ func getWhiteList(i *extensionsv1beta1.Ingress) *types.WhiteList {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getResponseForwarding(service *corev1.Service) *types.ResponseForwarding {
|
||||||
|
flushIntervalValue := getStringValue(service.Annotations, annotationKubernetesResponseForwardingFlushInterval, "")
|
||||||
|
if len(flushIntervalValue) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &types.ResponseForwarding{
|
||||||
|
FlushInterval: flushIntervalValue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func getBuffering(service *corev1.Service) *types.Buffering {
|
func getBuffering(service *corev1.Service) *types.Buffering {
|
||||||
var buffering *types.Buffering
|
var buffering *types.Buffering
|
||||||
|
|
||||||
|
|
|
@ -908,6 +908,9 @@ func TestServiceAnnotations(t *testing.T) {
|
||||||
iRule(
|
iRule(
|
||||||
iHost("max-conn"),
|
iHost("max-conn"),
|
||||||
iPaths(onePath(iBackend("service4", intstr.FromInt(804))))),
|
iPaths(onePath(iBackend("service4", intstr.FromInt(804))))),
|
||||||
|
iRule(
|
||||||
|
iHost("flush"),
|
||||||
|
iPaths(onePath(iBackend("service5", intstr.FromInt(805))))),
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -958,6 +961,15 @@ retryexpression: IsNetworkError() && Attempts() <= 2
|
||||||
clusterIP("10.0.0.4"),
|
clusterIP("10.0.0.4"),
|
||||||
sPorts(sPort(804, "http"))),
|
sPorts(sPort(804, "http"))),
|
||||||
),
|
),
|
||||||
|
buildService(
|
||||||
|
sName("service5"),
|
||||||
|
sNamespace("testing"),
|
||||||
|
sUID("5"),
|
||||||
|
sAnnotation(annotationKubernetesResponseForwardingFlushInterval, "10ms"),
|
||||||
|
sSpec(
|
||||||
|
clusterIP("10.0.0.5"),
|
||||||
|
sPorts(sPort(80, ""))),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
endpoints := []*corev1.Endpoints{
|
endpoints := []*corev1.Endpoints{
|
||||||
|
@ -1005,6 +1017,17 @@ retryexpression: IsNetworkError() && Attempts() <= 2
|
||||||
eAddresses(eAddress("10.4.0.2")),
|
eAddresses(eAddress("10.4.0.2")),
|
||||||
ePorts(ePort(8080, "http"))),
|
ePorts(ePort(8080, "http"))),
|
||||||
),
|
),
|
||||||
|
buildEndpoint(
|
||||||
|
eNamespace("testing"),
|
||||||
|
eName("service5"),
|
||||||
|
eUID("5"),
|
||||||
|
subset(
|
||||||
|
eAddresses(eAddress("10.4.0.1")),
|
||||||
|
ePorts(ePort(8080, "http"))),
|
||||||
|
subset(
|
||||||
|
eAddresses(eAddress("10.4.0.2")),
|
||||||
|
ePorts(ePort(8080, "http"))),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
watchChan := make(chan interface{})
|
watchChan := make(chan interface{})
|
||||||
|
@ -1028,6 +1051,11 @@ retryexpression: IsNetworkError() && Attempts() <= 2
|
||||||
lbMethod("drr"),
|
lbMethod("drr"),
|
||||||
circuitBreaker("NetworkErrorRatio() > 0.5"),
|
circuitBreaker("NetworkErrorRatio() > 0.5"),
|
||||||
),
|
),
|
||||||
|
backend("flush",
|
||||||
|
servers(),
|
||||||
|
lbMethod("wrr"),
|
||||||
|
responseForwarding("10ms"),
|
||||||
|
),
|
||||||
backend("bar",
|
backend("bar",
|
||||||
servers(
|
servers(
|
||||||
server("http://10.15.0.1:8080", weight(1)),
|
server("http://10.15.0.1:8080", weight(1)),
|
||||||
|
@ -1073,6 +1101,10 @@ retryexpression: IsNetworkError() && Attempts() <= 2
|
||||||
passHostHeader(),
|
passHostHeader(),
|
||||||
routes(
|
routes(
|
||||||
route("max-conn", "Host:max-conn"))),
|
route("max-conn", "Host:max-conn"))),
|
||||||
|
frontend("flush",
|
||||||
|
passHostHeader(),
|
||||||
|
routes(
|
||||||
|
route("flush", "Host:flush"))),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package kv
|
||||||
const (
|
const (
|
||||||
pathBackends = "/backends/"
|
pathBackends = "/backends/"
|
||||||
pathBackendCircuitBreakerExpression = "/circuitbreaker/expression"
|
pathBackendCircuitBreakerExpression = "/circuitbreaker/expression"
|
||||||
|
pathBackendResponseForwardingFlushInterval = "/responseforwarding/flushinterval"
|
||||||
pathBackendHealthCheckScheme = "/healthcheck/scheme"
|
pathBackendHealthCheckScheme = "/healthcheck/scheme"
|
||||||
pathBackendHealthCheckPath = "/healthcheck/path"
|
pathBackendHealthCheckPath = "/healthcheck/path"
|
||||||
pathBackendHealthCheckPort = "/healthcheck/port"
|
pathBackendHealthCheckPort = "/healthcheck/port"
|
||||||
|
|
|
@ -59,6 +59,7 @@ func (p *Provider) buildConfiguration() *types.Configuration {
|
||||||
// Backend functions
|
// Backend functions
|
||||||
"getServers": p.getServers,
|
"getServers": p.getServers,
|
||||||
"getCircuitBreaker": p.getCircuitBreaker,
|
"getCircuitBreaker": p.getCircuitBreaker,
|
||||||
|
"getResponseForwarding": p.getResponseForwarding,
|
||||||
"getLoadBalancer": p.getLoadBalancer,
|
"getLoadBalancer": p.getLoadBalancer,
|
||||||
"getMaxConn": p.getMaxConn,
|
"getMaxConn": p.getMaxConn,
|
||||||
"getHealthCheck": p.getHealthCheck,
|
"getHealthCheck": p.getHealthCheck,
|
||||||
|
@ -269,6 +270,20 @@ func (p *Provider) getLoadBalancer(rootPath string) *types.LoadBalancer {
|
||||||
return lb
|
return lb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Provider) getResponseForwarding(rootPath string) *types.ResponseForwarding {
|
||||||
|
if !p.has(rootPath, pathBackendResponseForwardingFlushInterval) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
value := p.get("", rootPath, pathBackendResponseForwardingFlushInterval)
|
||||||
|
if len(value) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &types.ResponseForwarding{
|
||||||
|
FlushInterval: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Provider) getCircuitBreaker(rootPath string) *types.CircuitBreaker {
|
func (p *Provider) getCircuitBreaker(rootPath string) *types.CircuitBreaker {
|
||||||
if !p.has(rootPath, pathBackendCircuitBreakerExpression) {
|
if !p.has(rootPath, pathBackendCircuitBreakerExpression) {
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -29,6 +29,7 @@ const (
|
||||||
SuffixBackendMaxConnAmount = "backend.maxconn.amount"
|
SuffixBackendMaxConnAmount = "backend.maxconn.amount"
|
||||||
SuffixBackendMaxConnExtractorFunc = "backend.maxconn.extractorfunc"
|
SuffixBackendMaxConnExtractorFunc = "backend.maxconn.extractorfunc"
|
||||||
SuffixBackendBuffering = "backend.buffering"
|
SuffixBackendBuffering = "backend.buffering"
|
||||||
|
SuffixBackendResponseForwardingFlushInterval = "backend.responseForwarding.flushInterval"
|
||||||
SuffixBackendBufferingMaxRequestBodyBytes = SuffixBackendBuffering + ".maxRequestBodyBytes"
|
SuffixBackendBufferingMaxRequestBodyBytes = SuffixBackendBuffering + ".maxRequestBodyBytes"
|
||||||
SuffixBackendBufferingMemRequestBodyBytes = SuffixBackendBuffering + ".memRequestBodyBytes"
|
SuffixBackendBufferingMemRequestBodyBytes = SuffixBackendBuffering + ".memRequestBodyBytes"
|
||||||
SuffixBackendBufferingMaxResponseBodyBytes = SuffixBackendBuffering + ".maxResponseBodyBytes"
|
SuffixBackendBufferingMaxResponseBodyBytes = SuffixBackendBuffering + ".maxResponseBodyBytes"
|
||||||
|
@ -131,6 +132,7 @@ const (
|
||||||
TraefikBackendMaxConnAmount = Prefix + SuffixBackendMaxConnAmount
|
TraefikBackendMaxConnAmount = Prefix + SuffixBackendMaxConnAmount
|
||||||
TraefikBackendMaxConnExtractorFunc = Prefix + SuffixBackendMaxConnExtractorFunc
|
TraefikBackendMaxConnExtractorFunc = Prefix + SuffixBackendMaxConnExtractorFunc
|
||||||
TraefikBackendBuffering = Prefix + SuffixBackendBuffering
|
TraefikBackendBuffering = Prefix + SuffixBackendBuffering
|
||||||
|
TraefikBackendResponseForwardingFlushInterval = Prefix + SuffixBackendResponseForwardingFlushInterval
|
||||||
TraefikBackendBufferingMaxRequestBodyBytes = Prefix + SuffixBackendBufferingMaxRequestBodyBytes
|
TraefikBackendBufferingMaxRequestBodyBytes = Prefix + SuffixBackendBufferingMaxRequestBodyBytes
|
||||||
TraefikBackendBufferingMemRequestBodyBytes = Prefix + SuffixBackendBufferingMemRequestBodyBytes
|
TraefikBackendBufferingMemRequestBodyBytes = Prefix + SuffixBackendBufferingMemRequestBodyBytes
|
||||||
TraefikBackendBufferingMaxResponseBodyBytes = Prefix + SuffixBackendBufferingMaxResponseBodyBytes
|
TraefikBackendBufferingMaxResponseBodyBytes = Prefix + SuffixBackendBufferingMaxResponseBodyBytes
|
||||||
|
|
|
@ -354,6 +354,19 @@ func GetHealthCheck(labels map[string]string) *types.HealthCheck {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetResponseForwarding Create ResponseForwarding from labels
|
||||||
|
func GetResponseForwarding(labels map[string]string) *types.ResponseForwarding {
|
||||||
|
if !HasPrefix(labels, TraefikBackendResponseForwardingFlushInterval) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
value := GetStringValue(labels, TraefikBackendResponseForwardingFlushInterval, "0")
|
||||||
|
|
||||||
|
return &types.ResponseForwarding{
|
||||||
|
FlushInterval: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// GetBuffering Create buffering from labels
|
// GetBuffering Create buffering from labels
|
||||||
func GetBuffering(labels map[string]string) *types.Buffering {
|
func GetBuffering(labels map[string]string) *types.Buffering {
|
||||||
if !HasPrefix(labels, TraefikBackendBuffering) {
|
if !HasPrefix(labels, TraefikBackendBuffering) {
|
||||||
|
|
|
@ -38,6 +38,7 @@ func (p *Provider) buildConfigurationV2(applications *marathon.Applications) *ty
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
"getServers": p.getServers,
|
"getServers": p.getServers,
|
||||||
|
|
||||||
// Frontend functions
|
// Frontend functions
|
||||||
|
|
|
@ -357,6 +357,7 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
withLabel(label.TraefikBackend, "foobar"),
|
withLabel(label.TraefikBackend, "foobar"),
|
||||||
|
|
||||||
withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"),
|
withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"),
|
||||||
|
withLabel(label.TraefikBackendResponseForwardingFlushInterval, "10ms"),
|
||||||
withLabel(label.TraefikBackendHealthCheckScheme, "http"),
|
withLabel(label.TraefikBackendHealthCheckScheme, "http"),
|
||||||
withLabel(label.TraefikBackendHealthCheckPath, "/health"),
|
withLabel(label.TraefikBackendHealthCheckPath, "/health"),
|
||||||
withLabel(label.TraefikBackendHealthCheckPort, "880"),
|
withLabel(label.TraefikBackendHealthCheckPort, "880"),
|
||||||
|
@ -586,6 +587,9 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -35,6 +35,7 @@ func (p *Provider) buildConfigurationV2(tasks []state.Task) *types.Configuration
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
"getServers": p.getServers,
|
"getServers": p.getServers,
|
||||||
"getHost": p.getHost,
|
"getHost": p.getHost,
|
||||||
"getServerPort": p.getServerPort,
|
"getServerPort": p.getServerPort,
|
||||||
|
|
|
@ -314,6 +314,7 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
withLabel(label.TraefikBackend, "foobar"),
|
withLabel(label.TraefikBackend, "foobar"),
|
||||||
|
|
||||||
withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"),
|
withLabel(label.TraefikBackendCircuitBreakerExpression, "NetworkErrorRatio() > 0.5"),
|
||||||
|
withLabel(label.TraefikBackendResponseForwardingFlushInterval, "10ms"),
|
||||||
withLabel(label.TraefikBackendHealthCheckScheme, "http"),
|
withLabel(label.TraefikBackendHealthCheckScheme, "http"),
|
||||||
withLabel(label.TraefikBackendHealthCheckPath, "/health"),
|
withLabel(label.TraefikBackendHealthCheckPath, "/health"),
|
||||||
withLabel(label.TraefikBackendHealthCheckPort, "880"),
|
withLabel(label.TraefikBackendHealthCheckPort, "880"),
|
||||||
|
@ -546,6 +547,9 @@ func TestBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Stickiness: &types.Stickiness{
|
Stickiness: &types.Stickiness{
|
||||||
|
|
|
@ -25,6 +25,7 @@ func (p *Provider) buildConfigurationV2(services []rancherData) *types.Configura
|
||||||
"getMaxConn": label.GetMaxConn,
|
"getMaxConn": label.GetMaxConn,
|
||||||
"getHealthCheck": label.GetHealthCheck,
|
"getHealthCheck": label.GetHealthCheck,
|
||||||
"getBuffering": label.GetBuffering,
|
"getBuffering": label.GetBuffering,
|
||||||
|
"getResponseForwarding": label.GetResponseForwarding,
|
||||||
"getServers": getServers,
|
"getServers": getServers,
|
||||||
|
|
||||||
// Frontend functions
|
// Frontend functions
|
||||||
|
|
|
@ -41,6 +41,7 @@ func TestProviderBuildConfiguration(t *testing.T) {
|
||||||
label.TraefikBackend: "foobar",
|
label.TraefikBackend: "foobar",
|
||||||
|
|
||||||
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
label.TraefikBackendCircuitBreakerExpression: "NetworkErrorRatio() > 0.5",
|
||||||
|
label.TraefikBackendResponseForwardingFlushInterval: "10ms",
|
||||||
label.TraefikBackendHealthCheckScheme: "http",
|
label.TraefikBackendHealthCheckScheme: "http",
|
||||||
label.TraefikBackendHealthCheckPath: "/health",
|
label.TraefikBackendHealthCheckPath: "/health",
|
||||||
label.TraefikBackendHealthCheckPort: "880",
|
label.TraefikBackendHealthCheckPort: "880",
|
||||||
|
@ -277,6 +278,9 @@ func TestProviderBuildConfiguration(t *testing.T) {
|
||||||
CircuitBreaker: &types.CircuitBreaker{
|
CircuitBreaker: &types.CircuitBreaker{
|
||||||
Expression: "NetworkErrorRatio() > 0.5",
|
Expression: "NetworkErrorRatio() > 0.5",
|
||||||
},
|
},
|
||||||
|
ResponseForwarding: &types.ResponseForwarding{
|
||||||
|
FlushInterval: "10ms",
|
||||||
|
},
|
||||||
LoadBalancer: &types.LoadBalancer{
|
LoadBalancer: &types.LoadBalancer{
|
||||||
Method: "drr",
|
Method: "drr",
|
||||||
Sticky: true,
|
Sticky: true,
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/containous/flaeg/parse"
|
||||||
"github.com/containous/mux"
|
"github.com/containous/mux"
|
||||||
"github.com/containous/traefik/configuration"
|
"github.com/containous/traefik/configuration"
|
||||||
"github.com/containous/traefik/healthcheck"
|
"github.com/containous/traefik/healthcheck"
|
||||||
|
@ -163,7 +164,7 @@ func (s *Server) loadFrontendConfig(
|
||||||
postConfigs = append(postConfigs, postConfig)
|
postConfigs = append(postConfigs, postConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
fwd, err := s.buildForwarder(entryPointName, entryPoint, frontendName, frontend, responseModifier)
|
fwd, err := s.buildForwarder(entryPointName, entryPoint, frontendName, frontend, responseModifier, backend)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create the forwarder for frontend %s: %v", frontendName, err)
|
return nil, fmt.Errorf("failed to create the forwarder for frontend %s: %v", frontendName, err)
|
||||||
}
|
}
|
||||||
|
@ -216,7 +217,7 @@ func (s *Server) loadFrontendConfig(
|
||||||
|
|
||||||
func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration.EntryPoint,
|
func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration.EntryPoint,
|
||||||
frontendName string, frontend *types.Frontend,
|
frontendName string, frontend *types.Frontend,
|
||||||
responseModifier modifyResponse) (http.Handler, error) {
|
responseModifier modifyResponse, backend *types.Backend) (http.Handler, error) {
|
||||||
|
|
||||||
roundTripper, err := s.getRoundTripper(entryPointName, frontend.PassTLSCert, entryPoint.TLS)
|
roundTripper, err := s.getRoundTripper(entryPointName, frontend.PassTLSCert, entryPoint.TLS)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -228,6 +229,14 @@ func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration
|
||||||
return nil, fmt.Errorf("error creating rewriter for frontend %s: %v", frontendName, err)
|
return nil, fmt.Errorf("error creating rewriter for frontend %s: %v", frontendName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var flushInterval parse.Duration
|
||||||
|
if backend.ResponseForwarding != nil {
|
||||||
|
err := flushInterval.Set(backend.ResponseForwarding.FlushInterval)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating flush interval for frontend %s: %v", frontendName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var fwd http.Handler
|
var fwd http.Handler
|
||||||
fwd, err = forward.New(
|
fwd, err = forward.New(
|
||||||
forward.Stream(true),
|
forward.Stream(true),
|
||||||
|
@ -236,6 +245,7 @@ func (s *Server) buildForwarder(entryPointName string, entryPoint *configuration
|
||||||
forward.Rewriter(rewriter),
|
forward.Rewriter(rewriter),
|
||||||
forward.ResponseModifier(responseModifier),
|
forward.ResponseModifier(responseModifier),
|
||||||
forward.BufferPool(s.bufferPool),
|
forward.BufferPool(s.bufferPool),
|
||||||
|
forward.StreamingFlushInterval(time.Duration(flushInterval)),
|
||||||
forward.WebsocketConnectionClosedHook(func(req *http.Request, conn net.Conn) {
|
forward.WebsocketConnectionClosedHook(func(req *http.Request, conn net.Conn) {
|
||||||
server := req.Context().Value(http.ServerContextKey).(*http.Server)
|
server := req.Context().Value(http.ServerContextKey).(*http.Server)
|
||||||
if server != nil {
|
if server != nil {
|
||||||
|
|
|
@ -8,6 +8,14 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $service.TraefikLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $service.TraefikLabels }}
|
{{ $loadBalancer := getLoadBalancer $service.TraefikLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -9,6 +9,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -8,6 +8,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $firstInstance.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $serviceName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $firstInstance.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $serviceName }}".loadBalancer]
|
[backends."backend-{{ $serviceName }}".loadBalancer]
|
||||||
|
|
|
@ -8,6 +8,11 @@
|
||||||
expression = "{{ $backend.CircuitBreaker.Expression }}"
|
expression = "{{ $backend.CircuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{if $backend.ResponseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $backend.responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
method = "{{ $backend.LoadBalancer.Method }}"
|
method = "{{ $backend.LoadBalancer.Method }}"
|
||||||
sticky = {{ $backend.LoadBalancer.Sticky }}
|
sticky = {{ $backend.LoadBalancer.Sticky }}
|
||||||
|
|
|
@ -8,6 +8,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.flushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend }}
|
{{ $loadBalancer := getLoadBalancer $backend }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -11,6 +11,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $app.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $app.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $app.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."{{ $backendName }}".loadBalancer]
|
[backends."{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -11,6 +11,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $app.TraefikLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $app.TraefikLabels }}
|
{{ $loadBalancer := getLoadBalancer $app.TraefikLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -10,6 +10,12 @@
|
||||||
expression = "{{ $circuitBreaker.Expression }}"
|
expression = "{{ $circuitBreaker.Expression }}"
|
||||||
{{end}}
|
{{end}}
|
||||||
|
|
||||||
|
{{ $responseForwarding := getResponseForwarding $backend.SegmentLabels }}
|
||||||
|
{{if $responseForwarding }}
|
||||||
|
[backends."backend-{{ $backendName }}".responseForwarding]
|
||||||
|
flushInterval = "{{ $responseForwarding.FlushInterval }}"
|
||||||
|
{{end}}
|
||||||
|
|
||||||
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
{{ $loadBalancer := getLoadBalancer $backend.SegmentLabels }}
|
||||||
{{if $loadBalancer }}
|
{{if $loadBalancer }}
|
||||||
[backends."backend-{{ $backendName }}".loadBalancer]
|
[backends."backend-{{ $backendName }}".loadBalancer]
|
||||||
|
|
|
@ -28,6 +28,12 @@ type Backend struct {
|
||||||
MaxConn *MaxConn `json:"maxConn,omitempty"`
|
MaxConn *MaxConn `json:"maxConn,omitempty"`
|
||||||
HealthCheck *HealthCheck `json:"healthCheck,omitempty"`
|
HealthCheck *HealthCheck `json:"healthCheck,omitempty"`
|
||||||
Buffering *Buffering `json:"buffering,omitempty"`
|
Buffering *Buffering `json:"buffering,omitempty"`
|
||||||
|
ResponseForwarding *ResponseForwarding `json:"forwardingResponse,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResponseForwarding holds configuration for the forward of the response
|
||||||
|
type ResponseForwarding struct {
|
||||||
|
FlushInterval string `json:"flushInterval,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxConn holds maximum connection configuration
|
// MaxConn holds maximum connection configuration
|
||||||
|
|
Loading…
Reference in a new issue