From 24192a37970ea99e934fff299a045879000ce838 Mon Sep 17 00:00:00 2001 From: mpl Date: Mon, 20 Jan 2020 17:42:05 +0100 Subject: [PATCH] fix memleak in safe.Pool Co-authored-by: Julien Salleyron --- pkg/safe/routine.go | 57 +++--------------- pkg/safe/routine_test.go | 58 ------------------- .../service/loadbalancer/mirror/mirror.go | 2 +- 3 files changed, 8 insertions(+), 109 deletions(-) diff --git a/pkg/safe/routine.go b/pkg/safe/routine.go index f02cfda9b..c1f81e5c3 100644 --- a/pkg/safe/routine.go +++ b/pkg/safe/routine.go @@ -19,14 +19,13 @@ type routineCtx func(ctx context.Context) // Pool is a pool of go routines type Pool struct { - routines []routine - routinesCtx []routineCtx - waitGroup sync.WaitGroup - lock sync.Mutex - baseCtx context.Context - baseCancel context.CancelFunc - ctx context.Context - cancel context.CancelFunc + routines []routine + waitGroup sync.WaitGroup + lock sync.Mutex + baseCtx context.Context + baseCancel context.CancelFunc + ctx context.Context + cancel context.CancelFunc } // NewPool creates a Pool @@ -46,17 +45,9 @@ func (p *Pool) Ctx() context.Context { return p.baseCtx } -// AddGoCtx adds a recoverable goroutine with a context without starting it -func (p *Pool) AddGoCtx(goroutine routineCtx) { - p.lock.Lock() - p.routinesCtx = append(p.routinesCtx, goroutine) - p.lock.Unlock() -} - // GoCtx starts a recoverable goroutine with a context func (p *Pool) GoCtx(goroutine routineCtx) { p.lock.Lock() - p.routinesCtx = append(p.routinesCtx, goroutine) p.waitGroup.Add(1) Go(func() { defer p.waitGroup.Done() @@ -65,17 +56,6 @@ func (p *Pool) GoCtx(goroutine routineCtx) { p.lock.Unlock() } -// addGo adds a recoverable goroutine, and can be stopped with stop chan -func (p *Pool) addGo(goroutine func(stop chan bool)) { - p.lock.Lock() - newRoutine := routine{ - goroutine: goroutine, - stop: make(chan bool, 1), - } - p.routines = append(p.routines, newRoutine) - p.lock.Unlock() -} - // Go starts a recoverable goroutine, and can be stopped with stop chan func (p *Pool) Go(goroutine func(stop chan bool)) { p.lock.Lock() @@ -114,29 +94,6 @@ func (p *Pool) Cleanup() { p.baseCancel() } -// Start starts all stopped routines -func (p *Pool) Start() { - p.lock.Lock() - defer p.lock.Unlock() - p.ctx, p.cancel = context.WithCancel(p.baseCtx) - for i := range p.routines { - p.waitGroup.Add(1) - p.routines[i].stop = make(chan bool, 1) - Go(func() { - defer p.waitGroup.Done() - p.routines[i].goroutine(p.routines[i].stop) - }) - } - - for _, routine := range p.routinesCtx { - p.waitGroup.Add(1) - Go(func() { - defer p.waitGroup.Done() - routine(p.ctx) - }) - } -} - // Go starts a recoverable goroutine func Go(goroutine func()) { GoWithRecover(goroutine, defaultRecoverGoroutine) diff --git a/pkg/safe/routine_test.go b/pkg/safe/routine_test.go index f9e8a6ab9..caeef93ca 100644 --- a/pkg/safe/routine_test.go +++ b/pkg/safe/routine_test.go @@ -67,13 +67,6 @@ func TestPoolWithCtx(t *testing.T) { p.GoCtx(testRoutine.routineCtx) }, }, - { - desc: "AddGoCtx()", - fn: func(p *Pool) { - p.AddGoCtx(testRoutine.routineCtx) - p.Start() - }, - }, } for _, test := range testCases { @@ -87,9 +80,6 @@ func TestPoolWithCtx(t *testing.T) { test.fn(p) defer p.Cleanup() - if len(p.routinesCtx) != 1 { - t.Fatalf("After %s, Pool did have %d goroutineCtxs, expected 1", test.desc, len(p.routinesCtx)) - } testDone := make(chan bool, 1) go func() { @@ -140,40 +130,6 @@ func TestPoolWithStopChan(t *testing.T) { } } -func TestPoolStartWithStopChan(t *testing.T) { - testRoutine := newFakeRoutine() - - p := NewPool(context.Background()) - - timer := time.NewTimer(500 * time.Millisecond) - defer timer.Stop() - - // Insert the stopped test goroutine via private fields into the Pool. - // There currently is no way to insert a routine via exported funcs that is not started immediately. - p.lock.Lock() - newRoutine := routine{ - goroutine: testRoutine.routine, - } - p.routines = append(p.routines, newRoutine) - p.lock.Unlock() - p.Start() - - testDone := make(chan bool, 1) - go func() { - <-testRoutine.startSig - p.Cleanup() - testDone <- true - }() - select { - case <-timer.C: - testRoutine.Lock() - defer testRoutine.Unlock() - t.Fatalf("Pool.Start() did not complete in time, goroutine started equals '%t'", testRoutine.started) - case <-testDone: - return - } -} - func TestPoolCleanupWithGoPanicking(t *testing.T) { testRoutine := func(stop chan bool) { panic("BOOM") @@ -193,26 +149,12 @@ func TestPoolCleanupWithGoPanicking(t *testing.T) { p.Go(testRoutine) }, }, - { - desc: "addGo() and Start()", - fn: func(p *Pool) { - p.addGo(testRoutine) - p.Start() - }, - }, { desc: "GoCtx()", fn: func(p *Pool) { p.GoCtx(testCtxRoutine) }, }, - { - desc: "AddGoCtx() and Start()", - fn: func(p *Pool) { - p.AddGoCtx(testCtxRoutine) - p.Start() - }, - }, } for _, test := range testCases { diff --git a/pkg/server/service/loadbalancer/mirror/mirror.go b/pkg/server/service/loadbalancer/mirror/mirror.go index 27d8e3230..60545090a 100644 --- a/pkg/server/service/loadbalancer/mirror/mirror.go +++ b/pkg/server/service/loadbalancer/mirror/mirror.go @@ -100,7 +100,7 @@ type blackholeResponseWriter struct{} func (b blackholeResponseWriter) Flush() {} func (b blackholeResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - return nil, nil, errors.New("you can hijack connection on blackholeResponseWriter") + return nil, nil, errors.New("connection on blackholeResponseWriter cannot be hijacked") } func (b blackholeResponseWriter) Header() http.Header {