Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cucumber/godog v0.15.1
github.com/go-logr/logr v1.4.3
github.com/stretchr/testify v1.11.1
go.uber.org/goleak v1.3.0
go.uber.org/mock v0.6.0
golang.org/x/sync v0.19.0
golang.org/x/text v0.33.0
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,12 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE=
golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
14 changes: 11 additions & 3 deletions openfeature/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package openfeature
import (
"context"
"errors"
"math"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -1302,13 +1301,22 @@ func TestRequirement_1_7_5(t *testing.T) {
// The client MUST default, run error hooks, and indicate an error if flag resolution is attempted while the provider
// is in NOT_READY.
func TestRequirement_1_7_6(t *testing.T) {
t.Cleanup(initSingleton)
t.Cleanup(func() {
eventing.(*eventExecutor).shutdown()
initSingleton()
})

ctrl := gomock.NewController(t)
mockHook := NewMockHook(ctrl)
mockHook.EXPECT().Error(gomock.Any(), gomock.Any(), ProviderNotReadyError, gomock.Any())
mockHook.EXPECT().Finally(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any())

// Use a channel that blocks until test cleanup to keep provider in NOT_READY state
blockChan := make(chan struct{})
t.Cleanup(func() {
close(blockChan)
})

notReadyEventingProvider := struct {
FeatureProvider
StateHandler
Expand All @@ -1317,7 +1325,7 @@ func TestRequirement_1_7_6(t *testing.T) {
NoopProvider{},
&stateHandlerForTests{
initF: func(e EvaluationContext) error {
<-time.After(math.MaxInt)
<-blockChan
return nil
},
},
Expand Down
53 changes: 50 additions & 3 deletions openfeature/event_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
scopedRegistry map[string]scopedCallback
eventChan chan eventPayload
once sync.Once
shutdownOnce sync.Once
mu sync.Mutex
done chan struct{}
wg sync.WaitGroup
}

func newEventExecutor() *eventExecutor {
Expand All @@ -37,6 +40,7 @@
apiRegistry: map[EventType][]EventCallback{},
scopedRegistry: map[string]scopedCallback{},
eventChan: make(chan eventPayload, 5),
done: make(chan struct{}),
}

executor.startEventListener()
Expand Down Expand Up @@ -223,17 +227,24 @@
e.activeSubscriptions = append(e.activeSubscriptions, newProvider)

if v, ok := newProvider.featureProvider.(EventHandler); ok {
e.wg.Add(1)
go func() {
defer e.wg.Done()
// event handling of the new feature provider
for {
select {
case event, ok := <-v.EventChannel():
if !ok {
return
}
e.eventChan <- eventPayload{
// Try to send the event, but also watch for shutdown signal
select {
case e.eventChan <- eventPayload{
event: event,
handler: newProvider.featureProvider,
}:
case <-newProvider.shutdownSemaphore:
return
}
case <-newProvider.shutdownSemaphore:
return
Expand Down Expand Up @@ -274,9 +285,19 @@
// startEventListener trigger the event listening of this executor
func (e *eventExecutor) startEventListener() {
e.once.Do(func() {
e.wg.Add(1)
go func() {
for payload := range e.eventChan {
e.triggerEvent(payload.event, payload.handler)
defer e.wg.Done()
for {
select {
case payload, ok := <-e.eventChan:
if !ok {
return
}
e.triggerEvent(payload.event, payload.handler)
case <-e.done:
return
}
}
}()
})
Expand Down Expand Up @@ -353,3 +374,29 @@
return provider.equals(defaultProvider) ||
slices.ContainsFunc(namedProviders, provider.equals)
}

// shutdown stops the event executor's goroutine and waits for it to complete.
// This should be called when the event executor is no longer needed to prevent goroutine leaks.
func (e *eventExecutor) shutdown() {
e.shutdownOnce.Do(func() {
e.mu.Lock()
// Close shutdown semaphores to signal all active provider goroutines to stop
// Closing (rather than sending) ensures all waiting goroutines receive the signal
for _, sub := range e.activeSubscriptions {
close(sub.shutdownSemaphore)
}
e.mu.Unlock()

// Close the done channel to stop the main event listener
close(e.done)

// Wait for all goroutines to finish
e.wg.Wait()

// Drain any remaining events in the channel
for len(e.eventChan) > 0 {
<-e.eventChan
}
Comment on lines +397 to +399
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While the current approach to draining the event channel is likely safe because all producer goroutines should have been stopped and waited for via e.wg.Wait(), using len(e.eventChan) in a loop can be racy if there are concurrent writers. A more robust and idiomatic way to drain a channel non-blockingly is to use a select with a default case. This ensures that the drain operation is safe even if future code changes inadvertently introduce a race condition.

for {
			select {
			case <-e.eventChan:
			default:
				return
			}
		}

})
}

Check failure on line 402 in openfeature/event_executor.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gci)
132 changes: 120 additions & 12 deletions openfeature/event_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
)

func init() {
Expand Down Expand Up @@ -1142,31 +1143,49 @@ func TestEventHandler_multiSubs(t *testing.T) {

// make sure events are received and count them
globalEvents := make(chan string, 10)
done := make(chan struct{})
t.Cleanup(func() {
close(done)
})

go func() {
for rsp := range rspGlobal {
globalEvents <- rsp.ProviderName
for {
select {
case rsp := <-rspGlobal:
globalEvents <- rsp.ProviderName
case <-done:
return
}
}
}()

clientAEvents := make(chan string, 10)
go func() {
for rsp := range rspClientA {
if rsp.ProviderName != "providerOther" {
t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.ProviderName)
for {
select {
case rsp := <-rspClientA:
if rsp.ProviderName != "providerOther" {
t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.ProviderName)
}
clientAEvents <- rsp.ProviderName
case <-done:
return
}

clientAEvents <- rsp.ProviderName
}
}()

clientBEvents := make(chan string, 10)
go func() {
for rsp := range rspClientB {
if rsp.ProviderName != "providerOther" {
t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.ProviderName)
for {
select {
case rsp := <-rspClientB:
if rsp.ProviderName != "providerOther" {
t.Errorf("incorrect event provider association, expected %s, got %s", "providerOther", rsp.ProviderName)
}
clientBEvents <- rsp.ProviderName
case <-done:
return
}

clientBEvents <- rsp.ProviderName
}
}()

Expand Down Expand Up @@ -1568,6 +1587,7 @@ func TestEventHandler_ChannelClosure(t *testing.T) {
callBack := func(e EventDetails) {
eventCount.Add(1)
}

executor.AddHandler(ProviderReady, &callBack)
// watch for empty events
executor.AddHandler("", &callBack)
Expand All @@ -1585,3 +1605,91 @@ func TestEventHandler_ChannelClosure(t *testing.T) {
afterCount := eventCount.Load()
require.Equal(t, initialCount, afterCount, "goroutine processed events after channel closed - indicates channel closure not detected")
}

// TestBasicShutdown verifies that the shutdown method stops goroutines
func TestBasicShutdown(t *testing.T) {
// Create a new event executor
exec := newEventExecutor()

// Give it a moment to start
time.Sleep(10 * time.Millisecond)

// Shutdown should complete without hanging
done := make(chan struct{})
go func() {
exec.shutdown()
close(done)
}()

select {
case <-done:
// Success - shutdown completed
case <-time.After(2 * time.Second):
t.Fatal("shutdown() hung and did not complete within 2 seconds")
}
}

// TestNoGoroutineLeakWithMultipleProviders verifies that goroutine cleanup
// works correctly even when multiple providers are registered.
func TestNoGoroutineLeakWithMultipleProviders(t *testing.T) {
// Setup: shut down any existing goroutines from previous tests first
if eventing != nil {
eventing.(*eventExecutor).shutdown()
}
initSingleton()

defer goleak.VerifyNone(t,
// Ignore the new event executor's goroutines created by Shutdown() -> initSingleton()
goleak.IgnoreTopFunction("github.com/open-feature/go-sdk/openfeature.(*eventExecutor).startEventListener.func1.1"),
goleak.IgnoreTopFunction("github.com/open-feature/go-sdk/openfeature.(*eventExecutor).startListeningAndShutdownOld.func1"),
)

// Ensure we clean up the event executor at the end, including any reinitialized instance
defer func() {
if eventing != nil {
eventing.(*eventExecutor).shutdown()
}
}()

// Set default provider
defaultProvider := &ProviderEventing{c: make(chan Event, 1)}
err := SetProvider(struct {
FeatureProvider
EventHandler
}{NoopProvider{}, defaultProvider})
if err != nil {
t.Fatal(err)
}

// Set named provider
namedProvider := &ProviderEventing{c: make(chan Event, 1)}
err = SetNamedProvider("test-domain", struct {
FeatureProvider
EventHandler
}{NoopProvider{}, namedProvider})
if err != nil {
t.Fatal(err)
}

// Trigger events
defaultProvider.Invoke(Event{
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{
Message: "Default ready",
},
})

namedProvider.Invoke(Event{
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{
Message: "Named ready",
},
})

time.Sleep(100 * time.Millisecond)

// Shutdown cleans up all goroutines and reinitializes
Shutdown()

// goleak will verify no goroutines are leaked (except the new event executor)
}
19 changes: 19 additions & 0 deletions openfeature/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package openfeature

import (
"os"
"testing"
)

// TestMain provides setup and teardown for the entire test suite
func TestMain(m *testing.M) {
// Run tests
code := m.Run()

// Final cleanup: shut down the global event executor
if eventing != nil {
eventing.(*eventExecutor).shutdown()
}

os.Exit(code)
}
7 changes: 7 additions & 0 deletions openfeature/openfeature.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ func init() {
}

func initSingleton() {
// Shutdown existing event executor to prevent goroutine leaks
if eventing != nil {
eventing.(*eventExecutor).shutdown()
}

exec := newEventExecutor()
eventing = exec

Expand Down Expand Up @@ -149,6 +154,7 @@ func RemoveHandler(eventType EventType, callback EventCallback) {
// hooks, event handlers, and providers.
func Shutdown() {
api.Shutdown()
eventing.(*eventExecutor).shutdown()
initSingleton()
}

Expand All @@ -159,6 +165,7 @@ func Shutdown() {
// Returns an error if any provider shutdown fails or if context is cancelled during shutdown.
func ShutdownWithContext(ctx context.Context) error {
err := api.ShutdownWithContext(ctx)
eventing.(*eventExecutor).shutdown()
initSingleton()
return err
}
Loading
Loading