-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhandler.go
More file actions
104 lines (94 loc) · 2.8 KB
/
handler.go
File metadata and controls
104 lines (94 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package events
import (
"context"
"fmt"
"reflect"
"sync/atomic"
)
var (
contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
errorType = reflect.TypeOf((*error)(nil)).Elem()
)
type registeredHandler struct {
id uint64
topic string
eventType reflect.Type
fn reflect.Value
expectsCtx bool
returnsError bool
}
var nextHandlerID atomic.Uint64
func newRegisteredHandler(handler any) (registeredHandler, error) {
if handler == nil {
return registeredHandler{}, ErrInvalidHandler
}
fn := reflect.ValueOf(handler)
typ := fn.Type()
if typ.Kind() != reflect.Func {
return registeredHandler{}, fmt.Errorf("%w: handler must be a function", ErrInvalidHandler)
}
if typ.NumIn() < 1 || typ.NumIn() > 2 {
return registeredHandler{}, fmt.Errorf("%w: handler must accept 1 or 2 arguments", ErrInvalidHandler)
}
if typ.NumOut() > 1 {
return registeredHandler{}, fmt.Errorf("%w: handler must return zero or one value", ErrInvalidHandler)
}
h := registeredHandler{id: nextHandlerID.Add(1), fn: fn}
eventIndex := 0
if typ.NumIn() == 2 {
if !typ.In(0).Implements(contextType) {
return registeredHandler{}, fmt.Errorf("%w: first argument must implement context.Context", ErrInvalidHandler)
}
h.expectsCtx = true
eventIndex = 1
}
h.eventType = typ.In(eventIndex)
if h.eventType.Kind() == reflect.Interface {
return registeredHandler{}, fmt.Errorf("%w: event argument must be a concrete type", ErrInvalidHandler)
}
if typ.NumOut() == 1 {
if !typ.Out(0).Implements(errorType) {
return registeredHandler{}, fmt.Errorf("%w: return value must be error", ErrInvalidHandler)
}
h.returnsError = true
}
sample := sampleEventValue(h.eventType)
topic, _, err := resolveTopic(sample.Interface())
if err != nil {
return registeredHandler{}, fmt.Errorf("%w: resolve handler topic: %v", ErrInvalidHandler, err)
}
h.topic = topic
return h, nil
}
func (h registeredHandler) invoke(ctx context.Context, value reflect.Value) error {
args := make([]reflect.Value, 0, 2)
if h.expectsCtx {
if ctx == nil {
ctx = context.Background()
}
args = append(args, reflect.ValueOf(ctx))
}
args = append(args, value)
out := h.fn.Call(args)
if !h.returnsError || len(out) == 0 || out[0].IsNil() {
return nil
}
return out[0].Interface().(error)
}
func (h registeredHandler) decodePayload(codec Codec, payload []byte) (reflect.Value, error) {
target := h.eventType
decodeTarget := reflect.New(indirectType(target))
if err := codec.Unmarshal(payload, decodeTarget.Interface()); err != nil {
return reflect.Value{}, err
}
if target.Kind() == reflect.Pointer {
return decodeTarget, nil
}
return decodeTarget.Elem(), nil
}
func sampleEventValue(typ reflect.Type) reflect.Value {
if typ.Kind() == reflect.Pointer {
return reflect.New(indirectType(typ))
}
return reflect.Zero(typ)
}