diff --git a/README.md b/README.md index d451d01..fa6d42a 100644 --- a/README.md +++ b/README.md @@ -21,10 +21,11 @@ Welcome to the **`Backhaul`** project! This project provides a high-performance - [WSS Multiplexing Configuration](#wss-multiplexing-configuration) 5. [Generating a Self-Signed TLS Certificate with OpenSSL](#generating-a-self-signed-tls-certificate-with-openssl) 6. [Running backhaul as a service](#running-backhaul-as-a-service) -7. [FAQ](#faq) -8. [Benchmark](#benchmark) -9. [License](#license) -10. [Donation](#donation) +7. [Monitoring with prometheus/grafana](#monitoring) +8. [FAQ](#faq) +9. [Benchmark](#benchmark) +10. [License](#license) +11. [Donation](#donation) --- @@ -106,8 +107,8 @@ To start using the solution, you'll need to configure both server and client com mss = 1360 # TCP/TCPMux: Maximum Segment Size in bytes; controls max TCP payload size to avoid fragmentation. (default: system-defined) so_rcvbuf = 4194304 # TCP/TCPMux: Socket receive buffer size (bytes); larger buffer allows higher throughput on receive side. (default: system-defined) so_sndbuf = 1048576 # TCP/TCPMux: Socket send buffer size (bytes); controls send queue size to manage outgoing data flow. (default: system-defined) - - + + metrics = ["default", "prometheus"] # metrics exposed via web_port. only the legacy json ("default") and prometheus ("prometheus") are supported. to achieve backward compatibility without changing the config file, legacy json is enabled by default. ports = [ "443-600", # Listen on all ports in the range 443 to 600 @@ -154,6 +155,8 @@ To start using the solution, you'll need to configure both server and client com mss = 1360 # TCP/TCPMux: Maximum Segment Size in bytes; controls max TCP payload size to avoid fragmentation. (default: system-defined) so_rcvbuf = 1048576 # TCP/TCPMux: Socket receive buffer size (bytes); larger buffer allows higher throughput on receive side. (default: system-defined) so_sndbuf = 4194304 # TCP/TCPMux: Socket send buffer size (bytes); controls send queue size to manage outgoing data flow. (default: system-defined) + + metrics = ["default", "prometheus"] # metrics exposed via web_port. only the legacy json ("default") and prometheus ("prometheus") are supported. to achieve backward compatibility without changing the config file, legacy json is enabled by default. ``` To start the `client`: @@ -571,6 +574,35 @@ sudo systemctl status backhaul.service journalctl -u backhaul.service -e -f ``` +## Monitoring + +setting `web_port` to a non-zero value will enable the monitoring interface. + +you can choose which monitoring interface you want to use by setting `metrics` to `prometheus`, `default` or both. empty array will fallback to `["default"]` + +### Basic Monitoring Setup +you can set up a basic monitoring setup using prometheus and grafana. +![grafana dashboard](monitoring/dashboard.jpg) + +(while not necessary, it is recommended to use docker) +1. install docker, docker-compose + ```bash + sudo apt update && sudo apt install docker.io docker-compose-v2 + # to run docker without sudo + sudo groupadd docker + sudo usermod -aG docker $USER + # If you're running Linux in a virtual machine, it may be necessary to restart the virtual machine for changes to take effect. + ``` +2. create a `docker-compose.yml` like [this example](monitoring/docker-compose.yml) +3. create a prometheus.yaml like [this example](monitoring/prometheus.yml) +4. run the docker-compose file + ```bash + docker compose up -d + ``` +5. visit grafana dashboard at `http://SERVER_IP:3000` (default username/password is `admin`) +6. create a new datasource, choose prometheus, and enter the url `http://prometheus:9090` +7. via `Dashboards > New > import` import the [example dashboard](monitoring/dashboard.jpg) or [create your own](https://grafana.com/tutorials/) + ## FAQ **Q: How do I decide which transport protocol to use?** diff --git a/cmd/cmd.go b/cmd/cmd.go index ef6d799..5dc5670 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -5,6 +5,7 @@ import ( "github.com/musix/backhaul/config" "github.com/musix/backhaul/internal/client" + "github.com/musix/backhaul/internal/web/metrics" "github.com/musix/backhaul/internal/server" "github.com/musix/backhaul/internal/utils" @@ -26,6 +27,8 @@ func Run(configPath string, ctx context.Context) { // Apply default values to the configuration applyDefaults(cfg) + metricHandler := metrics.NewMetricsHandler(ctx, logger, *cfg) + configType := "" if cfg.Server.BindAddr != "" { configType = "server" @@ -45,6 +48,7 @@ func Run(configPath string, ctx context.Context) { srv := server.NewServer(&cfg.Server, ctx) // server go srv.Start() + go metricHandler.Monitor() // Wait for shutdown signal <-ctx.Done() @@ -58,6 +62,7 @@ func Run(configPath string, ctx context.Context) { clnt := client.NewClient(&cfg.Client, ctx) // client go clnt.Start() + go metricHandler.Monitor() // Wait for shutdown signal <-ctx.Done() diff --git a/cmd/defaults.go b/cmd/defaults.go index d6c6980..2d34633 100644 --- a/cmd/defaults.go +++ b/cmd/defaults.go @@ -129,4 +129,13 @@ func applyDefaults(cfg *config.Config) { if cfg.Server.MuxCon < 1 { cfg.Server.MuxCon = defaultMuxCon } + + // keep legacy handler + if len(cfg.Client.MetricCollectors) == 0 { + cfg.Client.MetricCollectors = append(cfg.Client.MetricCollectors, "default") + } + + if len(cfg.Server.MetricCollectors) == 0 { + cfg.Server.MetricCollectors = append(cfg.Server.MetricCollectors, "default") + } } diff --git a/config/config.go b/config/config.go index 14a8672..605a4a6 100644 --- a/config/config.go +++ b/config/config.go @@ -41,6 +41,7 @@ type ServerConfig struct { MSS int `toml:"mss"` SO_RCVBUF int `toml:"so_rcvbuf"` SO_SNDBUF int `toml:"so_sndbuf"` + MetricCollectors []string `toml:"metrics"` } // ClientConfig represents the configuration for the client. @@ -69,6 +70,7 @@ type ClientConfig struct { MSS int `toml:"mss"` SO_RCVBUF int `toml:"so_rcvbuf"` SO_SNDBUF int `toml:"so_sndbuf"` + MetricCollectors []string `toml:"metrics"` } // Config represents the complete configuration, including both server and client settings. @@ -76,3 +78,11 @@ type Config struct { Server ServerConfig `toml:"server"` Client ClientConfig `toml:"client"` } + +func (c *Config) IsServerConfig() bool { + if c.Client.RemoteAddr != "" { + return false + } + + return true +} diff --git a/go.mod b/go.mod index c678e54..364303a 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,26 @@ go 1.23.1 require ( github.com/BurntSushi/toml v1.4.0 github.com/gorilla/websocket v1.5.3 + github.com/prometheus/client_golang v1.23.0 github.com/shirou/gopsutil/v4 v4.24.8 github.com/sirupsen/logrus v1.9.3 github.com/xtaci/smux v1.5.27 ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/procfs v0.16.1 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/tklauser/go-sysconf v0.3.14 // indirect github.com/tklauser/numcpus v0.8.0 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - golang.org/x/sys v0.25.0 // indirect + golang.org/x/sys v0.33.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/go.sum b/go.sum index 20b1cee..c72e329 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,39 @@ github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 h1:o4JXh1EVt9k/+g42oCprj/FisM4qX9L3sZB3upGN2ZU= +github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc= +github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE= +github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= +github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/shirou/gopsutil/v4 v4.24.8 h1:pVQjIenQkIhqO81mwTaXjTzOMT7d3TZkf43PlVFHENI= github.com/shirou/gopsutil/v4 v4.24.8/go.mod h1:wE0OrJtj4dG+hYkxqDH3QiBICdKSf04/npcvLLc/oRg= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= @@ -26,8 +44,8 @@ github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU= github.com/tklauser/go-sysconf v0.3.14/go.mod h1:1ym4lWMLUOhuBOPGtRcJm7tEGX4SCYNEEEtghGG/8uY= github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYgY= @@ -36,12 +54,16 @@ github.com/xtaci/smux v1.5.27 h1:uIU1dpJQQWUCmGxXBgajLfc8cMMb13hCitj+HC5yC/Q= github.com/xtaci/smux v1.5.27/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/internal/client/transport/accept_udp.go b/internal/client/transport/accept_udp.go index e1f777d..a85bcd5 100644 --- a/internal/client/transport/accept_udp.go +++ b/internal/client/transport/accept_udp.go @@ -7,13 +7,13 @@ import ( "net" "time" - "github.com/musix/backhaul/internal/web" + "github.com/musix/backhaul/internal/stats" "github.com/sirupsen/logrus" ) const BufferSize = 16 * 1024 -func UDPDialer(tcp net.Conn, remoteAddr string, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func UDPDialer(tcp net.Conn, remoteAddr string, logger *logrus.Logger, remotePort int) { remoteUDPAddr, err := net.ResolveUDPAddr("udp", remoteAddr) if err != nil { logger.Fatalf("failed to resolve remote address: %v", err) @@ -30,16 +30,16 @@ func UDPDialer(tcp net.Conn, remoteAddr string, logger *logrus.Logger, usage *we done := make(chan struct{}) go func() { - go tcpToUDP(tcp, remoteConn, logger, usage, remotePort, sniffer) + go tcpToUDP(tcp, remoteConn, logger, remotePort) done <- struct{}{} }() - udpToTCP(tcp, remoteConn, logger, usage, remotePort, sniffer) + udpToTCP(tcp, remoteConn, logger, remotePort) <-done } -func tcpToUDP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func tcpToUDP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, remotePort int) { buf := make([]byte, BufferSize) lenBuf := make([]byte, 2) // 2-byte header for packet size @@ -83,13 +83,11 @@ func tcpToUDP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, usage *web. logger.Tracef("read %d bytes from TCP, wrote %d bytes to UDP", packetSize, totalWritten) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(totalWritten)) - } + stats.RecordPortUsage(remotePort, uint64(totalWritten)) } } -func udpToTCP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func udpToTCP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, remotePort int) { buf := make([]byte, BufferSize-6) // reserved for 5 bytes header // Pre-allocate headers @@ -146,8 +144,6 @@ func udpToTCP(tcp net.Conn, udp *net.UDPConn, logger *logrus.Logger, usage *web. logger.Tracef("read %d bytes from UDP, wrote %d bytes to TCP", r, totalWritten) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(totalWritten)) - } + stats.RecordPortUsage(remotePort, uint64(totalWritten)) } } diff --git a/internal/client/transport/tcp.go b/internal/client/transport/tcp.go index 81811ba..bbfed3e 100644 --- a/internal/client/transport/tcp.go +++ b/internal/client/transport/tcp.go @@ -2,17 +2,16 @@ package transport import ( "context" - "fmt" "net" "strings" "sync" "sync/atomic" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" ) @@ -24,7 +23,6 @@ type TcpTransport struct { cancel context.CancelFunc logger *logrus.Logger controlChannel net.Conn - usageMonitor *web.Usage restartMutex sync.Mutex poolConnections int32 loadConnections int32 @@ -34,7 +32,6 @@ type TcpConfig struct { RemoteAddr string Token string SnifferLog string - TunnelStatus string KeepAlive time.Duration RetryInterval time.Duration DialTimeOut time.Duration @@ -60,7 +57,6 @@ func NewTCPClient(parentCtx context.Context, config *TcpConfig, logger *logrus.L cancel: cancel, logger: logger, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), @@ -70,11 +66,7 @@ func NewTCPClient(parentCtx context.Context, config *TcpConfig, logger *logrus.L } func (c *TcpTransport) Start() { - if c.config.WebPort > 0 { - go c.usageMonitor.Monitor() - } - - c.config.TunnelStatus = "Disconnected (TCP)" + stats.SetDown() go c.channelDialer() } @@ -108,8 +100,6 @@ func (c *TcpTransport) Restart() { // Re-initialize variables c.controlChannel = nil - c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) - c.config.TunnelStatus = "" c.poolConnections = 0 c.loadConnections = 0 c.controlFlow = make(chan struct{}, 100) @@ -117,6 +107,8 @@ func (c *TcpTransport) Restart() { // set the log level again c.logger.SetLevel(level) + stats.SetDown() + go c.Start() } @@ -170,7 +162,7 @@ func (c *TcpTransport) channelDialer() { c.controlChannel = tunnelTCPConn c.logger.Info("control channel established successfully") - c.config.TunnelStatus = "Connected (TCP)" + stats.SetUp() go c.poolMaintainer() go c.channelHandler() @@ -360,7 +352,7 @@ func (c *TcpTransport) tunnelDialer() { c.localDialer(tcpConn, resolvedAddr, port) case utils.SG_UDP: - UDPDialer(tcpConn, resolvedAddr, c.logger, c.usageMonitor, port, c.config.Sniffer) + UDPDialer(tcpConn, resolvedAddr, c.logger, port) default: c.logger.Error("undefined transport. close the connection.") @@ -390,5 +382,5 @@ func (c *TcpTransport) localDialer(tcpConn net.Conn, resolvedAddr string, port i c.logger.Debugf("connected to local address %s successfully", resolvedAddr) - handlers.TCPConnectionHandler(c.ctx, tcpConn, localConnection, c.logger, c.usageMonitor, port, c.config.Sniffer) + handlers.TCPConnectionHandler(c.ctx, tcpConn, localConnection, c.logger, port) } diff --git a/internal/client/transport/tcpmux.go b/internal/client/transport/tcpmux.go index 829e673..53dce55 100644 --- a/internal/client/transport/tcpmux.go +++ b/internal/client/transport/tcpmux.go @@ -2,18 +2,16 @@ package transport import ( "context" - "fmt" "net" "strings" "sync" "sync/atomic" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" - "github.com/sirupsen/logrus" "github.com/xtaci/smux" ) @@ -26,7 +24,6 @@ type TcpMuxTransport struct { cancel context.CancelFunc logger *logrus.Logger controlChannel net.Conn - usageMonitor *web.Usage restartMutex sync.Mutex poolConnections int32 loadConnections int32 @@ -37,7 +34,6 @@ type TcpMuxConfig struct { RemoteAddr string Token string SnifferLog string - TunnelStatus string Nodelay bool Sniffer bool KeepAlive time.Duration @@ -75,7 +71,6 @@ func NewMuxClient(parentCtx context.Context, config *TcpMuxConfig, logger *logru cancel: cancel, logger: logger, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), @@ -85,11 +80,7 @@ func NewMuxClient(parentCtx context.Context, config *TcpMuxConfig, logger *logru } func (c *TcpMuxTransport) Start() { - if c.config.WebPort > 0 { - go c.usageMonitor.Monitor() - } - - c.config.TunnelStatus = "Disconnected (TCPMUX)" + stats.SetDown() go c.channelDialer() } @@ -124,8 +115,6 @@ func (c *TcpMuxTransport) Restart() { // Re-initialize variables c.controlChannel = nil - c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) - c.config.TunnelStatus = "" c.poolConnections = 0 c.loadConnections = 0 c.controlFlow = make(chan struct{}, 100) @@ -133,6 +122,8 @@ func (c *TcpMuxTransport) Restart() { // set the log level again c.logger.SetLevel(level) + stats.SetDown() + go c.Start() } @@ -185,7 +176,7 @@ func (c *TcpMuxTransport) channelDialer() { c.controlChannel = tunnelConn c.logger.Info("control channel established successfully") - c.config.TunnelStatus = "Connected (TCPMux)" + stats.SetUp() go c.poolMaintainer() go c.channelHandler() @@ -411,5 +402,5 @@ func (c *TcpMuxTransport) localDialer(stream *smux.Stream, remoteAddr string) { c.logger.Debugf("connected to local address %s successfully", remoteAddr) - handlers.TCPConnectionHandler(c.ctx, stream, localConnection, c.logger, c.usageMonitor, int(port), c.config.Sniffer) + handlers.TCPConnectionHandler(c.ctx, stream, localConnection, c.logger, int(port)) } diff --git a/internal/client/transport/udp.go b/internal/client/transport/udp.go index cd16c72..a269c97 100644 --- a/internal/client/transport/udp.go +++ b/internal/client/transport/udp.go @@ -2,15 +2,14 @@ package transport import ( "context" - "fmt" "net" "sync" "sync/atomic" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" ) @@ -21,7 +20,6 @@ type UdpTransport struct { cancel context.CancelFunc logger *logrus.Logger controlChannel net.Conn - usageMonitor *web.Usage restartMutex sync.Mutex poolConnections int32 loadConnections int32 @@ -31,7 +29,6 @@ type UdpConfig struct { RemoteAddr string Token string SnifferLog string - TunnelStatus string RetryInterval time.Duration DialTimeOut time.Duration ConnPoolSize int @@ -52,7 +49,6 @@ func NewUDPClient(parentCtx context.Context, config *UdpConfig, logger *logrus.L cancel: cancel, logger: logger, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), @@ -62,11 +58,7 @@ func NewUDPClient(parentCtx context.Context, config *UdpConfig, logger *logrus.L } func (c *UdpTransport) Start() { - if c.config.WebPort > 0 { - go c.usageMonitor.Monitor() - } - - c.config.TunnelStatus = "Disconnected (UDP)" + stats.SetDown() go c.channelDialer() } @@ -101,8 +93,6 @@ func (c *UdpTransport) Restart() { // Re-initialize variables c.controlChannel = nil - c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) - c.config.TunnelStatus = "" c.poolConnections = 0 c.loadConnections = 0 c.controlFlow = make(chan struct{}, 100) @@ -110,6 +100,8 @@ func (c *UdpTransport) Restart() { // set the log level again c.logger.SetLevel(level) + stats.SetDown() + go c.Start() } @@ -163,7 +155,7 @@ func (c *UdpTransport) channelDialer() { c.controlChannel = tunnelTCPConn c.logger.Info("control channel established successfully") - c.config.TunnelStatus = "Connected (UDP)" + stats.SetUp() go c.poolMaintainer() go c.channelHandler() @@ -456,7 +448,7 @@ func (c *UdpTransport) udpCopy(srcConn, dstConn *net.UDPConn, port int) { // Optionally update the port usage stats if sniffing is enabled if c.config.Sniffer { - c.usageMonitor.AddOrUpdatePort(port, uint64(totalWritten)) + stats.RecordPortUsage(port, uint64(totalWritten)) } c.logger.Debugf("forwarded %d bytes from %s to %s", n, srcConn.LocalAddr().String(), dstConn.RemoteAddr().String()) diff --git a/internal/client/transport/ws.go b/internal/client/transport/ws.go index 6864925..596debf 100644 --- a/internal/client/transport/ws.go +++ b/internal/client/transport/ws.go @@ -3,17 +3,16 @@ package transport import ( "bytes" "context" - "fmt" "strings" "sync" "sync/atomic" "time" "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" @@ -27,7 +26,6 @@ type WsTransport struct { logger *logrus.Logger controlChannel *websocket.Conn restartMutex sync.Mutex - usageMonitor *web.Usage poolConnections int32 loadConnections int32 controlFlow chan struct{} @@ -36,7 +34,6 @@ type WsConfig struct { RemoteAddr string Token string SnifferLog string - TunnelStatus string Nodelay bool Sniffer bool KeepAlive time.Duration @@ -61,7 +58,6 @@ func NewWSClient(parentCtx context.Context, config *WsConfig, logger *logrus.Log cancel: cancel, logger: logger, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), @@ -71,12 +67,7 @@ func NewWSClient(parentCtx context.Context, config *WsConfig, logger *logrus.Log } func (c *WsTransport) Start() { - // for webui - if c.config.WebPort > 0 { - go c.usageMonitor.Monitor() - } - - c.config.TunnelStatus = fmt.Sprintf("Disconnected (%s)", c.config.Mode) + stats.SetDown() go c.channelDialer() @@ -111,8 +102,6 @@ func (c *WsTransport) Restart() { // Re-initialize variables c.controlChannel = nil - c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) - c.config.TunnelStatus = "" c.poolConnections = 0 c.loadConnections = 0 c.controlFlow = make(chan struct{}, 100) @@ -120,6 +109,8 @@ func (c *WsTransport) Restart() { // set the log level again c.logger.SetLevel(level) + stats.SetDown() + go c.Start() } @@ -140,7 +131,7 @@ func (c *WsTransport) channelDialer() { c.controlChannel = tunnelWSConn c.logger.Info("control channel established successfully") - c.config.TunnelStatus = fmt.Sprintf("Connected (%s)", c.config.Mode) + stats.SetUp() go c.poolMaintainer() go c.channelHandler() @@ -359,5 +350,5 @@ func (c *WsTransport) localDialer(tunnelCon *websocket.Conn, remoteAddr string, } c.logger.Debugf("connected to local address %s successfully", remoteAddr) - handlers.WSConnectionHandler(c.ctx, tunnelCon, localConnection, c.logger, c.usageMonitor, int(port), c.config.Sniffer) + handlers.WSConnectionHandler(c.ctx, tunnelCon, localConnection, c.logger, int(port)) } diff --git a/internal/client/transport/wsmux.go b/internal/client/transport/wsmux.go index 9633836..36397ff 100644 --- a/internal/client/transport/wsmux.go +++ b/internal/client/transport/wsmux.go @@ -2,17 +2,16 @@ package transport import ( "context" - "fmt" "strings" "sync" "sync/atomic" "time" "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/xtaci/smux" "github.com/gorilla/websocket" @@ -27,7 +26,6 @@ type WsMuxTransport struct { cancel context.CancelFunc logger *logrus.Logger controlChannel *websocket.Conn - usageMonitor *web.Usage restartMutex sync.Mutex poolConnections int32 loadConnections int32 @@ -37,7 +35,6 @@ type WsMuxConfig struct { RemoteAddr string Token string SnifferLog string - TunnelStatus string Nodelay bool Sniffer bool KeepAlive time.Duration @@ -74,7 +71,6 @@ func NewWSMuxClient(parentCtx context.Context, config *WsMuxConfig, logger *logr cancel: cancel, logger: logger, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), poolConnections: 0, loadConnections: 0, controlFlow: make(chan struct{}, 100), @@ -84,11 +80,7 @@ func NewWSMuxClient(parentCtx context.Context, config *WsMuxConfig, logger *logr } func (c *WsMuxTransport) Start() { - if c.config.WebPort > 0 { - go c.usageMonitor.Monitor() - } - - c.config.TunnelStatus = fmt.Sprintf("Disconnected (%s)", c.config.Mode) + stats.SetDown() go c.channelDialer() } @@ -123,8 +115,6 @@ func (c *WsMuxTransport) Restart() { // Re-initialize variables c.controlChannel = nil - c.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", c.config.WebPort), ctx, c.config.SnifferLog, c.config.Sniffer, &c.config.TunnelStatus, c.logger) - c.config.TunnelStatus = "" c.poolConnections = 0 c.loadConnections = 0 c.controlFlow = make(chan struct{}, 100) @@ -132,6 +122,8 @@ func (c *WsMuxTransport) Restart() { // set the log level again c.logger.SetLevel(level) + stats.SetDown() + go c.Start() } @@ -153,7 +145,7 @@ func (c *WsMuxTransport) channelDialer() { c.controlChannel = tunnelWSConn c.logger.Info("control channel established successfully") - c.config.TunnelStatus = fmt.Sprintf("Connected (%s)", c.config.Mode) + stats.SetUp() go c.poolMaintainer() go c.channelHandler() @@ -378,5 +370,5 @@ func (c *WsMuxTransport) localDialer(stream *smux.Stream, remoteAddr string) { c.logger.Debugf("connected to local address %s successfully", remoteAddr) - handlers.TCPConnectionHandler(c.ctx, stream, localConnection, c.logger, c.usageMonitor, int(port), c.config.Sniffer) + handlers.TCPConnectionHandler(c.ctx, stream, localConnection, c.logger, int(port)) } diff --git a/internal/server/transport/accept_udp.go b/internal/server/transport/accept_udp.go index 607e985..41a5142 100644 --- a/internal/server/transport/accept_udp.go +++ b/internal/server/transport/accept_udp.go @@ -7,8 +7,8 @@ import ( "sync" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" ) @@ -146,7 +146,7 @@ func (s *TcpTransport) handleUDPLoop(udpChan chan *LocalAcceptUDPConn, activeCon } // Handle data exchange between connections - go UDPConnectionHandler(localConn, tunnelConn, s.logger, s.usageMonitor, localConn.listener.LocalAddr().(*net.UDPAddr).Port, s.config.Sniffer, s.rtt, activeConnections, mu) + go UDPConnectionHandler(localConn, tunnelConn, s.logger, localConn.listener.LocalAddr().(*net.UDPAddr).Port, s.rtt, activeConnections, mu) s.logger.Debugf("initiate new handler for connection %s with timestamp %d", localConn.clientAddr.String(), localConn.timeCreated) break loop @@ -156,7 +156,7 @@ func (s *TcpTransport) handleUDPLoop(udpChan chan *LocalAcceptUDPConn, activeCon } } -func UDPConnectionHandler(udp *LocalAcceptUDPConn, tcp net.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool, rtt int64, activeConnections *map[string]*LocalAcceptUDPConn, mu *sync.Mutex) { +func UDPConnectionHandler(udp *LocalAcceptUDPConn, tcp net.Conn, logger *logrus.Logger, remotePort int, rtt int64, activeConnections *map[string]*LocalAcceptUDPConn, mu *sync.Mutex) { done := make(chan struct{}) if rtt == 0 { @@ -167,12 +167,12 @@ func UDPConnectionHandler(udp *LocalAcceptUDPConn, tcp net.Conn, logger *logrus. } go func() { - udpToTCP(tcp, udp, logger, usage, remotePort, sniffer) + udpToTCP(tcp, udp, logger, remotePort) tcp.Close() done <- struct{}{} }() - tcpToUDP(tcp, udp, logger, usage, remotePort, sniffer, rtt) + tcpToUDP(tcp, udp, logger, remotePort, rtt) tcp.Close() <-done @@ -186,7 +186,7 @@ func UDPConnectionHandler(udp *LocalAcceptUDPConn, tcp net.Conn, logger *logrus. mu.Unlock() } -func udpToTCP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func udpToTCP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, remotePort int) { // Create a header (2 bytes) to hold the size of the data header := make([]byte, 2) @@ -224,9 +224,7 @@ func udpToTCP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, usag logger.Tracef("received %d bytes, forwarded %d bytes from UDP to TCP", packetSize, totalWritten-2) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(totalWritten)) - } + stats.RecordPortUsage(remotePort, uint64(totalWritten)) case <-time.After(inactivityTimeout): // Timeout after 30 seconds of inactivity logger.Debugf("connection with timestamp %d and address %s idle for 60 seconds, closing", udp.timeCreated, udp.clientAddr.String()) @@ -235,7 +233,7 @@ func udpToTCP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, usag } } -func tcpToUDP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool, rtt int64) { +func tcpToUDP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, remotePort int, rtt int64) { buf := make([]byte, BufferSize) lenBuf := make([]byte, 2) // Buffer to store the 2-byte packet length timestampBuf := make([]byte, 4) // Buffer for timestamp (4 bytes) @@ -310,9 +308,7 @@ func tcpToUDP(tcp net.Conn, udp *LocalAcceptUDPConn, logger *logrus.Logger, usag totalWritten += w } - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(totalWritten)) - } + stats.RecordPortUsage(remotePort, uint64(totalWritten)) logger.Tracef("read %d bytes from TCP, forwarded %d bytes to UDP", packetSize, totalWritten) } diff --git a/internal/server/transport/tcp.go b/internal/server/transport/tcp.go index 8c0862c..921e13e 100644 --- a/internal/server/transport/tcp.go +++ b/internal/server/transport/tcp.go @@ -10,10 +10,10 @@ import ( "sync" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" ) @@ -29,26 +29,24 @@ type TcpTransport struct { reqNewConnChan chan struct{} controlChannel net.Conn restartMutex sync.Mutex - usageMonitor *web.Usage rtt int64 // in ms, for UDP } type TcpConfig struct { - BindAddr string - Token string - SnifferLog string - TunnelStatus string - Ports []string - Nodelay bool - Sniffer bool - KeepAlive time.Duration - Heartbeat time.Duration // in seconds - ChannelSize int - WebPort int - AcceptUDP bool - MSS int - SO_RCVBUF int - SO_SNDBUF int + BindAddr string + Token string + SnifferLog string + Ports []string + Nodelay bool + Sniffer bool + KeepAlive time.Duration + Heartbeat time.Duration // in seconds + ChannelSize int + WebPort int + AcceptUDP bool + MSS int + SO_RCVBUF int + SO_SNDBUF int } func NewTCPServer(parentCtx context.Context, config *TcpConfig, logger *logrus.Logger) *TcpTransport { @@ -66,7 +64,6 @@ func NewTCPServer(parentCtx context.Context, config *TcpConfig, logger *logrus.L localChannel: make(chan LocalTCPConn, config.ChannelSize), reqNewConnChan: make(chan struct{}, config.ChannelSize), controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), rtt: 0, } @@ -74,18 +71,14 @@ func NewTCPServer(parentCtx context.Context, config *TcpConfig, logger *logrus.L } func (s *TcpTransport) Start() { - s.config.TunnelStatus = "Disconnected (TCP)" - - if s.config.WebPort > 0 { - go s.usageMonitor.Monitor() - } + stats.SetDown() go s.tunnelListener() s.channelHandshake() if s.controlChannel != nil { - s.config.TunnelStatus = "Connected (TCP)" + stats.SetUp() numCPU := runtime.NumCPU() if numCPU > 4 { @@ -134,13 +127,13 @@ func (s *TcpTransport) Restart() { s.tunnelChannel = make(chan net.Conn, s.config.ChannelSize) s.localChannel = make(chan LocalTCPConn, s.config.ChannelSize) s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) - s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) - s.config.TunnelStatus = "" s.controlChannel = nil // set the log level again s.logger.SetLevel(level) + stats.SetDown() + go s.Start() } @@ -553,7 +546,7 @@ func (s *TcpTransport) handleLoop() { } // Handle data exchange between connections - go handlers.TCPConnectionHandler(s.ctx, localConn.conn, tunnelConn, s.logger, s.usageMonitor, localConn.conn.LocalAddr().(*net.TCPAddr).Port, s.config.Sniffer) + go handlers.TCPConnectionHandler(s.ctx, localConn.conn, tunnelConn, s.logger, localConn.conn.LocalAddr().(*net.TCPAddr).Port) break loop } diff --git a/internal/server/transport/tcpmux.go b/internal/server/transport/tcpmux.go index fd3abcd..8d453e0 100644 --- a/internal/server/transport/tcpmux.go +++ b/internal/server/transport/tcpmux.go @@ -11,10 +11,10 @@ import ( "sync/atomic" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" "github.com/musix/backhaul/internal/utils/network" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" "github.com/xtaci/smux" @@ -32,7 +32,6 @@ type TcpMuxTransport struct { localChannel chan LocalTCPConn reqNewConnChan chan struct{} controlChannel net.Conn - usageMonitor *web.Usage restartMutex sync.Mutex streamCounter int32 sessionCounter int32 @@ -40,7 +39,6 @@ type TcpMuxTransport struct { type TcpMuxConfig struct { BindAddr string - TunnelStatus string SnifferLog string Token string Ports []string @@ -86,24 +84,20 @@ func NewTcpMuxServer(parentCtx context.Context, config *TcpMuxConfig, logger *lo controlChannel: nil, // will be set when a control connection is established streamCounter: 0, sessionCounter: 0, - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), } return server } func (s *TcpMuxTransport) Start() { - if s.config.WebPort > 0 { - go s.usageMonitor.Monitor() - } - s.config.TunnelStatus = "Disconnected (TCPMux)" + stats.SetDown() go s.tunnelListener() s.channelHandshake() if s.controlChannel != nil { - s.config.TunnelStatus = "Connected (TCPMux)" + stats.SetUp() numCPU := runtime.NumCPU() if numCPU > 4 { @@ -155,14 +149,14 @@ func (s *TcpMuxTransport) Restart() { s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) s.handshakeChannel = make(chan net.Conn) s.controlChannel = nil - s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) - s.config.TunnelStatus = "" s.streamCounter = 0 s.sessionCounter = 0 // set the log level again s.logger.SetLevel(level) + stats.SetDown() + go s.Start() } @@ -605,7 +599,7 @@ func (s *TcpMuxTransport) handleSession(session *smux.Session) { // Handle data exchange between connections go func() { - handlers.TCPConnectionHandler(s.ctx, stream, incomingConn.conn, s.logger, s.usageMonitor, incomingConn.conn.LocalAddr().(*net.TCPAddr).Port, s.config.Sniffer) + handlers.TCPConnectionHandler(s.ctx, stream, incomingConn.conn, s.logger, incomingConn.conn.LocalAddr().(*net.TCPAddr).Port) atomic.AddInt32(&s.streamCounter, -1) <-counter // read signal from the channel }() diff --git a/internal/server/transport/udp.go b/internal/server/transport/udp.go index f938696..020edcd 100644 --- a/internal/server/transport/udp.go +++ b/internal/server/transport/udp.go @@ -9,8 +9,8 @@ import ( "sync" "time" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" - "github.com/musix/backhaul/internal/web" "github.com/sirupsen/logrus" ) @@ -26,20 +26,18 @@ type UdpTransport struct { reqNewConnChan chan struct{} controlChannel net.Conn restartMutex sync.Mutex - usageMonitor *web.Usage rtt int64 // for Fun! } type UdpConfig struct { - BindAddr string - Token string - SnifferLog string - TunnelStatus string - Ports []string - Sniffer bool - Heartbeat time.Duration // in seconds, for udp conn and control channel - ChannelSize int - WebPort int + BindAddr string + Token string + SnifferLog string + Ports []string + Sniffer bool + Heartbeat time.Duration // in seconds, for udp conn and control channel + ChannelSize int + WebPort int } func NewUDPServer(parentCtx context.Context, config *UdpConfig, logger *logrus.Logger) *UdpTransport { @@ -58,19 +56,13 @@ func NewUDPServer(parentCtx context.Context, config *UdpConfig, logger *logrus.L activeMu: sync.Mutex{}, reqNewConnChan: make(chan struct{}, config.ChannelSize), controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), rtt: 0, } return server } func (s *UdpTransport) Start() { - s.config.TunnelStatus = "Disconnected (UDP)" - - if s.config.WebPort > 0 { - go s.usageMonitor.Monitor() - } - + stats.SetDown() go s.channelHandshake() } @@ -105,8 +97,6 @@ func (s *UdpTransport) Restart() { // Re-initialize variables s.tunnelChannel = make(chan *TunnelUDPConn, s.config.ChannelSize) s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) - s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) - s.config.TunnelStatus = "" s.controlChannel = nil s.activeConnections = map[string]*TunnelUDPConn{} s.activeMu = sync.Mutex{} @@ -114,6 +104,8 @@ func (s *UdpTransport) Restart() { // set the log level again s.logger.SetLevel(level) + stats.SetDown() + go s.Start() } @@ -182,6 +174,7 @@ loop: s.controlChannel = conn s.logger.Info("control channel successfully established.") + stats.SetUp() break loop } @@ -653,7 +646,7 @@ func (s *UdpTransport) udpLocalCopy(from *LocalUDPConn, to *TunnelUDPConn) { } if s.config.Sniffer { - s.usageMonitor.AddOrUpdatePort(from.listener.LocalAddr().(*net.UDPAddr).Port, uint64(totalWritten)) + stats.RecordPortUsage(from.listener.LocalAddr().(*net.UDPAddr).Port, uint64(totalWritten)) } s.logger.Debugf("forwarded %d bytes from local connection %s to tunnel", packetSize, from.addr.String()) @@ -689,7 +682,7 @@ func (s *UdpTransport) udpTunnelCopy(from *TunnelUDPConn, to *LocalUDPConn) { } if s.config.Sniffer { - s.usageMonitor.AddOrUpdatePort(to.listener.LocalAddr().(*net.UDPAddr).Port, uint64(totalWritten)) + stats.RecordPortUsage(from.listener.LocalAddr().(*net.UDPAddr).Port, uint64(totalWritten)) } s.logger.Debugf("forwarded %d bytes from local connection %s to tunnel", packetSize, from.addr.String()) diff --git a/internal/server/transport/ws.go b/internal/server/transport/ws.go index 84953dc..cded8e7 100644 --- a/internal/server/transport/ws.go +++ b/internal/server/transport/ws.go @@ -12,9 +12,9 @@ import ( "time" "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" - "github.com/musix/backhaul/internal/web" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" @@ -31,24 +31,22 @@ type WsTransport struct { reqNewConnChan chan struct{} controlChannel *websocket.Conn restartMutex sync.Mutex - usageMonitor *web.Usage } type WsConfig struct { - BindAddr string - SnifferLog string - TLSCertFile string // Path to the TLS certificate file - TLSKeyFile string // Path to the TLS key file - TunnelStatus string - Token string - Ports []string - Nodelay bool - Sniffer bool - KeepAlive time.Duration - Heartbeat time.Duration // in seconds - ChannelSize int - WebPort int - Mode config.TransportType // ws or wss + BindAddr string + SnifferLog string + TLSCertFile string // Path to the TLS certificate file + TLSKeyFile string // Path to the TLS key file + Token string + Ports []string + Nodelay bool + Sniffer bool + KeepAlive time.Duration + Heartbeat time.Duration // in seconds + ChannelSize int + WebPort int + Mode config.TransportType // ws or wss } @@ -67,19 +65,13 @@ func NewWSServer(parentCtx context.Context, config *WsConfig, logger *logrus.Log localChannel: make(chan LocalTCPConn, config.ChannelSize), reqNewConnChan: make(chan struct{}, config.ChannelSize), controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), } return server } func (s *WsTransport) Start() { - // for webui - if s.config.WebPort > 0 { - go s.usageMonitor.Monitor() - } - - s.config.TunnelStatus = fmt.Sprintf("Disconnected (%s)", s.config.Mode) + stats.SetDown() go s.tunnelListener() @@ -116,12 +108,12 @@ func (s *WsTransport) Restart() { s.localChannel = make(chan LocalTCPConn, s.config.ChannelSize) s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) s.controlChannel = nil - s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) - s.config.TunnelStatus = "" // set the log level again s.logger.SetLevel(level) + stats.SetDown() + go s.Start() } @@ -258,8 +250,7 @@ func (s *WsTransport) tunnelListener() { go s.handleLoop() } - s.config.TunnelStatus = fmt.Sprintf("Connected (%s)", s.config.Mode) - + stats.SetUp() } else if strings.HasPrefix(r.URL.Path, "/tunnel") { wsConn := TunnelChannel{ conn: conn, @@ -510,7 +501,7 @@ func (s *WsTransport) handleLoop() { continue loop } // Handle data exchange between connections - go handlers.WSConnectionHandler(s.ctx, tunnelConnection.conn, localConn.conn, s.logger, s.usageMonitor, localConn.conn.LocalAddr().(*net.TCPAddr).Port, s.config.Sniffer) + go handlers.WSConnectionHandler(s.ctx, tunnelConnection.conn, localConn.conn, s.logger, localConn.conn.LocalAddr().(*net.TCPAddr).Port) break loop } } diff --git a/internal/server/transport/wsmux.go b/internal/server/transport/wsmux.go index d77e8e9..f799f40 100644 --- a/internal/server/transport/wsmux.go +++ b/internal/server/transport/wsmux.go @@ -13,9 +13,9 @@ import ( "time" "github.com/musix/backhaul/config" // for mode + "github.com/musix/backhaul/internal/stats" "github.com/musix/backhaul/internal/utils" "github.com/musix/backhaul/internal/utils/handlers" - "github.com/musix/backhaul/internal/web" "github.com/xtaci/smux" "github.com/gorilla/websocket" @@ -33,7 +33,6 @@ type WsMuxTransport struct { localChannel chan LocalTCPConn reqNewConnChan chan struct{} controlChannel *websocket.Conn - usageMonitor *web.Usage restartMutex sync.Mutex streamCounter int32 sessionCounter int32 @@ -45,7 +44,6 @@ type WsMuxConfig struct { SnifferLog string TLSCertFile string // Path to the TLS certificate file TLSKeyFile string // Path to the TLS key file - TunnelStatus string Ports []string Nodelay bool Sniffer bool @@ -87,19 +85,13 @@ func NewWSMuxServer(parentCtx context.Context, config *WsMuxConfig, logger *logr streamCounter: 0, sessionCounter: 0, controlChannel: nil, // will be set when a control connection is established - usageMonitor: web.NewDataStore(fmt.Sprintf(":%v", config.WebPort), ctx, config.SnifferLog, config.Sniffer, &config.TunnelStatus, logger), } return server } func (s *WsMuxTransport) Start() { - // for webui - if s.config.WebPort > 0 { - go s.usageMonitor.Monitor() - } - - s.config.TunnelStatus = fmt.Sprintf("Disconnected (%s)", s.config.Mode) + stats.SetDown() go s.tunnelListener() @@ -138,14 +130,14 @@ func (s *WsMuxTransport) Restart() { s.localChannel = make(chan LocalTCPConn, s.config.ChannelSize) s.reqNewConnChan = make(chan struct{}, s.config.ChannelSize) s.controlChannel = nil - s.usageMonitor = web.NewDataStore(fmt.Sprintf(":%v", s.config.WebPort), ctx, s.config.SnifferLog, s.config.Sniffer, &s.config.TunnelStatus, s.logger) - s.config.TunnelStatus = "" s.streamCounter = 0 s.sessionCounter = 0 // set the log level again s.logger.SetLevel(level) + stats.SetDown() + go s.Start() } @@ -283,8 +275,7 @@ func (s *WsMuxTransport) tunnelListener() { go s.handleLoop() } - s.config.TunnelStatus = fmt.Sprintf("Connected (%s)", s.config.Mode) - + stats.SetUp() } else if strings.HasPrefix(r.URL.Path, "/tunnel") { session, err := smux.Client(conn.NetConn(), s.smuxConfig) if err != nil { @@ -567,7 +558,7 @@ func (s *WsMuxTransport) handleSession(session *smux.Session) { // Handle data exchange between connections go func() { - handlers.TCPConnectionHandler(s.ctx, stream, incomingConn.conn, s.logger, s.usageMonitor, incomingConn.conn.LocalAddr().(*net.TCPAddr).Port, s.config.Sniffer) + handlers.TCPConnectionHandler(s.ctx, stream, incomingConn.conn, s.logger, incomingConn.conn.LocalAddr().(*net.TCPAddr).Port) atomic.AddInt32(&s.streamCounter, -1) <-counter // read signal from the channel }() diff --git a/internal/stats/stats.go b/internal/stats/stats.go new file mode 100644 index 0000000..c28028e --- /dev/null +++ b/internal/stats/stats.go @@ -0,0 +1,208 @@ +package stats + +import ( + "context" + "encoding/json" + "os" + "sync" + "time" + + "github.com/musix/backhaul/config" + "github.com/sirupsen/logrus" +) + +type TunnelSide string + +var ( + ServerSide TunnelSide = "server" + ClientSide TunnelSide = "client" +) + +type statsStorage struct { + ctx context.Context + + side TunnelSide + transport config.TransportType + + location string + logger *logrus.Logger + + stats *stats + up bool +} + +type stats struct { + mu sync.Mutex + PortUsages PortUsages `json:"port_usages"` + TotalUsage uint64 `json:"total_usage"` +} + +type PortUsages map[int]uint64 + +var instance *statsStorage + +func InitClientStats(ctx context.Context, logger *logrus.Logger, cfg config.ClientConfig) { + instance = &statsStorage{ + ctx: ctx, + side: ClientSide, + transport: cfg.Transport, + location: cfg.SnifferLog, + logger: logger, + stats: nil, + } + + if cfg.Sniffer { + instance.init() + } +} + +func InitServerStats(ctx context.Context, logger *logrus.Logger, cfg config.ServerConfig) { + instance = &statsStorage{ + ctx: ctx, + side: ServerSide, + transport: cfg.Transport, + location: cfg.SnifferLog, + logger: logger, + stats: nil, + } + + if cfg.Sniffer { + instance.init() + } +} + +func (s *statsStorage) init() { + s.load() + + go func() { + ticker := time.NewTicker(15 * time.Second) // every 15 seconds + defer ticker.Stop() + + for { + select { + case <-ticker.C: + go s.save() + case <-s.ctx.Done(): + return + } + } + }() +} + +func (s *statsStorage) save() { + if s.stats == nil { + return + } + + s.stats.mu.Lock() + defer s.stats.mu.Unlock() + + data, err := json.MarshalIndent(s.stats, "", " ") + if err != nil { + s.logger.Errorf("Failed to marshal stats: %v", err) + return + } + + if err := os.WriteFile(s.location, data, 0644); err != nil { + s.logger.Errorf("Failed to save stats to file: %v", err) + } +} + +func (s *statsStorage) load() { + data, err := os.ReadFile(s.location) + if err != nil { + if !os.IsNotExist(err) { + s.logger.Errorf("Failed to read stats file: %v", err) + } + + s.stats = &stats{ + PortUsages: make(PortUsages), + TotalUsage: 0, + } + return + } + + var loadedStats stats + if err := json.Unmarshal(data, &loadedStats); err != nil { + s.logger.Errorf("Failed to unmarshal stats: %v", err) + s.stats = &stats{ + PortUsages: make(PortUsages), + TotalUsage: 0, + } + return + } + + s.stats = &loadedStats + if s.stats.PortUsages == nil { + s.stats.PortUsages = make(PortUsages) + } +} + +func RecordPortUsage(port int, usage uint64) { + if instance == nil { + panic("attempt to record usage before initiating stat storage") + } + + if instance.stats == nil { + return + } + + instance.stats.mu.Lock() + defer instance.stats.mu.Unlock() + + existing, exists := instance.stats.PortUsages[port] + if exists { + usage = usage + existing + } + + instance.stats.PortUsages[port] = usage + instance.stats.TotalUsage += usage +} + +func GetPortUsages() PortUsages { + if instance == nil { + return map[int]uint64{} + } + + if instance.stats == nil { + return map[int]uint64{} + } + + return instance.stats.PortUsages +} + +func GetTotalUsage() uint64 { + if instance == nil { + return 0 + } + + if instance.stats == nil { + return 0 + } + + return instance.stats.TotalUsage +} + +func SetDown() { + if instance == nil { + return + } + + instance.up = false +} + +func SetUp() { + if instance == nil { + return + } + + instance.up = true +} + +func IsUp() bool { + if instance == nil { + return false + } + + return instance.up +} diff --git a/internal/utils/handlers/tcp_handler.go b/internal/utils/handlers/tcp_handler.go index 9414af5..3dbfb0a 100644 --- a/internal/utils/handlers/tcp_handler.go +++ b/internal/utils/handlers/tcp_handler.go @@ -6,19 +6,19 @@ import ( "io" "net" - "github.com/musix/backhaul/internal/web" + "github.com/musix/backhaul/internal/stats" "github.com/sirupsen/logrus" ) -func TCPConnectionHandler(ctx context.Context, from net.Conn, to net.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func TCPConnectionHandler(ctx context.Context, from net.Conn, to net.Conn, logger *logrus.Logger, remotePort int) { done := make(chan struct{}) go func() { defer close(done) - transferData(from, to, logger, usage, remotePort, sniffer) + transferData(from, to, logger, remotePort) }() - transferData(to, from, logger, usage, remotePort, sniffer) + transferData(to, from, logger, remotePort) select { case <-ctx.Done(): @@ -30,7 +30,7 @@ func TCPConnectionHandler(ctx context.Context, from net.Conn, to net.Conn, logge } // Using direct Read and Write for transferring data -func transferData(from net.Conn, to net.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func transferData(from net.Conn, to net.Conn, logger *logrus.Logger, remotePort int) { buf := make([]byte, 16*1024) // 16K for { // Read data from the source connection @@ -65,9 +65,6 @@ func transferData(from net.Conn, to net.Conn, logger *logrus.Logger, usage *web. } logger.Tracef("read data: %d bytes, written data: %d bytes", r, totalWritten) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(totalWritten)) - } + stats.RecordPortUsage(remotePort, uint64(totalWritten)) } - } diff --git a/internal/utils/handlers/ws_handler.go b/internal/utils/handlers/ws_handler.go index e4e9e08..8b6158e 100644 --- a/internal/utils/handlers/ws_handler.go +++ b/internal/utils/handlers/ws_handler.go @@ -7,20 +7,20 @@ import ( "net" "github.com/gorilla/websocket" - "github.com/musix/backhaul/internal/web" + "github.com/musix/backhaul/internal/stats" "github.com/sirupsen/logrus" ) // WebSocketToTCPConnectionHandler handles data transfer between a WebSocket and a TCP connection -func WSConnectionHandler(ctx context.Context, wsConn *websocket.Conn, tcpConn net.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func WSConnectionHandler(ctx context.Context, wsConn *websocket.Conn, tcpConn net.Conn, logger *logrus.Logger, remotePort int) { done := make(chan struct{}) go func() { defer close(done) - transferWebSocketToTCP(wsConn, tcpConn, logger, usage, remotePort, sniffer) + transferWebSocketToTCP(wsConn, tcpConn, logger, remotePort) }() - transferTCPToWebSocket(tcpConn, wsConn, logger, usage, remotePort, sniffer) + transferTCPToWebSocket(tcpConn, wsConn, logger, remotePort) select { case <-ctx.Done(): @@ -32,7 +32,7 @@ func WSConnectionHandler(ctx context.Context, wsConn *websocket.Conn, tcpConn ne } // transferWebSocketToTCP transfers data from a WebSocket connection to a TCP connection -func transferWebSocketToTCP(wsConn *websocket.Conn, tcpConn net.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func transferWebSocketToTCP(wsConn *websocket.Conn, tcpConn net.Conn, logger *logrus.Logger, remotePort int) { for { // Read message from the WebSocket connection messageType, message, err := wsConn.ReadMessage() @@ -58,15 +58,13 @@ func transferWebSocketToTCP(wsConn *websocket.Conn, tcpConn net.Conn, logger *lo return } logger.Tracef("transferred data from WebSocket to TCP: %d bytes", w) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(w)) - } + stats.RecordPortUsage(remotePort, uint64(w)) } } } // transferTCPToWebSocket transfers data from a TCP connection to a WebSocket connection -func transferTCPToWebSocket(tcpConn net.Conn, wsConn *websocket.Conn, logger *logrus.Logger, usage *web.Usage, remotePort int, sniffer bool) { +func transferTCPToWebSocket(tcpConn net.Conn, wsConn *websocket.Conn, logger *logrus.Logger, remotePort int) { buf := make([]byte, 16*1024) // 16K buffer size for { // Read data from the TCP connection @@ -96,8 +94,6 @@ func transferTCPToWebSocket(tcpConn net.Conn, wsConn *websocket.Conn, logger *lo } logger.Tracef("transferred data from TCP to WebSocket: %d bytes", n) - if sniffer { - usage.AddOrUpdatePort(remotePort, uint64(n)) - } + stats.RecordPortUsage(remotePort, uint64(n)) } } diff --git a/internal/web/index.html b/internal/web/metrics/index.html similarity index 100% rename from internal/web/index.html rename to internal/web/metrics/index.html diff --git a/internal/web/metrics/legacy.go b/internal/web/metrics/legacy.go new file mode 100644 index 0000000..1db7435 --- /dev/null +++ b/internal/web/metrics/legacy.go @@ -0,0 +1,267 @@ +package metrics + +import ( + "context" + "embed" + "encoding/json" + "fmt" + "html/template" + "net/http" + "time" + + "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" + "github.com/shirou/gopsutil/v4/cpu" + "github.com/shirou/gopsutil/v4/disk" + "github.com/shirou/gopsutil/v4/mem" + "github.com/shirou/gopsutil/v4/net" + "github.com/sirupsen/logrus" +) + +type SystemStats struct { + TunnelStatus string `json:"tunnelStatus"` + CPUUsage string `json:"cpuUsage"` + RAMUsage string `json:"ramUsage"` + DiskUsage string `json:"diskUsage"` + SwapUsage string `json:"swapUsage"` + NetworkTraffic string `json:"networkTraffic"` + UploadSpeed string `json:"uploadSpeed"` + DownloadSpeed string `json:"downloadSpeed"` + BackhaulTraffic string `json:"backhaulTraffic"` + Sniffer string `json:"sniffer"` + AllConnections string `json:"allConnections"` +} + +func init() { + RegisterCollector("default", func(ctx context.Context, log *logrus.Logger, cfg config.Config) Collector { + var transport config.TransportType + var sniffer bool + if cfg.IsServerConfig() { + transport = cfg.Server.Transport + sniffer = cfg.Server.Sniffer + } else { + transport = cfg.Client.Transport + sniffer = cfg.Client.Sniffer + } + + return &LegacyCollector{ + ctx: ctx, + logger: log, + transport: transport, + portUsageEnabled: sniffer, + } + }) +} + +// LegacyCollector implements Handler +type LegacyCollector struct { + ctx context.Context + logger *logrus.Logger + transport config.TransportType + status *string + portUsageEnabled bool +} + +func (c *LegacyCollector) Bind(srv *http.ServeMux) { + srv.HandleFunc("/", c.handleIndex) // handle index + srv.HandleFunc("/stats", c.statsHandler) + if c.portUsageEnabled { + srv.HandleFunc("/data", c.handleData) // New route for JSON data + } +} + +//go:embed index.html +var indexHTML embed.FS + +func (c *LegacyCollector) handleIndex(w http.ResponseWriter, r *http.Request) { + usageData := stats.GetPortUsages() + readableData := c.usageDataWithReadableUsage(usageData) + + tmpl, err := template.ParseFS(indexHTML, "index.html") + if err != nil { + c.logger.Errorf("error parsing template: %v", err) + return + } + + err = tmpl.Execute(w, readableData) + if err != nil { + c.logger.Errorf("error executing template: %v", err) + } +} + +func (c *LegacyCollector) handleData(w http.ResponseWriter, r *http.Request) { + usageData := stats.GetPortUsages() + readableData := c.usageDataWithReadableUsage(usageData) + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(readableData); err != nil { + c.logger.Errorf("error encoding JSON response: %v", err) + } +} + +func (c *LegacyCollector) statsHandler(w http.ResponseWriter, r *http.Request) { + systemStats, err := c.getSystemStats() + if err != nil { + c.logger.Error("Error fetching system stats:", err) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(systemStats); err != nil { + c.logger.Error("Error encoding JSON:", err) + } +} + +// converts the byte usage to a human-readable format +func (c *LegacyCollector) usageDataWithReadableUsage(usageData stats.PortUsages) []struct { + Port int + ReadableUsage string +} { + var result []struct { + Port int + ReadableUsage string + } + + for port, portUsage := range usageData { + result = append(result, struct { + Port int + ReadableUsage string + }{ + Port: port, + ReadableUsage: c.convertBytesToReadable(portUsage), + }) + } + + return result +} + +// ConvertBytesToReadable converts bytes into a human-readable format (KB, MB, GB) +func (c *LegacyCollector) convertBytesToReadable(bytes uint64) string { + const ( + KB = 1 << (10 * 1) // 1024 bytes + MB = 1 << (10 * 2) // 1024 KB + GB = 1 << (10 * 3) // 1024 MB + TB = 1 << (10 * 4) // 1024 TB + ) + + switch { + case bytes >= TB: + return fmt.Sprintf("%.2f TB", float64(bytes)/float64(TB)) + case bytes >= GB: + return fmt.Sprintf("%.2f GB", float64(bytes)/float64(GB)) + case bytes >= MB: + return fmt.Sprintf("%.2f MB", float64(bytes)/float64(MB)) + case bytes >= KB: + return fmt.Sprintf("%.2f KB", float64(bytes)/float64(KB)) + default: + return fmt.Sprintf("%d B", bytes) // Bytes + } +} + +func (c *LegacyCollector) getSystemStats() (*SystemStats, error) { + + // Get initial network stats + initialStats, err := c.getNetworkStats() + if err != nil { + return nil, err + } + + // Wait for 1 second + time.Sleep(1 * time.Second) + + // Get updated network stats + finalStats, err := c.getNetworkStats() + if err != nil { + return nil, err + } + + // Get CPU usage + cpuPercent, err := cpu.Percent(0, false) + if err != nil { + return nil, err + } + + // Get RAM usage + memStats, err := mem.VirtualMemory() + if err != nil { + return nil, err + } + + // Get Disk usage + diskStats, err := disk.Usage("/") + if err != nil { + return nil, err + } + + // Get Swap usage + swapStats, err := mem.SwapMemory() + if err != nil { + return nil, err + } + + // Get Network traffic + netStats, err := net.IOCounters(false) + if err != nil { + return nil, err + } + + // Get all active network connections (TCP, UDP, etc.) + connections, err := net.Connections("all") + if err != nil { + return nil, err + } + + // Calculate upload and download speeds + uploadSpeed := float64(finalStats.BytesSent - initialStats.BytesSent) + downloadSpeed := float64(finalStats.BytesRecv - initialStats.BytesRecv) + + var tunnelStatus string + if stats.IsUp() { + tunnelStatus = fmt.Sprintf("Connected (%s)", c.transport) + } else { + + tunnelStatus = fmt.Sprintf("Disconnected (%s)", c.transport) + } + + systemStats := &SystemStats{ + TunnelStatus: tunnelStatus, + CPUUsage: c.formatFloat(cpuPercent[0]), + RAMUsage: c.convertBytesToReadable(memStats.Used), + DiskUsage: c.convertBytesToReadable(diskStats.Used), + SwapUsage: c.convertBytesToReadable(swapStats.Used), + NetworkTraffic: c.convertBytesToReadable(netStats[0].BytesSent + netStats[0].BytesRecv), + DownloadSpeed: c.formatSpeed(downloadSpeed), + UploadSpeed: c.formatSpeed(uploadSpeed), + BackhaulTraffic: c.convertBytesToReadable(stats.GetTotalUsage()), + Sniffer: map[bool]string{true: "Running", false: "Not running"}[c.portUsageEnabled], + AllConnections: fmt.Sprintf("%d", len(connections)), + } + + return systemStats, nil +} + +func (c *LegacyCollector) formatSpeed(bytesPerSec float64) string { + if bytesPerSec >= 1e9 { + return fmt.Sprintf("%.2f GB/s", bytesPerSec/1e9) + } else if bytesPerSec >= 1e6 { + return fmt.Sprintf("%.2f MB/s", bytesPerSec/1e6) + } else if bytesPerSec >= 1e3 { + return fmt.Sprintf("%.2f KB/s", bytesPerSec/1e3) + } + return fmt.Sprintf("%.2f B/s", bytesPerSec) +} + +func (c *LegacyCollector) formatFloat(value float64) string { + return fmt.Sprintf("%.2f%%", value) +} + +func (c *LegacyCollector) getNetworkStats() (*net.IOCountersStat, error) { + ioCounters, err := net.IOCounters(false) + if err != nil { + return nil, err + } + if len(ioCounters) == 0 { + return nil, fmt.Errorf("no network IO counters found") + } + return &ioCounters[0], nil +} diff --git a/internal/web/metrics/metrics.go b/internal/web/metrics/metrics.go new file mode 100644 index 0000000..ec0cc32 --- /dev/null +++ b/internal/web/metrics/metrics.go @@ -0,0 +1,109 @@ +package metrics + +import ( + "context" + "errors" + "fmt" + "net/http" + "time" + + "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" + "github.com/sirupsen/logrus" +) + +type CollectorFactory func(ctx context.Context, log *logrus.Logger, cfg config.Config) Collector +type Collector interface { + Bind(mux *http.ServeMux) +} + +type Handler struct { + ctx context.Context + srv *http.Server + log *logrus.Logger + + cfg config.Config + + collectors map[string]Collector +} + +var factories = make(map[string]CollectorFactory) + +func NewMetricsHandler(ctx context.Context, log *logrus.Logger, cfg config.Config) *Handler { + var includedCollectors []string + if cfg.IsServerConfig() { + stats.InitServerStats(ctx, log, cfg.Server) + includedCollectors = cfg.Server.MetricCollectors + } else { + stats.InitClientStats(ctx, log, cfg.Client) + includedCollectors = cfg.Client.MetricCollectors + } + + collectors := map[string]Collector{} + + for _, name := range includedCollectors { + factory, ok := factories[name] + if !ok { + log.Errorf("unknown metrics handler: %s", name) + continue + } + + collectors[name] = factory(ctx, log, cfg) + } + + return &Handler{ + ctx: ctx, + log: log, + cfg: cfg, + collectors: collectors, + } +} + +func (m *Handler) Monitor() { + var port int + if m.cfg.IsServerConfig() { + port = m.cfg.Server.WebPort + } else { + port = m.cfg.Client.WebPort + } + + if port <= 0 { + return + } + + srv := http.NewServeMux() + m.bindCollectors(srv) + + bindAddr := fmt.Sprintf(":%d", port) + m.srv = &http.Server{ + Addr: bindAddr, + Handler: srv, + } + + go func() { + <-m.ctx.Done() + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + if err := m.srv.Shutdown(shutdownCtx); err != nil { + m.log.Errorf("sniffer server shutdown error: %v", err) + } + }() + + // Start the server + m.log.Info("collector service listening on port: ", bindAddr) + if err := m.srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + m.log.Errorf("collector server error: %v", err) + } +} + +func (m *Handler) bindCollectors(srv *http.ServeMux) { + for _, collectorImpl := range m.collectors { + collectorImpl.Bind(srv) + } +} + +func RegisterCollector(name string, factory CollectorFactory) { + factories[name] = factory +} diff --git a/internal/web/metrics/prometheus.go b/internal/web/metrics/prometheus.go new file mode 100644 index 0000000..2fd6d20 --- /dev/null +++ b/internal/web/metrics/prometheus.go @@ -0,0 +1,111 @@ +package metrics + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/musix/backhaul/config" + "github.com/musix/backhaul/internal/stats" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +// PrometheusCollector implements Collector +type PrometheusCollector struct { + ctx context.Context + transport config.TransportType + labels prometheus.Labels + + reg *prometheus.Registry +} + +func init() { + RegisterCollector("prometheus", NewPrometheusCollector) +} + +func NewPrometheusCollector(ctx context.Context, log *logrus.Logger, cfg config.Config) Collector { + var transport config.TransportType + var labels prometheus.Labels + if cfg.IsServerConfig() { + transport = cfg.Server.Transport + labels = prometheus.Labels{ + "transport": fmt.Sprintf("%s", transport), + "side": "server", + } + } else { + transport = cfg.Client.Transport + labels = prometheus.Labels{ + "transport": fmt.Sprintf("%s", transport), + "side": "client", + } + } + + instance := &PrometheusCollector{ + ctx: ctx, + transport: transport, + labels: labels, + reg: prometheus.NewRegistry(), + } + + // default collectors + instance.reg.MustRegister(collectors.NewGoCollector()) + instance.reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + + // backhaul tunnel status + instance.customCollectors() + + return instance +} + +func (p *PrometheusCollector) Bind(mux *http.ServeMux) { + mux.Handle("/metrics", promhttp.HandlerFor(p.reg, promhttp.HandlerOpts{EnableOpenMetrics: true})) +} + +func (p *PrometheusCollector) customCollectors() { + statusGauge := promauto.With(p.reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: "backhaul", + Subsystem: "tunnel", + Name: "status", + Help: "is Backhaul tunnel up", + ConstLabels: p.labels, + }, + ) + + usageGauge := promauto.With(p.reg).NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "backhaul", + Subsystem: "tunnel", + Name: "usage", + Help: "backhaul usage per port", + ConstLabels: p.labels, + }, + []string{"port"}, + ) + + go func() { + tick := time.NewTicker(10 * time.Second) + defer tick.Stop() + for { + select { + case <-tick.C: + var status float64 + if stats.IsUp() { + status = 1 + } + statusGauge.Set(status) + + for i, u := range stats.GetPortUsages() { + usageGauge.With(prometheus.Labels{"port": fmt.Sprintf("%d", i)}).Set(float64(u)) + } + case <-p.ctx.Done(): + return + } + } + }() +} diff --git a/internal/web/sniffer.go b/internal/web/sniffer.go deleted file mode 100644 index f5dac22..0000000 --- a/internal/web/sniffer.go +++ /dev/null @@ -1,447 +0,0 @@ -package web - -import ( - "context" - "embed" - "encoding/json" - "fmt" - "html/template" - "net/http" - "os" - "sort" - "sync" - "time" - - "github.com/shirou/gopsutil/v4/cpu" - "github.com/shirou/gopsutil/v4/disk" - "github.com/shirou/gopsutil/v4/mem" - "github.com/shirou/gopsutil/v4/net" - - "github.com/sirupsen/logrus" -) - -type Usage struct { - dataStore sync.Map - listenAddr string - shutdownCtx context.Context - cancelFunc context.CancelFunc - server *http.Server - logger *logrus.Logger - sniffer bool - snifferLog string - mu sync.Mutex - totalTraffic uint64 - tunnelStatus *string -} - -type PortUsage struct { - Port int - Usage uint64 -} - -type SystemStats struct { - TunnelStatus string `json:"tunnelStatus"` - CPUUsage string `json:"cpuUsage"` - RAMUsage string `json:"ramUsage"` - DiskUsage string `json:"diskUsage"` - SwapUsage string `json:"swapUsage"` - NetworkTraffic string `json:"networkTraffic"` - UploadSpeed string `json:"uploadSpeed"` - DownloadSpeed string `json:"downloadSpeed"` - BackhaulTraffic string `json:"backhaulTraffic"` - Sniffer string `json:"sniffer"` - AllConnections string `json:"allConnections"` -} - -func NewDataStore(listenAddr string, shutdownCtx context.Context, snifferLog string, sniffer bool, tunnelStatus *string, logger *logrus.Logger) *Usage { - ctx, cancel := context.WithCancel(shutdownCtx) - u := &Usage{ - listenAddr: listenAddr, - shutdownCtx: ctx, - cancelFunc: cancel, - logger: logger, - sniffer: sniffer, - snifferLog: snifferLog, - tunnelStatus: tunnelStatus, - mu: sync.Mutex{}, - totalTraffic: 0, - } - return u -} - -func (m *Usage) Monitor() { - mux := http.NewServeMux() - mux.HandleFunc("/", m.handleIndex) // handle index - mux.HandleFunc("/stats", m.statsHandler) - if m.sniffer { - mux.HandleFunc("/data", m.handleData) // New route for JSON data - } - m.server = &http.Server{ - Addr: m.listenAddr, - Handler: mux, - } - - go func() { - <-m.shutdownCtx.Done() - - shutdownCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - // Attempt to gracefully shut down the server - if err := m.server.Shutdown(shutdownCtx); err != nil { - m.logger.Errorf("sniffer server shutdown error: %v", err) - } - }() - - // start save data - if m.sniffer { - go func() { - ticker := time.NewTicker(15 * time.Second) // every 5 seconds - defer ticker.Stop() - - for { - select { - case <-ticker.C: - go m.saveUsageData() - case <-m.shutdownCtx.Done(): - return - } - } - }() - } - // Start the server - m.logger.Info("sniffer service listening on port: ", m.listenAddr) - if err := m.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { - m.logger.Errorf("sniffer server error: %v", err) - } -} - -//go:embed index.html -var indexHTML embed.FS - -func (m *Usage) handleIndex(w http.ResponseWriter, r *http.Request) { - usageData := m.getUsageFromFile() - readableData := m.usageDataWithReadableUsage(usageData) - - tmpl, err := template.ParseFS(indexHTML, "index.html") - if err != nil { - m.logger.Errorf("error parsing template: %v", err) - return - } - - err = tmpl.Execute(w, readableData) - if err != nil { - m.logger.Errorf("error executing template: %v", err) - } -} - -func (m *Usage) handleData(w http.ResponseWriter, r *http.Request) { - usageData := m.getUsageFromFile() - readableData := m.usageDataWithReadableUsage(usageData) - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(readableData); err != nil { - m.logger.Errorf("error encoding JSON response: %v", err) - } -} - -func (m *Usage) statsHandler(w http.ResponseWriter, r *http.Request) { - stats, err := m.getSystemStats() - if err != nil { - m.logger.Error("Error fetching system stats:", err) - return - } - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(stats); err != nil { - m.logger.Error("Error encoding JSON:", err) - } -} - -func (m *Usage) AddOrUpdatePort(port int, usage uint64) { - m.mu.Lock() - defer m.mu.Unlock() - - // Retrieve current usage data for the port - value, ok := m.dataStore.Load(port) - if ok { - // Port exists, update usage - portUsage := value.(PortUsage) - portUsage.Usage += usage - m.dataStore.Store(port, portUsage) - } else { - // Port does not exist, create new entry - m.dataStore.Store(port, PortUsage{Port: port, Usage: usage}) - } -} - -func (m *Usage) saveUsageData() { - // Step 1: Load existing usage data from the JSON file - var existingUsageData []PortUsage - file, err := os.Open(m.snifferLog) - if err == nil { - // If the file exists, decode the JSON data into existingUsageData - defer file.Close() - err = json.NewDecoder(file).Decode(&existingUsageData) - if err != nil { - m.logger.Errorf("error decoding JSON data: %v", err) - return - } - } else if !os.IsNotExist(err) { - // Log any error except file not existing - m.logger.Errorf("error opening JSON file: %v", err) - return - } - - // Step 2: Get current usage data from sync.Map - currentUsageData := m.collectUsageDataFromSyncMap() - - // Step 3: Merge the existing and current usage data into a map to avoid duplicates - usageMap := make(map[int]PortUsage) - - // Add existing usage data to the map - for _, usage := range existingUsageData { - usageMap[usage.Port] = usage - } - - // Append or update current usage data in the map - for _, usage := range currentUsageData { - if existing, exists := usageMap[usage.Port]; exists { - // Update existing port usage - existing.Usage += usage.Usage - usageMap[usage.Port] = existing - } else { - // Add new port usage - usageMap[usage.Port] = usage - } - } - - m.totalTraffic = 0 - - // Step 4: Convert the map back to a slice - var mergedUsageData []PortUsage - for _, usage := range usageMap { - mergedUsageData = append(mergedUsageData, usage) - m.totalTraffic += usage.Usage - } - - // Step 5: Convert merged data to JSON - data, err := json.MarshalIndent(mergedUsageData, "", " ") - if err != nil { - m.logger.Errorf("error marshalling usage data: %v", err) - return - } - - // Step 6: Write JSON data to file - err = os.WriteFile(m.snifferLog, data, 0644) - if err != nil { - m.logger.Errorf("error writing usage data to file: %v", err) - } -} - -func (m *Usage) getUsageFromFile() []PortUsage { - // Check if the file exists - if _, err := os.Stat(m.snifferLog); os.IsNotExist(err) { - // If the file does not exist, create it and write "null" - file, err := os.OpenFile(m.snifferLog, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - m.logger.Errorf("error creating file: %v", err) - return nil - } - - // Write "null" to the new file - if _, err := file.Write([]byte("null")); err != nil { - m.logger.Errorf("error writing 'null' to the file: %v", err) - file.Close() - return nil - } - - return nil - } - - var usageData []PortUsage - - // Open the JSON file - file, err := os.Open(m.snifferLog) - if err != nil { - m.logger.Errorf("error opening JSON file: %v", err) - return nil - } - defer file.Close() - - // Decode the JSON file into the usageData slice - err = json.NewDecoder(file).Decode(&usageData) - if err != nil { - m.logger.Errorf("error decoding JSON data: %v", err) - return nil - } - - // Sort usageData by Port in ascending order - sort.Slice(usageData, func(i, j int) bool { - return usageData[i].Port < usageData[j].Port - }) - - return usageData -} - -// converts the byte usage to a human-readable format -func (m *Usage) usageDataWithReadableUsage(usageData []PortUsage) []struct { - Port int - ReadableUsage string -} { - var result []struct { - Port int - ReadableUsage string - } - - for _, portUsage := range usageData { - result = append(result, struct { - Port int - ReadableUsage string - }{ - Port: portUsage.Port, - ReadableUsage: m.convertBytesToReadable(portUsage.Usage), - }) - } - - return result -} - -// collectUsageDataFromSyncMap gathers data from sync.Map -func (m *Usage) collectUsageDataFromSyncMap() []PortUsage { - m.mu.Lock() - defer m.mu.Unlock() - - var usageData []PortUsage - m.dataStore.Range(func(key, value interface{}) bool { - if portUsage, ok := value.(PortUsage); ok { - usageData = append(usageData, portUsage) - m.dataStore.Delete(key) - } - return true - }) - return usageData -} - -// ConvertBytesToReadable converts bytes into a human-readable format (KB, MB, GB) -func (m *Usage) convertBytesToReadable(bytes uint64) string { - const ( - KB = 1 << (10 * 1) // 1024 bytes - MB = 1 << (10 * 2) // 1024 KB - GB = 1 << (10 * 3) // 1024 MB - TB = 1 << (10 * 4) // 1024 TB - ) - - switch { - case bytes >= TB: - return fmt.Sprintf("%.2f TB", float64(bytes)/float64(TB)) - case bytes >= GB: - return fmt.Sprintf("%.2f GB", float64(bytes)/float64(GB)) - case bytes >= MB: - return fmt.Sprintf("%.2f MB", float64(bytes)/float64(MB)) - case bytes >= KB: - return fmt.Sprintf("%.2f KB", float64(bytes)/float64(KB)) - default: - return fmt.Sprintf("%d B", bytes) // Bytes - } -} - -func (m *Usage) getSystemStats() (*SystemStats, error) { - - // Get initial network stats - initialStats, err := m.getNetworkStats() - if err != nil { - return nil, err - } - - // Wait for 1 second - time.Sleep(1 * time.Second) - - // Get updated network stats - finalStats, err := m.getNetworkStats() - if err != nil { - return nil, err - } - - // Get CPU usage - cpuPercent, err := cpu.Percent(0, false) - if err != nil { - return nil, err - } - - // Get RAM usage - memStats, err := mem.VirtualMemory() - if err != nil { - return nil, err - } - - // Get Disk usage - diskStats, err := disk.Usage("/") - if err != nil { - return nil, err - } - - // Get Swap usage - swapStats, err := mem.SwapMemory() - if err != nil { - return nil, err - } - - // Get Network traffic - netStats, err := net.IOCounters(false) - if err != nil { - return nil, err - } - - // Get all active network connections (TCP, UDP, etc.) - connections, err := net.Connections("all") - if err != nil { - return nil, err - } - - // Calculate upload and download speeds - uploadSpeed := float64(finalStats.BytesSent - initialStats.BytesSent) - downloadSpeed := float64(finalStats.BytesRecv - initialStats.BytesRecv) - - stats := &SystemStats{ - TunnelStatus: *m.tunnelStatus, - CPUUsage: m.formatFloat(cpuPercent[0]), - RAMUsage: m.convertBytesToReadable(memStats.Used), - DiskUsage: m.convertBytesToReadable(diskStats.Used), - SwapUsage: m.convertBytesToReadable(swapStats.Used), - NetworkTraffic: m.convertBytesToReadable(netStats[0].BytesSent + netStats[0].BytesRecv), - DownloadSpeed: m.formatSpeed(downloadSpeed), - UploadSpeed: m.formatSpeed(uploadSpeed), - BackhaulTraffic: m.convertBytesToReadable(m.totalTraffic), - Sniffer: map[bool]string{true: "Running", false: "Not running"}[m.sniffer], - AllConnections: fmt.Sprintf("%d", len(connections)), - } - - return stats, nil -} - -func (m *Usage) formatSpeed(bytesPerSec float64) string { - if bytesPerSec >= 1e9 { - return fmt.Sprintf("%.2f GB/s", bytesPerSec/1e9) - } else if bytesPerSec >= 1e6 { - return fmt.Sprintf("%.2f MB/s", bytesPerSec/1e6) - } else if bytesPerSec >= 1e3 { - return fmt.Sprintf("%.2f KB/s", bytesPerSec/1e3) - } - return fmt.Sprintf("%.2f B/s", bytesPerSec) -} - -func (m *Usage) formatFloat(value float64) string { - return fmt.Sprintf("%.2f%%", value) -} - -func (m *Usage) getNetworkStats() (*net.IOCountersStat, error) { - ioCounters, err := net.IOCounters(false) - if err != nil { - return nil, err - } - if len(ioCounters) == 0 { - return nil, fmt.Errorf("no network IO counters found") - } - return &ioCounters[0], nil -} diff --git a/monitoring/backhaul.json b/monitoring/backhaul.json new file mode 100644 index 0000000..31bf679 --- /dev/null +++ b/monitoring/backhaul.json @@ -0,0 +1,284 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "prometheus", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__elements": {}, + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "12.2.0-16818804881" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "state-timeline", + "name": "State timeline", + "version": "" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "fixed" + }, + "custom": { + "axisPlacement": "auto", + "fillOpacity": 70, + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineWidth": 0, + "spanNulls": false + }, + "mappings": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "Down" + }, + "1": { + "color": "green", + "index": 1, + "text": "Up" + } + }, + "type": "value" + }, + { + "options": { + "match": "null+nan", + "result": { + "color": "yellow", + "index": 2, + "text": "No data" + } + }, + "type": "special" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": 0 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 2, + "options": { + "alignValue": "left", + "legend": { + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "mergeValues": true, + "rowHeight": 0.9, + "showValue": "auto", + "tooltip": { + "hideZeros": false, + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "12.2.0-16818804881", + "targets": [ + { + "editorMode": "code", + "exemplar": false, + "expr": "backhaul_tunnel_status", + "instant": false, + "legendFormat": "[{{side}}] {{instance}} {{transport}}", + "range": true, + "refId": "A", + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + } + } + ], + "title": "Tunnel Status", + "type": "state-timeline" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisGridShow": true, + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 44, + "gradientMode": "opacity", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "noValue": "0", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "decbytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 11, + "w": 24, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.2.0-16818804881", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "backhaul_tunnel_usage", + "format": "heatmap", + "instant": false, + "legendFormat": "[{{side}}] {{instance}} {{transport}}:{{port}}", + "range": true, + "refId": "A" + } + ], + "title": "Network Usage", + "type": "timeseries" + } + ], + "schemaVersion": 41, + "tags": [], + "templating": { + "list": [] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Backhaul", + "uid": "5d38542f-078b-4d23-953e-fd81ef9955c0", + "version": 4, + "weekStart": "" +} \ No newline at end of file diff --git a/monitoring/dashboard.jpg b/monitoring/dashboard.jpg new file mode 100644 index 0000000..f96c00c Binary files /dev/null and b/monitoring/dashboard.jpg differ diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml new file mode 100644 index 0000000..51b9b23 --- /dev/null +++ b/monitoring/docker-compose.yml @@ -0,0 +1,18 @@ +services: + prometheus: + image: prom/prometheus:latest + container_name: prometheus + volumes: + - prometheus-data:/prometheus + - ./prometheus.yml:/etc/prometheus/prometheus.yml + grafana: + image: grafana/grafana:latest + container_name: grafana + volumes: + - grafana-data:/var/lib/grafana + ports: + - "3000:3000" + +volumes: + prometheus-data: + grafana-data: \ No newline at end of file diff --git a/monitoring/prometheus.yml b/monitoring/prometheus.yml new file mode 100644 index 0000000..dbea3a2 --- /dev/null +++ b/monitoring/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 30s # Set the scrape interval to every 15 seconds. Default is every 1 minute. + +scrape_configs: + - job_name: 'backhaul' + static_configs: + - targets: ['BACKHAUL_SERVER_IP:WEB_PORT', 'other servers...'] + +alerting: + # you can read more about alerting here: https://prometheus.io/docs/prometheus/latest/configuration/alerting_rules/