diff --git a/glide.lock b/glide.lock index cd755e2cf..55d7c44ed 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 03f8c75c2523eaea36cd61abc4d335ce68115c80404de7a10d8d4f0225957fc9 -updated: 2017-11-28T15:59:09.002650953+01:00 +hash: 4595cac0a682ce8e36b78630f12a3cfab75307fc58cb4a1f5e416017d3ae20d6 +updated: 2017-11-29T12:05:49.613148632+01:00 imports: - name: cloud.google.com/go version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c @@ -111,10 +111,6 @@ imports: - pkg/srv - pkg/types - version -- name: github.com/coreos/go-semver - version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6 - subpackages: - - semver - name: github.com/coreos/go-oidc version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d subpackages: @@ -123,6 +119,10 @@ imports: - key - oauth2 - oidc +- name: github.com/coreos/go-semver + version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6 + subpackages: + - semver - name: github.com/coreos/go-systemd version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 subpackages: @@ -645,9 +645,10 @@ imports: subpackages: - googleapis/rpc/status - name: google.golang.org/grpc - version: b8669c35455183da6d5c474ea6e72fbf55183274 + version: b3ddf786825de56a4178401b7e174ee332173b66 subpackages: - codes + - connectivity - credentials - grpclb/grpc_lb_v1 - grpclog diff --git a/glide.yaml b/glide.yaml index e2d86a115..0bf2656a3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -139,7 +139,7 @@ import: - package: golang.org/x/sys version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 - package: golang.org/x/net - version: 242b6b35177ec3909636b6cf6a47e8c2c6324b5d + version: c8c74377599bd978aee1cf3b9b63a8634051cec2 subpackages: - http2 - context @@ -179,7 +179,7 @@ import: subpackages: - proto - package: github.com/golang/protobuf - version: 2bba0603135d7d7f5cb73b2125beeda19c09f4ef + version: 4bd1920723d7b7c925de087aa32e2187708897f7 - package: github.com/rancher/go-rancher version: 52e2f489534007ae843065468c5a1920d542afa4 - package: golang.org/x/oauth2 diff --git a/vendor/github.com/containous/traefik-extra-service-fabric/labels.go b/vendor/github.com/containous/traefik-extra-service-fabric/labels.go new file mode 100644 index 000000000..2dea665ae --- /dev/null +++ b/vendor/github.com/containous/traefik-extra-service-fabric/labels.go @@ -0,0 +1,23 @@ +package servicefabric + +import "strings" + +func hasServiceLabel(service ServiceItemExtended, key string) bool { + _, exists := service.Labels[key] + return exists +} + +func getFuncBoolLabel(labelName string) func(service ServiceItemExtended) bool { + return func(service ServiceItemExtended) bool { + return getBoolLabel(service, labelName) + } +} + +func getBoolLabel(service ServiceItemExtended, labelName string) bool { + value, exists := service.Labels[labelName] + return exists && strings.EqualFold(strings.TrimSpace(value), "true") +} + +func getServiceLabelValue(service ServiceItemExtended, key string) string { + return service.Labels[key] +} diff --git a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go index 487fb6ecc..79ed5ef57 100644 --- a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go +++ b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric.go @@ -92,6 +92,7 @@ func (p *Provider) updateConfig(configurationChan chan<- types.ConfigMessage, po "getServiceLabelsWithPrefix": getServiceLabelsWithPrefix, "getServicesWithLabelValueMap": getServicesWithLabelValueMap, "getServicesWithLabelValue": getServicesWithLabelValue, + "isExposed": getFuncBoolLabel("expose"), } configuration, err := p.GetConfiguration(tmpl, sfFuncMap, templateObjects) @@ -235,15 +236,6 @@ func getValidInstances(sfClient sfClient, app sf.ApplicationItem, service sf.Ser return validInstances } -func hasServiceLabel(service ServiceItemExtended, key string) bool { - _, exists := service.Labels[key] - return exists -} - -func getServiceLabelValue(service ServiceItemExtended, key string) string { - return service.Labels[key] -} - func getServicesWithLabelValueMap(services []ServiceItemExtended, key string) map[string][]ServiceItemExtended { result := map[string][]ServiceItemExtended{} for _, service := range services { diff --git a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_tmpl.go b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_tmpl.go index 64269f8c0..d7af11912 100644 --- a/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_tmpl.go +++ b/vendor/github.com/containous/traefik-extra-service-fabric/servicefabric_tmpl.go @@ -91,13 +91,12 @@ const tmpl = ` {{end}} {{end}} {{range $service := .Services}} - {{if hasServiceLabel $service "expose"}} + {{if isExposed $service}} {{if eq $service.ServiceKind "Stateless"}} [frontends."{{$service.Name}}"] backend = "{{$service.Name}}" - {{if hasServiceLabel $service "frontend.passHostHeader"}} passHostHeader = {{getServiceLabelValue $service "frontend.passHostHeader" }} {{end}} diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index f0b459125..797190f14 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -74,9 +74,6 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) } c.trailerMD = stream.Trailer() - if peer, ok := peer.FromContext(stream.Context()); ok { - c.peer = peer - } return nil } @@ -262,6 +259,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } return toRPCErr(err) } + if peer, ok := peer.FromContext(stream.Context()); ok { + c.peer = peer + } err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) if err != nil { if put != nil { diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index f21a56852..e3e3140f1 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -20,7 +20,6 @@ package grpc import ( "errors" - "fmt" "net" "strings" "sync" @@ -28,6 +27,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -312,8 +312,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, + csMgr: &connectivityStateManager{}, conns: make(map[Address]*addrConn), } + cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} cc.ctx, cc.cancel = context.WithCancel(context.Background()) for _, opt := range opts { @@ -443,37 +445,95 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return cc, nil } -// ConnectivityState indicates the state of a client connection. -type ConnectivityState int +// connectivityStateEvaluator gets updated by addrConns when their +// states transition, based on which it evaluates the state of +// ClientConn. +// Note: This code will eventually sit in the balancer in the new design. +type connectivityStateEvaluator struct { + csMgr *connectivityStateManager + mu sync.Mutex + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transientFailure. +} -const ( - // Idle indicates the ClientConn is idle. - Idle ConnectivityState = iota - // Connecting indicates the ClienConn is connecting. - Connecting - // Ready indicates the ClientConn is ready for work. - Ready - // TransientFailure indicates the ClientConn has seen a failure but expects to recover. - TransientFailure - // Shutdown indicates the ClientConn has started shutting down. - Shutdown -) +// recordTransition records state change happening in every addrConn and based on +// that it evaluates what state the ClientConn is in. +// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, +// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection +// before any addrConn is created ClientConn is in idle state. In the end when ClientConn +// closes it is in connectivity.Shutdown state. +// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { + cse.mu.Lock() + defer cse.mu.Unlock() -func (s ConnectivityState) String() string { - switch s { - case Idle: - return "IDLE" - case Connecting: - return "CONNECTING" - case Ready: - return "READY" - case TransientFailure: - return "TRANSIENT_FAILURE" - case Shutdown: - return "SHUTDOWN" - default: - panic(fmt.Sprintf("unknown connectivity state: %d", s)) + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + cse.numReady += updateVal + case connectivity.Connecting: + cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal + } } + + // Evaluate. + if cse.numReady > 0 { + cse.csMgr.updateState(connectivity.Ready) + return + } + if cse.numConnecting > 0 { + cse.csMgr.updateState(connectivity.Connecting) + return + } + cse.csMgr.updateState(connectivity.TransientFailure) +} + +// connectivityStateManager keeps the connectivity.State of ClientConn. +// This struct will eventually be exported so the balancers can access it. +type connectivityStateManager struct { + mu sync.Mutex + state connectivity.State + notifyChan chan struct{} +} + +// updateState updates the connectivity.State of ClientConn. +// If there's a change it notifies goroutines waiting on state change to +// happen. +func (csm *connectivityStateManager) updateState(state connectivity.State) { + csm.mu.Lock() + defer csm.mu.Unlock() + if csm.state == connectivity.Shutdown { + return + } + if csm.state == state { + return + } + csm.state = state + if csm.notifyChan != nil { + // There are other goroutines waiting on this channel. + close(csm.notifyChan) + csm.notifyChan = nil + } +} + +func (csm *connectivityStateManager) getState() connectivity.State { + csm.mu.Lock() + defer csm.mu.Unlock() + return csm.state +} + +func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { + csm.mu.Lock() + defer csm.mu.Unlock() + if csm.notifyChan == nil { + csm.notifyChan = make(chan struct{}) + } + return csm.notifyChan } // ClientConn represents a client connection to an RPC server. @@ -484,6 +544,8 @@ type ClientConn struct { target string authority string dopts dialOptions + csMgr *connectivityStateManager + csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. mu sync.RWMutex sc ServiceConfig @@ -492,6 +554,28 @@ type ClientConn struct { mkp keepalive.ClientParameters } +// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or +// ctx expires. A true value is returned in former case and false in latter. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { + ch := cc.csMgr.getNotifyChan() + if cc.csMgr.getState() != sourceState { + return true + } + select { + case <-ctx.Done(): + return false + case <-ch: + return true + } +} + +// GetState returns the connectivity.State of ClientConn. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) GetState() connectivity.State { + return cc.csMgr.getState() +} + // lbWatcher watches the Notify channel of the balancer in cc and manages // connections accordingly. If doneChan is not nil, it is closed after the // first successfull connection is made. @@ -522,14 +606,18 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { } cc.mu.Unlock() for _, a := range add { + var err error if doneChan != nil { - err := cc.resetAddrConn(a, true, nil) + err = cc.resetAddrConn(a, true, nil) if err == nil { close(doneChan) doneChan = nil } } else { - cc.resetAddrConn(a, false, nil) + err = cc.resetAddrConn(a, false, nil) + } + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) } } for _, c := range del { @@ -570,7 +658,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) dopts: cc.dopts, } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) - ac.stateCV = sync.NewCond(&ac.mu) + ac.csEvltr = cc.csEvltr if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) } @@ -727,6 +815,7 @@ func (cc *ClientConn) Close() error { } conns := cc.conns cc.conns = nil + cc.csMgr.updateState(connectivity.Shutdown) cc.mu.Unlock() if cc.dopts.balancer != nil { cc.dopts.balancer.Close() @@ -747,10 +836,11 @@ type addrConn struct { dopts dialOptions events trace.EventLog - mu sync.Mutex - state ConnectivityState - stateCV *sync.Cond - down func(error) // the handler called when a connection is down. + csEvltr *connectivityStateEvaluator + + mu sync.Mutex + state connectivity.State + down func(error) // the handler called when a connection is down. // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -790,49 +880,13 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { } } -// getState returns the connectivity state of the Conn -func (ac *addrConn) getState() ConnectivityState { - ac.mu.Lock() - defer ac.mu.Unlock() - return ac.state -} - -// waitForStateChange blocks until the state changes to something other than the sourceState. -func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { - ac.mu.Lock() - defer ac.mu.Unlock() - if sourceState != ac.state { - return ac.state, nil - } - done := make(chan struct{}) - var err error - go func() { - select { - case <-ctx.Done(): - ac.mu.Lock() - err = ctx.Err() - ac.stateCV.Broadcast() - ac.mu.Unlock() - case <-done: - } - }() - defer close(done) - for sourceState == ac.state { - ac.stateCV.Wait() - if err != nil { - return ac.state, err - } - } - return ac.state, nil -} - // resetTransport recreates a transport to the address for ac. // For the old transport: // - if drain is true, it will be gracefully closed. // - otherwise, it will be closed. func (ac *addrConn) resetTransport(drain bool) error { ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { ac.mu.Unlock() return errConnClosing } @@ -841,24 +895,21 @@ func (ac *addrConn) resetTransport(drain bool) error { ac.down(downErrorf(false, true, "%v", errNetworkIO)) ac.down = nil } - ac.state = Connecting - ac.stateCV.Broadcast() + oldState := ac.state + ac.state = connectivity.Connecting + ac.csEvltr.recordTransition(oldState, ac.state) t := ac.transport ac.transport = nil ac.mu.Unlock() - if t != nil { - if drain { - t.GracefulClose() - } else { - t.Close() - } + if t != nil && !drain { + t.Close() } ac.cc.mu.RLock() ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.cc.mu.RUnlock() for retries := 0; ; retries++ { ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing @@ -886,14 +937,15 @@ func (ac *addrConn) resetTransport(drain bool) error { } grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing } ac.errorf("transient failure: %v", err) - ac.state = TransientFailure - ac.stateCV.Broadcast() + oldState = ac.state + ac.state = connectivity.TransientFailure + ac.csEvltr.recordTransition(oldState, ac.state) if ac.ready != nil { close(ac.ready) ac.ready = nil @@ -911,14 +963,15 @@ func (ac *addrConn) resetTransport(drain bool) error { } ac.mu.Lock() ac.printf("ready") - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() newTransport.Close() return errConnClosing } - ac.state = Ready - ac.stateCV.Broadcast() + oldState = ac.state + ac.state = connectivity.Ready + ac.csEvltr.recordTransition(oldState, ac.state) ac.transport = newTransport if ac.ready != nil { close(ac.ready) @@ -988,13 +1041,14 @@ func (ac *addrConn) transportMonitor() { default: } ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac has been shutdown. ac.mu.Unlock() return } - ac.state = TransientFailure - ac.stateCV.Broadcast() + oldState := ac.state + ac.state = connectivity.TransientFailure + ac.csEvltr.recordTransition(oldState, ac.state) ac.mu.Unlock() if err := ac.resetTransport(false); err != nil { grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) @@ -1013,12 +1067,12 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and there is a balancer/failfast is true. +// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { - case ac.state == Shutdown: + case ac.state == connectivity.Shutdown: if failfast || !hasBalancer { // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. err := ac.tearDownErr @@ -1027,11 +1081,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans } ac.mu.Unlock() return nil, errConnClosing - case ac.state == Ready: + case ac.state == connectivity.Ready: ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure: + case ac.state == connectivity.TransientFailure: if failfast || hasBalancer { ac.mu.Unlock() return nil, errConnUnavailable @@ -1073,12 +1127,13 @@ func (ac *addrConn) tearDown(err error) { // address removal and GoAway. ac.transport.GracefulClose() } - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { return } - ac.state = Shutdown + oldState := ac.state + ac.state = connectivity.Shutdown ac.tearDownErr = err - ac.stateCV.Broadcast() + ac.csEvltr.recordTransition(oldState, ac.state) if ac.events != nil { ac.events.Finish() ac.events = nil diff --git a/vendor/google.golang.org/grpc/connectivity/connectivity.go b/vendor/google.golang.org/grpc/connectivity/connectivity.go new file mode 100644 index 000000000..568ef5dc6 --- /dev/null +++ b/vendor/google.golang.org/grpc/connectivity/connectivity.go @@ -0,0 +1,72 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package connectivity defines connectivity semantics. +// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md. +// All APIs in this package are experimental. +package connectivity + +import ( + "golang.org/x/net/context" + "google.golang.org/grpc/grpclog" +) + +// State indicates the state of connectivity. +// It can be the state of a ClientConn or SubConn. +type State int + +func (s State) String() string { + switch s { + case Idle: + return "IDLE" + case Connecting: + return "CONNECTING" + case Ready: + return "READY" + case TransientFailure: + return "TRANSIENT_FAILURE" + case Shutdown: + return "SHUTDOWN" + default: + grpclog.Errorf("unknown connectivity state: %d", s) + return "Invalid-State" + } +} + +const ( + // Idle indicates the ClientConn is idle. + Idle State = iota + // Connecting indicates the ClienConn is connecting. + Connecting + // Ready indicates the ClientConn is ready for work. + Ready + // TransientFailure indicates the ClientConn has seen a failure but expects to recover. + TransientFailure + // Shutdown indicates the ClientConn has started shutting down. + Shutdown +) + +// Reporter reports the connectivity states. +type Reporter interface { + // CurrentState returns the current state of the reporter. + CurrentState() State + // WaitForStateChange blocks until the reporter's state is different from the given state, + // and returns true. + // It returns false if <-ctx.Done() can proceed (ctx got timeout or got canceled). + WaitForStateChange(context.Context, State) bool +} diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 0dbf62fe3..ace206b8f 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -519,6 +519,6 @@ const SupportPackageIsVersion3 = true const SupportPackageIsVersion4 = true // Version is the current grpc version. -const Version = "1.5.1" +const Version = "1.5.2" const grpcUA = "grpc-go/" + Version diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go index de9420a39..501eb03c4 100644 --- a/vendor/google.golang.org/grpc/transport/control.go +++ b/vendor/google.golang.org/grpc/transport/control.go @@ -74,6 +74,8 @@ func (*resetStream) item() {} type goAway struct { code http2.ErrCode debugData []byte + headsUp bool + closeConn bool } func (*goAway) item() {} diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go index a62f4e31a..d4fc6815e 100644 --- a/vendor/google.golang.org/grpc/transport/http2_client.go +++ b/vendor/google.golang.org/grpc/transport/http2_client.go @@ -107,8 +107,6 @@ type http2Client struct { maxStreams int // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 - // goAwayID records the Last-Stream-ID in the GoAway frame from the server. - goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 // goAwayReason records the http2.ErrCode and debug data received with the @@ -662,24 +660,6 @@ func (t *http2Client) GracefulClose() error { t.mu.Unlock() return nil } - // Notify the streams which were initiated after the server sent GOAWAY. - select { - case <-t.goAway: - n := t.prevGoAwayID - if n == 0 && t.nextID > 1 { - n = t.nextID - 2 - } - m := t.goAwayID + 2 - if m == 2 { - m = 1 - } - for i := m; i <= n; i += 2 { - if s, ok := t.activeStreams[i]; ok { - close(s.goAway) - } - } - default: - } if t.state == draining { t.mu.Unlock() return nil @@ -987,36 +967,56 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { + t.mu.Lock() + if t.state != reachable && t.state != draining { + t.mu.Unlock() + return + } if f.ErrCode == http2.ErrCodeEnhanceYourCalm { infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") } - t.mu.Lock() - if t.state == reachable || t.state == draining { - if f.LastStreamID > 0 && f.LastStreamID%2 != 1 { - t.mu.Unlock() - t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) - return - } - select { - case <-t.goAway: - id := t.goAwayID - // t.goAway has been closed (i.e.,multiple GoAways). - if id < f.LastStreamID { - t.mu.Unlock() - t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) - return - } - t.prevGoAwayID = id - t.goAwayID = f.LastStreamID - t.mu.Unlock() - return - default: - t.setGoAwayReason(f) - } - t.goAwayID = f.LastStreamID - close(t.goAway) + id := f.LastStreamID + if id > 0 && id%2 != 1 { + t.mu.Unlock() + t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) + return } + // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387). + // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay + // with the ID of the last stream the server will process. + // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we + // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server + // was being sent don't get killed. + select { + case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). + // If there are multiple GoAways the first one should always have an ID greater than the following ones. + if id > t.prevGoAwayID { + t.mu.Unlock() + t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) + return + } + default: + t.setGoAwayReason(f) + close(t.goAway) + t.state = draining + } + // All streams with IDs greater than the GoAwayId + // and smaller than the previous GoAway ID should be killed. + upperLimit := t.prevGoAwayID + if upperLimit == 0 { // This is the first GoAway Frame. + upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. + } + for streamID, stream := range t.activeStreams { + if streamID > id && streamID <= upperLimit { + close(stream.goAway) + } + } + t.prevGoAwayID = id + active := len(t.activeStreams) t.mu.Unlock() + if active == 0 { + t.Close() + } } // setGoAwayReason sets the value of t.goAwayReason based diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 5bb283ffc..92ab4d9c3 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -68,7 +68,6 @@ type http2Server struct { framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding hEnc *hpack.Encoder // HPACK encoder - // The max number of concurrent streams. maxStreams uint32 // controlBuf delivers all the control related tasks (e.g., window @@ -77,9 +76,7 @@ type http2Server struct { fc *inFlow // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool - - stats stats.Handler - + stats stats.Handler // Flag to keep track of reading activity on transport. // 1 is true and 0 is false. activity uint32 // Accessed atomically. @@ -95,13 +92,19 @@ type http2Server struct { // Flag to signify that number of ping strikes should be reset to 0. // This is set whenever data or header frames are sent. // 1 means yes. - resetPingStrikes uint32 // Accessed atomically. - + resetPingStrikes uint32 // Accessed atomically. initialWindowSize int32 + bdpEst *bdpEstimator - bdpEst *bdpEstimator + mu sync.Mutex // guard the following - mu sync.Mutex // guard the following + // drainChan is initialized when drain(...) is called the first time. + // After which the server writes out the first GoAway(with ID 2^31-1) frame. + // Then an independent goroutine will be launched to later send the second GoAway. + // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. + // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is + // already underway. + drainChan chan struct{} state transportState activeStreams map[uint32]*Stream // the per-stream outbound flow control window size set by the peer. @@ -592,6 +595,10 @@ const ( func (t *http2Server) handlePing(f *http2.PingFrame) { if f.IsAck() { + if f.Data == goAwayPing.data && t.drainChan != nil { + close(t.drainChan) + return + } // Maybe it's a BDP ping. if t.bdpEst != nil { t.bdpEst.calculate(f.Data) @@ -631,7 +638,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) { if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. - t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")}) + t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) } } @@ -956,23 +963,18 @@ func (t *http2Server) keepalive() { continue } val := t.kp.MaxConnectionIdle - time.Since(idle) + t.mu.Unlock() if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. - t.state = draining - t.mu.Unlock() - t.Drain() + t.drain(http2.ErrCodeNo, []byte{}) // Reseting the timer so that the clean-up doesn't deadlock. maxIdle.Reset(infinity) return } - t.mu.Unlock() maxIdle.Reset(val) case <-maxAge.C: - t.mu.Lock() - t.state = draining - t.mu.Unlock() - t.Drain() + t.drain(http2.ErrCodeNo, []byte{}) maxAge.Reset(t.kp.MaxConnectionAgeGrace) select { case <-maxAge.C: @@ -1004,6 +1006,8 @@ func (t *http2Server) keepalive() { } } +var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} + // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Server) controller() { @@ -1033,12 +1037,38 @@ func (t *http2Server) controller() { return } sid := t.maxStreamID - t.state = draining - t.mu.Unlock() - t.framer.writeGoAway(true, sid, i.code, i.debugData) - if i.code == http2.ErrCodeEnhanceYourCalm { - t.Close() + if !i.headsUp { + // Stop accepting more streams now. + t.state = draining + t.mu.Unlock() + t.framer.writeGoAway(true, sid, i.code, i.debugData) + if i.closeConn { + // Abruptly close the connection following the GoAway. + t.Close() + } + t.writableChan <- 0 + continue } + t.mu.Unlock() + // For a graceful close, send out a GoAway with stream ID of MaxUInt32, + // Follow that with a ping and wait for the ack to come back or a timer + // to expire. During this time accept new streams since they might have + // originated before the GoAway reaches the client. + // After getting the ack or timer expiration send out another GoAway this + // time with an ID of the max stream server intends to process. + t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{}) + t.framer.writePing(true, false, goAwayPing.data) + go func() { + timer := time.NewTimer(time.Minute) + defer timer.Stop() + select { + case <-t.drainChan: + case <-timer.C: + case <-t.shutdownChan: + return + } + t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData}) + }() case *flushIO: t.framer.flushWrite() case *ping: @@ -1116,7 +1146,17 @@ func (t *http2Server) RemoteAddr() net.Addr { } func (t *http2Server) Drain() { - t.controlBuf.put(&goAway{code: http2.ErrCodeNo}) + t.drain(http2.ErrCodeNo, []byte{}) +} + +func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { + t.mu.Lock() + defer t.mu.Unlock() + if t.drainChan != nil { + return + } + t.drainChan = make(chan struct{}) + t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) } var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))