From 9fbe21c5346e39db987d77ff3b9226ba5059d033 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Fri, 19 May 2017 14:24:28 +0200 Subject: [PATCH] Upgrade go-marathon to dd6cbd4. Fixes a problem with UnreachableStrategy being available now in two type-incompatible formats (object and string). We also upgrade the transitive dependency github.com/donovanhide/eventsource. --- glide.lock | 8 +- glide.yaml | 2 +- .../donovanhide/eventsource/server.go | 22 +++++ .../donovanhide/eventsource/stream.go | 63 +++++++++++-- .../github.com/gambol99/go-marathon/client.go | 15 +-- .../github.com/gambol99/go-marathon/group.go | 8 +- .../github.com/gambol99/go-marathon/health.go | 2 +- .../gambol99/go-marathon/subscription.go | 92 ++++++++++++++----- .../go-marathon/unreachable_strategy.go | 42 +++++++++ 9 files changed, 206 insertions(+), 48 deletions(-) diff --git a/glide.lock b/glide.lock index 02202d874..f35a851c0 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 6e206389bc4f381387be8b5f0cc5a41224329752654ddb3c2a805adde0333217 -updated: 2017-06-17T14:30:19.890844996+02:00 +hash: 9a62fe4058ef43ed185d96638322dd3d44ae21f7f65fae14c4c6d404876c2872 +updated: 2017-06-28T15:47:14.848940186+02:00 imports: - name: cloud.google.com/go version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c @@ -178,7 +178,7 @@ imports: - store/etcd - store/zookeeper - name: github.com/donovanhide/eventsource - version: fd1de70867126402be23c306e1ce32828455d85b + version: 441a03aa37b3329bbb79f43de81914ea18724718 - name: github.com/eapache/channels version: 47238d5aae8c0fefd518ef2bee46290909cf8263 - name: github.com/eapache/queue @@ -201,7 +201,7 @@ imports: - name: github.com/fatih/color version: 9131ab34cf20d2f6d83fdc67168a5430d1c7dc23 - name: github.com/gambol99/go-marathon - version: 15ea23e360abb8b25071e677aed344f31838e403 + version: dd6cbd4c2d71294a19fb89158f2a00d427f174ab - name: github.com/ghodss/yaml version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee - name: github.com/go-ini/ini diff --git a/glide.yaml b/glide.yaml index face98d36..b6323c01f 100644 --- a/glide.yaml +++ b/glide.yaml @@ -93,7 +93,7 @@ import: - package: k8s.io/client-go version: v2.0.0 - package: github.com/gambol99/go-marathon - version: d672c6fbb499596869d95146a26e7d0746c06c54 + version: dd6cbd4c2d71294a19fb89158f2a00d427f174ab - package: github.com/ArthurHlt/go-eureka-client subpackages: - eureka diff --git a/vendor/github.com/donovanhide/eventsource/server.go b/vendor/github.com/donovanhide/eventsource/server.go index 5090be05e..82cee69dd 100644 --- a/vendor/github.com/donovanhide/eventsource/server.go +++ b/vendor/github.com/donovanhide/eventsource/server.go @@ -4,6 +4,7 @@ import ( "log" "net/http" "strings" + "sync" ) type subscription struct { @@ -32,6 +33,8 @@ type Server struct { subs chan *subscription unregister chan *subscription quit chan bool + isClosed bool + isClosedMutex sync.RWMutex } // Create a new Server ready for handler creation and publishing events @@ -51,6 +54,7 @@ func NewServer() *Server { // Stop handling publishing func (srv *Server) Close() { srv.quit <- true + srv.markServerClosed() } // Create a new handler for serving a specified channel @@ -69,6 +73,12 @@ func (srv *Server) Handler(channel string) http.HandlerFunc { } w.WriteHeader(http.StatusOK) + // If the Handler is still active even though the server is closed, stop here. + // Otherwise the Handler will block while publishing to srv.subs indefinitely. + if srv.isServerClosed() { + return + } + sub := &subscription{ channel: channel, lastEventId: req.Header.Get("Last-Event-ID"), @@ -165,3 +175,15 @@ func (srv *Server) run() { } } } + +func (srv *Server) isServerClosed() bool { + srv.isClosedMutex.RLock() + defer srv.isClosedMutex.RUnlock() + return srv.isClosed +} + +func (srv *Server) markServerClosed() { + srv.isClosedMutex.Lock() + defer srv.isClosedMutex.Unlock() + srv.isClosed = true +} diff --git a/vendor/github.com/donovanhide/eventsource/stream.go b/vendor/github.com/donovanhide/eventsource/stream.go index b016619ed..597087701 100644 --- a/vendor/github.com/donovanhide/eventsource/stream.go +++ b/vendor/github.com/donovanhide/eventsource/stream.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "log" "net/http" + "sync" "time" ) @@ -27,6 +28,10 @@ type Stream struct { Errors chan error // Logger is a logger that, when set, will be used for logging debug messages Logger *log.Logger + // isClosed is a marker that the stream is/should be closed + isClosed bool + // isClosedMutex is a mutex protecting concurrent read/write access of isClosed + isClosedMutex sync.RWMutex } type SubscriptionError struct { @@ -61,7 +66,7 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques c: client, req: request, lastEventId: lastEventId, - retry: (time.Millisecond * 3000), + retry: time.Millisecond * 3000, Events: make(chan Event), Errors: make(chan error), } @@ -75,6 +80,29 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques return stream, nil } +// Close will close the stream. It is safe for concurrent access and can be called multiple times. +func (stream *Stream) Close() { + if stream.isStreamClosed() { + return + } + + stream.markStreamClosed() + close(stream.Errors) + close(stream.Events) +} + +func (stream *Stream) isStreamClosed() bool { + stream.isClosedMutex.RLock() + defer stream.isClosedMutex.RUnlock() + return stream.isClosed +} + +func (stream *Stream) markStreamClosed() { + stream.isClosedMutex.Lock() + defer stream.isClosedMutex.Unlock() + stream.isClosed = true +} + // Go's http package doesn't copy headers across when it encounters // redirects so we need to do that manually. func checkRedirect(req *http.Request, via []*http.Request) error { @@ -112,15 +140,27 @@ func (stream *Stream) connect() (r io.ReadCloser, err error) { func (stream *Stream) stream(r io.ReadCloser) { defer r.Close() + + // receives events until an error is encountered + stream.receiveEvents(r) + + // tries to reconnect and start the stream again + stream.retryRestartStream() +} + +func (stream *Stream) receiveEvents(r io.ReadCloser) { dec := NewDecoder(r) + for { ev, err := dec.Decode() - + if stream.isStreamClosed() { + return + } if err != nil { stream.Errors <- err - // respond to all errors by reconnecting and trying again - break + return } + pub := ev.(*publication) if pub.Retry() > 0 { stream.retry = time.Duration(pub.Retry()) * time.Millisecond @@ -130,20 +170,25 @@ func (stream *Stream) stream(r io.ReadCloser) { } stream.Events <- ev } +} + +func (stream *Stream) retryRestartStream() { backoff := stream.retry for { - time.Sleep(backoff) if stream.Logger != nil { stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds()) } - + time.Sleep(backoff) + if stream.isStreamClosed() { + return + } // NOTE: because of the defer we're opening the new connection // before closing the old one. Shouldn't be a problem in practice, // but something to be aware of. - next, err := stream.connect() + r, err := stream.connect() if err == nil { - go stream.stream(next) - break + go stream.stream(r) + return } stream.Errors <- err backoff *= 2 diff --git a/vendor/github.com/gambol99/go-marathon/client.go b/vendor/github.com/gambol99/go-marathon/client.go index d3ca78f8c..a042c560f 100644 --- a/vendor/github.com/gambol99/go-marathon/client.go +++ b/vendor/github.com/gambol99/go-marathon/client.go @@ -150,8 +150,6 @@ type Marathon interface { } var ( - // ErrInvalidResponse is thrown when marathon responds with invalid or error response - ErrInvalidResponse = errors.New("invalid response from Marathon") // ErrMarathonDown is thrown when all the marathon endpoints are down ErrMarathonDown = errors.New("all the Marathon hosts are presently down") // ErrTimeoutError is thrown when the operation has timed out @@ -190,6 +188,11 @@ type httpClient struct { config Config } +// newRequestError signals that creating a new http.Request failed +type newRequestError struct { + error +} + // NewClient creates a new marathon client // config: the configuration to use func NewClient(config Config) (Marathon, error) { @@ -298,8 +301,7 @@ func (r *marathonClient) apiCall(method, path string, body, result interface{}) if response.StatusCode >= 200 && response.StatusCode <= 299 { if result != nil { if err := json.Unmarshal(respBody, result); err != nil { - r.debugLog.Printf("apiCall(): failed to unmarshall the response from marathon, error: %s\n", err) - return ErrInvalidResponse + return fmt.Errorf("failed to unmarshal response from Marathon: %s", err) } } return nil @@ -317,7 +319,8 @@ func (r *marathonClient) apiCall(method, path string, body, result interface{}) } } -// buildAPIRequest creates a default API request +// buildAPIRequest creates a default API request. +// It fails when there is no available member in the cluster anymore or when the request can not be built. func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader) (request *http.Request, member string, err error) { // Grab a member from the cluster member, err = r.hosts.getMember() @@ -328,7 +331,7 @@ func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader) // Build the HTTP request to Marathon request, err = r.client.buildMarathonRequest(method, member, path, reader) if err != nil { - return nil, member, err + return nil, member, newRequestError{err} } return request, member, nil } diff --git a/vendor/github.com/gambol99/go-marathon/group.go b/vendor/github.com/gambol99/go-marathon/group.go index b95e40328..401916e3f 100644 --- a/vendor/github.com/gambol99/go-marathon/group.go +++ b/vendor/github.com/gambol99/go-marathon/group.go @@ -209,7 +209,9 @@ func (r *marathonClient) WaitOnGroup(name string, timeout time.Duration) error { func (r *marathonClient) DeleteGroup(name string, force bool) (*DeploymentID, error) { version := new(DeploymentID) path := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)) - path = buildPathWithForceParam(path, force) + if force { + path += "?force=true" + } if err := r.apiDelete(path, nil, version); err != nil { return nil, err } @@ -224,7 +226,9 @@ func (r *marathonClient) DeleteGroup(name string, force bool) (*DeploymentID, er func (r *marathonClient) UpdateGroup(name string, group *Group, force bool) (*DeploymentID, error) { deploymentID := new(DeploymentID) path := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name)) - path = buildPathWithForceParam(path, force) + if force { + path += "?force=true" + } if err := r.apiPut(path, group, deploymentID); err != nil { return nil, err } diff --git a/vendor/github.com/gambol99/go-marathon/health.go b/vendor/github.com/gambol99/go-marathon/health.go index c264416b0..11c68e64d 100644 --- a/vendor/github.com/gambol99/go-marathon/health.go +++ b/vendor/github.com/gambol99/go-marathon/health.go @@ -27,7 +27,7 @@ type HealthCheck struct { GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"` IntervalSeconds int `json:"intervalSeconds,omitempty"` TimeoutSeconds int `json:"timeoutSeconds,omitempty"` - IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,ommitempty"` + IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,omitempty"` } // SetCommand sets the given command on the health check. diff --git a/vendor/github.com/gambol99/go-marathon/subscription.go b/vendor/github.com/gambol99/go-marathon/subscription.go index 7a625a973..fa70b1488 100644 --- a/vendor/github.com/gambol99/go-marathon/subscription.go +++ b/vendor/github.com/gambol99/go-marathon/subscription.go @@ -103,7 +103,8 @@ func (r *marathonClient) registerSubscription() error { case EventsTransportCallback: return r.registerCallbackSubscription() case EventsTransportSSE: - return r.registerSSESubscription() + r.registerSSESubscription() + return nil default: return fmt.Errorf("the events transport: %d is not supported", r.config.EventsTransport) } @@ -162,40 +163,81 @@ func (r *marathonClient) registerCallbackSubscription() error { return nil } -func (r *marathonClient) registerSSESubscription() error { - // Prevent multiple SSE subscriptions +// registerSSESubscription starts a go routine that continously tries to +// connect to the SSE stream and to process the received events. To establish +// the connection it tries the active cluster members until no more member is +// active. When this happens it will retry to get a connection every 5 seconds. +func (r *marathonClient) registerSSESubscription() { if r.subscribedToSSE { - return nil - } - - request, _, err := r.buildAPIRequest("GET", marathonAPIEventStream, nil) - if err != nil { - return err - } - - // Try to connect to stream, reusing the http client settings - stream, err := eventsource.SubscribeWith("", r.config.HTTPClient, request) - if err != nil { - return err + return } go func() { for { - select { - case ev := <-stream.Events: - if err := r.handleEvent(ev.Data()); err != nil { - // TODO let the user handle this error instead of logging it here - r.debugLog.Printf("registerSSESubscription(): failed to handle event: %v\n", err) - } - case err := <-stream.Errors: - // TODO let the user handle this error instead of logging it here - r.debugLog.Printf("registerSSESubscription(): failed to receive event: %v\n", err) + stream, err := r.connectToSSE() + if err != nil { + r.debugLog.Printf("Error connecting SSE subscription: %s", err) + <-time.After(5 * time.Second) + continue } + + err = r.listenToSSE(stream) + stream.Close() + r.debugLog.Printf("Error on SSE subscription: %s", err) } }() r.subscribedToSSE = true - return nil +} + +// connectToSSE tries to establish an *eventsource.Stream to any of the Marathon cluster members, marking the +// member as down on connection failure, until there is no more active member in the cluster. +// Given the http request can not be built, it will panic as this case should never happen. +func (r *marathonClient) connectToSSE() (*eventsource.Stream, error) { + for { + request, member, err := r.buildAPIRequest("GET", marathonAPIEventStream, nil) + if err != nil { + switch err.(type) { + case newRequestError: + panic(fmt.Sprintf("Requests for SSE subscriptions should never fail to be created: %s", err.Error())) + default: + return nil, err + } + } + + // The event source library manipulates the HTTPClient. So we create a new one and copy + // its underlying fields for performance reasons. See note that at least the Transport + // should be reused here: https://golang.org/pkg/net/http/#Client + httpClient := &http.Client{ + Transport: r.config.HTTPClient.Transport, + CheckRedirect: r.config.HTTPClient.CheckRedirect, + Jar: r.config.HTTPClient.Jar, + Timeout: r.config.HTTPClient.Timeout, + } + + stream, err := eventsource.SubscribeWith("", httpClient, request) + if err != nil { + r.debugLog.Printf("Error subscribing to Marathon event stream: %s", err) + r.hosts.markDown(member) + continue + } + + return stream, nil + } +} + +func (r *marathonClient) listenToSSE(stream *eventsource.Stream) error { + for { + select { + case ev := <-stream.Events: + if err := r.handleEvent(ev.Data()); err != nil { + r.debugLog.Printf("listenToSSE(): failed to handle event: %v", err) + } + case err := <-stream.Errors: + return err + + } + } } // Subscribe adds a URL to Marathon's callback facility diff --git a/vendor/github.com/gambol99/go-marathon/unreachable_strategy.go b/vendor/github.com/gambol99/go-marathon/unreachable_strategy.go index a15f7afe6..a77ff6936 100644 --- a/vendor/github.com/gambol99/go-marathon/unreachable_strategy.go +++ b/vendor/github.com/gambol99/go-marathon/unreachable_strategy.go @@ -16,12 +16,54 @@ limitations under the License. package marathon +import ( + "encoding/json" + "fmt" +) + +const UnreachableStrategyAbsenceReasonDisabled = "disabled" + // UnreachableStrategy is the unreachable strategy applied to an application. type UnreachableStrategy struct { + EnabledUnreachableStrategy + AbsenceReason string +} + +// EnabledUnreachableStrategy covers parameters pertaining to present unreachable strategies. +type EnabledUnreachableStrategy struct { InactiveAfterSeconds *float64 `json:"inactiveAfterSeconds,omitempty"` ExpungeAfterSeconds *float64 `json:"expungeAfterSeconds,omitempty"` } +type unreachableStrategy UnreachableStrategy + +// UnmarshalJSON unmarshals the given JSON into an UnreachableStrategy. It +// populates parameters for present strategies, and otherwise only sets the +// absence reason. +func (us *UnreachableStrategy) UnmarshalJSON(b []byte) error { + var u unreachableStrategy + var errEnabledUS, errNonEnabledUS error + if errEnabledUS = json.Unmarshal(b, &u); errEnabledUS == nil { + *us = UnreachableStrategy(u) + return nil + } + + if errNonEnabledUS = json.Unmarshal(b, &us.AbsenceReason); errNonEnabledUS == nil { + return nil + } + + return fmt.Errorf("failed to unmarshal unreachable strategy: unmarshaling into enabled returned error '%s'; unmarshaling into non-enabled returned error '%s'", errEnabledUS, errNonEnabledUS) +} + +// MarshalJSON marshals the unreachable strategy. +func (us *UnreachableStrategy) MarshalJSON() ([]byte, error) { + if us.AbsenceReason == "" { + return json.Marshal(us.EnabledUnreachableStrategy) + } + + return json.Marshal(us.AbsenceReason) +} + // SetInactiveAfterSeconds sets the period after which instance will be marked as inactive. func (us UnreachableStrategy) SetInactiveAfterSeconds(cap float64) UnreachableStrategy { us.InactiveAfterSeconds = &cap