Sync vendor and glide.

This commit is contained in:
Ludovic Fernandez 2017-11-29 13:26:03 +01:00 committed by Traefiker
parent 9fe6a0a894
commit 7081f3df58
12 changed files with 374 additions and 190 deletions

15
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 03f8c75c2523eaea36cd61abc4d335ce68115c80404de7a10d8d4f0225957fc9 hash: 4595cac0a682ce8e36b78630f12a3cfab75307fc58cb4a1f5e416017d3ae20d6
updated: 2017-11-28T15:59:09.002650953+01:00 updated: 2017-11-29T12:05:49.613148632+01:00
imports: imports:
- name: cloud.google.com/go - name: cloud.google.com/go
version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c
@ -111,10 +111,6 @@ imports:
- pkg/srv - pkg/srv
- pkg/types - pkg/types
- version - version
- name: github.com/coreos/go-semver
version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6
subpackages:
- semver
- name: github.com/coreos/go-oidc - name: github.com/coreos/go-oidc
version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d version: 5644a2f50e2d2d5ba0b474bc5bc55fea1925936d
subpackages: subpackages:
@ -123,6 +119,10 @@ imports:
- key - key
- oauth2 - oauth2
- oidc - oidc
- name: github.com/coreos/go-semver
version: 8ab6407b697782a06568d4b7f1db25550ec2e4c6
subpackages:
- semver
- name: github.com/coreos/go-systemd - name: github.com/coreos/go-systemd
version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6
subpackages: subpackages:
@ -645,9 +645,10 @@ imports:
subpackages: subpackages:
- googleapis/rpc/status - googleapis/rpc/status
- name: google.golang.org/grpc - name: google.golang.org/grpc
version: b8669c35455183da6d5c474ea6e72fbf55183274 version: b3ddf786825de56a4178401b7e174ee332173b66
subpackages: subpackages:
- codes - codes
- connectivity
- credentials - credentials
- grpclb/grpc_lb_v1 - grpclb/grpc_lb_v1
- grpclog - grpclog

View file

@ -139,7 +139,7 @@ import:
- package: golang.org/x/sys - package: golang.org/x/sys
version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9
- package: golang.org/x/net - package: golang.org/x/net
version: 242b6b35177ec3909636b6cf6a47e8c2c6324b5d version: c8c74377599bd978aee1cf3b9b63a8634051cec2
subpackages: subpackages:
- http2 - http2
- context - context
@ -179,7 +179,7 @@ import:
subpackages: subpackages:
- proto - proto
- package: github.com/golang/protobuf - package: github.com/golang/protobuf
version: 2bba0603135d7d7f5cb73b2125beeda19c09f4ef version: 4bd1920723d7b7c925de087aa32e2187708897f7
- package: github.com/rancher/go-rancher - package: github.com/rancher/go-rancher
version: 52e2f489534007ae843065468c5a1920d542afa4 version: 52e2f489534007ae843065468c5a1920d542afa4
- package: golang.org/x/oauth2 - package: golang.org/x/oauth2

View file

@ -0,0 +1,23 @@
package servicefabric
import "strings"
func hasServiceLabel(service ServiceItemExtended, key string) bool {
_, exists := service.Labels[key]
return exists
}
func getFuncBoolLabel(labelName string) func(service ServiceItemExtended) bool {
return func(service ServiceItemExtended) bool {
return getBoolLabel(service, labelName)
}
}
func getBoolLabel(service ServiceItemExtended, labelName string) bool {
value, exists := service.Labels[labelName]
return exists && strings.EqualFold(strings.TrimSpace(value), "true")
}
func getServiceLabelValue(service ServiceItemExtended, key string) string {
return service.Labels[key]
}

View file

@ -92,6 +92,7 @@ func (p *Provider) updateConfig(configurationChan chan<- types.ConfigMessage, po
"getServiceLabelsWithPrefix": getServiceLabelsWithPrefix, "getServiceLabelsWithPrefix": getServiceLabelsWithPrefix,
"getServicesWithLabelValueMap": getServicesWithLabelValueMap, "getServicesWithLabelValueMap": getServicesWithLabelValueMap,
"getServicesWithLabelValue": getServicesWithLabelValue, "getServicesWithLabelValue": getServicesWithLabelValue,
"isExposed": getFuncBoolLabel("expose"),
} }
configuration, err := p.GetConfiguration(tmpl, sfFuncMap, templateObjects) configuration, err := p.GetConfiguration(tmpl, sfFuncMap, templateObjects)
@ -235,15 +236,6 @@ func getValidInstances(sfClient sfClient, app sf.ApplicationItem, service sf.Ser
return validInstances return validInstances
} }
func hasServiceLabel(service ServiceItemExtended, key string) bool {
_, exists := service.Labels[key]
return exists
}
func getServiceLabelValue(service ServiceItemExtended, key string) string {
return service.Labels[key]
}
func getServicesWithLabelValueMap(services []ServiceItemExtended, key string) map[string][]ServiceItemExtended { func getServicesWithLabelValueMap(services []ServiceItemExtended, key string) map[string][]ServiceItemExtended {
result := map[string][]ServiceItemExtended{} result := map[string][]ServiceItemExtended{}
for _, service := range services { for _, service := range services {

View file

@ -91,13 +91,12 @@ const tmpl = `
{{end}} {{end}}
{{end}} {{end}}
{{range $service := .Services}} {{range $service := .Services}}
{{if hasServiceLabel $service "expose"}} {{if isExposed $service}}
{{if eq $service.ServiceKind "Stateless"}} {{if eq $service.ServiceKind "Stateless"}}
[frontends."{{$service.Name}}"] [frontends."{{$service.Name}}"]
backend = "{{$service.Name}}" backend = "{{$service.Name}}"
{{if hasServiceLabel $service "frontend.passHostHeader"}} {{if hasServiceLabel $service "frontend.passHostHeader"}}
passHostHeader = {{getServiceLabelValue $service "frontend.passHostHeader" }} passHostHeader = {{getServiceLabelValue $service "frontend.passHostHeader" }}
{{end}} {{end}}

View file

@ -74,9 +74,6 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
} }
c.trailerMD = stream.Trailer() c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil return nil
} }
@ -262,6 +259,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
return toRPCErr(err) return toRPCErr(err)
} }
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
if err != nil { if err != nil {
if put != nil { if put != nil {

View file

@ -20,7 +20,6 @@ package grpc
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"strings" "strings"
"sync" "sync"
@ -28,6 +27,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/trace" "golang.org/x/net/trace"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
@ -312,8 +312,10 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{ cc := &ClientConn{
target: target, target: target,
csMgr: &connectivityStateManager{},
conns: make(map[Address]*addrConn), conns: make(map[Address]*addrConn),
} }
cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts { for _, opt := range opts {
@ -443,37 +445,95 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return cc, nil return cc, nil
} }
// ConnectivityState indicates the state of a client connection. // connectivityStateEvaluator gets updated by addrConns when their
type ConnectivityState int // states transition, based on which it evaluates the state of
// ClientConn.
// Note: This code will eventually sit in the balancer in the new design.
type connectivityStateEvaluator struct {
csMgr *connectivityStateManager
mu sync.Mutex
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
const ( // recordTransition records state change happening in every addrConn and based on
// Idle indicates the ClientConn is idle. // that it evaluates what state the ClientConn is in.
Idle ConnectivityState = iota // It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
// Connecting indicates the ClienConn is connecting. // Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
Connecting // before any addrConn is created ClientConn is in idle state. In the end when ClientConn
// Ready indicates the ClientConn is ready for work. // closes it is in connectivity.Shutdown state.
Ready // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
// TransientFailure indicates the ClientConn has seen a failure but expects to recover. func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
TransientFailure cse.mu.Lock()
// Shutdown indicates the ClientConn has started shutting down. defer cse.mu.Unlock()
Shutdown
)
func (s ConnectivityState) String() string { // Update counters.
switch s { for idx, state := range []connectivity.State{oldState, newState} {
case Idle: updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
return "IDLE" switch state {
case Connecting: case connectivity.Ready:
return "CONNECTING" cse.numReady += updateVal
case Ready: case connectivity.Connecting:
return "READY" cse.numConnecting += updateVal
case TransientFailure: case connectivity.TransientFailure:
return "TRANSIENT_FAILURE" cse.numTransientFailure += updateVal
case Shutdown: }
return "SHUTDOWN"
default:
panic(fmt.Sprintf("unknown connectivity state: %d", s))
} }
// Evaluate.
if cse.numReady > 0 {
cse.csMgr.updateState(connectivity.Ready)
return
}
if cse.numConnecting > 0 {
cse.csMgr.updateState(connectivity.Connecting)
return
}
cse.csMgr.updateState(connectivity.TransientFailure)
}
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {
mu sync.Mutex
state connectivity.State
notifyChan chan struct{}
}
// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {
csm.mu.Lock()
defer csm.mu.Unlock()
if csm.state == connectivity.Shutdown {
return
}
if csm.state == state {
return
}
csm.state = state
if csm.notifyChan != nil {
// There are other goroutines waiting on this channel.
close(csm.notifyChan)
csm.notifyChan = nil
}
}
func (csm *connectivityStateManager) getState() connectivity.State {
csm.mu.Lock()
defer csm.mu.Unlock()
return csm.state
}
func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
csm.mu.Lock()
defer csm.mu.Unlock()
if csm.notifyChan == nil {
csm.notifyChan = make(chan struct{})
}
return csm.notifyChan
} }
// ClientConn represents a client connection to an RPC server. // ClientConn represents a client connection to an RPC server.
@ -484,6 +544,8 @@ type ClientConn struct {
target string target string
authority string authority string
dopts dialOptions dopts dialOptions
csMgr *connectivityStateManager
csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
mu sync.RWMutex mu sync.RWMutex
sc ServiceConfig sc ServiceConfig
@ -492,6 +554,28 @@ type ClientConn struct {
mkp keepalive.ClientParameters mkp keepalive.ClientParameters
} }
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
ch := cc.csMgr.getNotifyChan()
if cc.csMgr.getState() != sourceState {
return true
}
select {
case <-ctx.Done():
return false
case <-ch:
return true
}
}
// GetState returns the connectivity.State of ClientConn.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
// lbWatcher watches the Notify channel of the balancer in cc and manages // lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the // connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made. // first successfull connection is made.
@ -522,14 +606,18 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
} }
cc.mu.Unlock() cc.mu.Unlock()
for _, a := range add { for _, a := range add {
var err error
if doneChan != nil { if doneChan != nil {
err := cc.resetAddrConn(a, true, nil) err = cc.resetAddrConn(a, true, nil)
if err == nil { if err == nil {
close(doneChan) close(doneChan)
doneChan = nil doneChan = nil
} }
} else { } else {
cc.resetAddrConn(a, false, nil) err = cc.resetAddrConn(a, false, nil)
}
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
} }
} }
for _, c := range del { for _, c := range del {
@ -570,7 +658,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
dopts: cc.dopts, dopts: cc.dopts,
} }
ac.ctx, ac.cancel = context.WithCancel(cc.ctx) ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu) ac.csEvltr = cc.csEvltr
if EnableTracing { if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
} }
@ -727,6 +815,7 @@ func (cc *ClientConn) Close() error {
} }
conns := cc.conns conns := cc.conns
cc.conns = nil cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock() cc.mu.Unlock()
if cc.dopts.balancer != nil { if cc.dopts.balancer != nil {
cc.dopts.balancer.Close() cc.dopts.balancer.Close()
@ -747,10 +836,11 @@ type addrConn struct {
dopts dialOptions dopts dialOptions
events trace.EventLog events trace.EventLog
mu sync.Mutex csEvltr *connectivityStateEvaluator
state ConnectivityState
stateCV *sync.Cond mu sync.Mutex
down func(error) // the handler called when a connection is down. state connectivity.State
down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed // ready is closed and becomes nil when a new transport is up or failed
// due to timeout. // due to timeout.
ready chan struct{} ready chan struct{}
@ -790,49 +880,13 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
} }
} }
// getState returns the connectivity state of the Conn
func (ac *addrConn) getState() ConnectivityState {
ac.mu.Lock()
defer ac.mu.Unlock()
return ac.state
}
// waitForStateChange blocks until the state changes to something other than the sourceState.
func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
ac.mu.Lock()
defer ac.mu.Unlock()
if sourceState != ac.state {
return ac.state, nil
}
done := make(chan struct{})
var err error
go func() {
select {
case <-ctx.Done():
ac.mu.Lock()
err = ctx.Err()
ac.stateCV.Broadcast()
ac.mu.Unlock()
case <-done:
}
}()
defer close(done)
for sourceState == ac.state {
ac.stateCV.Wait()
if err != nil {
return ac.state, err
}
}
return ac.state, nil
}
// resetTransport recreates a transport to the address for ac. // resetTransport recreates a transport to the address for ac.
// For the old transport: // For the old transport:
// - if drain is true, it will be gracefully closed. // - if drain is true, it will be gracefully closed.
// - otherwise, it will be closed. // - otherwise, it will be closed.
func (ac *addrConn) resetTransport(drain bool) error { func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock() ac.mu.Lock()
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
ac.mu.Unlock() ac.mu.Unlock()
return errConnClosing return errConnClosing
} }
@ -841,24 +895,21 @@ func (ac *addrConn) resetTransport(drain bool) error {
ac.down(downErrorf(false, true, "%v", errNetworkIO)) ac.down(downErrorf(false, true, "%v", errNetworkIO))
ac.down = nil ac.down = nil
} }
ac.state = Connecting oldState := ac.state
ac.stateCV.Broadcast() ac.state = connectivity.Connecting
ac.csEvltr.recordTransition(oldState, ac.state)
t := ac.transport t := ac.transport
ac.transport = nil ac.transport = nil
ac.mu.Unlock() ac.mu.Unlock()
if t != nil { if t != nil && !drain {
if drain { t.Close()
t.GracefulClose()
} else {
t.Close()
}
} }
ac.cc.mu.RLock() ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock() ac.cc.mu.RUnlock()
for retries := 0; ; retries++ { for retries := 0; ; retries++ {
ac.mu.Lock() ac.mu.Lock()
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked. // ac.tearDown(...) has been invoked.
ac.mu.Unlock() ac.mu.Unlock()
return errConnClosing return errConnClosing
@ -886,14 +937,15 @@ func (ac *addrConn) resetTransport(drain bool) error {
} }
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock() ac.mu.Lock()
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked. // ac.tearDown(...) has been invoked.
ac.mu.Unlock() ac.mu.Unlock()
return errConnClosing return errConnClosing
} }
ac.errorf("transient failure: %v", err) ac.errorf("transient failure: %v", err)
ac.state = TransientFailure oldState = ac.state
ac.stateCV.Broadcast() ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil { if ac.ready != nil {
close(ac.ready) close(ac.ready)
ac.ready = nil ac.ready = nil
@ -911,14 +963,15 @@ func (ac *addrConn) resetTransport(drain bool) error {
} }
ac.mu.Lock() ac.mu.Lock()
ac.printf("ready") ac.printf("ready")
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked. // ac.tearDown(...) has been invoked.
ac.mu.Unlock() ac.mu.Unlock()
newTransport.Close() newTransport.Close()
return errConnClosing return errConnClosing
} }
ac.state = Ready oldState = ac.state
ac.stateCV.Broadcast() ac.state = connectivity.Ready
ac.csEvltr.recordTransition(oldState, ac.state)
ac.transport = newTransport ac.transport = newTransport
if ac.ready != nil { if ac.ready != nil {
close(ac.ready) close(ac.ready)
@ -988,13 +1041,14 @@ func (ac *addrConn) transportMonitor() {
default: default:
} }
ac.mu.Lock() ac.mu.Lock()
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
// ac has been shutdown. // ac has been shutdown.
ac.mu.Unlock() ac.mu.Unlock()
return return
} }
ac.state = TransientFailure oldState := ac.state
ac.stateCV.Broadcast() ac.state = connectivity.TransientFailure
ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock() ac.mu.Unlock()
if err := ac.resetTransport(false); err != nil { if err := ac.resetTransport(false); err != nil {
grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
@ -1013,12 +1067,12 @@ func (ac *addrConn) transportMonitor() {
} }
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
// iv) transport is in TransientFailure and there is a balancer/failfast is true. // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for { for {
ac.mu.Lock() ac.mu.Lock()
switch { switch {
case ac.state == Shutdown: case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer { if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr err := ac.tearDownErr
@ -1027,11 +1081,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
} }
ac.mu.Unlock() ac.mu.Unlock()
return nil, errConnClosing return nil, errConnClosing
case ac.state == Ready: case ac.state == connectivity.Ready:
ct := ac.transport ct := ac.transport
ac.mu.Unlock() ac.mu.Unlock()
return ct, nil return ct, nil
case ac.state == TransientFailure: case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer { if failfast || hasBalancer {
ac.mu.Unlock() ac.mu.Unlock()
return nil, errConnUnavailable return nil, errConnUnavailable
@ -1073,12 +1127,13 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway. // address removal and GoAway.
ac.transport.GracefulClose() ac.transport.GracefulClose()
} }
if ac.state == Shutdown { if ac.state == connectivity.Shutdown {
return return
} }
ac.state = Shutdown oldState := ac.state
ac.state = connectivity.Shutdown
ac.tearDownErr = err ac.tearDownErr = err
ac.stateCV.Broadcast() ac.csEvltr.recordTransition(oldState, ac.state)
if ac.events != nil { if ac.events != nil {
ac.events.Finish() ac.events.Finish()
ac.events = nil ac.events = nil

View file

@ -0,0 +1,72 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
// All APIs in this package are experimental.
package connectivity
import (
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
)
// State indicates the state of connectivity.
// It can be the state of a ClientConn or SubConn.
type State int
func (s State) String() string {
switch s {
case Idle:
return "IDLE"
case Connecting:
return "CONNECTING"
case Ready:
return "READY"
case TransientFailure:
return "TRANSIENT_FAILURE"
case Shutdown:
return "SHUTDOWN"
default:
grpclog.Errorf("unknown connectivity state: %d", s)
return "Invalid-State"
}
}
const (
// Idle indicates the ClientConn is idle.
Idle State = iota
// Connecting indicates the ClienConn is connecting.
Connecting
// Ready indicates the ClientConn is ready for work.
Ready
// TransientFailure indicates the ClientConn has seen a failure but expects to recover.
TransientFailure
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
// Reporter reports the connectivity states.
type Reporter interface {
// CurrentState returns the current state of the reporter.
CurrentState() State
// WaitForStateChange blocks until the reporter's state is different from the given state,
// and returns true.
// It returns false if <-ctx.Done() can proceed (ctx got timeout or got canceled).
WaitForStateChange(context.Context, State) bool
}

View file

@ -519,6 +519,6 @@ const SupportPackageIsVersion3 = true
const SupportPackageIsVersion4 = true const SupportPackageIsVersion4 = true
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.5.1" const Version = "1.5.2"
const grpcUA = "grpc-go/" + Version const grpcUA = "grpc-go/" + Version

View file

@ -74,6 +74,8 @@ func (*resetStream) item() {}
type goAway struct { type goAway struct {
code http2.ErrCode code http2.ErrCode
debugData []byte debugData []byte
headsUp bool
closeConn bool
} }
func (*goAway) item() {} func (*goAway) item() {}

View file

@ -107,8 +107,6 @@ type http2Client struct {
maxStreams int maxStreams int
// the per-stream outbound flow control window size set by the peer. // the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32 streamSendQuota uint32
// goAwayID records the Last-Stream-ID in the GoAway frame from the server.
goAwayID uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32 prevGoAwayID uint32
// goAwayReason records the http2.ErrCode and debug data received with the // goAwayReason records the http2.ErrCode and debug data received with the
@ -662,24 +660,6 @@ func (t *http2Client) GracefulClose() error {
t.mu.Unlock() t.mu.Unlock()
return nil return nil
} }
// Notify the streams which were initiated after the server sent GOAWAY.
select {
case <-t.goAway:
n := t.prevGoAwayID
if n == 0 && t.nextID > 1 {
n = t.nextID - 2
}
m := t.goAwayID + 2
if m == 2 {
m = 1
}
for i := m; i <= n; i += 2 {
if s, ok := t.activeStreams[i]; ok {
close(s.goAway)
}
}
default:
}
if t.state == draining { if t.state == draining {
t.mu.Unlock() t.mu.Unlock()
return nil return nil
@ -987,36 +967,56 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
} }
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Lock()
if t.state != reachable && t.state != draining {
t.mu.Unlock()
return
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm { if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
} }
t.mu.Lock() id := f.LastStreamID
if t.state == reachable || t.state == draining { if id > 0 && id%2 != 1 {
if f.LastStreamID > 0 && f.LastStreamID%2 != 1 { t.mu.Unlock()
t.mu.Unlock() t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) return
return
}
select {
case <-t.goAway:
id := t.goAwayID
// t.goAway has been closed (i.e.,multiple GoAways).
if id < f.LastStreamID {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
return
}
t.prevGoAwayID = id
t.goAwayID = f.LastStreamID
t.mu.Unlock()
return
default:
t.setGoAwayReason(f)
}
t.goAwayID = f.LastStreamID
close(t.goAway)
} }
// A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
// The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
// with the ID of the last stream the server will process.
// Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
// close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
// was being sent don't get killed.
select {
case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
return
}
default:
t.setGoAwayReason(f)
close(t.goAway)
t.state = draining
}
// All streams with IDs greater than the GoAwayId
// and smaller than the previous GoAway ID should be killed.
upperLimit := t.prevGoAwayID
if upperLimit == 0 { // This is the first GoAway Frame.
upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
}
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
close(stream.goAway)
}
}
t.prevGoAwayID = id
active := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
if active == 0 {
t.Close()
}
} }
// setGoAwayReason sets the value of t.goAwayReason based // setGoAwayReason sets the value of t.goAwayReason based

View file

@ -68,7 +68,6 @@ type http2Server struct {
framer *framer framer *framer
hBuf *bytes.Buffer // the buffer for HPACK encoding hBuf *bytes.Buffer // the buffer for HPACK encoding
hEnc *hpack.Encoder // HPACK encoder hEnc *hpack.Encoder // HPACK encoder
// The max number of concurrent streams. // The max number of concurrent streams.
maxStreams uint32 maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window // controlBuf delivers all the control related tasks (e.g., window
@ -77,9 +76,7 @@ type http2Server struct {
fc *inFlow fc *inFlow
// sendQuotaPool provides flow control to outbound message. // sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool sendQuotaPool *quotaPool
stats stats.Handler
stats stats.Handler
// Flag to keep track of reading activity on transport. // Flag to keep track of reading activity on transport.
// 1 is true and 0 is false. // 1 is true and 0 is false.
activity uint32 // Accessed atomically. activity uint32 // Accessed atomically.
@ -95,13 +92,19 @@ type http2Server struct {
// Flag to signify that number of ping strikes should be reset to 0. // Flag to signify that number of ping strikes should be reset to 0.
// This is set whenever data or header frames are sent. // This is set whenever data or header frames are sent.
// 1 means yes. // 1 means yes.
resetPingStrikes uint32 // Accessed atomically. resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32 initialWindowSize int32
bdpEst *bdpEstimator
bdpEst *bdpEstimator mu sync.Mutex // guard the following
mu sync.Mutex // guard the following // drainChan is initialized when drain(...) is called the first time.
// After which the server writes out the first GoAway(with ID 2^31-1) frame.
// Then an independent goroutine will be launched to later send the second GoAway.
// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
// already underway.
drainChan chan struct{}
state transportState state transportState
activeStreams map[uint32]*Stream activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer. // the per-stream outbound flow control window size set by the peer.
@ -592,6 +595,10 @@ const (
func (t *http2Server) handlePing(f *http2.PingFrame) { func (t *http2Server) handlePing(f *http2.PingFrame) {
if f.IsAck() { if f.IsAck() {
if f.Data == goAwayPing.data && t.drainChan != nil {
close(t.drainChan)
return
}
// Maybe it's a BDP ping. // Maybe it's a BDP ping.
if t.bdpEst != nil { if t.bdpEst != nil {
t.bdpEst.calculate(f.Data) t.bdpEst.calculate(f.Data)
@ -631,7 +638,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
if t.pingStrikes > maxPingStrikes { if t.pingStrikes > maxPingStrikes {
// Send goaway and close the connection. // Send goaway and close the connection.
t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings")}) t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
} }
} }
@ -956,23 +963,18 @@ func (t *http2Server) keepalive() {
continue continue
} }
val := t.kp.MaxConnectionIdle - time.Since(idle) val := t.kp.MaxConnectionIdle - time.Since(idle)
t.mu.Unlock()
if val <= 0 { if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection. // Gracefully close the connection.
t.state = draining t.drain(http2.ErrCodeNo, []byte{})
t.mu.Unlock()
t.Drain()
// Reseting the timer so that the clean-up doesn't deadlock. // Reseting the timer so that the clean-up doesn't deadlock.
maxIdle.Reset(infinity) maxIdle.Reset(infinity)
return return
} }
t.mu.Unlock()
maxIdle.Reset(val) maxIdle.Reset(val)
case <-maxAge.C: case <-maxAge.C:
t.mu.Lock() t.drain(http2.ErrCodeNo, []byte{})
t.state = draining
t.mu.Unlock()
t.Drain()
maxAge.Reset(t.kp.MaxConnectionAgeGrace) maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select { select {
case <-maxAge.C: case <-maxAge.C:
@ -1004,6 +1006,8 @@ func (t *http2Server) keepalive() {
} }
} }
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
// controller running in a separate goroutine takes charge of sending control // controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server. // frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Server) controller() { func (t *http2Server) controller() {
@ -1033,12 +1037,38 @@ func (t *http2Server) controller() {
return return
} }
sid := t.maxStreamID sid := t.maxStreamID
t.state = draining if !i.headsUp {
t.mu.Unlock() // Stop accepting more streams now.
t.framer.writeGoAway(true, sid, i.code, i.debugData) t.state = draining
if i.code == http2.ErrCodeEnhanceYourCalm { t.mu.Unlock()
t.Close() t.framer.writeGoAway(true, sid, i.code, i.debugData)
if i.closeConn {
// Abruptly close the connection following the GoAway.
t.Close()
}
t.writableChan <- 0
continue
} }
t.mu.Unlock()
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
// Follow that with a ping and wait for the ack to come back or a timer
// to expire. During this time accept new streams since they might have
// originated before the GoAway reaches the client.
// After getting the ack or timer expiration send out another GoAway this
// time with an ID of the max stream server intends to process.
t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
t.framer.writePing(true, false, goAwayPing.data)
go func() {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-t.drainChan:
case <-timer.C:
case <-t.shutdownChan:
return
}
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
}()
case *flushIO: case *flushIO:
t.framer.flushWrite() t.framer.flushWrite()
case *ping: case *ping:
@ -1116,7 +1146,17 @@ func (t *http2Server) RemoteAddr() net.Addr {
} }
func (t *http2Server) Drain() { func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{code: http2.ErrCodeNo}) t.drain(http2.ErrCodeNo, []byte{})
}
func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainChan != nil {
return
}
t.drainChan = make(chan struct{})
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
} }
var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))