Upgrade go-marathon to dd6cbd4.

Fixes a problem with UnreachableStrategy being available now in two
type-incompatible formats (object and string).

We also upgrade the transitive dependency
github.com/donovanhide/eventsource.
This commit is contained in:
Timo Reimann 2017-05-19 14:24:28 +02:00 committed by Ludovic Fernandez
parent 7a34303593
commit 9fbe21c534
9 changed files with 206 additions and 48 deletions

8
glide.lock generated
View file

@ -1,5 +1,5 @@
hash: 6e206389bc4f381387be8b5f0cc5a41224329752654ddb3c2a805adde0333217
updated: 2017-06-17T14:30:19.890844996+02:00
hash: 9a62fe4058ef43ed185d96638322dd3d44ae21f7f65fae14c4c6d404876c2872
updated: 2017-06-28T15:47:14.848940186+02:00
imports:
- name: cloud.google.com/go
version: 2e6a95edb1071d750f6d7db777bf66cd2997af6c
@ -178,7 +178,7 @@ imports:
- store/etcd
- store/zookeeper
- name: github.com/donovanhide/eventsource
version: fd1de70867126402be23c306e1ce32828455d85b
version: 441a03aa37b3329bbb79f43de81914ea18724718
- name: github.com/eapache/channels
version: 47238d5aae8c0fefd518ef2bee46290909cf8263
- name: github.com/eapache/queue
@ -201,7 +201,7 @@ imports:
- name: github.com/fatih/color
version: 9131ab34cf20d2f6d83fdc67168a5430d1c7dc23
- name: github.com/gambol99/go-marathon
version: 15ea23e360abb8b25071e677aed344f31838e403
version: dd6cbd4c2d71294a19fb89158f2a00d427f174ab
- name: github.com/ghodss/yaml
version: 73d445a93680fa1a78ae23a5839bad48f32ba1ee
- name: github.com/go-ini/ini

View file

@ -93,7 +93,7 @@ import:
- package: k8s.io/client-go
version: v2.0.0
- package: github.com/gambol99/go-marathon
version: d672c6fbb499596869d95146a26e7d0746c06c54
version: dd6cbd4c2d71294a19fb89158f2a00d427f174ab
- package: github.com/ArthurHlt/go-eureka-client
subpackages:
- eureka

View file

@ -4,6 +4,7 @@ import (
"log"
"net/http"
"strings"
"sync"
)
type subscription struct {
@ -32,6 +33,8 @@ type Server struct {
subs chan *subscription
unregister chan *subscription
quit chan bool
isClosed bool
isClosedMutex sync.RWMutex
}
// Create a new Server ready for handler creation and publishing events
@ -51,6 +54,7 @@ func NewServer() *Server {
// Stop handling publishing
func (srv *Server) Close() {
srv.quit <- true
srv.markServerClosed()
}
// Create a new handler for serving a specified channel
@ -69,6 +73,12 @@ func (srv *Server) Handler(channel string) http.HandlerFunc {
}
w.WriteHeader(http.StatusOK)
// If the Handler is still active even though the server is closed, stop here.
// Otherwise the Handler will block while publishing to srv.subs indefinitely.
if srv.isServerClosed() {
return
}
sub := &subscription{
channel: channel,
lastEventId: req.Header.Get("Last-Event-ID"),
@ -165,3 +175,15 @@ func (srv *Server) run() {
}
}
}
func (srv *Server) isServerClosed() bool {
srv.isClosedMutex.RLock()
defer srv.isClosedMutex.RUnlock()
return srv.isClosed
}
func (srv *Server) markServerClosed() {
srv.isClosedMutex.Lock()
defer srv.isClosedMutex.Unlock()
srv.isClosed = true
}

View file

@ -7,6 +7,7 @@ import (
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
@ -27,6 +28,10 @@ type Stream struct {
Errors chan error
// Logger is a logger that, when set, will be used for logging debug messages
Logger *log.Logger
// isClosed is a marker that the stream is/should be closed
isClosed bool
// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
isClosedMutex sync.RWMutex
}
type SubscriptionError struct {
@ -61,7 +66,7 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques
c: client,
req: request,
lastEventId: lastEventId,
retry: (time.Millisecond * 3000),
retry: time.Millisecond * 3000,
Events: make(chan Event),
Errors: make(chan error),
}
@ -75,6 +80,29 @@ func SubscribeWith(lastEventId string, client *http.Client, request *http.Reques
return stream, nil
}
// Close will close the stream. It is safe for concurrent access and can be called multiple times.
func (stream *Stream) Close() {
if stream.isStreamClosed() {
return
}
stream.markStreamClosed()
close(stream.Errors)
close(stream.Events)
}
func (stream *Stream) isStreamClosed() bool {
stream.isClosedMutex.RLock()
defer stream.isClosedMutex.RUnlock()
return stream.isClosed
}
func (stream *Stream) markStreamClosed() {
stream.isClosedMutex.Lock()
defer stream.isClosedMutex.Unlock()
stream.isClosed = true
}
// Go's http package doesn't copy headers across when it encounters
// redirects so we need to do that manually.
func checkRedirect(req *http.Request, via []*http.Request) error {
@ -112,15 +140,27 @@ func (stream *Stream) connect() (r io.ReadCloser, err error) {
func (stream *Stream) stream(r io.ReadCloser) {
defer r.Close()
// receives events until an error is encountered
stream.receiveEvents(r)
// tries to reconnect and start the stream again
stream.retryRestartStream()
}
func (stream *Stream) receiveEvents(r io.ReadCloser) {
dec := NewDecoder(r)
for {
ev, err := dec.Decode()
if stream.isStreamClosed() {
return
}
if err != nil {
stream.Errors <- err
// respond to all errors by reconnecting and trying again
break
return
}
pub := ev.(*publication)
if pub.Retry() > 0 {
stream.retry = time.Duration(pub.Retry()) * time.Millisecond
@ -130,20 +170,25 @@ func (stream *Stream) stream(r io.ReadCloser) {
}
stream.Events <- ev
}
}
func (stream *Stream) retryRestartStream() {
backoff := stream.retry
for {
time.Sleep(backoff)
if stream.Logger != nil {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}
time.Sleep(backoff)
if stream.isStreamClosed() {
return
}
// NOTE: because of the defer we're opening the new connection
// before closing the old one. Shouldn't be a problem in practice,
// but something to be aware of.
next, err := stream.connect()
r, err := stream.connect()
if err == nil {
go stream.stream(next)
break
go stream.stream(r)
return
}
stream.Errors <- err
backoff *= 2

View file

@ -150,8 +150,6 @@ type Marathon interface {
}
var (
// ErrInvalidResponse is thrown when marathon responds with invalid or error response
ErrInvalidResponse = errors.New("invalid response from Marathon")
// ErrMarathonDown is thrown when all the marathon endpoints are down
ErrMarathonDown = errors.New("all the Marathon hosts are presently down")
// ErrTimeoutError is thrown when the operation has timed out
@ -190,6 +188,11 @@ type httpClient struct {
config Config
}
// newRequestError signals that creating a new http.Request failed
type newRequestError struct {
error
}
// NewClient creates a new marathon client
// config: the configuration to use
func NewClient(config Config) (Marathon, error) {
@ -298,8 +301,7 @@ func (r *marathonClient) apiCall(method, path string, body, result interface{})
if response.StatusCode >= 200 && response.StatusCode <= 299 {
if result != nil {
if err := json.Unmarshal(respBody, result); err != nil {
r.debugLog.Printf("apiCall(): failed to unmarshall the response from marathon, error: %s\n", err)
return ErrInvalidResponse
return fmt.Errorf("failed to unmarshal response from Marathon: %s", err)
}
}
return nil
@ -317,7 +319,8 @@ func (r *marathonClient) apiCall(method, path string, body, result interface{})
}
}
// buildAPIRequest creates a default API request
// buildAPIRequest creates a default API request.
// It fails when there is no available member in the cluster anymore or when the request can not be built.
func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader) (request *http.Request, member string, err error) {
// Grab a member from the cluster
member, err = r.hosts.getMember()
@ -328,7 +331,7 @@ func (r *marathonClient) buildAPIRequest(method, path string, reader io.Reader)
// Build the HTTP request to Marathon
request, err = r.client.buildMarathonRequest(method, member, path, reader)
if err != nil {
return nil, member, err
return nil, member, newRequestError{err}
}
return request, member, nil
}

View file

@ -209,7 +209,9 @@ func (r *marathonClient) WaitOnGroup(name string, timeout time.Duration) error {
func (r *marathonClient) DeleteGroup(name string, force bool) (*DeploymentID, error) {
version := new(DeploymentID)
path := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name))
path = buildPathWithForceParam(path, force)
if force {
path += "?force=true"
}
if err := r.apiDelete(path, nil, version); err != nil {
return nil, err
}
@ -224,7 +226,9 @@ func (r *marathonClient) DeleteGroup(name string, force bool) (*DeploymentID, er
func (r *marathonClient) UpdateGroup(name string, group *Group, force bool) (*DeploymentID, error) {
deploymentID := new(DeploymentID)
path := fmt.Sprintf("%s/%s", marathonAPIGroups, trimRootPath(name))
path = buildPathWithForceParam(path, force)
if force {
path += "?force=true"
}
if err := r.apiPut(path, group, deploymentID); err != nil {
return nil, err
}

View file

@ -27,7 +27,7 @@ type HealthCheck struct {
GracePeriodSeconds int `json:"gracePeriodSeconds,omitempty"`
IntervalSeconds int `json:"intervalSeconds,omitempty"`
TimeoutSeconds int `json:"timeoutSeconds,omitempty"`
IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,ommitempty"`
IgnoreHTTP1xx *bool `json:"ignoreHttp1xx,omitempty"`
}
// SetCommand sets the given command on the health check.

View file

@ -103,7 +103,8 @@ func (r *marathonClient) registerSubscription() error {
case EventsTransportCallback:
return r.registerCallbackSubscription()
case EventsTransportSSE:
return r.registerSSESubscription()
r.registerSSESubscription()
return nil
default:
return fmt.Errorf("the events transport: %d is not supported", r.config.EventsTransport)
}
@ -162,40 +163,81 @@ func (r *marathonClient) registerCallbackSubscription() error {
return nil
}
func (r *marathonClient) registerSSESubscription() error {
// Prevent multiple SSE subscriptions
// registerSSESubscription starts a go routine that continously tries to
// connect to the SSE stream and to process the received events. To establish
// the connection it tries the active cluster members until no more member is
// active. When this happens it will retry to get a connection every 5 seconds.
func (r *marathonClient) registerSSESubscription() {
if r.subscribedToSSE {
return nil
}
request, _, err := r.buildAPIRequest("GET", marathonAPIEventStream, nil)
if err != nil {
return err
}
// Try to connect to stream, reusing the http client settings
stream, err := eventsource.SubscribeWith("", r.config.HTTPClient, request)
if err != nil {
return err
return
}
go func() {
for {
select {
case ev := <-stream.Events:
if err := r.handleEvent(ev.Data()); err != nil {
// TODO let the user handle this error instead of logging it here
r.debugLog.Printf("registerSSESubscription(): failed to handle event: %v\n", err)
}
case err := <-stream.Errors:
// TODO let the user handle this error instead of logging it here
r.debugLog.Printf("registerSSESubscription(): failed to receive event: %v\n", err)
stream, err := r.connectToSSE()
if err != nil {
r.debugLog.Printf("Error connecting SSE subscription: %s", err)
<-time.After(5 * time.Second)
continue
}
err = r.listenToSSE(stream)
stream.Close()
r.debugLog.Printf("Error on SSE subscription: %s", err)
}
}()
r.subscribedToSSE = true
return nil
}
// connectToSSE tries to establish an *eventsource.Stream to any of the Marathon cluster members, marking the
// member as down on connection failure, until there is no more active member in the cluster.
// Given the http request can not be built, it will panic as this case should never happen.
func (r *marathonClient) connectToSSE() (*eventsource.Stream, error) {
for {
request, member, err := r.buildAPIRequest("GET", marathonAPIEventStream, nil)
if err != nil {
switch err.(type) {
case newRequestError:
panic(fmt.Sprintf("Requests for SSE subscriptions should never fail to be created: %s", err.Error()))
default:
return nil, err
}
}
// The event source library manipulates the HTTPClient. So we create a new one and copy
// its underlying fields for performance reasons. See note that at least the Transport
// should be reused here: https://golang.org/pkg/net/http/#Client
httpClient := &http.Client{
Transport: r.config.HTTPClient.Transport,
CheckRedirect: r.config.HTTPClient.CheckRedirect,
Jar: r.config.HTTPClient.Jar,
Timeout: r.config.HTTPClient.Timeout,
}
stream, err := eventsource.SubscribeWith("", httpClient, request)
if err != nil {
r.debugLog.Printf("Error subscribing to Marathon event stream: %s", err)
r.hosts.markDown(member)
continue
}
return stream, nil
}
}
func (r *marathonClient) listenToSSE(stream *eventsource.Stream) error {
for {
select {
case ev := <-stream.Events:
if err := r.handleEvent(ev.Data()); err != nil {
r.debugLog.Printf("listenToSSE(): failed to handle event: %v", err)
}
case err := <-stream.Errors:
return err
}
}
}
// Subscribe adds a URL to Marathon's callback facility

View file

@ -16,12 +16,54 @@ limitations under the License.
package marathon
import (
"encoding/json"
"fmt"
)
const UnreachableStrategyAbsenceReasonDisabled = "disabled"
// UnreachableStrategy is the unreachable strategy applied to an application.
type UnreachableStrategy struct {
EnabledUnreachableStrategy
AbsenceReason string
}
// EnabledUnreachableStrategy covers parameters pertaining to present unreachable strategies.
type EnabledUnreachableStrategy struct {
InactiveAfterSeconds *float64 `json:"inactiveAfterSeconds,omitempty"`
ExpungeAfterSeconds *float64 `json:"expungeAfterSeconds,omitempty"`
}
type unreachableStrategy UnreachableStrategy
// UnmarshalJSON unmarshals the given JSON into an UnreachableStrategy. It
// populates parameters for present strategies, and otherwise only sets the
// absence reason.
func (us *UnreachableStrategy) UnmarshalJSON(b []byte) error {
var u unreachableStrategy
var errEnabledUS, errNonEnabledUS error
if errEnabledUS = json.Unmarshal(b, &u); errEnabledUS == nil {
*us = UnreachableStrategy(u)
return nil
}
if errNonEnabledUS = json.Unmarshal(b, &us.AbsenceReason); errNonEnabledUS == nil {
return nil
}
return fmt.Errorf("failed to unmarshal unreachable strategy: unmarshaling into enabled returned error '%s'; unmarshaling into non-enabled returned error '%s'", errEnabledUS, errNonEnabledUS)
}
// MarshalJSON marshals the unreachable strategy.
func (us *UnreachableStrategy) MarshalJSON() ([]byte, error) {
if us.AbsenceReason == "" {
return json.Marshal(us.EnabledUnreachableStrategy)
}
return json.Marshal(us.AbsenceReason)
}
// SetInactiveAfterSeconds sets the period after which instance will be marked as inactive.
func (us UnreachableStrategy) SetInactiveAfterSeconds(cap float64) UnreachableStrategy {
us.InactiveAfterSeconds = &cap