diff --git a/diagnostic/build-62be7990.json b/diagnostic/build-62be7990.json new file mode 100644 index 00000000..b1a3bf18 --- /dev/null +++ b/diagnostic/build-62be7990.json @@ -0,0 +1,87 @@ +{ + "generated_at": "2026-06-17T10:56:21.264928+00:00", + "commit": "62be7990", + "diagnostic_logd": "diagnostic/build-62be7990.logd", + "diagnostic_logd_error": null, + "message_blocker": null, + "chunked": false, + "chunk_size_bytes": null, + "password": "7472742f4fd4512dffdf", + "decrypt_command": "encryptly unpack diagnostic/build-62be7990.logd --password 7472742f4fd4512dffdf", + "total_modules": 10, + "passed": 2, + "failed": 8, + "modules": [ + { + "name": "backend", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'cargo'" + }, + { + "name": "frontend", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'npm'" + }, + { + "name": "market", + "status": "PASS", + "elapsed_seconds": 14.686, + "artifact": "/src/market/market", + "output": "go: downloading go.uber.org/zap v1.27.0\ngo: downloading github.com/shopspring/decimal v1.4.0\ngo: downloading github.com/gorilla/websocket v1.5.3\ngo: downloading github.com/google/uuid v1.6.0\ngo: downloading github.com/prometheus/client_golang v1.23.2\ngo: downloading go.uber.org/multierr v1.10.0\ngo: downloading github.com/prometheus/common v0.66.1\ngo: downloading github.com/cespare/xxhash/v2 v2.3.0\ngo: downloading google.golang.org/protobuf v1.36.8\ngo: downloading github.com/prometheus/client_model v0.6.2\ngo: downloading github.com/beorn7/perks v1.0.1\ngo: downloading github.com/prometheus/procfs v0.16.1\ngo: downloading go.yaml.in/yaml/v2 v2.4.2\ngo: downloading github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822\ngo: downloading golang.org/x/sys v0.35.0" + }, + { + "name": "frailbox", + "status": "PASS", + "elapsed_seconds": 0.042, + "artifact": "/src/frailbox/frailbox", + "output": "make: Nothing to be done for 'all'." + }, + { + "name": "engine", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'cmake'" + }, + { + "name": "compliance", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'javac'" + }, + { + "name": "v2-market-stream", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'ruby'" + }, + { + "name": "nfc-scanner", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'luac'" + }, + { + "name": "openapi-haskell", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'ghc'" + }, + { + "name": "openapi-tools", + "status": "FAIL", + "elapsed_seconds": 0, + "artifact": null, + "output": "Command not found: [Errno 2] No such file or directory: 'luac'" + } + ], + "pr_note": "Include the encrypted diagnostic logd artifact(s): diagnostic/build-62be7990.logd. The encrypted .logd is the required diagnostic content for PR review; this JSON file is metadata. Maintainers may ask you to remove these diagnostic artifacts before merging." +} diff --git a/diagnostic/build-62be7990.logd b/diagnostic/build-62be7990.logd new file mode 100644 index 00000000..590f957b Binary files /dev/null and b/diagnostic/build-62be7990.logd differ diff --git a/market/analytics/collector.go b/market/analytics/collector.go index 65d29693..2723f5d9 100644 --- a/market/analytics/collector.go +++ b/market/analytics/collector.go @@ -15,12 +15,9 @@ package analytics import ( "context" "encoding/csv" - "encoding/json" "fmt" "math" "math/rand" - "os" - "path/filepath" "sort" "strconv" "strings" diff --git a/market/analytics/prometheus.go b/market/analytics/prometheus.go new file mode 100644 index 00000000..a040c5dc --- /dev/null +++ b/market/analytics/prometheus.go @@ -0,0 +1,148 @@ +package analytics + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +type PrometheusSnapshot struct { + ActiveConnections float64 + BidDepth float64 + AskDepth float64 +} + +type PrometheusExporter struct { + registry *prometheus.Registry + orders *prometheus.CounterVec + trades prometheus.Counter + activeConnections prometheus.Gauge + orderBookDepth *prometheus.GaugeVec + matchingLatency prometheus.Histogram + snapshot func() PrometheusSnapshot + server *http.Server + mu sync.Mutex +} + +func NewPrometheusExporter(snapshot func() PrometheusSnapshot) *PrometheusExporter { + exporter := &PrometheusExporter{ + registry: prometheus.NewRegistry(), + orders: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "market_orders_total", + Help: "Total market orders by order type and side.", + }, []string{"type", "side"}), + trades: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "market_trades_total", + Help: "Total market trades matched by the engine.", + }), + activeConnections: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "market_active_connections", + Help: "Current active market gateway WebSocket connections.", + }), + orderBookDepth: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "market_orderbook_depth", + Help: "Current aggregate order book depth by side.", + }, []string{"side"}), + matchingLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "market_matching_latency_seconds", + Help: "Latency for market matching engine order placement.", + Buckets: []float64{0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1}, + }), + snapshot: snapshot, + } + + exporter.registry.MustRegister( + exporter.orders, + exporter.trades, + exporter.activeConnections, + exporter.orderBookDepth, + exporter.matchingLatency, + ) + + for _, orderType := range []string{"limit", "market", "stop_limit", "stop_market", "trailing_stop", "iceberg"} { + for _, side := range []string{"buy", "sell"} { + exporter.orders.WithLabelValues(orderType, side) + } + } + for _, side := range []string{"bids", "asks"} { + exporter.orderBookDepth.WithLabelValues(side) + } + + return exporter +} + +func (e *PrometheusExporter) Handler() http.Handler { + handler := promhttp.HandlerFor(e.registry, promhttp.HandlerOpts{}) + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + e.syncSnapshot() + handler.ServeHTTP(w, r) + }) +} + +func (e *PrometheusExporter) Start(port int) error { + e.mu.Lock() + e.server = &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: e.Handler(), + ReadHeaderTimeout: 5 * time.Second, + } + server := e.server + e.mu.Unlock() + + err := server.ListenAndServe() + if err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + return nil +} + +func (e *PrometheusExporter) Stop(ctx context.Context) error { + e.mu.Lock() + server := e.server + e.mu.Unlock() + if server == nil { + return nil + } + return server.Shutdown(ctx) +} + +func (e *PrometheusExporter) RecordOrder(orderType string, side string) { + if orderType == "" || side == "" { + return + } + e.orders.WithLabelValues(orderType, side).Inc() +} + +func (e *PrometheusExporter) RecordTrades(count int) { + if count <= 0 { + return + } + e.trades.Add(float64(count)) +} + +func (e *PrometheusExporter) ObserveMatchingLatency(duration time.Duration) { + if duration < 0 { + return + } + e.matchingLatency.Observe(duration.Seconds()) +} + +func (e *PrometheusExporter) syncSnapshot() { + if e.snapshot == nil { + return + } + snapshot := e.snapshot() + e.activeConnections.Set(snapshot.ActiveConnections) + e.orderBookDepth.WithLabelValues("bids").Set(snapshot.BidDepth) + e.orderBookDepth.WithLabelValues("asks").Set(snapshot.AskDepth) +} diff --git a/market/go.mod b/market/go.mod index 18fc5b58..3905a0e0 100644 --- a/market/go.mod +++ b/market/go.mod @@ -5,8 +5,20 @@ go 1.26 require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 + github.com/prometheus/client_golang v1.23.2 github.com/shopspring/decimal v1.4.0 go.uber.org/zap v1.27.0 ) -require go.uber.org/multierr v1.10.0 // indirect +require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.66.1 // indirect + github.com/prometheus/procfs v0.16.1 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.yaml.in/yaml/v2 v2.4.2 // indirect + golang.org/x/sys v0.35.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect +) diff --git a/market/go.sum b/market/go.sum index 0e58551e..3e62ecb5 100644 --- a/market/go.sum +++ b/market/go.sum @@ -1,20 +1,55 @@ +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= +github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= +github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= +go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/market/main.go b/market/main.go index 39e96d19..a971dbc3 100644 --- a/market/main.go +++ b/market/main.go @@ -1,12 +1,17 @@ package main import ( + "context" "flag" "fmt" + "net/http" "os" "os/signal" + "strconv" "syscall" + "time" + "github.com/tent-of-trials/market/analytics" "github.com/tent-of-trials/market/matching" "github.com/tent-of-trials/market/orderbook" "github.com/tent-of-trials/market/types" @@ -19,6 +24,7 @@ var ( symbols = flag.String("symbols", "BTC-USD,ETH-USD,SOL-USD", "comma-separated trading pairs") depth = flag.Int("depth", 100, "order book depth per side") rateLimit = flag.Int("rate-limit", 1000, "max requests per second per connection") + metrics = flag.Bool("metrics", true, "enable Prometheus metrics endpoint") ) // The market entrypoint. I don't fucking know anymore. @@ -32,20 +38,21 @@ func main() { zap.Int("port", *port), zap.String("symbols", *symbols), zap.Int("depth", *depth), + zap.Bool("metrics", *metrics), ) bookConfig := orderbook.Config{ - MaxDepth: *depth, - PriceDecimals: 8, + MaxDepth: *depth, + PriceDecimals: 8, VolumeDecimals: 8, } engineConfig := matching.EngineConfig{ - OrderTimeoutMs: 30000, + OrderTimeoutMs: 30000, MaxPendingOrders: 10000, - EnableShorting: true, - FeeRate: "0.001", - MakerFeeRate: "0.0005", + EnableShorting: true, + FeeRate: "0.001", + MakerFeeRate: "0.0005", } books := make(map[types.Symbol]*orderbook.OrderBook) @@ -57,18 +64,46 @@ func main() { logger.Info("order book initialized", zap.String("symbol", string(sym))) } + hub := ws.NewHub(logger) + var metricsExporter *analytics.PrometheusExporter + if *metrics { + metricsExporter = analytics.NewPrometheusExporter(func() analytics.PrometheusSnapshot { + var bidDepth float64 + var askDepth float64 + for _, book := range books { + bidDepth += float64(len(book.GetBids())) + askDepth += float64(len(book.GetAsks())) + } + return analytics.PrometheusSnapshot{ + ActiveConnections: float64(hub.ActiveConnections()), + BidDepth: bidDepth, + AskDepth: askDepth, + } + }) + engineConfig.Metrics = metricsExporter + } + engine := matching.NewMatchingEngine(engineConfig, books) logger.Info("matching engine initialized", zap.Int("symbols", len(parsedSymbols)), ) - hub := ws.NewHub(logger) go hub.Run() + if metricsExporter != nil { + metricsPort := parseMetricsPort(logger) + go func() { + logger.Info("starting metrics server", zap.Int("port", metricsPort)) + if err := metricsExporter.Start(metricsPort); err != nil { + logger.Error("failed to start metrics server", zap.Error(err)) + } + }() + } + server := ws.NewServer(hub, engine, logger, *port) go func() { logger.Info("starting WebSocket server", zap.Int("port", *port)) - if err := server.Start(); err != nil { + if err := server.Start(); err != nil && err != http.ErrServerClosed { logger.Fatal("failed to start server", zap.Error(err)) } }() @@ -84,6 +119,14 @@ func main() { server.Stop() logger.Info("server stopped") + if metricsExporter != nil { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := metricsExporter.Stop(ctx); err != nil { + logger.Warn("metrics server shutdown failed", zap.Error(err)) + } + cancel() + } + for sym := range books { book := books[sym] book.Close() @@ -112,3 +155,20 @@ func parseSymbols(s string) []types.Symbol { fmt.Printf("market: configured symbols %v\n", result) return result } + +func parseMetricsPort(logger *zap.Logger) int { + const defaultMetricsPort = 9090 + value := os.Getenv("METRICS_PORT") + if value == "" { + return defaultMetricsPort + } + port, err := strconv.Atoi(value) + if err != nil || port <= 0 || port > 65535 { + logger.Warn("invalid METRICS_PORT, using default", + zap.String("value", value), + zap.Int("default", defaultMetricsPort), + ) + return defaultMetricsPort + } + return port +} diff --git a/market/matching/engine.go b/market/matching/engine.go index 203de286..8ea31d4f 100644 --- a/market/matching/engine.go +++ b/market/matching/engine.go @@ -17,6 +17,13 @@ type EngineConfig struct { EnableShorting bool FeeRate string MakerFeeRate string + Metrics MetricsRecorder +} + +type MetricsRecorder interface { + RecordOrder(orderType string, side string) + RecordTrades(count int) + ObserveMatchingLatency(duration time.Duration) } type MatchingEngine struct { @@ -24,18 +31,27 @@ type MatchingEngine struct { books map[types.Symbol]*orderbook.OrderBook trades []*types.Trade tradeCount atomic.Int64 + metrics MetricsRecorder mu sync.RWMutex } func NewMatchingEngine(config EngineConfig, books map[types.Symbol]*orderbook.OrderBook) *MatchingEngine { return &MatchingEngine{ - config: config, - books: books, - trades: make([]*types.Trade, 0, 10000), + config: config, + books: books, + trades: make([]*types.Trade, 0, 10000), + metrics: config.Metrics, } } func (e *MatchingEngine) PlaceOrder(order *types.Order) ([]*types.Trade, error) { + start := time.Now() + defer func() { + if e.metrics != nil { + e.metrics.ObserveMatchingLatency(time.Since(start)) + } + }() + if order.ID == "" { order.ID = uuid.New().String() } @@ -53,6 +69,11 @@ func (e *MatchingEngine) PlaceOrder(order *types.Order) ([]*types.Trade, error) return nil, err } + if e.metrics != nil { + e.metrics.RecordOrder(order.Type.String(), order.Side.String()) + e.metrics.RecordTrades(len(trades)) + } + order.Status = types.Filled order.FilledQty = order.Quantity order.RemainingQty = decimal.Zero @@ -112,9 +133,9 @@ func (e *MatchingEngine) ValidateOrder(order *types.Order) error { } var ( - ErrSymbolNotFound = &EngineError{"symbol not found"} - ErrInvalidQuantity = &EngineError{"invalid quantity"} - ErrInvalidPrice = &EngineError{"invalid price"} + ErrSymbolNotFound = &EngineError{"symbol not found"} + ErrInvalidQuantity = &EngineError{"invalid quantity"} + ErrInvalidPrice = &EngineError{"invalid price"} ErrShortingDisabled = &EngineError{"shorting disabled"} ) diff --git a/market/ws/server.go b/market/ws/server.go index b87cea9f..4ad741d2 100644 --- a/market/ws/server.go +++ b/market/ws/server.go @@ -21,12 +21,12 @@ var upgrader = websocket.Upgrader{ } type Client struct { - hub *Hub - conn *websocket.Conn - send chan []byte - subs map[types.Symbol]struct{} - remote string - mu sync.Mutex + hub *Hub + conn *websocket.Conn + send chan []byte + subs map[types.Symbol]struct{} + remote string + mu sync.Mutex } type Hub struct { @@ -62,22 +62,25 @@ func (h *Hub) Run() { case client := <-h.register: h.mu.Lock() h.clients[client] = struct{}{} + total := len(h.clients) h.mu.Unlock() h.logger.Info("client connected", zap.String("remote", client.remote), - zap.Int("total", len(h.clients)), + zap.Int("total", total), ) case client := <-h.unregister: h.mu.Lock() + total := len(h.clients) if _, ok := h.clients[client]; ok { delete(h.clients, client) close(client.send) + total = len(h.clients) } h.mu.Unlock() h.logger.Info("client disconnected", zap.String("remote", client.remote), - zap.Int("total", len(h.clients)), + zap.Int("total", total), ) case message := <-h.broadcast: @@ -95,6 +98,12 @@ func (h *Hub) Run() { } } +func (h *Hub) ActiveConnections() int { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) +} + func NewServer(hub *Hub, engine *matching.MatchingEngine, logger *zap.Logger, port int) *Server { return &Server{ hub: hub,