-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbroadcast_test.go
More file actions
129 lines (123 loc) · 2.74 KB
/
broadcast_test.go
File metadata and controls
129 lines (123 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package broadcast
import (
"context"
"sync"
"testing"
"time"
)
// TestBroadCaster_Back ensures that a message will not be received by the sender itself.
func TestBroadCaster_Back(t *testing.T) {
const N = 10
broadcaster := New()
wg := sync.WaitGroup{}
wg.Add(N)
for i := 0; i < N; i++ {
go func(i int) {
defer wg.Done()
sender, receiver := broadcaster.Join(time.Second)
go func() {
sender <- i
close(sender)
}()
for m := range receiver {
if m.(int) == i {
t.Errorf("receiver %d: received a same number %v", i, m)
return
}
t.Logf("receiver %d: %v", i, m)
}
}(i)
}
wg.Wait()
}
// TestBroadCaster_Duplicate ensures that a receiver will not receive a message twice.
func TestBroadCaster_Duplicate(t *testing.T) {
const N = 10
broadcaster := New()
wg := sync.WaitGroup{}
wg.Add(N)
for i := 0; i < N; i++ {
sender, receiver := broadcaster.Join(time.Second)
go func(i int) {
defer wg.Done()
received := make(map[interface{}]struct{})
for m := range receiver {
if _, ok := received[m]; ok {
t.Errorf("receiver %d: received %v twice", i, m)
return
}
received[m] = struct{}{}
t.Logf("receiver %d: %v", i, m)
if len(received) == N {
close(sender)
}
}
}(i)
}
sender, _ := broadcaster.Join(time.Second)
for i := 0; i < N; i++ {
sender <- i
}
close(sender)
wg.Wait()
}
// TestBroadCaster_Timeout ensures the correctness of package in some edge cases.
func TestBroadCaster_Timeout(t *testing.T) {
const N = 10000
for _, timeout := range [...]time.Duration{-1, 0, 1} {
broadcaster := New()
sender, _ := broadcaster.Join(timeout)
_, receiver := broadcaster.Join(timeout)
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
defer cancel()
for i := 0; i < N; i++ {
select {
case <-ctx.Done():
return
case sender <- struct{}{}:
}
}
}()
go func() {
defer wg.Done()
defer cancel()
for i := 0; i < N; i++ {
select {
case <-ctx.Done():
return
case <-receiver:
}
}
}()
wg.Wait()
}
}
func TestBroadCaster_Block(t *testing.T) {
broadcaster := New()
sender, _ := broadcaster.Join(time.Second)
_, _ = broadcaster.Join(time.Second)
startTime := time.Now()
sender <- struct{}{}
sender <- struct{}{}
if duration := time.Since(startTime); duration < time.Second {
t.Errorf("the operation should be blocked at least 1 second, but we got %v", duration)
}
}
func BenchmarkBroadCaster(b *testing.B) {
broadcaster := New()
sender, _ := broadcaster.Join(time.Second)
for i := 0; i < 1024; i++ {
go func() {
_, receiver := broadcaster.Join(time.Second)
for range receiver {
}
}()
}
for i := 0; i < b.N; i++ {
sender <- i
}
}