-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsupervisorprocesses.go
More file actions
174 lines (131 loc) · 4.88 KB
/
supervisorprocesses.go
File metadata and controls
174 lines (131 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package actress
import (
"context"
"log/slog"
"sync"
"github.com/fxamacker/cbor/v2"
)
type supervisorProcesses struct {
mu sync.Mutex
procMap map[EventName]*Process
}
// Prepare and return a new *processes structure.
func newsuperVisorProcesses() *supervisorProcesses {
p := supervisorProcesses{
procMap: make(map[EventName]*Process),
}
return &p
}
// ------------------------------------------------------------------------------
// Events and event functions, ESRouter
// ------------------------------------------------------------------------------
// Router for supervisor events.
const ESRouter EventName = "ESRouter"
// Process function for routing and handling supervisor events. Will check
// and route the event to the correct process.
func esRouterFn(ctx context.Context, p *Process) func() {
fn := func() {
for {
select {
case ev := <-p.SupervisorEventCh:
// If there is a next event defined, we make a copy of all the fields of the current event,
// and put that as the previousEvent on the next event. We can use this information later
// if need to check something in the previous event.
if ev.NextEvent != nil {
// Keep the information about the current event, so we are able to check for things
// like ackTimeout and what node to reply back to if ack should be given.
ev.NextEvent.PreviousEvent = CopyEventFields(&ev)
}
// Check if process is registred and valid.
p.supervisorProcesses.mu.Lock()
_, ok := p.supervisorProcesses.procMap[ev.Name]
p.supervisorProcesses.mu.Unlock()
if !ok {
slog.Error("esRouterFn", "on", p.Config.NodeName, "found no process registered for the event type", ev.Name)
}
// // Process was registered. Deliver the event to the process InCh.
p.supervisorProcesses.mu.Lock()
inCh := p.supervisorProcesses.procMap[ev.Name].InCh
p.supervisorProcesses.mu.Unlock()
slog.Debug("esRouterFn", "on", p.Config.NodeName, "Routing event", p.Event, "node", p.Config.NodeName, "name", ev.Name, "Inch", inCh)
inCh <- ev
case <-p.Ctx.Done():
slog.Debug("esRouterFn", "got ctx.Done, on", p.Config.NodeName)
return
}
}
}
return fn
}
// ------------------------------------------------------------------------------
// Events and event functions, Process handling
// ------------------------------------------------------------------------------
// Handles information about the currently running processes in the local Actress system.
const ESProcesses EventName = "ESProcesses"
// Will instruct to get all information about all processes.
const InstructionESProcessesAdd Instruction = "InstructionESProcessesAdd"
const InstructionESProcessesDelete Instruction = "InstructionESProcessesDelete"
const InstructionESProcessesGetAll Instruction = "InstructionESProcessesGetAll"
type esProcessesMapDataIn struct {
Name EventName
}
type ESProcessesMap map[EventName]string
// ETFunc for handling information about the currently running processes in the local Actress system.
func esProcessesFn() ETFunc {
ETfn := func(ctx context.Context, p *Process) func() {
fn := func() {
// The map of all the running processes.
processMap := make(ESProcessesMap)
for {
p.SignalReady()
select {
case ev := <-p.InCh:
switch ev.Instruction {
// Add The received data about a process to the map.
case InstructionESProcessesAdd:
md := esProcessesMapDataIn{}
err := cbor.Unmarshal(ev.Data, &md)
if err != nil {
slog.Error("esProcessesFn", "failed to unmarshal esProcesses map in data", err)
}
processMap[md.Name] = string(md.Name)
slog.Debug("esProcessesFn", "on", p.Config.NodeName, "processesMap", processMap)
// Nothing to output are produced so we just add for the .NextEvent if defined.
if ev.NextEvent != nil {
p.AddEvent(*ev.NextEvent)
}
case InstructionESProcessesDelete:
md := esProcessesMapDataIn{}
err := cbor.Unmarshal(ev.Data, &md)
if err != nil {
slog.Error("esProcessesFn", "failed to unmarshal esProcesses map in data", err)
}
delete(processMap, p.Event)
// Nothing to output are produced so we just add for the .NextEvent if defined.
if ev.NextEvent != nil {
p.AddEvent(*ev.NextEvent)
}
// Dump the content of the whole processes map, and send it with .NextEvent.
case InstructionESProcessesGetAll:
b, err := cbor.Marshal(processMap)
if err != nil {
slog.Error("esProcessesFn", "failed to marshal esProcesses for push all", err)
}
nEv := ev.NextEvent
nEv.Data = b
if ev.NextEvent != nil {
p.AddEvent(*nEv)
}
default:
slog.Error("esProcessesFn", "not a defined instruction", ev.Instruction)
}
case <-p.Ctx.Done():
slog.Debug("esProcessesFn", "got ctx.Done, on", p.Config.NodeName)
return
}
}
}
return fn
}
return ETfn
}