From 1d286b880c76c9d84cfc1a641012b2cb97a8bf9f Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Mon, 25 Jul 2022 18:55:02 +0200 Subject: [PATCH 01/22] feat: naive websocket server --- cmd/connect/main.go | 139 ++++++++++++++++++++++++++++++++++++++++++++ go.mod | 5 +- go.sum | 2 + 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 cmd/connect/main.go diff --git a/cmd/connect/main.go b/cmd/connect/main.go new file mode 100644 index 0000000..f2ca3fb --- /dev/null +++ b/cmd/connect/main.go @@ -0,0 +1,139 @@ +package main + +import ( + "fmt" + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +const ( + port = 8080 + token = "6db67fafc4f5bf965a5a" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return r.URL.Query().Get("access_token") == token + }, +} + +func main() { + addr := fmt.Sprintf("localhost:%d", port) + + fmt.Printf("listening on http://%s\n", addr) + + http.HandleFunc("/run", run) + http.HandleFunc("/status", status) + + log.Fatal(http.ListenAndServe(addr, nil)) +} + +func run(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + log.Println("connected /run") + + defer ws.Close() + + dr := dummyRun{} + + for { + m, err := readTextMessage(ws) + if err != nil { + log.Println("read error:", err) + break + } + + switch string(m) { + case "run": + if dr.started { + log.Println("already started") + + err = ws.WriteMessage(websocket.TextMessage, []byte("already started")) + if err != nil { + log.Println("cannot write message:", err) + break + } + } else { + log.Println("starting run") + + dr.run() + + err = ws.WriteMessage(websocket.TextMessage, []byte("ack")) + if err != nil { + log.Println("cannot write message:", err) + break + } + } + + default: + log.Printf("<- %s", m) + + err = ws.WriteMessage(websocket.TextMessage, m) + if err != nil { + log.Println("cannot write message:", err) + break + } + } + } +} + +func readTextMessage(ws *websocket.Conn) ([]byte, error) { + messageType, m, err := ws.ReadMessage() + if err != nil { + return nil, fmt.Errorf("cannot read message as text: %s", err) + } + + if messageType != websocket.TextMessage { + return nil, fmt.Errorf("cannot read message as text: message type is not text") + } + + return m, nil +} + +type dummyRun struct { + started bool +} + +func (d *dummyRun) run() { + d.started = true +} + +func status(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + log.Println("connected /status") + + defer ws.Close() + + ds := dummyStatus{} + + for range time.Tick(time.Second * 1) { + ds.inc() + data := []byte(fmt.Sprint(ds.c)) + + err = ws.WriteMessage(websocket.TextMessage, data) + if err != nil { + log.Println("cannot write message:", err) + break + } + log.Printf("-> %s", data) + } +} + +type dummyStatus struct { + c int +} + +func (d *dummyStatus) inc() { + d.c++ +} diff --git a/go.mod b/go.mod index 4de6a8f..30119a4 100644 --- a/go.mod +++ b/go.mod @@ -8,4 +8,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 ) -require github.com/drykit-go/cond v0.1.0 // indirect +require ( + github.com/drykit-go/cond v0.1.0 // indirect + github.com/gorilla/websocket v1.5.0 +) diff --git a/go.sum b/go.sum index 12452b0..9e7eaf9 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/drykit-go/strcase v0.2.0/go.mod h1:cWK0/az2f09UPIbJ42Sb8Iqdv01uENrFX+ github.com/drykit-go/testx v0.1.0/go.mod h1:qGXb49a8CzQ82crBeCVW8R3kGU1KRgWHnI+Q6CNVbz8= github.com/drykit-go/testx v1.2.0 h1:UsH+tFd24z3Xu+mwvwPY+9eBEg9CUyMsUeMYyUprG0o= github.com/drykit-go/testx v1.2.0/go.mod h1:qTzXJgnAg8n31woklBzNTaWzLMJrnFk93x/aeaIpc20= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= From afeffb943413ae534bb037a5dd24ac9ea9dba50a Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 26 Jul 2022 19:41:22 +0200 Subject: [PATCH 02/22] feat: single socket with dynamic event handling --- cmd/connect/main.go | 168 ++++++++++++++++++++++++++++---------------- 1 file changed, 106 insertions(+), 62 deletions(-) diff --git a/cmd/connect/main.go b/cmd/connect/main.go index f2ca3fb..8b03d64 100644 --- a/cmd/connect/main.go +++ b/cmd/connect/main.go @@ -25,23 +25,70 @@ func main() { fmt.Printf("listening on http://%s\n", addr) - http.HandleFunc("/run", run) - http.HandleFunc("/status", status) + http.HandleFunc("/run", handleRun) log.Fatal(http.ListenAndServe(addr, nil)) } -func run(w http.ResponseWriter, r *http.Request) { +type Runner struct { + running bool + progress progress + + isPollingProgress bool + abortPollingProgress chan struct{} +} + +func (r *Runner) run() string { + if !r.running { + r.running = true + r.progress = progress{} + + // Prepare abort signal. + r.abortPollingProgress = make(chan struct{}, 1) + + return "running" + } + + return "error: already running" +} + +func (r *Runner) stop() string { + if !r.running { + return "error: not running" + } + + r.running = false + r.abortPollingProgress <- struct{}{} + r.isPollingProgress = false + r.progress = progress{} + + return "stopped" +} + +type progress struct { + value int +} + +func (p *progress) next() int { + // Go no further than 100. + if p.value <= 100 { + p.value++ + } + + return p.value +} + +func handleRun(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return } - log.Println("connected /run") + log.Printf("websocket connected with client %s", r.Host) defer ws.Close() - dr := dummyRun{} + runner := Runner{} for { m, err := readTextMessage(ws) @@ -50,36 +97,62 @@ func run(w http.ResponseWriter, r *http.Request) { break } + log.Printf("<- %s", m) + switch string(m) { case "run": - if dr.started { - log.Println("already started") - - err = ws.WriteMessage(websocket.TextMessage, []byte("already started")) - if err != nil { - log.Println("cannot write message:", err) - break - } - } else { - log.Println("starting run") - - dr.run() - - err = ws.WriteMessage(websocket.TextMessage, []byte("ack")) - if err != nil { - log.Println("cannot write message:", err) - break - } + status := runner.run() + + err = writeTextMessage(ws, status) + if err != nil { + break } - default: - log.Printf("<- %s", m) + case "stop": + status := runner.stop() + + err = writeTextMessage(ws, status) + if err != nil { + break + } - err = ws.WriteMessage(websocket.TextMessage, m) + case "pull": + err = writeTextMessage(ws, "not implemented") if err != nil { - log.Println("cannot write message:", err) break } + log.Println("^^ not implemented", m) + default: + log.Println("^^ not implemented", m) + } + + if runner.running && !runner.isPollingProgress { + go runner.pollProgress(ws) + } + } +} + +func (r *Runner) pollProgress(ws *websocket.Conn) { + r.isPollingProgress = true + + for range time.Tick(time.Millisecond * 500) { + select { + case <-r.abortPollingProgress: + return + default: + } + + if r.progress.value == 100 { + r.isPollingProgress = false + return + } + + val := r.progress.next() + percent := fmt.Sprintf("%d%%", val) + + err := writeTextMessage(ws, percent) + if err != nil { + break } } } @@ -97,43 +170,14 @@ func readTextMessage(ws *websocket.Conn) ([]byte, error) { return m, nil } -type dummyRun struct { - started bool -} - -func (d *dummyRun) run() { - d.started = true -} - -func status(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) +func writeTextMessage(ws *websocket.Conn, m string) error { + err := ws.WriteMessage(websocket.TextMessage, []byte(m)) if err != nil { - log.Println(err) - return + log.Println("cannot write message:", err) + return err } - log.Println("connected /status") - - defer ws.Close() - - ds := dummyStatus{} - - for range time.Tick(time.Second * 1) { - ds.inc() - data := []byte(fmt.Sprint(ds.c)) - err = ws.WriteMessage(websocket.TextMessage, data) - if err != nil { - log.Println("cannot write message:", err) - break - } - log.Printf("-> %s", data) - } -} - -type dummyStatus struct { - c int -} + log.Printf("-> %s", m) -func (d *dummyStatus) inc() { - d.c++ + return nil } From 2a640fef6f5cc318b246b11e04a9639363aa8c5a Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 26 Jul 2022 19:55:20 +0200 Subject: [PATCH 03/22] chore: rename command to ws (work in progress) --- cmd/{connect => ws}/main.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cmd/{connect => ws}/main.go (100%) diff --git a/cmd/connect/main.go b/cmd/ws/main.go similarity index 100% rename from cmd/connect/main.go rename to cmd/ws/main.go From f23c073a034694b93ec01ade0293a471a1cbb979 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 00:29:26 +0200 Subject: [PATCH 04/22] feat: use websocket to access runner api --- cmd/server/main.go | 10 +-- cmd/ws/main.go | 183 --------------------------------------------- server/handle.go | 66 ++++++++++++++++ server/helper.go | 14 ++++ server/io.go | 39 ++++++++++ server/run.go | 82 +++++++++++++------- server/server.go | 94 ----------------------- server/state.go | 24 ------ server/stop.go | 17 ----- server/upgrade.go | 15 ++++ 10 files changed, 192 insertions(+), 352 deletions(-) delete mode 100644 cmd/ws/main.go create mode 100644 server/handle.go create mode 100644 server/helper.go create mode 100644 server/io.go delete mode 100644 server/server.go delete mode 100644 server/state.go delete mode 100644 server/stop.go create mode 100644 server/upgrade.go diff --git a/cmd/server/main.go b/cmd/server/main.go index c4f915e..3524076 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,6 +2,8 @@ package main import ( "fmt" + "log" + "net/http" "github.com/benchttp/engine/server" ) @@ -9,13 +11,7 @@ import ( const port = "8080" func main() { - if err := run(); err != nil { - fmt.Println(err) - } -} - -func run() error { addr := ":" + port fmt.Println("http://localhost" + addr) - return server.ListenAndServe(addr) + log.Fatal(http.ListenAndServe(addr, &server.Handler{})) } diff --git a/cmd/ws/main.go b/cmd/ws/main.go deleted file mode 100644 index 8b03d64..0000000 --- a/cmd/ws/main.go +++ /dev/null @@ -1,183 +0,0 @@ -package main - -import ( - "fmt" - "log" - "net/http" - "time" - - "github.com/gorilla/websocket" -) - -const ( - port = 8080 - token = "6db67fafc4f5bf965a5a" -) - -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return r.URL.Query().Get("access_token") == token - }, -} - -func main() { - addr := fmt.Sprintf("localhost:%d", port) - - fmt.Printf("listening on http://%s\n", addr) - - http.HandleFunc("/run", handleRun) - - log.Fatal(http.ListenAndServe(addr, nil)) -} - -type Runner struct { - running bool - progress progress - - isPollingProgress bool - abortPollingProgress chan struct{} -} - -func (r *Runner) run() string { - if !r.running { - r.running = true - r.progress = progress{} - - // Prepare abort signal. - r.abortPollingProgress = make(chan struct{}, 1) - - return "running" - } - - return "error: already running" -} - -func (r *Runner) stop() string { - if !r.running { - return "error: not running" - } - - r.running = false - r.abortPollingProgress <- struct{}{} - r.isPollingProgress = false - r.progress = progress{} - - return "stopped" -} - -type progress struct { - value int -} - -func (p *progress) next() int { - // Go no further than 100. - if p.value <= 100 { - p.value++ - } - - return p.value -} - -func handleRun(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - log.Printf("websocket connected with client %s", r.Host) - - defer ws.Close() - - runner := Runner{} - - for { - m, err := readTextMessage(ws) - if err != nil { - log.Println("read error:", err) - break - } - - log.Printf("<- %s", m) - - switch string(m) { - case "run": - status := runner.run() - - err = writeTextMessage(ws, status) - if err != nil { - break - } - - case "stop": - status := runner.stop() - - err = writeTextMessage(ws, status) - if err != nil { - break - } - - case "pull": - err = writeTextMessage(ws, "not implemented") - if err != nil { - break - } - log.Println("^^ not implemented", m) - default: - log.Println("^^ not implemented", m) - } - - if runner.running && !runner.isPollingProgress { - go runner.pollProgress(ws) - } - } -} - -func (r *Runner) pollProgress(ws *websocket.Conn) { - r.isPollingProgress = true - - for range time.Tick(time.Millisecond * 500) { - select { - case <-r.abortPollingProgress: - return - default: - } - - if r.progress.value == 100 { - r.isPollingProgress = false - return - } - - val := r.progress.next() - percent := fmt.Sprintf("%d%%", val) - - err := writeTextMessage(ws, percent) - if err != nil { - break - } - } -} - -func readTextMessage(ws *websocket.Conn) ([]byte, error) { - messageType, m, err := ws.ReadMessage() - if err != nil { - return nil, fmt.Errorf("cannot read message as text: %s", err) - } - - if messageType != websocket.TextMessage { - return nil, fmt.Errorf("cannot read message as text: message type is not text") - } - - return m, nil -} - -func writeTextMessage(ws *websocket.Conn, m string) error { - err := ws.WriteMessage(websocket.TextMessage, []byte(m)) - if err != nil { - log.Println("cannot write message:", err) - return err - } - - log.Printf("-> %s", m) - - return nil -} diff --git a/server/handle.go b/server/handle.go new file mode 100644 index 0000000..22b172c --- /dev/null +++ b/server/handle.go @@ -0,0 +1,66 @@ +package server + +import ( + "fmt" + "log" + "net/http" +) + +type Handler struct{} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/run": + handle(w, r) + default: + http.NotFound(w, r) + } +} + +func handle(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return // Connection is dead. + } + + defer ws.Close() + + log.Println("websocket connected with client") + + run := run{} + + for { + m, err := readMessage(ws) + if err != nil { + log.Println(err) + break // Connection is dead. + } + + switch m { + case "run": + go run.run(ws) + _ = writeMessage(ws, "starting run") + + case "stop": + ok := run.stop() + if ok { + _ = writeMessage(ws, "stopped") + } else { + _ = writeMessage(ws, "not running") + } + + case "pull": + run.pull(ws) + + default: + _ = writeMessage(ws, fmt.Sprintf("unknown command: %s", m)) + } + } + + // Clean up + if run.cancel != nil { + run.cancel() + } + run.flush() +} diff --git a/server/helper.go b/server/helper.go new file mode 100644 index 0000000..d7600a3 --- /dev/null +++ b/server/helper.go @@ -0,0 +1,14 @@ +package server + +import ( + "net/url" + + "github.com/benchttp/engine/runner" +) + +func config() runner.Config { + rqurl, _ := url.ParseRequestURI("https://example.com") + config := runner.DefaultConfig() + config.Request.URL = rqurl + return config +} diff --git a/server/io.go b/server/io.go new file mode 100644 index 0000000..3e7ee66 --- /dev/null +++ b/server/io.go @@ -0,0 +1,39 @@ +package server + +import ( + "fmt" + "log" + + "github.com/gorilla/websocket" +) + +func readMessage(ws *websocket.Conn) (string, error) { + messageType, m, err := ws.ReadMessage() + if err != nil { + return "", fmt.Errorf("cannot read message: %s", err) + } + + if messageType != websocket.TextMessage { + return "", fmt.Errorf("message type is not TextMessage") + } + + ms := string(m) + + log.Printf("<- %s", ms) + + return ms, nil +} + +func writeMessage(ws *websocket.Conn, m string) error { + mb := []byte(m) + + err := ws.WriteMessage(websocket.TextMessage, mb) + if err != nil { + log.Println("cannot write message:", err) + return err + } + + log.Printf("-> %s", m) + + return nil +} diff --git a/server/run.go b/server/run.go index 1b11007..6a3b152 100644 --- a/server/run.go +++ b/server/run.go @@ -1,44 +1,72 @@ package server import ( - "io" - "net/http" + "context" + "fmt" + "sync" - "github.com/benchttp/engine/internal/configparse" + "github.com/benchttp/engine/runner" + "github.com/gorilla/websocket" ) -func (s *server) handleRun(w http.ResponseWriter, r *http.Request) { - // Allow single run at a time - if s.isRequesterRunning() { - http.Error(w, "already running", http.StatusConflict) - return - } - defer s.flush() +type run struct { + mu sync.RWMutex - // Read input config - readBody, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + runner *runner.Runner + output *runner.Report + err error + cancel context.CancelFunc +} + +func (r *run) run(ws *websocket.Conn) { + r.flush() + + ctx, cancel := context.WithCancel(context.Background()) + r.cancel = cancel + + r.runner = runner.New( + func(rp runner.RecordingProgress) { + // Protect from concurrent write to websocket connection. + r.mu.Lock() + defer r.mu.Unlock() + m := fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()) + _ = writeMessage(ws, m) + }, + ) - // Parse json config - cfg, err := configparse.JSON(readBody) + out, err := r.runner.Run(ctx, config()) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) + r.err = err + _ = writeMessage(ws, fmt.Sprintf("done with error: %s", err)) return } - // Start run - out, err := s.doRun(cfg) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + r.output = out + _ = writeMessage(ws, "done without error") +} + +func (r *run) stop() bool { + defer r.flush() + if r.runner == nil { + return false } + r.cancel() + return true +} - // Respond with run output - if _, err := out.WriteJSON(w); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) +func (r *run) pull(ws *websocket.Conn) { + if r.output == nil { + _ = writeMessage(ws, "not done yet") return } + + m := r.output.String() + _ = writeMessage(ws, m) +} + +func (r *run) flush() { + r.runner = nil + r.output = nil + r.err = nil + r.cancel = nil } diff --git a/server/server.go b/server/server.go deleted file mode 100644 index 0e7405d..0000000 --- a/server/server.go +++ /dev/null @@ -1,94 +0,0 @@ -package server - -import ( - "context" - "net/http" - "sync" - - "github.com/benchttp/engine/runner" -) - -func ListenAndServe(addr string) error { - return http.ListenAndServe(addr, &server{}) -} - -type server struct { - mu sync.RWMutex - runner *runner.Runner - stopRun context.CancelFunc -} - -func (s *server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/run": - s.handleRun(w, r) - case "/progress": - s.handleProgress(w, r) - case "/stop": - s.handleStop(w, r) - default: - http.NotFound(w, r) - } -} - -func (s *server) doRun(cfg runner.Config) (*runner.Report, error) { - ctx, cancel := context.WithCancel(context.Background()) - - s.setRunner(runner.New(nil)) - s.setStopRun(cancel) - - // Run benchmark - return s.runner.Run(ctx, silentConfig(cfg)) -} - -func (s *server) setRunner(r *runner.Runner) { - s.mu.Lock() - defer s.mu.Unlock() - s.runner = r -} - -func (s *server) setStopRun(cancelFunc context.CancelFunc) { - s.mu.Lock() - defer s.mu.Unlock() - s.stopRun = cancelFunc -} - -func (s *server) flush() { - s.mu.Lock() - defer s.mu.Unlock() - s.runner = nil - s.stopRun = nil -} - -func (s *server) isRequesterRunning() bool { - s.mu.RLock() - defer s.mu.RUnlock() - return s.runner != nil -} - -func (s *server) recordingProgress() (progress runner.RecordingProgress, ok bool) { - s.mu.RLock() - defer s.mu.RUnlock() - if s.runner == nil { - return runner.RecordingProgress{}, false - } - return s.runner.Progress(), true -} - -func (s *server) stopRequester() bool { - s.mu.Lock() - defer s.mu.Unlock() - if s.runner == nil { - return false - } - s.stopRun() - return true -} - -func silentConfig(cfg runner.Config) runner.Config { - cfg.Output = runner.OutputConfig{ - Silent: true, - Template: "", - } - return cfg -} diff --git a/server/state.go b/server/state.go deleted file mode 100644 index 8d153c6..0000000 --- a/server/state.go +++ /dev/null @@ -1,24 +0,0 @@ -package server - -import "net/http" - -func (s *server) handleProgress(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - - progress, ok := s.recordingProgress() - if !ok { - http.Error(w, "not running", http.StatusConflict) - return - } - - jsonProgress, err := progress.JSON() - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Write(jsonProgress) -} diff --git a/server/stop.go b/server/stop.go deleted file mode 100644 index 76158fc..0000000 --- a/server/stop.go +++ /dev/null @@ -1,17 +0,0 @@ -package server - -import "net/http" - -func (s *server) handleStop(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) - return - } - - defer s.flush() - - if ok := s.stopRequester(); !ok { - http.Error(w, "not running", http.StatusConflict) - return - } -} diff --git a/server/upgrade.go b/server/upgrade.go new file mode 100644 index 0000000..72ce2bc --- /dev/null +++ b/server/upgrade.go @@ -0,0 +1,15 @@ +package server + +import ( + "net/http" + + "github.com/gorilla/websocket" +) + +const token = "6db67fafc4f5bf965a5a" //nolint:gosec + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return r.URL.Query().Get("access_token") == token + }, +} From 286ed8905d761d733057fa97f0892b3138a2a6a4 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 09:37:12 +0200 Subject: [PATCH 05/22] docs: write implem documentation --- server/handle.go | 13 +++++-------- server/run.go | 40 +++++++++++++++++++++++++++++----------- server/upgrade.go | 1 + 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/server/handle.go b/server/handle.go index 22b172c..53859ec 100644 --- a/server/handle.go +++ b/server/handle.go @@ -6,6 +6,8 @@ import ( "net/http" ) +// Handler has as single method, Handler.ServeHTTP. +// It serves a websocket server. type Handler struct{} func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -29,6 +31,7 @@ func handle(w http.ResponseWriter, r *http.Request) { log.Println("websocket connected with client") run := run{} + defer run.flush() for { m, err := readMessage(ws) @@ -39,7 +42,7 @@ func handle(w http.ResponseWriter, r *http.Request) { switch m { case "run": - go run.run(ws) + go run.start(ws) _ = writeMessage(ws, "starting run") case "stop": @@ -51,16 +54,10 @@ func handle(w http.ResponseWriter, r *http.Request) { } case "pull": - run.pull(ws) + run.sendOutput(ws) default: _ = writeMessage(ws, fmt.Sprintf("unknown command: %s", m)) } } - - // Clean up - if run.cancel != nil { - run.cancel() - } - run.flush() } diff --git a/server/run.go b/server/run.go index 6a3b152..bf66d93 100644 --- a/server/run.go +++ b/server/run.go @@ -9,6 +9,8 @@ import ( "github.com/gorilla/websocket" ) +// run is a stateful representation of the current run +// performed by the server. type run struct { mu sync.RWMutex @@ -18,21 +20,16 @@ type run struct { cancel context.CancelFunc } -func (r *run) run(ws *websocket.Conn) { +// start starts the run. Any previous state is immediately flushed. +// Once the run is done, the state is updated. start sends message +// through the websocket connection, notifying the client. +func (r *run) start(ws *websocket.Conn) { r.flush() ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel - r.runner = runner.New( - func(rp runner.RecordingProgress) { - // Protect from concurrent write to websocket connection. - r.mu.Lock() - defer r.mu.Unlock() - m := fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()) - _ = writeMessage(ws, m) - }, - ) + r.runner = runner.New(r.sendProgess(ws)) out, err := r.runner.Run(ctx, config()) if err != nil { @@ -45,6 +42,7 @@ func (r *run) run(ws *websocket.Conn) { _ = writeMessage(ws, "done without error") } +// stop stops the run if it is running. The state is always flushed. func (r *run) stop() bool { defer r.flush() if r.runner == nil { @@ -54,7 +52,23 @@ func (r *run) stop() bool { return true } -func (r *run) pull(ws *websocket.Conn) { +// sendProgress sends the current runner.RecordingProgress through +// the websocket connection. As multiple goroutines may invoke sendProgess +// simultaneously as a callback via runner.onRecordingProgress, writing to +// the websocket connection is protected by a lock. +func (r *run) sendProgess(ws *websocket.Conn) func(runner.RecordingProgress) { + return func(rp runner.RecordingProgress) { + r.mu.Lock() + defer r.mu.Unlock() + + m := fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()) + _ = writeMessage(ws, m) + } +} + +// sendOutput sends the output of the run through the websocket connection +// or a error message if there is no output available. +func (r *run) sendOutput(ws *websocket.Conn) { if r.output == nil { _ = writeMessage(ws, "not done yet") return @@ -64,7 +78,11 @@ func (r *run) pull(ws *websocket.Conn) { _ = writeMessage(ws, m) } +// flush clears the state. func (r *run) flush() { + if r.cancel != nil { + r.cancel() + } r.runner = nil r.output = nil r.err = nil diff --git a/server/upgrade.go b/server/upgrade.go index 72ce2bc..097abb6 100644 --- a/server/upgrade.go +++ b/server/upgrade.go @@ -8,6 +8,7 @@ import ( const token = "6db67fafc4f5bf965a5a" //nolint:gosec +// upgrader will upgrade the HTTP server connection to the WebSocket protocol. var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return r.URL.Query().Get("access_token") == token From fd6f6c739445b8980e6cdfe2ff510e072215f51f Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 10:41:21 +0200 Subject: [PATCH 06/22] refactor: interface websocket io away from server --- cmd/server/main.go | 7 +++++- internal/socketio/reader.go | 47 +++++++++++++++++++++++++++++++++++++ internal/socketio/writer.go | 41 ++++++++++++++++++++++++++++++++ server/handle.go | 28 ++++++++++++++-------- server/io.go | 39 ------------------------------ server/run.go | 37 ++++++++++++++--------------- 6 files changed, 130 insertions(+), 69 deletions(-) create mode 100644 internal/socketio/reader.go create mode 100644 internal/socketio/writer.go delete mode 100644 server/io.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 3524076..eaba7ef 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -13,5 +13,10 @@ const port = "8080" func main() { addr := ":" + port fmt.Println("http://localhost" + addr) - log.Fatal(http.ListenAndServe(addr, &server.Handler{})) + + handler := &server.Handler{ + Silent: false, + } + + log.Fatal(http.ListenAndServe(addr, handler)) } diff --git a/internal/socketio/reader.go b/internal/socketio/reader.go new file mode 100644 index 0000000..c343d10 --- /dev/null +++ b/internal/socketio/reader.go @@ -0,0 +1,47 @@ +package socketio + +import ( + "fmt" + "log" + + "github.com/gorilla/websocket" +) + +type Reader interface { + ReadTextMessage() (string, error) + ReadJSON() error +} + +type reader struct { + ws *websocket.Conn + silent bool +} + +// NewReader returns a concrete type Reader that will read from +// the websocket connection. +func NewReader(ws *websocket.Conn, slient bool) Reader { + return &reader{ws, slient} +} + +func (r *reader) ReadTextMessage() (string, error) { + messageType, p, err := r.ws.ReadMessage() + if err != nil { + return "", fmt.Errorf("cannot read message: %s", err) + } + + if messageType != websocket.TextMessage { + return "", fmt.Errorf("message type is not TextMessage") + } + + m := string(p) + + if !r.silent { + log.Printf("<- %s", m) + } + + return m, nil +} + +func (r *reader) ReadJSON() error { + return fmt.Errorf("not implemented") +} diff --git a/internal/socketio/writer.go b/internal/socketio/writer.go new file mode 100644 index 0000000..0f7c428 --- /dev/null +++ b/internal/socketio/writer.go @@ -0,0 +1,41 @@ +package socketio + +import ( + "fmt" + "log" + + "github.com/gorilla/websocket" +) + +type Writer interface { + WriteTextMessage(m string) error + WriteJSON() error +} + +type writer struct { + ws *websocket.Conn + silent bool +} + +// NewWriter returns a concrete type Writer that will write to +// the websocket connection. +func NewWriter(ws *websocket.Conn, silent bool) Writer { + return &writer{ws, silent} +} + +func (w *writer) WriteTextMessage(m string) error { + err := w.ws.WriteMessage(websocket.TextMessage, []byte(m)) + if err != nil { + return fmt.Errorf("cannot write message: %s", err) + } + + if !w.silent { + log.Printf("-> %s", m) + } + + return nil +} + +func (w *writer) WriteJSON() error { + return fmt.Errorf("not implemented") +} diff --git a/server/handle.go b/server/handle.go index 53859ec..ed13d3d 100644 --- a/server/handle.go +++ b/server/handle.go @@ -4,22 +4,26 @@ import ( "fmt" "log" "net/http" + + "github.com/benchttp/engine/internal/socketio" ) // Handler has as single method, Handler.ServeHTTP. // It serves a websocket server. -type Handler struct{} +type Handler struct { + Silent bool +} func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/run": - handle(w, r) + h.handle(w, r) default: http.NotFound(w, r) } } -func handle(w http.ResponseWriter, r *http.Request) { +func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) @@ -28,36 +32,40 @@ func handle(w http.ResponseWriter, r *http.Request) { defer ws.Close() + reader := socketio.NewReader(ws, h.Silent) + writer := socketio.NewWriter(ws, h.Silent) + log.Println("websocket connected with client") run := run{} defer run.flush() for { - m, err := readMessage(ws) + p, err := reader.ReadTextMessage() if err != nil { log.Println(err) break // Connection is dead. } + m := string(p) switch m { case "run": - go run.start(ws) - _ = writeMessage(ws, "starting run") + go run.start(writer) + _ = writer.WriteTextMessage("starting run") case "stop": ok := run.stop() if ok { - _ = writeMessage(ws, "stopped") + _ = writer.WriteTextMessage("stopped") } else { - _ = writeMessage(ws, "not running") + _ = writer.WriteTextMessage("not running") } case "pull": - run.sendOutput(ws) + run.sendOutput(writer) default: - _ = writeMessage(ws, fmt.Sprintf("unknown command: %s", m)) + _ = writer.WriteTextMessage(fmt.Sprintf("unknown command: %s", m)) } } } diff --git a/server/io.go b/server/io.go deleted file mode 100644 index 3e7ee66..0000000 --- a/server/io.go +++ /dev/null @@ -1,39 +0,0 @@ -package server - -import ( - "fmt" - "log" - - "github.com/gorilla/websocket" -) - -func readMessage(ws *websocket.Conn) (string, error) { - messageType, m, err := ws.ReadMessage() - if err != nil { - return "", fmt.Errorf("cannot read message: %s", err) - } - - if messageType != websocket.TextMessage { - return "", fmt.Errorf("message type is not TextMessage") - } - - ms := string(m) - - log.Printf("<- %s", ms) - - return ms, nil -} - -func writeMessage(ws *websocket.Conn, m string) error { - mb := []byte(m) - - err := ws.WriteMessage(websocket.TextMessage, mb) - if err != nil { - log.Println("cannot write message:", err) - return err - } - - log.Printf("-> %s", m) - - return nil -} diff --git a/server/run.go b/server/run.go index bf66d93..3ce9a3d 100644 --- a/server/run.go +++ b/server/run.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" + "github.com/benchttp/engine/internal/socketio" "github.com/benchttp/engine/runner" - "github.com/gorilla/websocket" ) // run is a stateful representation of the current run @@ -21,25 +21,25 @@ type run struct { } // start starts the run. Any previous state is immediately flushed. -// Once the run is done, the state is updated. start sends message -// through the websocket connection, notifying the client. -func (r *run) start(ws *websocket.Conn) { +// Once the run is done, the state is updated. start uses w to notify +// that the run has started and the done status upon completion. +func (r *run) start(w socketio.Writer) { r.flush() ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel - r.runner = runner.New(r.sendProgess(ws)) + r.runner = runner.New(r.sendProgess(w)) out, err := r.runner.Run(ctx, config()) if err != nil { r.err = err - _ = writeMessage(ws, fmt.Sprintf("done with error: %s", err)) + _ = w.WriteTextMessage(fmt.Sprintf("done with error: %s", err)) return } r.output = out - _ = writeMessage(ws, "done without error") + _ = w.WriteTextMessage("done without error") } // stop stops the run if it is running. The state is always flushed. @@ -52,30 +52,29 @@ func (r *run) stop() bool { return true } -// sendProgress sends the current runner.RecordingProgress through -// the websocket connection. As multiple goroutines may invoke sendProgess -// simultaneously as a callback via runner.onRecordingProgress, writing to -// the websocket connection is protected by a lock. -func (r *run) sendProgess(ws *websocket.Conn) func(runner.RecordingProgress) { +// sendProgress sends the current runner.RecordingProgress via w. +// As multiple goroutines may invoke sendProgess simultaneously +// as a callback from runner.onRecordingProgress, writing to w +// is protected by a lock. +func (r *run) sendProgess(w socketio.Writer) func(runner.RecordingProgress) { return func(rp runner.RecordingProgress) { r.mu.Lock() defer r.mu.Unlock() m := fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()) - _ = writeMessage(ws, m) + _ = w.WriteTextMessage(m) } } -// sendOutput sends the output of the run through the websocket connection -// or a error message if there is no output available. -func (r *run) sendOutput(ws *websocket.Conn) { +// sendOutput sends the output of the run via w or an error message +// if there is no output available. +func (r *run) sendOutput(w socketio.Writer) { if r.output == nil { - _ = writeMessage(ws, "not done yet") + _ = w.WriteTextMessage("not done yet") return } - m := r.output.String() - _ = writeMessage(ws, m) + _ = w.WriteTextMessage(r.output.String()) } // flush clears the state. From f12ec2f68c3117f1833605732f6e146ab13756e1 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 11:01:10 +0200 Subject: [PATCH 07/22] chore: rename progress method to match standard --- server/run.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/run.go b/server/run.go index 3ce9a3d..4ba30f7 100644 --- a/server/run.go +++ b/server/run.go @@ -29,7 +29,7 @@ func (r *run) start(w socketio.Writer) { ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel - r.runner = runner.New(r.sendProgess(w)) + r.runner = runner.New(r.sendRecordingProgess(w)) out, err := r.runner.Run(ctx, config()) if err != nil { @@ -52,11 +52,11 @@ func (r *run) stop() bool { return true } -// sendProgress sends the current runner.RecordingProgress via w. -// As multiple goroutines may invoke sendProgess simultaneously +// sendRecordingProgess sends the current runner.RecordingProgress via w. +// As multiple goroutines may invoke run.sendRecordingProgess concurrently // as a callback from runner.onRecordingProgress, writing to w // is protected by a lock. -func (r *run) sendProgess(w socketio.Writer) func(runner.RecordingProgress) { +func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgress) { return func(rp runner.RecordingProgress) { r.mu.Lock() defer r.mu.Unlock() From e86d39bea4400db7f18954a2bf7615a20c1932d4 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 11:54:32 +0200 Subject: [PATCH 08/22] fix: data race on call to run.flush --- server/run.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/server/run.go b/server/run.go index 4ba30f7..65952d8 100644 --- a/server/run.go +++ b/server/run.go @@ -43,13 +43,10 @@ func (r *run) start(w socketio.Writer) { } // stop stops the run if it is running. The state is always flushed. -func (r *run) stop() bool { - defer r.flush() - if r.runner == nil { - return false - } - r.cancel() - return true +func (r *run) stop() (ok bool) { + ok = r.runner != nil + r.flush() + return } // sendRecordingProgess sends the current runner.RecordingProgress via w. @@ -69,6 +66,9 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr // sendOutput sends the output of the run via w or an error message // if there is no output available. func (r *run) sendOutput(w socketio.Writer) { + r.mu.RLock() + defer r.mu.RUnlock() + if r.output == nil { _ = w.WriteTextMessage("not done yet") return @@ -77,8 +77,11 @@ func (r *run) sendOutput(w socketio.Writer) { _ = w.WriteTextMessage(r.output.String()) } -// flush clears the state. +// flush clears the state. Calling run.flush locks the run for writing. func (r *run) flush() { + r.mu.Lock() + defer r.mu.Unlock() + if r.cancel != nil { r.cancel() } From 9d360c815a7b87895f27b156e591b7cfe6fba567 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 16:38:54 +0200 Subject: [PATCH 09/22] feat: send json message to client --- internal/socketio/writer.go | 15 ++++++++++++--- server/handle.go | 2 +- server/message.go | 6 ++++++ server/run.go | 22 +++++++++++++++++----- 4 files changed, 36 insertions(+), 9 deletions(-) create mode 100644 server/message.go diff --git a/internal/socketio/writer.go b/internal/socketio/writer.go index 0f7c428..cd4d00d 100644 --- a/internal/socketio/writer.go +++ b/internal/socketio/writer.go @@ -9,7 +9,7 @@ import ( type Writer interface { WriteTextMessage(m string) error - WriteJSON() error + WriteJSON(v interface{}) error } type writer struct { @@ -36,6 +36,15 @@ func (w *writer) WriteTextMessage(m string) error { return nil } -func (w *writer) WriteJSON() error { - return fmt.Errorf("not implemented") +func (w *writer) WriteJSON(v interface{}) error { + err := w.ws.WriteJSON(v) + if err != nil { + return fmt.Errorf("cannot write message: %s", err) + } + + if !w.silent { + log.Printf("-> %v", v) + } + + return nil } diff --git a/server/handle.go b/server/handle.go index ed13d3d..189344a 100644 --- a/server/handle.go +++ b/server/handle.go @@ -51,7 +51,7 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { switch m { case "run": go run.start(writer) - _ = writer.WriteTextMessage("starting run") + _ = writer.WriteJSON(message{Event: "running"}) case "stop": ok := run.stop() diff --git a/server/message.go b/server/message.go new file mode 100644 index 0000000..fd4614c --- /dev/null +++ b/server/message.go @@ -0,0 +1,6 @@ +package server + +type message struct { + Event string `json:"event"` + Data interface{} `json:"data"` +} diff --git a/server/run.go b/server/run.go index 65952d8..f9b35e5 100644 --- a/server/run.go +++ b/server/run.go @@ -34,12 +34,13 @@ func (r *run) start(w socketio.Writer) { out, err := r.runner.Run(ctx, config()) if err != nil { r.err = err - _ = w.WriteTextMessage(fmt.Sprintf("done with error: %s", err)) + _ = w.WriteJSON(message{Event: "done"}) return } r.output = out - _ = w.WriteTextMessage("done without error") + + _ = w.WriteJSON(message{Event: "done"}) } // stop stops the run if it is running. The state is always flushed. @@ -58,8 +59,11 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr r.mu.Lock() defer r.mu.Unlock() - m := fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()) - _ = w.WriteTextMessage(m) + m := message{ + Event: "progress", + Data: fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()), + } + _ = w.WriteJSON(m) } } @@ -74,7 +78,15 @@ func (r *run) sendOutput(w socketio.Writer) { return } - _ = w.WriteTextMessage(r.output.String()) + m := message{Event: "output"} + + if r.err != nil { + m.Data = r.err + } else { + m.Data = r.output + } + + _ = w.WriteJSON(m) } // flush clears the state. Calling run.flush locks the run for writing. From 3151e2e9d1187917ab9c05d93920254d0faba86a Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Thu, 28 Jul 2022 17:23:05 +0200 Subject: [PATCH 10/22] feat: accept json message from client --- internal/configparse/json.go | 2 +- internal/configparse/parse.go | 16 ++++++------ internal/configparse/parser.go | 6 ++--- internal/configparse/parser_internal_test.go | 4 +-- internal/socketio/reader.go | 15 ++++++++--- server/handle.go | 26 +++++++++++++++----- server/helper.go | 14 ----------- server/message.go | 11 ++++++++- server/run.go | 14 +++++------ 9 files changed, 63 insertions(+), 45 deletions(-) delete mode 100644 server/helper.go diff --git a/internal/configparse/json.go b/internal/configparse/json.go index 37a5bbb..736c520 100644 --- a/internal/configparse/json.go +++ b/internal/configparse/json.go @@ -8,7 +8,7 @@ import ( func JSON(in []byte) (runner.Config, error) { parser := jsonParser{} - var uconf unmarshaledConfig + var uconf UnmarshaledConfig if err := parser.parse(in, &uconf); err != nil { return runner.Config{}, err } diff --git a/internal/configparse/parse.go b/internal/configparse/parse.go index d8c8c27..9bcb084 100644 --- a/internal/configparse/parse.go +++ b/internal/configparse/parse.go @@ -11,11 +11,11 @@ import ( "github.com/benchttp/engine/runner" ) -// unmarshaledConfig is a raw data model for runner config files. +// UnmarshaledConfig is a raw data model for runner config files. // It serves as a receiver for unmarshaling processes and for that reason // its types are kept simple (certain types are incompatible with certain // unmarshalers). -type unmarshaledConfig struct { +type UnmarshaledConfig struct { Extends *string `yaml:"extends" json:"extends"` Request struct { @@ -47,7 +47,7 @@ type unmarshaledConfig struct { // and returns it or the first non-nil error occurring in the process, // which can be any of the values declared in the package. func Parse(filename string) (cfg runner.Config, err error) { - uconfs, err := parseFileRecursive(filename, []unmarshaledConfig{}, set{}) + uconfs, err := parseFileRecursive(filename, []UnmarshaledConfig{}, set{}) if err != nil { return } @@ -73,9 +73,9 @@ func (s set) add(v string) error { // occurring in the process. func parseFileRecursive( filename string, - uconfs []unmarshaledConfig, + uconfs []UnmarshaledConfig, seen set, -) ([]unmarshaledConfig, error) { +) ([]UnmarshaledConfig, error) { // avoid infinite recursion caused by circular reference if err := seen.add(filename); err != nil { return uconfs, ErrCircularExtends @@ -100,7 +100,7 @@ func parseFileRecursive( // parseFile parses a single config file and returns the result as an // unmarshaledConfig and an appropriate error predeclared in the package. -func parseFile(filename string) (uconf unmarshaledConfig, err error) { +func parseFile(filename string) (uconf UnmarshaledConfig, err error) { b, err := os.ReadFile(filename) switch { case err == nil: @@ -127,7 +127,7 @@ func parseFile(filename string) (uconf unmarshaledConfig, err error) { // as runner.ConfigGlobal and merging them into a single one. // It returns the merged result or the first non-nil error occurring in the // process. -func parseAndMergeConfigs(uconfs []unmarshaledConfig) (cfg runner.Config, err error) { +func parseAndMergeConfigs(uconfs []UnmarshaledConfig) (cfg runner.Config, err error) { if len(uconfs) == 0 { // supposedly catched upstream, should not occur return cfg, errors.New( "an unacceptable error occurred parsing the config file, " + @@ -164,7 +164,7 @@ func (pconf *parsedConfig) add(field string) { // newParsedConfig parses an input raw config as a runner.ConfigGlobal and returns // a parsedConfig or the first non-nil error occurring in the process. -func newParsedConfig(uconf unmarshaledConfig) (parsedConfig, error) { //nolint:gocognit // acceptable complexity for a parsing func +func newParsedConfig(uconf UnmarshaledConfig) (parsedConfig, error) { //nolint:gocognit // acceptable complexity for a parsing func const numField = 12 // should match the number of config Fields (not critical) pconf := parsedConfig{ diff --git a/internal/configparse/parser.go b/internal/configparse/parser.go index eaa036f..c647f88 100644 --- a/internal/configparse/parser.go +++ b/internal/configparse/parser.go @@ -22,7 +22,7 @@ const ( type configParser interface { // parse parses a raw bytes input as a raw config and stores // the resulting value into dst. - parse(in []byte, dst *unmarshaledConfig) error + parse(in []byte, dst *UnmarshaledConfig) error } // newParser returns an appropriate parser according to ext, or a non-nil @@ -43,7 +43,7 @@ type yamlParser struct{} // parse decodes a raw yaml input in strict mode (unknown fields disallowed) // and stores the resulting value into dst. -func (p yamlParser) parse(in []byte, dst *unmarshaledConfig) error { +func (p yamlParser) parse(in []byte, dst *UnmarshaledConfig) error { decoder := yaml.NewDecoder(bytes.NewReader(in)) decoder.KnownFields(true) return p.handleError(decoder.Decode(dst)) @@ -130,7 +130,7 @@ type jsonParser struct{} // parse decodes a raw JSON input in strict mode (unknown fields disallowed) // and stores the resulting value into dst. -func (p jsonParser) parse(in []byte, dst *unmarshaledConfig) error { +func (p jsonParser) parse(in []byte, dst *UnmarshaledConfig) error { decoder := json.NewDecoder(bytes.NewReader(in)) decoder.DisallowUnknownFields() return p.handleError(decoder.Decode(dst)) diff --git a/internal/configparse/parser_internal_test.go b/internal/configparse/parser_internal_test.go index 9a0f407..29aeccd 100644 --- a/internal/configparse/parser_internal_test.go +++ b/internal/configparse/parser_internal_test.go @@ -64,7 +64,7 @@ func TestYAMLParser(t *testing.T) { t.Run(tc.label, func(t *testing.T) { var ( parser yamlParser - rawcfg unmarshaledConfig + rawcfg UnmarshaledConfig yamlErr *yaml.TypeError ) @@ -122,7 +122,7 @@ func TestJSONParser(t *testing.T) { t.Run(tc.label, func(t *testing.T) { var ( parser jsonParser - rawcfg unmarshaledConfig + rawcfg UnmarshaledConfig ) gotErr := parser.parse(tc.in, &rawcfg) diff --git a/internal/socketio/reader.go b/internal/socketio/reader.go index c343d10..05b80c3 100644 --- a/internal/socketio/reader.go +++ b/internal/socketio/reader.go @@ -9,7 +9,7 @@ import ( type Reader interface { ReadTextMessage() (string, error) - ReadJSON() error + ReadJSON(v interface{}) error } type reader struct { @@ -42,6 +42,15 @@ func (r *reader) ReadTextMessage() (string, error) { return m, nil } -func (r *reader) ReadJSON() error { - return fmt.Errorf("not implemented") +func (r *reader) ReadJSON(v interface{}) error { + err := r.ws.ReadJSON(v) + if err != nil { + return fmt.Errorf("cannot read message: %s", err) + } + + if !r.silent { + log.Printf("<- %v", v) + } + + return nil } diff --git a/server/handle.go b/server/handle.go index 189344a..61979a6 100644 --- a/server/handle.go +++ b/server/handle.go @@ -1,10 +1,12 @@ package server import ( + "encoding/json" "fmt" "log" "net/http" + "github.com/benchttp/engine/internal/configparse" "github.com/benchttp/engine/internal/socketio" ) @@ -41,17 +43,29 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { defer run.flush() for { - p, err := reader.ReadTextMessage() + inc := incomingMessage{} + err := reader.ReadJSON(&inc) if err != nil { log.Println(err) break // Connection is dead. } - m := string(p) - switch m { + // TODO Update package configparse for this purpose. + p, err := json.Marshal(inc.Data) + if err != nil { + log.Println(err) + break // Connection is dead. + } + cfg, err := configparse.JSON(p) + if err != nil { + log.Println(err) + break // Connection is dead. + } + + switch inc.Event { case "run": - go run.start(writer) - _ = writer.WriteJSON(message{Event: "running"}) + go run.start(writer, cfg) + _ = writer.WriteJSON(outgoingMessage{Event: "running"}) case "stop": ok := run.stop() @@ -65,7 +79,7 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { run.sendOutput(writer) default: - _ = writer.WriteTextMessage(fmt.Sprintf("unknown command: %s", m)) + _ = writer.WriteTextMessage(fmt.Sprintf("unknown incoming event: %s", inc.Event)) } } } diff --git a/server/helper.go b/server/helper.go deleted file mode 100644 index d7600a3..0000000 --- a/server/helper.go +++ /dev/null @@ -1,14 +0,0 @@ -package server - -import ( - "net/url" - - "github.com/benchttp/engine/runner" -) - -func config() runner.Config { - rqurl, _ := url.ParseRequestURI("https://example.com") - config := runner.DefaultConfig() - config.Request.URL = rqurl - return config -} diff --git a/server/message.go b/server/message.go index fd4614c..985d51d 100644 --- a/server/message.go +++ b/server/message.go @@ -1,6 +1,15 @@ package server -type message struct { +import ( + "github.com/benchttp/engine/internal/configparse" +) + +type incomingMessage struct { + Event string `json:"event"` + Data configparse.UnmarshaledConfig `json:"data"` +} + +type outgoingMessage struct { Event string `json:"event"` Data interface{} `json:"data"` } diff --git a/server/run.go b/server/run.go index f9b35e5..2fffc99 100644 --- a/server/run.go +++ b/server/run.go @@ -23,7 +23,7 @@ type run struct { // start starts the run. Any previous state is immediately flushed. // Once the run is done, the state is updated. start uses w to notify // that the run has started and the done status upon completion. -func (r *run) start(w socketio.Writer) { +func (r *run) start(w socketio.Writer, cfg runner.Config) { r.flush() ctx, cancel := context.WithCancel(context.Background()) @@ -31,16 +31,16 @@ func (r *run) start(w socketio.Writer) { r.runner = runner.New(r.sendRecordingProgess(w)) - out, err := r.runner.Run(ctx, config()) + out, err := r.runner.Run(ctx, cfg) if err != nil { r.err = err - _ = w.WriteJSON(message{Event: "done"}) + _ = w.WriteJSON(outgoingMessage{Event: "done"}) return } r.output = out - _ = w.WriteJSON(message{Event: "done"}) + _ = w.WriteJSON(outgoingMessage{Event: "done"}) } // stop stops the run if it is running. The state is always flushed. @@ -59,7 +59,7 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr r.mu.Lock() defer r.mu.Unlock() - m := message{ + m := outgoingMessage{ Event: "progress", Data: fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()), } @@ -67,7 +67,7 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr } } -// sendOutput sends the output of the run via w or an error message +// sendOutput sends the output of the run via w or an error outgoingMessage // if there is no output available. func (r *run) sendOutput(w socketio.Writer) { r.mu.RLock() @@ -78,7 +78,7 @@ func (r *run) sendOutput(w socketio.Writer) { return } - m := message{Event: "output"} + m := outgoingMessage{Event: "output"} if r.err != nil { m.Data = r.err From 109486b4c27ac9487f98a351628a6ea3639e4cc6 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Fri, 29 Jul 2022 16:03:49 +0200 Subject: [PATCH 11/22] refactor: remove output state and auto send output --- server/handle.go | 3 --- server/run.go | 33 ++------------------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/server/handle.go b/server/handle.go index 61979a6..a177f60 100644 --- a/server/handle.go +++ b/server/handle.go @@ -75,9 +75,6 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { _ = writer.WriteTextMessage("not running") } - case "pull": - run.sendOutput(writer) - default: _ = writer.WriteTextMessage(fmt.Sprintf("unknown incoming event: %s", inc.Event)) } diff --git a/server/run.go b/server/run.go index 2fffc99..302ac02 100644 --- a/server/run.go +++ b/server/run.go @@ -15,8 +15,6 @@ type run struct { mu sync.RWMutex runner *runner.Runner - output *runner.Report - err error cancel context.CancelFunc } @@ -33,14 +31,11 @@ func (r *run) start(w socketio.Writer, cfg runner.Config) { out, err := r.runner.Run(ctx, cfg) if err != nil { - r.err = err - _ = w.WriteJSON(outgoingMessage{Event: "done"}) + _ = w.WriteJSON(outgoingMessage{Event: "done", Data: err}) return } - r.output = out - - _ = w.WriteJSON(outgoingMessage{Event: "done"}) + _ = w.WriteJSON(outgoingMessage{Event: "done", Data: out}) } // stop stops the run if it is running. The state is always flushed. @@ -67,28 +62,6 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr } } -// sendOutput sends the output of the run via w or an error outgoingMessage -// if there is no output available. -func (r *run) sendOutput(w socketio.Writer) { - r.mu.RLock() - defer r.mu.RUnlock() - - if r.output == nil { - _ = w.WriteTextMessage("not done yet") - return - } - - m := outgoingMessage{Event: "output"} - - if r.err != nil { - m.Data = r.err - } else { - m.Data = r.output - } - - _ = w.WriteJSON(m) -} - // flush clears the state. Calling run.flush locks the run for writing. func (r *run) flush() { r.mu.Lock() @@ -98,7 +71,5 @@ func (r *run) flush() { r.cancel() } r.runner = nil - r.output = nil - r.err = nil r.cancel = nil } From 888abbc34b6c0d27393ec82373894f738f687984 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Fri, 29 Jul 2022 18:20:46 +0200 Subject: [PATCH 12/22] refactor: typed message structures --- server/handle.go | 46 ++++++++++++++++++++++------------------------ server/message.go | 25 +++++++++++++++++++------ server/run.go | 6 +++--- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/server/handle.go b/server/handle.go index a177f60..19436d7 100644 --- a/server/handle.go +++ b/server/handle.go @@ -2,6 +2,7 @@ package server import ( "encoding/json" + "errors" "fmt" "log" "net/http" @@ -29,54 +30,51 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) - return // Connection is dead. + return } defer ws.Close() + log.Println("websocket connected with client") + reader := socketio.NewReader(ws, h.Silent) writer := socketio.NewWriter(ws, h.Silent) - log.Println("websocket connected with client") - run := run{} defer run.flush() for { - inc := incomingMessage{} + inc := messageProcedure{} err := reader.ReadJSON(&inc) if err != nil { log.Println(err) - break // Connection is dead. - } - - // TODO Update package configparse for this purpose. - p, err := json.Marshal(inc.Data) - if err != nil { - log.Println(err) - break // Connection is dead. - } - cfg, err := configparse.JSON(p) - if err != nil { - log.Println(err) - break // Connection is dead. + break } - switch inc.Event { + switch inc.Procedure { case "run": + // TODO Update package configparse for this purpose. + p, err := json.Marshal(inc.Data) + if err != nil { + log.Println(err) + break + } + cfg, err := configparse.JSON(p) + if err != nil { + log.Println(err) + break + } + go run.start(writer, cfg) - _ = writer.WriteJSON(outgoingMessage{Event: "running"}) case "stop": ok := run.stop() - if ok { - _ = writer.WriteTextMessage("stopped") - } else { - _ = writer.WriteTextMessage("not running") + if !ok { + _ = writer.WriteJSON(messageError{Event: "error", Error: errors.New("not running")}) } default: - _ = writer.WriteTextMessage(fmt.Sprintf("unknown incoming event: %s", inc.Event)) + _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", inc.Procedure)) } } } diff --git a/server/message.go b/server/message.go index 985d51d..9c8e10c 100644 --- a/server/message.go +++ b/server/message.go @@ -2,14 +2,27 @@ package server import ( "github.com/benchttp/engine/internal/configparse" + "github.com/benchttp/engine/runner" ) -type incomingMessage struct { - Event string `json:"event"` - Data configparse.UnmarshaledConfig `json:"data"` +type messageProcedure struct { + Procedure string `json:"procedure"` + // Data is non-empty if MessageProcedure.Procedure is "start". + Data configparse.UnmarshaledConfig `json:"data"` } -type outgoingMessage struct { - Event string `json:"event"` - Data interface{} `json:"data"` +type messageProgress struct { + Event string `json:"event"` + // Data runner.RecordingProgress `json:"data"` + Data string `json:"data"` +} + +type messageDone struct { + Event string `json:"event"` + Data runner.Report `json:"data"` +} + +type messageError struct { + Event string `json:"event"` + Error error `json:"error"` } diff --git a/server/run.go b/server/run.go index 302ac02..4dbe19e 100644 --- a/server/run.go +++ b/server/run.go @@ -31,11 +31,11 @@ func (r *run) start(w socketio.Writer, cfg runner.Config) { out, err := r.runner.Run(ctx, cfg) if err != nil { - _ = w.WriteJSON(outgoingMessage{Event: "done", Data: err}) + _ = w.WriteJSON(messageError{Event: "done", Error: err}) return } - _ = w.WriteJSON(outgoingMessage{Event: "done", Data: out}) + _ = w.WriteJSON(messageDone{Event: "done", Data: *out}) } // stop stops the run if it is running. The state is always flushed. @@ -54,7 +54,7 @@ func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgr r.mu.Lock() defer r.mu.Unlock() - m := outgoingMessage{ + m := messageProgress{ Event: "progress", Data: fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()), } From 3bfc899c5c37b520e29cd73817e8153bf21a109a Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Sat, 30 Jul 2022 17:57:46 +0200 Subject: [PATCH 13/22] misc: rename structs --- server/handle.go | 18 ++++++------ server/message.go | 20 ++++++------- server/run.go | 75 ----------------------------------------------- server/service.go | 73 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 94 deletions(-) delete mode 100644 server/run.go create mode 100644 server/service.go diff --git a/server/handle.go b/server/handle.go index 19436d7..9c58114 100644 --- a/server/handle.go +++ b/server/handle.go @@ -40,18 +40,18 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { reader := socketio.NewReader(ws, h.Silent) writer := socketio.NewWriter(ws, h.Silent) - run := run{} - defer run.flush() + srv := &service{} + defer srv.flush() for { - inc := messageProcedure{} + inc := clientMessage{} err := reader.ReadJSON(&inc) if err != nil { log.Println(err) break } - switch inc.Procedure { + switch inc.Action { case "run": // TODO Update package configparse for this purpose. p, err := json.Marshal(inc.Data) @@ -65,16 +65,16 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { break } - go run.start(writer, cfg) + go srv.doRun(writer, cfg) - case "stop": - ok := run.stop() + case "cancel": + ok := srv.cancelRun() if !ok { - _ = writer.WriteJSON(messageError{Event: "error", Error: errors.New("not running")}) + _ = writer.WriteJSON(errorMessage{Event: "error", Error: "not running"}) } default: - _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", inc.Procedure)) + _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", inc.Action)) } } } diff --git a/server/message.go b/server/message.go index 9c8e10c..41c03fe 100644 --- a/server/message.go +++ b/server/message.go @@ -5,24 +5,24 @@ import ( "github.com/benchttp/engine/runner" ) -type messageProcedure struct { - Procedure string `json:"procedure"` - // Data is non-empty if MessageProcedure.Procedure is "start". +type clientMessage struct { + Action string `json:"action"` + // Data is non-empty if MessageProcedure.Action is "start". Data configparse.UnmarshaledConfig `json:"data"` } -type messageProgress struct { - Event string `json:"event"` +type progressMessage struct { + Event string `json:"state"` // Data runner.RecordingProgress `json:"data"` Data string `json:"data"` } -type messageDone struct { - Event string `json:"event"` +type doneMessage struct { + Event string `json:"state"` Data runner.Report `json:"data"` } -type messageError struct { - Event string `json:"event"` - Error error `json:"error"` +type errorMessage struct { + Event string `json:"state"` + Error string `json:"error"` } diff --git a/server/run.go b/server/run.go deleted file mode 100644 index 4dbe19e..0000000 --- a/server/run.go +++ /dev/null @@ -1,75 +0,0 @@ -package server - -import ( - "context" - "fmt" - "sync" - - "github.com/benchttp/engine/internal/socketio" - "github.com/benchttp/engine/runner" -) - -// run is a stateful representation of the current run -// performed by the server. -type run struct { - mu sync.RWMutex - - runner *runner.Runner - cancel context.CancelFunc -} - -// start starts the run. Any previous state is immediately flushed. -// Once the run is done, the state is updated. start uses w to notify -// that the run has started and the done status upon completion. -func (r *run) start(w socketio.Writer, cfg runner.Config) { - r.flush() - - ctx, cancel := context.WithCancel(context.Background()) - r.cancel = cancel - - r.runner = runner.New(r.sendRecordingProgess(w)) - - out, err := r.runner.Run(ctx, cfg) - if err != nil { - _ = w.WriteJSON(messageError{Event: "done", Error: err}) - return - } - - _ = w.WriteJSON(messageDone{Event: "done", Data: *out}) -} - -// stop stops the run if it is running. The state is always flushed. -func (r *run) stop() (ok bool) { - ok = r.runner != nil - r.flush() - return -} - -// sendRecordingProgess sends the current runner.RecordingProgress via w. -// As multiple goroutines may invoke run.sendRecordingProgess concurrently -// as a callback from runner.onRecordingProgress, writing to w -// is protected by a lock. -func (r *run) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgress) { - return func(rp runner.RecordingProgress) { - r.mu.Lock() - defer r.mu.Unlock() - - m := messageProgress{ - Event: "progress", - Data: fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()), - } - _ = w.WriteJSON(m) - } -} - -// flush clears the state. Calling run.flush locks the run for writing. -func (r *run) flush() { - r.mu.Lock() - defer r.mu.Unlock() - - if r.cancel != nil { - r.cancel() - } - r.runner = nil - r.cancel = nil -} diff --git a/server/service.go b/server/service.go new file mode 100644 index 0000000..a592c2e --- /dev/null +++ b/server/service.go @@ -0,0 +1,73 @@ +package server + +import ( + "context" + "fmt" + "sync" + + "github.com/benchttp/engine/internal/socketio" + "github.com/benchttp/engine/runner" +) + +type service struct { + mu sync.RWMutex + + runner *runner.Runner + cancel context.CancelFunc +} + +// doRun calls to runner.Run. Any previous state is immediately flushed. +// Once the doRun is done, the state is updated. doRun uses w to notify +// that the doRun has started and the done status upon completion. +func (s *service) doRun(w socketio.Writer, cfg runner.Config) { + s.flush() + + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + + s.runner = runner.New(s.sendRecordingProgess(w)) + + out, err := s.runner.Run(ctx, cfg) + if err != nil { + _ = w.WriteJSON(errorMessage{Event: "done", Error: err.Error()}) + return + } + + _ = w.WriteJSON(doneMessage{Event: "done", Data: *out}) +} + +// cancelRun stops the run if it is running. The state is always flushed. +func (s *service) cancelRun() (ok bool) { + ok = s.runner != nil + s.flush() + return +} + +// sendRecordingProgess sends the current runner.RecordingProgress via w. +// As multiple goroutines may invoke run.sendRecordingProgess concurrently +// as a callback from runner.onRecordingProgress, writing to w +// is protected by a lock. +func (s *service) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgress) { + return func(rp runner.RecordingProgress) { + s.mu.Lock() + defer s.mu.Unlock() + + m := progressMessage{ + Event: "progress", + Data: fmt.Sprintf("%s: %d/%d %d", rp.Status(), rp.DoneCount, rp.MaxCount, rp.Percent()), + } + _ = w.WriteJSON(m) + } +} + +// flush clears the state. Calling run.flush locks the run for writing. +func (s *service) flush() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cancel != nil { + s.cancel() + } + s.runner = nil + s.cancel = nil +} From 5827cd414cb64756e268cbef30437ae8ecb407a7 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Sat, 30 Jul 2022 18:51:19 +0200 Subject: [PATCH 14/22] refactor: move state around struct - Handler manages the websocket connection and interprets the messages - service (private) interfaces package runner --- cmd/server/main.go | 9 ++-- server/handle.go | 80 --------------------------------- server/handler.go | 109 +++++++++++++++++++++++++++++++++++++++++++++ server/message.go | 5 +-- server/service.go | 39 ++++++++-------- server/upgrade.go | 16 ------- 6 files changed, 135 insertions(+), 123 deletions(-) delete mode 100644 server/handle.go create mode 100644 server/handler.go delete mode 100644 server/upgrade.go diff --git a/cmd/server/main.go b/cmd/server/main.go index eaba7ef..29625de 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -8,15 +8,16 @@ import ( "github.com/benchttp/engine/server" ) -const port = "8080" +const ( + port = "8080" + token = "6db67fafc4f5bf965a5a" //nolint:gosec +) func main() { addr := ":" + port fmt.Println("http://localhost" + addr) - handler := &server.Handler{ - Silent: false, - } + handler := server.NewHandler(false, token) log.Fatal(http.ListenAndServe(addr, handler)) } diff --git a/server/handle.go b/server/handle.go deleted file mode 100644 index 9c58114..0000000 --- a/server/handle.go +++ /dev/null @@ -1,80 +0,0 @@ -package server - -import ( - "encoding/json" - "errors" - "fmt" - "log" - "net/http" - - "github.com/benchttp/engine/internal/configparse" - "github.com/benchttp/engine/internal/socketio" -) - -// Handler has as single method, Handler.ServeHTTP. -// It serves a websocket server. -type Handler struct { - Silent bool -} - -func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/run": - h.handle(w, r) - default: - http.NotFound(w, r) - } -} - -func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - - defer ws.Close() - - log.Println("websocket connected with client") - - reader := socketio.NewReader(ws, h.Silent) - writer := socketio.NewWriter(ws, h.Silent) - - srv := &service{} - defer srv.flush() - - for { - inc := clientMessage{} - err := reader.ReadJSON(&inc) - if err != nil { - log.Println(err) - break - } - - switch inc.Action { - case "run": - // TODO Update package configparse for this purpose. - p, err := json.Marshal(inc.Data) - if err != nil { - log.Println(err) - break - } - cfg, err := configparse.JSON(p) - if err != nil { - log.Println(err) - break - } - - go srv.doRun(writer, cfg) - - case "cancel": - ok := srv.cancelRun() - if !ok { - _ = writer.WriteJSON(errorMessage{Event: "error", Error: "not running"}) - } - - default: - _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", inc.Action)) - } - } -} diff --git a/server/handler.go b/server/handler.go new file mode 100644 index 0000000..43bd446 --- /dev/null +++ b/server/handler.go @@ -0,0 +1,109 @@ +package server + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/benchttp/engine/internal/configparse" + "github.com/benchttp/engine/internal/socketio" + "github.com/benchttp/engine/runner" + "github.com/gorilla/websocket" +) + +// Handler has as single method, Handler.ServeHTTP. +// It serves a websocket server allowing remote +// manipulation of runner.Runner. +type Handler struct { + Silent bool + Token string + srv *service + upgrader websocket.Upgrader +} + +func NewHandler(silent bool, token string) *Handler { + return &Handler{ + Silent: silent, + Token: token, + srv: &service{}, + upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return r.URL.Query().Get("access_token") == token + }, + }, + } +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/run": + h.handle(w, r) + default: + http.NotFound(w, r) + } +} + +func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { + ws, err := h.upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + defer ws.Close() + // The client is gone, flush all the state. + // TODO Handle reconnect? + defer h.srv.flush() + + log.Println("connected with client via websocket") + + reader := socketio.NewReader(ws, h.Silent) + writer := socketio.NewWriter(ws, h.Silent) + + for { + m := clientMessage{} + err := reader.ReadJSON(&m) + if err != nil { + log.Println(err) + break + } + + switch m.Action { + case "run": + cfg, err := parseConfig(m.Data) + if err != nil { + log.Println(err) + break + } + + go h.srv.doRun(writer, cfg) + + case "cancel": + ok := h.srv.cancelRun() + if !ok { + _ = writer.WriteJSON(errorMessage{Event: "error", Error: "not running"}) + } + + default: + _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", m.Action)) + } + } +} + +// TODO Update package configparse for this purpose. + +func parseConfig(data configparse.UnmarshaledConfig) (runner.Config, error) { + p, err := json.Marshal(data) + if err != nil { + return runner.Config{}, err + } + + cfg, err := configparse.JSON(p) + if err != nil { + log.Println(err) + return runner.Config{}, err + } + + return cfg, nil +} diff --git a/server/message.go b/server/message.go index 41c03fe..15a4ac9 100644 --- a/server/message.go +++ b/server/message.go @@ -7,14 +7,13 @@ import ( type clientMessage struct { Action string `json:"action"` - // Data is non-empty if MessageProcedure.Action is "start". + // Data is non-empty if field Action is "run". Data configparse.UnmarshaledConfig `json:"data"` } type progressMessage struct { Event string `json:"state"` - // Data runner.RecordingProgress `json:"data"` - Data string `json:"data"` + Data string `json:"data"` // TODO runner.RecordingProgress } type doneMessage struct { diff --git a/server/service.go b/server/service.go index a592c2e..702380e 100644 --- a/server/service.go +++ b/server/service.go @@ -10,18 +10,15 @@ import ( ) type service struct { - mu sync.RWMutex - + mu sync.RWMutex runner *runner.Runner cancel context.CancelFunc } -// doRun calls to runner.Run. Any previous state is immediately flushed. -// Once the doRun is done, the state is updated. doRun uses w to notify -// that the doRun has started and the done status upon completion. +// doRun calls runner.Runner.Run. The service state is overwritten. +// The return value of runner.Runner.Run is send to the client via +// w. The run progress is streamed through w. func (s *service) doRun(w socketio.Writer, cfg runner.Config) { - s.flush() - ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel @@ -36,18 +33,23 @@ func (s *service) doRun(w socketio.Writer, cfg runner.Config) { _ = w.WriteJSON(doneMessage{Event: "done", Data: *out}) } -// cancelRun stops the run if it is running. The state is always flushed. +// cancelRun cancels the run of the current runner. +// If the runner is nil, cancelRun is noop. func (s *service) cancelRun() (ok bool) { - ok = s.runner != nil - s.flush() - return + s.mu.Lock() + defer s.mu.Unlock() + if s.runner == nil { + return false + } + s.cancel() + return true } -// sendRecordingProgess sends the current runner.RecordingProgress via w. -// As multiple goroutines may invoke run.sendRecordingProgess concurrently -// as a callback from runner.onRecordingProgress, writing to w -// is protected by a lock. +// sendRecordingProgess returns a callback +// to send the current runner progress via w. func (s *service) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgress) { + // The callback is invoked from a goroutine spawned by Recorder.Record. + // Protect w from concurrent write with a lock. return func(rp runner.RecordingProgress) { s.mu.Lock() defer s.mu.Unlock() @@ -60,14 +62,11 @@ func (s *service) sendRecordingProgess(w socketio.Writer) func(runner.RecordingP } } -// flush clears the state. Calling run.flush locks the run for writing. +// flush clears the service state. +// Calling service.flush locks it for writing. func (s *service) flush() { s.mu.Lock() defer s.mu.Unlock() - - if s.cancel != nil { - s.cancel() - } s.runner = nil s.cancel = nil } diff --git a/server/upgrade.go b/server/upgrade.go deleted file mode 100644 index 097abb6..0000000 --- a/server/upgrade.go +++ /dev/null @@ -1,16 +0,0 @@ -package server - -import ( - "net/http" - - "github.com/gorilla/websocket" -) - -const token = "6db67fafc4f5bf965a5a" //nolint:gosec - -// upgrader will upgrade the HTTP server connection to the WebSocket protocol. -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return r.URL.Query().Get("access_token") == token - }, -} From ce6af4f21d6c23f5b015fb89cdcffb42cf6d355f Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:14:08 +0200 Subject: [PATCH 15/22] chore: rename member srv to service --- server/handler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/handler.go b/server/handler.go index 43bd446..705638d 100644 --- a/server/handler.go +++ b/server/handler.go @@ -18,15 +18,15 @@ import ( type Handler struct { Silent bool Token string - srv *service + service *service upgrader websocket.Upgrader } func NewHandler(silent bool, token string) *Handler { return &Handler{ - Silent: silent, - Token: token, - srv: &service{}, + Silent: silent, + Token: token, + service: &service{}, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return r.URL.Query().Get("access_token") == token @@ -54,7 +54,7 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { defer ws.Close() // The client is gone, flush all the state. // TODO Handle reconnect? - defer h.srv.flush() + defer h.service.flush() log.Println("connected with client via websocket") @@ -77,10 +77,10 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { break } - go h.srv.doRun(writer, cfg) + go h.service.doRun(writer, cfg) case "cancel": - ok := h.srv.cancelRun() + ok := h.service.cancelRun() if !ok { _ = writer.WriteJSON(errorMessage{Event: "error", Error: "not running"}) } From d5444b14afe7888422df6e354aa091c87ef6f42a Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:15:19 +0200 Subject: [PATCH 16/22] fix: close the connection before flushing --- server/handler.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/handler.go b/server/handler.go index 705638d..73d9cab 100644 --- a/server/handler.go +++ b/server/handler.go @@ -51,10 +51,12 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { return } - defer ws.Close() - // The client is gone, flush all the state. - // TODO Handle reconnect? - defer h.service.flush() + defer func() { + ws.Close() + // The client is gone, flush all the state. + // TODO Handle reconnect? + h.service.flush() + }() log.Println("connected with client via websocket") From 3b1a0e21f44c25d4600d2306cf83582bfef32eda Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:18:22 +0200 Subject: [PATCH 17/22] docs: Handler struct --- server/handler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/handler.go b/server/handler.go index 73d9cab..7037d81 100644 --- a/server/handler.go +++ b/server/handler.go @@ -12,9 +12,9 @@ import ( "github.com/gorilla/websocket" ) -// Handler has as single method, Handler.ServeHTTP. -// It serves a websocket server allowing remote -// manipulation of runner.Runner. +// Handler implements http.Handler. +// It serves a websocket server allowing +// remote manipulation of runner.Runner. type Handler struct { Silent bool Token string From 84dbced3d59b7bb96986ea853c77404fc64f526c Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:33:31 +0200 Subject: [PATCH 18/22] refactor: merge Reader and Writer interface and rename package --- internal/socketio/reader.go | 56 ---------------------- internal/socketio/writer.go | 50 -------------------- internal/websocketio/io.go | 92 +++++++++++++++++++++++++++++++++++++ server/handler.go | 13 +++--- server/service.go | 6 +-- 5 files changed, 101 insertions(+), 116 deletions(-) delete mode 100644 internal/socketio/reader.go delete mode 100644 internal/socketio/writer.go create mode 100644 internal/websocketio/io.go diff --git a/internal/socketio/reader.go b/internal/socketio/reader.go deleted file mode 100644 index 05b80c3..0000000 --- a/internal/socketio/reader.go +++ /dev/null @@ -1,56 +0,0 @@ -package socketio - -import ( - "fmt" - "log" - - "github.com/gorilla/websocket" -) - -type Reader interface { - ReadTextMessage() (string, error) - ReadJSON(v interface{}) error -} - -type reader struct { - ws *websocket.Conn - silent bool -} - -// NewReader returns a concrete type Reader that will read from -// the websocket connection. -func NewReader(ws *websocket.Conn, slient bool) Reader { - return &reader{ws, slient} -} - -func (r *reader) ReadTextMessage() (string, error) { - messageType, p, err := r.ws.ReadMessage() - if err != nil { - return "", fmt.Errorf("cannot read message: %s", err) - } - - if messageType != websocket.TextMessage { - return "", fmt.Errorf("message type is not TextMessage") - } - - m := string(p) - - if !r.silent { - log.Printf("<- %s", m) - } - - return m, nil -} - -func (r *reader) ReadJSON(v interface{}) error { - err := r.ws.ReadJSON(v) - if err != nil { - return fmt.Errorf("cannot read message: %s", err) - } - - if !r.silent { - log.Printf("<- %v", v) - } - - return nil -} diff --git a/internal/socketio/writer.go b/internal/socketio/writer.go deleted file mode 100644 index cd4d00d..0000000 --- a/internal/socketio/writer.go +++ /dev/null @@ -1,50 +0,0 @@ -package socketio - -import ( - "fmt" - "log" - - "github.com/gorilla/websocket" -) - -type Writer interface { - WriteTextMessage(m string) error - WriteJSON(v interface{}) error -} - -type writer struct { - ws *websocket.Conn - silent bool -} - -// NewWriter returns a concrete type Writer that will write to -// the websocket connection. -func NewWriter(ws *websocket.Conn, silent bool) Writer { - return &writer{ws, silent} -} - -func (w *writer) WriteTextMessage(m string) error { - err := w.ws.WriteMessage(websocket.TextMessage, []byte(m)) - if err != nil { - return fmt.Errorf("cannot write message: %s", err) - } - - if !w.silent { - log.Printf("-> %s", m) - } - - return nil -} - -func (w *writer) WriteJSON(v interface{}) error { - err := w.ws.WriteJSON(v) - if err != nil { - return fmt.Errorf("cannot write message: %s", err) - } - - if !w.silent { - log.Printf("-> %v", v) - } - - return nil -} diff --git a/internal/websocketio/io.go b/internal/websocketio/io.go new file mode 100644 index 0000000..2c02a62 --- /dev/null +++ b/internal/websocketio/io.go @@ -0,0 +1,92 @@ +package websocketio + +import ( + "fmt" + "log" + + "github.com/gorilla/websocket" +) + +type Reader interface { + ReadTextMessage() (string, error) + ReadJSON(v interface{}) error +} + +type Writer interface { + WriteTextMessage(m string) error + WriteJSON(v interface{}) error +} + +type ReadWriter interface { + Reader + Writer +} + +type readWriter struct { + ws *websocket.Conn + silent bool +} + +// NewReadWriter returns a concrete type ReadWriter +// reading from and writing to the websocket connection. +func NewReadWriter(ws *websocket.Conn, silent bool) ReadWriter { + return &readWriter{ws, silent} +} + +func (rw *readWriter) ReadTextMessage() (string, error) { + messageType, p, err := rw.ws.ReadMessage() + if err != nil { + return "", fmt.Errorf("cannot read message: %s", err) + } + + if messageType != websocket.TextMessage { + return "", fmt.Errorf("message type is not TextMessage") + } + + m := string(p) + + if !rw.silent { + log.Printf("<- %s", m) + } + + return m, nil +} + +func (rw *readWriter) ReadJSON(v interface{}) error { + err := rw.ws.ReadJSON(v) + if err != nil { + return fmt.Errorf("cannot read message: %s", err) + } + + if !rw.silent { + log.Printf("<- %v", v) + } + + return nil +} + +func (rw *readWriter) WriteTextMessage(m string) error { + err := rw.ws.WriteMessage(websocket.TextMessage, []byte(m)) + if err != nil { + return fmt.Errorf("cannot write message: %s", err) + } + + if !rw.silent { + log.Printf("-> %s", m) + } + + return nil +} + +func (rw *readWriter) WriteJSON(v interface{}) error { + err := rw.ws.WriteJSON(v) + if err != nil { + return fmt.Errorf("cannot write message: %s", err) + } + + if !rw.silent { + log.Printf("-> %v", v) + } + + return nil +} diff --git a/server/handler.go b/server/handler.go index 7037d81..3eeecfd 100644 --- a/server/handler.go +++ b/server/handler.go @@ -7,7 +7,7 @@ import ( "net/http" "github.com/benchttp/engine/internal/configparse" - "github.com/benchttp/engine/internal/socketio" + "github.com/benchttp/engine/internal/websocketio" "github.com/benchttp/engine/runner" "github.com/gorilla/websocket" ) @@ -60,12 +60,11 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { log.Println("connected with client via websocket") - reader := socketio.NewReader(ws, h.Silent) - writer := socketio.NewWriter(ws, h.Silent) + rw := websocketio.NewReadWriter(ws, h.Silent) for { m := clientMessage{} - err := reader.ReadJSON(&m) + err := rw.ReadJSON(&m) if err != nil { log.Println(err) break @@ -79,16 +78,16 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { break } - go h.service.doRun(writer, cfg) + go h.service.doRun(rw, cfg) case "cancel": ok := h.service.cancelRun() if !ok { - _ = writer.WriteJSON(errorMessage{Event: "error", Error: "not running"}) + _ = rw.WriteJSON(errorMessage{Event: "error", Error: "not running"}) } default: - _ = writer.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", m.Action)) + _ = rw.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", m.Action)) } } } diff --git a/server/service.go b/server/service.go index 702380e..124ead0 100644 --- a/server/service.go +++ b/server/service.go @@ -5,7 +5,7 @@ import ( "fmt" "sync" - "github.com/benchttp/engine/internal/socketio" + "github.com/benchttp/engine/internal/websocketio" "github.com/benchttp/engine/runner" ) @@ -18,7 +18,7 @@ type service struct { // doRun calls runner.Runner.Run. The service state is overwritten. // The return value of runner.Runner.Run is send to the client via // w. The run progress is streamed through w. -func (s *service) doRun(w socketio.Writer, cfg runner.Config) { +func (s *service) doRun(w websocketio.Writer, cfg runner.Config) { ctx, cancel := context.WithCancel(context.Background()) s.cancel = cancel @@ -47,7 +47,7 @@ func (s *service) cancelRun() (ok bool) { // sendRecordingProgess returns a callback // to send the current runner progress via w. -func (s *service) sendRecordingProgess(w socketio.Writer) func(runner.RecordingProgress) { +func (s *service) sendRecordingProgess(w websocketio.Writer) func(runner.RecordingProgress) { // The callback is invoked from a goroutine spawned by Recorder.Record. // Protect w from concurrent write with a lock. return func(rp runner.RecordingProgress) { From 66fc8aa64bc48a5b782b407d6adfc368ed747d82 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:39:00 +0200 Subject: [PATCH 19/22] refactor: extract upgrader --- server/handler.go | 16 +++++----------- server/upgrader.go | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 11 deletions(-) create mode 100644 server/upgrader.go diff --git a/server/handler.go b/server/handler.go index 3eeecfd..a3ab599 100644 --- a/server/handler.go +++ b/server/handler.go @@ -9,17 +9,15 @@ import ( "github.com/benchttp/engine/internal/configparse" "github.com/benchttp/engine/internal/websocketio" "github.com/benchttp/engine/runner" - "github.com/gorilla/websocket" ) // Handler implements http.Handler. // It serves a websocket server allowing // remote manipulation of runner.Runner. type Handler struct { - Silent bool - Token string - service *service - upgrader websocket.Upgrader + Silent bool + Token string + service *service } func NewHandler(silent bool, token string) *Handler { @@ -27,11 +25,6 @@ func NewHandler(silent bool, token string) *Handler { Silent: silent, Token: token, service: &service{}, - upgrader: websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { - return r.URL.Query().Get("access_token") == token - }, - }, } } @@ -45,7 +38,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { - ws, err := h.upgrader.Upgrade(w, r, nil) + upgrader := secureUpgrader(h.Token) + ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Println(err) return diff --git a/server/upgrader.go b/server/upgrader.go new file mode 100644 index 0000000..033eb65 --- /dev/null +++ b/server/upgrader.go @@ -0,0 +1,15 @@ +package server + +import ( + "net/http" + + "github.com/gorilla/websocket" +) + +func secureUpgrader(token string) websocket.Upgrader { + return websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return r.URL.Query().Get("access_token") == token + }, + } +} From 60d6b947dd6468a1640b09b97ce4d52dc3ade51f Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 00:42:09 +0200 Subject: [PATCH 20/22] docs: service.cancelRun panics --- server/service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/service.go b/server/service.go index 124ead0..c56ebb7 100644 --- a/server/service.go +++ b/server/service.go @@ -35,6 +35,8 @@ func (s *service) doRun(w websocketio.Writer, cfg runner.Config) { // cancelRun cancels the run of the current runner. // If the runner is nil, cancelRun is noop. +// cancelRun panics if cancelRun is invoked while +// service.runner is non-nil yet service.cancel is nil. func (s *service) cancelRun() (ok bool) { s.mu.Lock() defer s.mu.Unlock() From 88ce51419c50a1a4408ce0fa77c523547830774e Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 08:45:07 +0200 Subject: [PATCH 21/22] chore: smoother error handling --- server/handler.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/handler.go b/server/handler.go index a3ab599..3195fb2 100644 --- a/server/handler.go +++ b/server/handler.go @@ -41,7 +41,7 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { upgrader := secureUpgrader(h.Token) ws, err := upgrader.Upgrade(w, r, nil) if err != nil { - log.Println(err) + http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -58,8 +58,7 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { for { m := clientMessage{} - err := rw.ReadJSON(&m) - if err != nil { + if err := rw.ReadJSON(&m); err != nil { log.Println(err) break } @@ -75,13 +74,12 @@ func (h *Handler) handle(w http.ResponseWriter, r *http.Request) { go h.service.doRun(rw, cfg) case "cancel": - ok := h.service.cancelRun() - if !ok { - _ = rw.WriteJSON(errorMessage{Event: "error", Error: "not running"}) + if ok := h.service.cancelRun(); !ok { + rw.WriteJSON(errorMessage{Event: "error", Error: "not running"}) //nolint:errcheck } default: - _ = rw.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", m.Action)) + rw.WriteTextMessage(fmt.Sprintf("unknown procedure: %s", m.Action)) //nolint:errcheck } } } From 850a1be0a7eb3f71a3c6fe38ec48e150d66e1916 Mon Sep 17 00:00:00 2001 From: moreirathomas Date: Tue, 2 Aug 2022 20:06:28 +0200 Subject: [PATCH 22/22] docs: document token --- cmd/server/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 29625de..1c30e1e 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -9,7 +9,8 @@ import ( ) const ( - port = "8080" + port = "8080" + // token is a dummy token used for development only. token = "6db67fafc4f5bf965a5a" //nolint:gosec )