diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index a153395..bf7123c 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -12,8 +12,8 @@ jobs: with: go-version-file: go.mod - name: golangci-lint - uses: golangci/golangci-lint-action@v3 + uses: golangci/golangci-lint-action@v7 with: - version: v1.55.2 + version: v2.0.2 - name: go mod tidy check uses: katexochen/go-tidy-check@v2 \ No newline at end of file diff --git a/README.md b/README.md index 99988e4..d8658e7 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,14 @@ This Source Code Form is subject to the terms of the Apache Public License, vers - Resize queue length - Dequeue work in queue - View work in queue and it's current state, priority and position in queue. +- Publication package provides a generic publish-subscribe (pub/sub) mechanism for Go applications. It allows you to create publications to which multiple subscribers can listen. When a message is published, it's distributed to all relevant subscribers. + - **Generic:** Supports publishing and subscribing to messages of any type. + - **Filtering:** Subscribers can define filters to receive only messages that meet specific criteria. + - **Buffered Channels:** Subscribers receive messages through buffered channels, allowing for asynchronous message handling. + - **Concurrency-Safe:** Designed for concurrent use, ensuring safe message delivery in multi-threaded environments. + - **Timeout Support:** Control the maximum time spent attempting to deliver a message to a subscriber. + - **Clean Shutdown:** Gracefully close publications and subscriber channels. + - **Unsubscribe:** Subscribers can unsubscribe from a publication. - ValidationError - Facilitates error reflecting validation issues to a user. - Supports warnings and errors @@ -39,6 +47,8 @@ This Source Code Form is subject to the terms of the Apache Public License, vers - A starting point for setting up and managing HTTP and/or gRPC services. - Serve both HTTP and gRPC services from single server. - Ability to configuration via configuration builder with chaining. +- Generic package contains standard library types that are not yet type safe using generics and wraps them in types that are type safe using generics. + - SyncMap wraps a sync.Map with a generic. - Additional tools to come! ## Contribution diff --git a/generic/syncmap.go b/generic/syncmap.go new file mode 100644 index 0000000..0bccf4d --- /dev/null +++ b/generic/syncmap.go @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2025 by Randy Bell. All rights reserved. + * + * This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt. + */ + +package generic + +import ( + "iter" + "sync" +) + +// SyncMap is a generic wrapper around sync.Map that provides type safety +type SyncMap[K comparable, V any] struct { + *sync.Map +} + +func NewSyncMap[K comparable, V any]() *SyncMap[K, V] { + return &SyncMap[K, V]{ + Map: &sync.Map{}, + } +} + +// Load returns the value stored in the map for a key, or the zero value if no +// value is present. The ok result indicates whether value was found in the map. +func (m *SyncMap[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.Map.Load(key) + if !ok { + return + } + + return v.(V), ok +} + +// Store sets the value for a key. +func (m *SyncMap[K, V]) Store(key K, value V) { + m.Map.Store(key, value) +} + +// Swap swaps the value for a key and returns the previous value if any. The loaded result reports whether the key was present. +func (m *SyncMap[K, V]) Swap(key K, value V) (previous V, loaded bool) { + v, l := m.Map.Swap(key, value) + if !l { + return + } + return v.(V), l +} + +// Delete deletes the value for a key. +func (m *SyncMap[K, V]) Delete(key K) { + m.Map.Delete(key) +} + +// LoadOrStore returns the existing value for the key if present. +// Otherwise, it stores and returns the given value. The loaded result is true if the value was loaded, false if stored. +func (m *SyncMap[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + v, l := m.Map.LoadOrStore(key, value) + return v.(V), l +} + +// LoadAndDelete deletes the value for a key, returning the previous value if any. +// The loaded result reports whether the key was present. +func (m *SyncMap[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, l := m.Map.LoadAndDelete(key) + if !l { + return + } + return v.(V), l +} + +// CompareAndDelete deletes the entry for key if its value is equal to old. +// The old value must be of a comparable type. +func (m *SyncMap[K, V]) CompareAndDelete(key K, old V) bool { + return m.Map.CompareAndDelete(key, old) +} + +// CompareAndSwap swaps the old and new values for key if the value stored in the map is equal to old. +func (m *SyncMap[K, V]) CompareAndSwap(key K, old V, new V) bool { + return m.Map.CompareAndSwap(key, old, new) +} + +// Range calls f sequentially for each key and value present in the map. If f returns false, range stops the iteration. +func (m *SyncMap[K, V]) Range(f func(key K, value V) bool) { + m.Map.Range(func(k any, v any) bool { + return f(k.(K), v.(V)) + }) +} + +// Iterate returns an iterator that can be used to iterate over the map. +func (m *SyncMap[K, V]) Iterate() iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + for anyKey, anyValue := range m.Map.Range { + if !yield( + anyKey.(K), + anyValue.(V), + ) { + break + } + } + } +} diff --git a/generic/syncmap_test.go b/generic/syncmap_test.go new file mode 100644 index 0000000..b4c1ffc --- /dev/null +++ b/generic/syncmap_test.go @@ -0,0 +1,179 @@ +/* + * Copyright (c) 2025 by Randy Bell. All rights reserved. + * + * This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt. + */ + +package generic + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSyncMap_Load(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + value, ok := m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 1, value) + + value, ok = m.Load("key2") + assert.False(t, ok) + assert.Equal(t, 0, value) +} + +func TestSyncMap_Store(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + value, ok := m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 1, value) + + m.Store("key1", 2) + value, ok = m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 2, value) +} + +func TestSyncMap_Swap(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + previous, loaded := m.Swap("key1", 2) + assert.True(t, loaded) + assert.Equal(t, 1, previous) + + value, ok := m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 2, value) + + previous, loaded = m.Swap("key2", 3) + assert.False(t, loaded) + assert.Equal(t, 0, previous) + + value, ok = m.Load("key2") + assert.True(t, ok) + assert.Equal(t, 3, value) +} + +func TestSyncMap_Delete(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + m.Delete("key1") + _, ok := m.Load("key1") + assert.False(t, ok) +} + +func TestSyncMap_LoadOrStore(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + actual, loaded := m.LoadOrStore("key1", 1) + assert.False(t, loaded) + assert.Equal(t, 1, actual) + + actual, loaded = m.LoadOrStore("key1", 2) + assert.True(t, loaded) + assert.Equal(t, 1, actual) +} + +func TestSyncMap_LoadAndDelete(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + value, loaded := m.LoadAndDelete("key1") + assert.True(t, loaded) + assert.Equal(t, 1, value) + + _, ok := m.Load("key1") + assert.False(t, ok) + + value, loaded = m.LoadAndDelete("key2") + assert.False(t, loaded) + assert.Equal(t, 0, value) +} + +func TestSyncMap_CompareAndDelete(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + deleted := m.CompareAndDelete("key1", 1) + assert.True(t, deleted) + _, ok := m.Load("key1") + assert.False(t, ok) + + m.Store("key2", 2) + deleted = m.CompareAndDelete("key2", 3) + assert.False(t, deleted) + value, ok := m.Load("key2") + assert.True(t, ok) + assert.Equal(t, 2, value) +} + +func TestSyncMap_CompareAndSwap(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + swapped := m.CompareAndSwap("key1", 1, 2) + assert.True(t, swapped) + value, ok := m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 2, value) + + swapped = m.CompareAndSwap("key1", 2, 2) + assert.True(t, swapped) + value, ok = m.Load("key1") + assert.True(t, ok) + assert.Equal(t, 2, value) +} + +func TestSyncMap_Range(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + m.Store("key2", 2) + m.Store("key3", 3) + + count := 0 + m.Range(func(key string, value int) bool { + count++ + assert.Contains(t, []string{"key1", "key2", "key3"}, key) + assert.Contains(t, []int{1, 2, 3}, value) + return true + }) + assert.Equal(t, 3, count) + + count = 0 + m.Range(func(key string, value int) bool { + count++ + return key != "key2" + }) + assert.Equal(t, 2, count) +} + +func TestSyncMap_Iterate(t *testing.T) { + m := SyncMap[string, int]{Map: &sync.Map{}} + m.Store("key1", 1) + m.Store("key2", 2) + m.Store("key3", 3) + + count := 0 + it := m.Iterate() + it(func(key string, value int) bool { + count++ + assert.Contains(t, []string{"key1", "key2", "key3"}, key) + assert.Contains(t, []int{1, 2, 3}, value) + return true + }) + assert.Equal(t, 3, count) + + brokeEarly := false + fmt.Println("test") + it = m.Iterate() + it(func(key string, value int) bool { + count++ + if key == "key2" { + brokeEarly = true + return false + } + return true + }) + assert.True(t, brokeEarly) +} diff --git a/go.mod b/go.mod index db3a7a2..c2d1851 100644 --- a/go.mod +++ b/go.mod @@ -1,27 +1,27 @@ module github.com/rbell/toolchest -go 1.22.1 - -toolchain go1.22.2 +go 1.23.7 require ( - github.com/google/btree v1.1.2 + github.com/google/btree v1.1.3 github.com/google/uuid v1.6.0 - github.com/richardwilkes/toolbox v1.112.0 - github.com/stretchr/testify v1.9.0 - google.golang.org/grpc v1.63.2 - google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002 + github.com/richardwilkes/toolbox v1.122.1 + github.com/stretchr/testify v1.10.0 + google.golang.org/grpc v1.71.0 + google.golang.org/protobuf v1.36.5 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/stretchr/objx v0.5.2 // indirect - golang.org/x/net v0.24.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 57faab6..f07cbee 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,16 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= -github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -19,26 +23,39 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/richardwilkes/toolbox v1.112.0 h1:MOcaE67wJrUn3bIar5a3kiPe7i6HLB6xWq0owgofMws= -github.com/richardwilkes/toolbox v1.112.0/go.mod h1:NkBik7tpAvCuxCipWW19yyURNcrBdddqGRCS+FkFoO0= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/richardwilkes/toolbox v1.122.1 h1:s87vBsQIoOkYjAm8CML7kKLq8q7xosH/Xcg7eQ3nSCU= +github.com/richardwilkes/toolbox v1.122.1/go.mod h1:+1cPcLlDiIkfQkwmAVro69v7liuquC919u9iG1CU9Ls= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= -google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002 h1:V7Da7qt0MkY3noVANIMVBk28nOnijADeOR3i5Hcvpj4= -google.golang.org/protobuf v1.33.1-0.20240408130810-98873a205002/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 h1:iK2jbkWL86DXjEx0qiHcRE9dE4/Ahua5k6V8OWFb//c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/publisher/README.md b/publisher/README.md new file mode 100644 index 0000000..7816f01 --- /dev/null +++ b/publisher/README.md @@ -0,0 +1,122 @@ +# Publication Package + +The `publication` package provides a generic publish-subscribe (pub/sub) mechanism for Go applications. It allows you to create publications to which multiple subscribers can listen. When a message is published, it's distributed to all relevant subscribers. + +## Key Features + +* **Generic:** Supports publishing and subscribing to messages of any type. +* **Filtering:** Subscribers can define filters to receive only messages that meet specific criteria. +* **Buffered Channels:** Subscribers receive messages through buffered channels, allowing for asynchronous message handling. +* **Concurrency-Safe:** Designed for concurrent use, ensuring safe message delivery in multi-threaded environments. +* **Timeout Support:** Control the maximum time spent attempting to deliver a message to a subscriber. +* **Clean Shutdown:** Gracefully close publications and subscriber channels. +* **Unsubscribe:** Subscribers can unsubscribe from a publication. + +## Core Components + +### `Publication[T]` + +The `Publication` struct is the central component of the pub/sub system. It manages the list of subscribers and handles message distribution. + +* **`NewPublicationT *Publication[T]`:** Creates a new publication for messages of type `T`. +* **`Subscribe(buffer int, filter func(T) bool) *Subscriber[T]`:** Adds a new subscriber to the publication. + * `buffer`: The size of the buffer for the subscriber's receive channel. + * `filter`: An optional function that determines whether a subscriber should receive a message. If `nil`, all messages are received. +* **`Publish(message T, timeout *time.Duration)`:** Publishes a message to all subscribers. + * `message`: The message to publish. + * `timeout`: An optional timeout for sending the message to each subscriber. If `nil`, a default timeout of 10 seconds is used. +* **`Close()`:** Closes the publication and all subscriber channels. + +### `Subscriber[T]` + +The `Subscriber` struct represents a single subscriber to a publication. + +* **`Close()`:** Closes the subscriber's receive channel and unsubscribes them from the publication. +* **`Receive() <-chan T`:** Returns the subscriber's receive channel, from which messages can be read. + +## Usage Examples + +### Basic Publication and Subscription +```go +package main + +import ( + "fmt" + "time" + "github.com/rbell/toolchest/publisher" // Replace with your actual import path +) + +func main() { + // Create a new publication for integers. + pub := publisher.NewPublication[int]() + + // Subscribe to the publication with a buffer of 10. + sub := pub.Subscribe(10) + + // Publish some messages. + pub.Publish(1) + pub.Publish(2) + pub.Publish(3) + + // Receive messages from the subscriber. + for i := 0; i < 3; i++ { + select { + case msg := <-sub.Receive(): + fmt.Println("Received:", msg) + case <-time.After(time.Second): + fmt.Println("Timeout waiting for message") + return + } + } + // Close the publication. + pub.Close() +} +``` + +### Filtering Messages +```go +package main +import ( + "fmt" + "time" + "github.com/rbell/toolchest/publisher" // Replace with your actual import path +) + +func main() { + // Create a new publication for integers. + pub := publisher.NewPublication[int]() + + // Subscribe to even numbers only. + evenSub := pub.Subscribe(10, publisher.WithFilter(func(i int) bool { return i%2 == 0 })) + + // Subscribe to odd numbers only. + oddSub := pub.Subscribe(10, publisher.WithFilter(func(i int) bool { return i%2 != 0 })) + + pub.Publish(1) + pub.Publish(2) + pub.Publish(3) + pub.Publish(4) + + // Receive even messages. + for i := 0; i < 2; i++ { + select { + case msg := <-evenSub.Receive(): + fmt.Println("Even Received:", msg) + case <-time.After(time.Second): + fmt.Println("Timeout waiting for even message") + return + } + } + // Receive odd messages. + for i := 0; i < 2; i++ { + select { + case msg := <-oddSub.Receive(): + fmt.Println("Odd Received:", msg) + case <-time.After(time.Second): + fmt.Println("Timeout waiting for odd message") + return + } + } + pub.Close() +} +``` \ No newline at end of file diff --git a/publisher/publication.go b/publisher/publication.go new file mode 100644 index 0000000..1816046 --- /dev/null +++ b/publisher/publication.go @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2025 by Randy Bell. All rights reserved. + * + * This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt. + */ + +package publisher + +import ( + "sync/atomic" + "time" + + "github.com/rbell/toolchest/generic" +) + +// defaultTimeout is the default timeout used when publishing a message. +var defaultTimeout = 10 * time.Second + +// Publication is a generic struct that manages the publication of messages to subscribers. +type Publication[T any] struct { + subscribers *generic.SyncMap[uint64, *Subscriber[T]] + subscriberCount atomic.Uint64 +} + +// Subscriber is a struct that represents a subscriber to a publication. +type Subscriber[T any] struct { + subscriberID uint64 + filter func(T) bool + onFiltered func(T) + receiveCh chan T + publisher *Publication[T] + timeout time.Duration + onTimeout func(T) +} + +type SubscriberOption[T any] func(sub *Subscriber[T]) + +// NewPublication creates a new Publication. +func NewPublication[T any]() *Publication[T] { + return &Publication[T]{ + subscribers: generic.NewSyncMap[uint64, *Subscriber[T]](), + subscriberCount: atomic.Uint64{}, + } +} + +// Subscribe creates a new subscriber to the publication. +// +// buffer: the size of the buffer for the subscriber's receive channel. +// options: optional options for the subscriber. +// +// Returns a reference to the new subscriber. +func (p *Publication[T]) Subscribe(buffer int, opts ...SubscriberOption[T]) *Subscriber[T] { + sub := &Subscriber[T]{ + subscriberID: p.subscriberCount.Add(1), + receiveCh: make(chan T, buffer), + publisher: p, + timeout: defaultTimeout, + } + + for _, opt := range opts { + opt(sub) + } + + p.subscribers.Store(sub.subscriberID, sub) + return sub +} + +// Publish publishes a message to all subscribers. +// +// message: the message to publish. +// timeout: the timeout for sending the message to each subscriber. If nil, the default timeout is used. +func (p *Publication[T]) Publish(message T) { + for _, sub := range p.subscribers.Iterate() { + if sub.filter == nil || sub.filter(message) { + go func() { + select { + case sub.receiveCh <- message: + // continue + case <-time.After(sub.timeout): + // continue + } + }() + } + } +} + +// Close closes the publication and all subscriber channels. +func (p *Publication[T]) Close() { + for _, listener := range p.subscribers.Iterate() { + close(listener.receiveCh) + } + p.subscribers.Clear() +} + +// unsubscribe removes a subscriber from the publication. +func (p *Publication[T]) unsubscribe(subscriberID uint64) { + if s, ok := p.subscribers.Load(subscriberID); ok { + close(s.receiveCh) + p.subscribers.Delete(subscriberID) + } +} + +// Close closes the subscriber's receive channel and unsubscribes them from the publication. +func (s *Subscriber[T]) Close() { + s.publisher.unsubscribe(s.subscriberID) +} + +// Receive returns the subscriber's receive channel. +func (s *Subscriber[T]) Receive() <-chan T { + return s.receiveCh +} + +//region Subscriber Options + +func WithFilter[T any](filter func(T) bool) SubscriberOption[T] { + return func(sub *Subscriber[T]) { + sub.filter = filter + } +} + +func WithTimeout[T any](timeout time.Duration) SubscriberOption[T] { + return func(sub *Subscriber[T]) { + sub.timeout = timeout + } +} + +func OnTimeout[T any](onTimeout func(T)) SubscriberOption[T] { + return func(sub *Subscriber[T]) { + sub.onTimeout = onTimeout + } +} + +func OnFiltered[T any](onFiltered func(T)) SubscriberOption[T] { + return func(sub *Subscriber[T]) { + sub.onFiltered = onFiltered + } +} + +//endregion diff --git a/publisher/publication_test.go b/publisher/publication_test.go new file mode 100644 index 0000000..e80fd81 --- /dev/null +++ b/publisher/publication_test.go @@ -0,0 +1,322 @@ +/* + * Copyright (c) 2025 by Randy Bell. All rights reserved. + * + * This Source Code Form is subject to the terms of the Apache Public License, version 2.0. If a copy of the APL was not distributed with this file, you can obtain one at https://www.apache.org/licenses/LICENSE-2.0.txt. + */ + +package publisher + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewPublication(t *testing.T) { + p := NewPublication[int]() + assert.NotNil(t, p) + assert.NotNil(t, p.subscribers) + assert.Equal(t, uint64(0), p.subscriberCount.Load()) +} + +func TestPublication_Subscribe(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(10) + assert.NotNil(t, sub) + assert.Equal(t, uint64(1), sub.subscriberID) + assert.Nil(t, sub.filter) + assert.NotNil(t, sub.receiveCh) + assert.Equal(t, p, sub.publisher) + + sub2 := p.Subscribe(5, WithFilter(func(i int) bool { return i > 5 })) + assert.NotNil(t, sub2) + assert.Equal(t, uint64(2), sub2.subscriberID) + assert.NotNil(t, sub2.filter) + assert.NotNil(t, sub2.receiveCh) + assert.Equal(t, p, sub2.publisher) +} + +func TestPublication_Publish(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(10) + sub2 := p.Subscribe(5, WithFilter(func(i int) bool { return i > 5 })) + + p.Publish(10) + + assert.Eventually(t, func() bool { + select { + case msg := <-sub.receiveCh: + assert.Equal(t, 10, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + + assert.Eventually(t, func() bool { + select { + case msg := <-sub2.receiveCh: + assert.Equal(t, 10, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + +} + +func TestPublication_Close(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(10) + sub2 := p.Subscribe(5, WithFilter(func(i int) bool { return i > 5 })) + + p.Close() + + _, ok := <-sub.receiveCh + assert.False(t, ok) + _, ok = <-sub2.receiveCh + assert.False(t, ok) + + // verify subscribers are removed + var count int + p.subscribers.Range(func(key uint64, value *Subscriber[int]) bool { + count++ + return true + }) + assert.Equal(t, 0, count) +} + +func TestPublication_unsubscribe(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(10) + sub2 := p.Subscribe(5, WithFilter(func(i int) bool { return i > 5 })) + + p.unsubscribe(sub.subscriberID) + + // verify subscriber is removed + var count int + p.subscribers.Range(func(key uint64, value *Subscriber[int]) bool { + count++ + return true + }) + assert.Equal(t, 1, count) + + // verify channel is closed + _, ok := <-sub.receiveCh + assert.False(t, ok) + + // verify other channel is still open + p.Publish(10) + assert.Eventually(t, func() bool { + select { + case msg := <-sub2.receiveCh: + assert.Equal(t, 10, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) +} + +func TestSubscriber_Close(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(1) + sub2 := p.Subscribe(5, WithFilter(func(i int) bool { return i > 5 })) + + sub.Close() + + // verify subscriber is removed + var count int + p.subscribers.Range(func(key uint64, value *Subscriber[int]) bool { + count++ + return true + }) + assert.Equal(t, 1, count) + + // verify channel is closed + _, ok := <-sub.receiveCh + assert.False(t, ok) + + // verify other channel is still open + p.Publish(10) + assert.Eventually(t, func() bool { + select { + case msg := <-sub2.receiveCh: + assert.Equal(t, 10, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) +} + +func TestPublication_Publish_Concurrent(t *testing.T) { + p := NewPublication[int]() + numSubscribers := 100 + numMessages := 1000 + var wg sync.WaitGroup + wg.Add(numSubscribers) + + for i := 0; i < numSubscribers; i++ { + sub := p.Subscribe(10) + go func() { + defer wg.Done() + for j := 0; j < numMessages; j++ { + select { + case <-sub.Receive(): + // Received message + case <-time.After(time.Second): + assert.Fail(t, "timeout waiting for message") + return + } + } + }() + } + + for i := 0; i < numMessages; i++ { + p.Publish(i) + } + + wg.Wait() + p.Close() +} + +func TestPublication_Subscribe_Concurrent(t *testing.T) { + p := NewPublication[int]() + numSubscribers := 100 + var wg sync.WaitGroup + wg.Add(numSubscribers) + + for i := 0; i < numSubscribers; i++ { + go func() { + defer wg.Done() + p.Subscribe(10) + }() + } + + wg.Wait() + assert.Equal(t, uint64(numSubscribers), p.subscriberCount.Load()) + p.Close() +} + +func TestPublication_Publish_WithFilter(t *testing.T) { + p := NewPublication[int]() + sub1 := p.Subscribe(10, WithFilter(func(i int) bool { return i%2 == 0 })) + sub2 := p.Subscribe(10, WithFilter(func(i int) bool { return i%2 != 0 })) + + p.Publish(2) + p.Publish(3) + + assert.Eventually(t, func() bool { + select { + case msg := <-sub1.Receive(): + assert.Equal(t, 2, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + + select { + case <-sub1.Receive(): + assert.Fail(t, "should not have received message") + default: + // continue + } + + assert.Eventually(t, func() bool { + select { + case msg := <-sub2.Receive(): + assert.Equal(t, 3, msg) + return true + default: + return false + } + }, time.Second, 10*time.Millisecond) + + select { + case <-sub2.Receive(): + assert.Fail(t, "should not have received message") + default: + // continue + } + p.Close() +} + +func TestPublication_Publish_WithFilter_Concurrent(t *testing.T) { + p := NewPublication[int]() + numSubscribers := 100 + numMessages := 1000 + var wg sync.WaitGroup + wg.Add(numSubscribers) + + for i := 0; i < numSubscribers; i++ { + sub := p.Subscribe(10, WithFilter(func(i int) bool { return i%2 == 0 })) + go func() { + defer wg.Done() + for j := 0; j < numMessages; j++ { + select { + case msg := <-sub.Receive(): + assert.Equal(t, 0, msg%2) + case <-time.After(time.Second): + assert.Fail(t, "timeout waiting for message") + return + } + } + }() + } + + for i := 0; i < numMessages; i++ { + p.Publish(i * 2) + } + + wg.Wait() + p.Close() +} + +func TestPublication_Publish_WithFilter_NoMatch(t *testing.T) { + p := NewPublication[int]() + sub := p.Subscribe(10, WithFilter(func(i int) bool { return i > 100 })) + + p.Publish(1) + + select { + case <-sub.Receive(): + assert.Fail(t, "should not have received message") + default: + // continue + } + p.Close() +} + +func TestPublication_Publish_WithFilter_NoMatch_Concurrent(t *testing.T) { + p := NewPublication[int]() + numSubscribers := 100 + numMessages := 1000 + var wg sync.WaitGroup + wg.Add(numSubscribers) + + for i := 0; i < numSubscribers; i++ { + sub := p.Subscribe(10, WithFilter(func(i int) bool { return i < 0 })) + go func() { + defer wg.Done() + for j := 0; j < numMessages; j++ { + select { + case <-sub.Receive(): + assert.Fail(t, "should not have received message") + case <-time.After(time.Millisecond * 10): + // continue + } + } + }() + } + + for i := 0; i < numMessages; i++ { + p.Publish(i) + } + + wg.Wait() + p.Close() +} diff --git a/server/httpMiddleware/logResponse.go b/server/httpMiddleware/logResponse.go index a443f7a..5195786 100644 --- a/server/httpMiddleware/logResponse.go +++ b/server/httpMiddleware/logResponse.go @@ -54,7 +54,7 @@ type ResponseWriterWrapper struct { // NewResponseWriterWrapper static function creates a wrapper for the http.ResponseWriter func newResponseWriterWrapper(w http.ResponseWriter) ResponseWriterWrapper { var buf bytes.Buffer - var statusCode int = 200 + var statusCode = 200 return ResponseWriterWrapper{ w: &w, body: &buf, diff --git a/sliceOps/sliceOps.go b/sliceOps/sliceOps.go index 8e40362..7b1e764 100644 --- a/sliceOps/sliceOps.go +++ b/sliceOps/sliceOps.go @@ -100,7 +100,7 @@ func Intersection[T comparable](slices ...[]T) []T { result := []T{} for _, s := range slices { for _, e := range s { - //nolint:gosimple // This is more readable than the suggested alternative. + //nolint:gosimple,staticcheck // This is more readable than the suggested alternative if _, exists := intersectionMap[e]; exists { intersectionMap[e]++ } else { diff --git a/stacktrace/stackTrace.go b/stacktrace/stackTrace.go index c2006f5..6ab5cd8 100644 --- a/stacktrace/stackTrace.go +++ b/stacktrace/stackTrace.go @@ -176,6 +176,7 @@ func (s *stack) Format(st fmt.State, verb rune) { case st.Flag('+'): for _, pc := range *s { f := frame(pc) + //nolint:errcheck // skip error fmt.Fprintf(st, "\n%+v", f) } } diff --git a/storage/orderedBTree.go b/storage/orderedBTree.go index d230311..f5f429b 100644 --- a/storage/orderedBTree.go +++ b/storage/orderedBTree.go @@ -2,8 +2,9 @@ package storage import ( "cmp" - "github.com/google/btree" "sync" + + "github.com/google/btree" ) type orderedItem[K cmp.Ordered, V any] struct { @@ -34,7 +35,7 @@ func (t *OrderedBTree[K, V]) Set(key K, value *V) { t.writeMux.Lock() defer t.writeMux.Unlock() orderedKey := orderedItem[K, V]{key, value} - t.BTree.ReplaceOrInsert(orderedKey) + t.ReplaceOrInsert(orderedKey) } // Get returns the value of type V for the key of type K. If the key is not found, ok is returned as false. diff --git a/workqueue/examples/dequeueExample/main.go b/workqueue/examples/dequeueExample/main.go index 58ab1be..dedebec 100644 --- a/workqueue/examples/dequeueExample/main.go +++ b/workqueue/examples/dequeueExample/main.go @@ -58,6 +58,7 @@ func main() { fmt.Println("After Queing Temprary Work") printItems(q) + //nolint:errcheck // skip error q.Dequeue(id) fmt.Println("After Dequeue Temprary Work") diff --git a/workqueue/examples/errorMonitoring/main.go b/workqueue/examples/errorMonitoring/main.go index 06aff98..f546432 100644 --- a/workqueue/examples/errorMonitoring/main.go +++ b/workqueue/examples/errorMonitoring/main.go @@ -45,7 +45,7 @@ func main() { time.Sleep(time.Millisecond * 100) // emulate doing some processing // emulate error thrown every 20th index if index%20 == 0 { - return fmt.Errorf("Error on index %v", index) + return fmt.Errorf("error on index %v", index) } else { fmt.Printf("Doing some work! %v\n", index) // emulate logging } diff --git a/workqueue/workHeap_test.go b/workqueue/workHeap_test.go index 9a06608..30eb3e7 100644 --- a/workqueue/workHeap_test.go +++ b/workqueue/workHeap_test.go @@ -112,9 +112,9 @@ func TestWorkHeap_Priority_Pop(t *testing.T) { // assert // Popped results should be in priority order - assert.Equal(t, "work2", result1.(*workItem).QueuedWork.name) - assert.Equal(t, "work3", result2.(*workItem).QueuedWork.name) - assert.Equal(t, "work1", result3.(*workItem).QueuedWork.name) + assert.Equal(t, "work2", result1.(*workItem).name) + assert.Equal(t, "work3", result2.(*workItem).name) + assert.Equal(t, "work1", result3.(*workItem).name) } func TestWorkHeap_AdjustPriorities_ChangesPriorities(t *testing.T) {