-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcustomprocesses.go
More file actions
187 lines (157 loc) · 5.75 KB
/
customprocesses.go
File metadata and controls
187 lines (157 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// Actress Copyright (C) 2024 Bjørn Tore Svinningen
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package actress
import (
"context"
"log"
"log/slog"
"sync"
)
// Holds information about what process functions who belongs to what
// event, and also a map of the started processes.
type customProcesses struct {
procMap map[EventName]*Process
mu sync.Mutex
}
// Add a new Event and it's process to the processes map.
//
// *******************************************************************
// TODO: Consider if we still need this function. It is not in use.
// *******************************************************************
func (p *customProcesses) Add(et EventName, proc *Process) {
// Check if a process for the same event is defined, and if so we
// cancel the current process before we replace it with a new one.
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.procMap[et]; ok {
p.procMap[et].Cancel()
}
p.procMap[et] = proc
}
// Delete an Event and it's process from the processes map.
func (p *customProcesses) Delete(et EventName) {
p.mu.Lock()
defer p.mu.Unlock()
p.procMap[et].Cancel()
delete(p.procMap, et)
log.Printf("deleted process %v\n", et)
}
// Checks if the event is defined in the processes map, and returns true if it is.
func (p *customProcesses) IsEventDefined(ev EventName) bool {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.procMap[ev]; !ok {
return false
}
return true
}
// Prepare and return a new *customProcesses structure.
func newCustomProcesses() *customProcesses {
p := customProcesses{
procMap: make(map[EventName]*Process),
}
return &p
}
// ------------------------------------------------------------------------------
// Events and event functions.
// ------------------------------------------------------------------------------
// Router for custom events.
const ECRouter EventName = "ECRouter"
// Process function for routing and handling events. Will check
// and route the event to the correct process.
func ecRouterFn(ctx context.Context, p *Process) func() {
fn := func() {
for {
select {
case ev := <-p.CustomEventCh:
// If there is a next event defined, we make a copy of all the fields of the current event,
// and put that as the previousEvent on the next event. We can use this information later
// if need to check something in the previous event.
if ev.NextEvent != nil {
// Keep the information about the current event, so we are able to check for things
// like ackTimeout and what node to reply back to if ack should be given.
ev.NextEvent.PreviousEvent = CopyEventFields(&ev)
}
// Custom processes can take a little longer to start up and be
// registered in the map. We check here if process is registred,
// and if it is not we retry.
// The checking is done in a go routine so the router don't block
// here waiting, and we continue with the next event in the queue.
p.CustomProcesses.mu.Lock()
_, ok := p.CustomProcesses.procMap[ev.Name]
p.CustomProcesses.mu.Unlock()
if !ok {
go func(ev Event) {
// Try to 3 times to deliver the message.
for i := 0; i < 3; i++ {
p.CustomProcesses.mu.Lock()
_, ok := p.CustomProcesses.procMap[ev.Name]
p.CustomProcesses.mu.Unlock()
if !ok {
slog.Error("ecRouterFn", "on", p.Config.NodeName, "found no process registered for the event type", ev.Name, "ev.DstNode", ev.DstNode)
continue
}
// Process is now registred, so we can safely put
//the event on the InCh of the process.
p.CustomProcesses.mu.Lock()
p.CustomProcesses.procMap[ev.Name].InCh <- ev
p.CustomProcesses.mu.Unlock()
return
}
}(ev)
// The above go routine will wait, and check if the process becomes
// available, and send the event if it the process eventually found,
// so we can continue with the next event in the queue.
continue
}
// Process was registered. Deliver the event to the process InCh.
p.CustomProcesses.mu.Lock()
inCh := p.CustomProcesses.procMap[ev.Name].InCh
p.CustomProcesses.mu.Unlock()
slog.Debug("ecRouterFn", "on", p.Config.NodeName, "Routing event", p.Event, "node", p.Config.NodeName, "name", ev.Name, "Inch", inCh)
inCh <- ev
case <-p.Ctx.Done():
slog.Debug("ecRouterFn", "got ctx.Done, on", p.Config.NodeName)
return
}
}
}
return fn
}
// Primarily used for testing to check that the ECRouter properly routes events, and that
// custom processes start up correctly.
const ECGeneralDelivery EventName = "ECGeneralDelivery"
// Primarily used for testing to check that the ECRouter properly routes events, and that
// custom processes start up correctly.
func ecGeneralDeliveryFn(ctx context.Context, p *Process) func() {
fn := func() {
p.SignalReady()
for {
select {
case ev := <-p.InCh:
// Primarily used for tests. Will just forward the event data to defined NextEvent.
if ev.NextEvent != nil {
nextEv := ev.NextEvent
nextEv.Data = ev.Data
p.AddEvent(*nextEv)
}
case <-p.Ctx.Done():
slog.Info("ecGeneralDeliveryFn", "got ctx.Done, on", p.Config.NodeName)
return
}
}
}
return fn
}