diff --git a/publisher/publication.go b/publisher/publication.go index 1816046..ff3e9ca 100644 --- a/publisher/publication.go +++ b/publisher/publication.go @@ -72,14 +72,18 @@ func (p *Publication[T]) Subscribe(buffer int, opts ...SubscriberOption[T]) *Sub func (p *Publication[T]) Publish(message T) { for _, sub := range p.subscribers.Iterate() { if sub.filter == nil || sub.filter(message) { - go func() { + go func(sub *Subscriber[T]) { select { case sub.receiveCh <- message: // continue case <-time.After(sub.timeout): - // continue + if sub.onTimeout != nil { + sub.onTimeout(message) + } } - }() + }(sub) + } else if sub.filter != nil && sub.onFiltered != nil { + sub.onFiltered(message) } } }