Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions go.mod

This file was deleted.

92 changes: 0 additions & 92 deletions go.sum

This file was deleted.

92 changes: 72 additions & 20 deletions stathat/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"encoding/json"
"errors"
"io/ioutil"
"io"
"math"
"net/http"
"time"
Expand All @@ -24,6 +24,40 @@ var (
ErrNaN = errors.New("value is not a number (NaN)")
)

// Option configures a Batcher.
type Option func(*Batcher)

// WithHTTPClient sets a custom HTTP client for the Batcher.
// If not provided, http.DefaultClient is used.
func WithHTTPClient(c *http.Client) Option {
return func(b *Batcher) {
if c != nil {
b.client = c
}
}
}

// WithRetries sets the number of retries for failed requests.
// A value of 0 means no retries (single attempt).
// If not provided, defaults to 1 retry (2 total attempts).
func WithRetries(n int) Option {
return func(b *Batcher) {
if n >= 0 {
b.retries = n
}
}
}

// WithEZKey sets the StatHat EZ key for authentication.
// This allows overriding the key passed to NewBatcher (e.g., from SecretsManager).
func WithEZKey(key string) Option {
return func(b *Batcher) {
if key != "" {
b.EZKey = key
}
}
}

type Stat struct {
Stat string `json:"stat"`
Count *float64 `json:"count,omitempty"`
Expand All @@ -38,20 +72,35 @@ type BulkStat struct {

type Batcher struct {
Stats chan Stat
stop chan interface{}
stop chan any
EZKey string
flushInterval time.Duration
client *http.Client
retries int
}

func NewBatcher(ezKey string, d time.Duration) (Batcher, error) {
func NewBatcher(ezKey string, d time.Duration, opts ...Option) (Batcher, error) {
if d < 0 {
return Batcher{}, ErrInvalidFlushInterval
}

c := make(chan Stat, 10000)
st := make(chan interface{})
st := make(chan any)

b := Batcher{
EZKey: ezKey,
flushInterval: d,
stop: st,
Stats: c,
client: http.DefaultClient,
retries: 1,
}

return Batcher{EZKey: ezKey, flushInterval: d, stop: st, Stats: c}, nil
for _, opt := range opts {
opt(&b)
}

return b, nil
}

// PostEZCount enqueues a given integer value to be added to a StatHat counter stat. 0 values will
Expand All @@ -71,7 +120,7 @@ func (b Batcher) PostEZCount(statName string, count int) error {
return b.PostEZCountTime(statName, count, time.Now().Unix())
}

// PostEZCount enqueues a given integer value to be added to a StatHat counter stat with a specific
// PostEZCountTime enqueues a given integer value to be added to a StatHat counter stat with a specific
// timestamp. 0 values will be dropped.
func (b Batcher) PostEZCountTime(statName string, count int, timestamp int64) error {
// If a caller is sending a 0, they probably intend for it to be a no-op — to essentially
Expand Down Expand Up @@ -167,15 +216,15 @@ func (b Batcher) flush(stats []*Stat) {

req.Header.Add("Content-Type", "application/json")

send(req, 2)
b.send(req)
}
}

func chunks(stats []*Stat) chan []*Stat {
c := make(chan []*Stat)
go func() {
chunks := len(stats) / 1000
for i := 0; i <= chunks; i++ {
cnks := len(stats) / 1000
for i := 0; i <= cnks; i++ {

start := i * 1000
end := start + 1000
Expand All @@ -193,18 +242,21 @@ func chunks(stats []*Stat) chan []*Stat {
return c
}

func send(req *http.Request, retries int) {
for i := 0; i < retries; i++ {
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Warn(logID, "error posting data to stathat", "error", err.Error())
continue
func (b Batcher) send(req *http.Request) {
var attempt int
for {
resp, err := b.client.Do(req)
if err == nil {
body, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
log.Debug(logID, "Flushed", "status", resp.Status, "resp", string(body))
return
}

b, _ := ioutil.ReadAll(resp.Body)
defer resp.Body.Close()

log.Debug(logID, "Flushed", "status", resp.Status, "resp", string(b))
break
log.Warn(logID, "error posting data to stathat", "error", err.Error())
attempt++
if attempt > b.retries {
return
}
}
}
43 changes: 40 additions & 3 deletions stathat/batcher_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package stathat_test

import (
"bytes"
"encoding/json"
"io/ioutil"
"io"
"math"
"net/http"
"net/http/httptest"
"sync/atomic"
"time"

. "github.com/onsi/ginkgo"
Expand All @@ -14,6 +16,13 @@ import (
"github.com/timehop/batchhat/stathat"
)

// RoundTripFunc allows creating http.RoundTripper from a function
type RoundTripFunc func(req *http.Request) (*http.Response, error)

func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
return f(req)
}

var _ = Describe("Batcher", func() {
Describe(".NewBatcher", func() {
Describe("with valid parameters", func() {
Expand All @@ -31,6 +40,34 @@ var _ = Describe("Batcher", func() {
Expect(err).To(Equal(stathat.ErrInvalidFlushInterval))
})
})

Describe("with custom HTTP client", func() {
It("should use the provided client", func() {
var customClientUsed atomic.Bool
customClient := &http.Client{
Transport: RoundTripFunc(func(req *http.Request) (*http.Response, error) {
customClientUsed.Store(true)
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewBufferString(`{"status":200,"msg":"ok"}`)),
}, nil
}),
}

b, err := stathat.NewBatcher("ezkey", 10*time.Millisecond, stathat.WithHTTPClient(customClient))
Expect(err).To(BeNil())

go b.Start()
defer b.Stop()

stathat.APIURL = "http://test.local/ez"
b.PostEZCount("test", 1)

Eventually(func() bool {
return customClientUsed.Load()
}).Should(BeTrue())
})
})
})

AssertEZCall := func(action func(b stathat.Batcher), verify func(stats []*stathat.Stat)) {
Expand All @@ -52,7 +89,7 @@ var _ = Describe("Batcher", func() {
It("should send the stat", func() {
done := make(chan struct{})
ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bts, err := ioutil.ReadAll(r.Body)
bts, err := io.ReadAll(r.Body)
defer r.Body.Close()

Expect(err).To(BeNil())
Expand Down Expand Up @@ -332,7 +369,7 @@ var _ = Describe("Batcher", func() {
done := make(chan struct{})

ts = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
bts, err := ioutil.ReadAll(r.Body)
bts, err := io.ReadAll(r.Body)
defer r.Body.Close()

Expect(err).To(BeNil())
Expand Down
2 changes: 1 addition & 1 deletion stathat/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.0
github.com/timehop/golog v0.0.0-20171219220921-cbf2afa06faa
github.com/timehop/golog v1.0.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions stathat/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ github.com/onsi/gomega v1.27.0/go.mod h1:i189pavgK95OSIipFBa74gC2V4qrQuvjuyGEr3G
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/timehop/golog v0.0.0-20171219220921-cbf2afa06faa h1:XmauaQKjqFdruqQn3i1ZLROecSxcTncTEnweyZjMNAw=
github.com/timehop/golog v0.0.0-20171219220921-cbf2afa06faa/go.mod h1:5snQTM41D/oFFJvpy4wLBsYt9HgGJ8bL8F8dgmtt13E=
github.com/timehop/golog v1.0.0 h1:EU4AUqPGyQ7anhplWto/k48/Uv0d1As90NgD/OGlS5M=
github.com/timehop/golog v1.0.0/go.mod h1:5snQTM41D/oFFJvpy4wLBsYt9HgGJ8bL8F8dgmtt13E=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand Down