diff --git a/Dockerfile b/Dockerfile index c169403..d9a1a92 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,11 +41,16 @@ LABEL org.opencontainers.image.title="Pebblify" \ org.opencontainers.image.vendor="Dockermint" \ org.opencontainers.image.created="${CREATED}" -RUN apk add --no-cache ca-certificates tzdata \ +RUN apk add --no-cache ca-certificates tzdata curl \ && adduser -D -H -u 10000 -s /sbin/nologin appuser COPY --from=builder /build/pebblify /usr/local/bin/pebblify +EXPOSE 8086 9090 + +HEALTHCHECK --interval=10s --timeout=3s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8086/healthz/live || exit 1 + USER 10000:10000 -ENTRYPOINT ["pebblify"] \ No newline at end of file +ENTRYPOINT ["pebblify"] diff --git a/cmd/pebblify/main.go b/cmd/pebblify/main.go index 81f185b..10b0412 100644 --- a/cmd/pebblify/main.go +++ b/cmd/pebblify/main.go @@ -1,14 +1,20 @@ package main import ( + "context" "flag" "fmt" + "net/http" "os" "path/filepath" "runtime" + "time" + "github.com/Dockermint/Pebblify/internal/completion" "github.com/Dockermint/Pebblify/internal/fsutil" + "github.com/Dockermint/Pebblify/internal/health" "github.com/Dockermint/Pebblify/internal/migration" + "github.com/Dockermint/Pebblify/internal/prom" "github.com/Dockermint/Pebblify/internal/state" "github.com/Dockermint/Pebblify/internal/verify" ) @@ -38,6 +44,8 @@ func main() { recoverCmd(os.Args[2:]) case "verify": verifyCmd(os.Args[2:]) + case "completion": + completionCmd(os.Args[2:]) case "-h", "--help", "help": usage() default: @@ -64,6 +72,7 @@ Commands: level-to-pebble Convert a Tendermint/CometBFT data/ directory from LevelDB to PebbleDB recover Resume a previously interrupted conversion verify Verify that converted data matches the source + completion Generate or install shell completion scripts version Show version information Options for level-to-pebble: @@ -85,6 +94,14 @@ Options for verify: --stop-on-error Stop at first mismatch -v, --verbose Show each key being verified +Health probes (opt-in): + --health Enable the HTTP health probe server + --health-port P Port for the health server (default: 8086) + +Prometheus metrics (opt-in): + --metrics Enable the Prometheus metrics server + --metrics-port P Port for the metrics server (default: 9090) + Global flags: -h, --help Show this help -V, --version Show version and exit @@ -99,9 +116,94 @@ Examples: # Verify the converted data pebblify verify ~/.gaia/data ./output/data + # Convert with health probes enabled + pebblify level-to-pebble --health --health-port 8086 ~/.gaia/data ./output + + # Generate bash completion script + pebblify completion bash + + # Install zsh completion + pebblify completion install zsh + `, Version) } +type healthFlags struct { + enabled bool + port int +} + +func addHealthFlags(fs *flag.FlagSet) *healthFlags { + hf := &healthFlags{} + fs.BoolVar(&hf.enabled, "health", false, "enable HTTP health probe server") + fs.IntVar(&hf.port, "health-port", 8086, "port for the health server") + return hf +} + +type metricsFlags struct { + enabled bool + port int +} + +func addMetricsFlags(fs *flag.FlagSet) *metricsFlags { + mf := &metricsFlags{} + fs.BoolVar(&mf.enabled, "metrics", false, "enable Prometheus metrics server") + fs.IntVar(&mf.port, "metrics-port", 9090, "port for the metrics server") + return mf +} + +func startMetricsServer(mf *metricsFlags) *prom.Server { + if !mf.enabled { + return nil + } + + srv := prom.NewServer(mf.port) + + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + fmt.Fprintf(os.Stderr, "metrics server error: %v\n", err) + } + }() + + return srv +} + +func stopMetricsServer(srv *prom.Server) { + if srv == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) +} + +func startHealthServer(hf *healthFlags) (*health.Server, *health.ProbeState) { + probeState := health.NewProbeState(30 * time.Second) + + if !hf.enabled { + return nil, probeState + } + + srv := health.NewServer(hf.port, probeState) + + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + fmt.Fprintf(os.Stderr, "health server error: %v\n", err) + } + }() + + return srv, probeState +} + +func stopHealthServer(srv *health.Server) { + if srv == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) +} + func levelToPebbleCmd(args []string) { fs := flag.NewFlagSet("level-to-pebble", flag.ExitOnError) force := fs.Bool("force", false, "overwrite existing temporary state") @@ -112,6 +214,8 @@ func levelToPebbleCmd(args []string) { tmpDir := fs.String("tmp-dir", "", "directory where .pebblify-tmp will be created (default: system temp)") verbose := fs.Bool("verbose", false, "enable verbose output") fs.BoolVar(verbose, "v", false, "alias for --verbose") + hf := addHealthFlags(fs) + mf := addMetricsFlags(fs) if err := fs.Parse(args); err != nil { os.Exit(1) @@ -125,6 +229,12 @@ func levelToPebbleCmd(args []string) { os.Exit(1) } + srv, probeState := startHealthServer(hf) + defer stopHealthServer(srv) + + metricsSrv := startMetricsServer(mf) + defer stopMetricsServer(metricsSrv) + src := rest[0] out := rest[1] @@ -158,13 +268,21 @@ func levelToPebbleCmd(args []string) { } defer unlock() + probeState.SetStarted() + probeState.SetReady() + + ticker := health.NewPingTicker(probeState, 5*time.Second) + defer ticker.Stop() + cfg := &migration.RunConfig{ - Workers: *workers, - BatchMemory: *batchMemory, - Verbose: *verbose, + Workers: *workers, + BatchMemory: *batchMemory, + Verbose: *verbose, + MetricsEnabled: mf.enabled, } if err := migration.RunLevelToPebble(src, out, cfg, tmpRoot); err != nil { + probeState.SetNotReady() fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } @@ -178,6 +296,8 @@ func recoverCmd(args []string) { tmpDir := fs.String("tmp-dir", "", "directory containing .pebblify-tmp (must match conversion)") verbose := fs.Bool("verbose", false, "enable verbose output") fs.BoolVar(verbose, "v", false, "alias for --verbose") + hf := addHealthFlags(fs) + mf := addMetricsFlags(fs) if err := fs.Parse(args); err != nil { os.Exit(1) @@ -189,6 +309,12 @@ func recoverCmd(args []string) { os.Exit(1) } + srv, probeState := startHealthServer(hf) + defer stopHealthServer(srv) + + metricsSrv := startMetricsServer(mf) + defer stopMetricsServer(metricsSrv) + baseTmpDir := os.TempDir() if *tmpDir != "" { baseTmpDir = *tmpDir @@ -210,7 +336,14 @@ func recoverCmd(args []string) { } defer unlock() - if err := migration.RunRecover(*workers, *batchMemory, tmpRoot, *verbose); err != nil { + probeState.SetStarted() + probeState.SetReady() + + ticker := health.NewPingTicker(probeState, 5*time.Second) + defer ticker.Stop() + + if err := migration.RunRecover(*workers, *batchMemory, tmpRoot, *verbose, mf.enabled); err != nil { + probeState.SetNotReady() fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } @@ -249,3 +382,65 @@ func verifyCmd(args []string) { os.Exit(1) } } + +func completionCmd(args []string) { + if len(args) == 0 { + completionUsage() + os.Exit(1) + } + + switch args[0] { + case "bash": + fmt.Print(completion.GenerateBash()) + case "zsh": + fmt.Print(completion.GenerateZsh()) + case "install": + completionInstallCmd(args[1:]) + default: + fmt.Fprintf(os.Stderr, "unknown shell: %s\n\n", args[0]) + completionUsage() + os.Exit(1) + } +} + +func completionInstallCmd(args []string) { + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "missing shell argument\n\n") + completionUsage() + os.Exit(1) + } + + var ( + dest string + err error + hint string + ) + + switch args[0] { + case "bash": + dest, err = completion.InstallBash() + hint = fmt.Sprintf("source %s", dest) + case "zsh": + dest, err = completion.InstallZsh() + hint = "autoload -Uz compinit && compinit" + default: + fmt.Fprintf(os.Stderr, "unknown shell: %s\n\n", args[0]) + completionUsage() + os.Exit(1) + } + + if err != nil { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + } + + fmt.Fprintf(os.Stderr, "completion installed to %s\n", dest) + fmt.Fprintf(os.Stderr, "reload with: %s\n", hint) +} + +func completionUsage() { + fmt.Fprintf(os.Stderr, `Usage: + pebblify completion Print completion script to stdout + pebblify completion install Install completion script +`) +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d33c82f --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +services: + pebblify: + build: + context: . + args: + VERSION: dev + REVISION: local + ports: + - "8086:8086" + - "9090:9090" + volumes: + - ./testdata:/data + entrypoint: ["pebblify", "level-to-pebble", "--health", "--health-port", "8086", "--metrics", "--metrics-port", "9090"] + command: ["/data/source", "/data/output"] + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8086/healthz/live"] + interval: 10s + timeout: 3s + start_period: 5s + retries: 3 diff --git a/go.mod b/go.mod index 8facf08..45d525c 100644 --- a/go.mod +++ b/go.mod @@ -4,13 +4,14 @@ go 1.25.4 require ( github.com/cockroachdb/pebble v1.1.5 + github.com/prometheus/client_golang v1.21.1 github.com/syndtr/goleveldb v1.0.0 ) require ( github.com/DataDog/zstd v1.4.5 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.11.3 // indirect github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect @@ -18,20 +19,18 @@ require ( github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/klauspost/compress v1.16.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect - github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.15.0 // indirect - github.com/prometheus/client_model v0.3.0 // indirect - github.com/prometheus/common v0.42.0 // indirect - github.com/prometheus/procfs v0.9.0 // indirect - github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.62.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect - golang.org/x/sys v0.18.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/protobuf v1.36.1 // indirect ) diff --git a/go.sum b/go.sum index 428b690..c635c52 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,10 @@ github.com/DataDog/zstd v1.4.5 h1:EndNeuB0l9syBZhut0wns3gV1hL8zX8LIu6ZiVHWLIQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= +github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f/go.mod h1:a9RdTaap04u637JoCzcUoIcDmvwSUtcUFtT/C3kJlTU= github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I= github.com/cockroachdb/errors v1.11.3/go.mod h1:m4UIW4CDjx+R5cybPsNrRbreomiFqt8o1h1wUVazSd8= github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce h1:giXvy4KSc/6g/esnpM7Geqxka4WSqI1SZc7sMJFd3y4= @@ -17,47 +19,60 @@ github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= -github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= -github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/prometheus/client_golang v1.15.0 h1:5fCgGYogn0hFdhyhLbw7hEsWxufKtY9klyvdNfFlFhM= -github.com/prometheus/client_golang v1.15.0/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= -github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= -github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= -github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI1YM= -github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= -github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= -github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v1.21.1 h1:DOvXXTqVzvkIewV/CDPFdejpMCGeMcbGCQ8YOmu+Ibk= +github.com/prometheus/client_golang v1.21.1/go.mod h1:U9NM32ykUErtVBxdvD3zfi+EuFkkaBvMb09mIfe0Zgg= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= +github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= +github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= +github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -74,21 +89,24 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -97,11 +115,15 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/completion/completion.go b/internal/completion/completion.go new file mode 100644 index 0000000..78fec96 --- /dev/null +++ b/internal/completion/completion.go @@ -0,0 +1,213 @@ +package completion + +import ( + "fmt" + "os" + "path/filepath" + "strings" +) + +func GenerateBash() string { + return `_pebblify() { + local cur prev commands + COMPREPLY=() + cur="${COMP_WORDS[COMP_CWORD]}" + prev="${COMP_WORDS[COMP_CWORD-1]}" + + commands="level-to-pebble recover verify completion version help" + + if [[ ${COMP_CWORD} -eq 1 ]]; then + COMPREPLY=( $(compgen -W "${commands}" -- "${cur}") ) + return 0 + fi + + case "${COMP_WORDS[1]}" in + level-to-pebble) + local opts="-f --force -w --workers --batch-memory --tmp-dir -v --verbose --health --health-port" + case "${prev}" in + --workers|-w|--batch-memory|--health-port) + return 0 + ;; + --tmp-dir) + COMPREPLY=( $(compgen -d -- "${cur}") ) + return 0 + ;; + *) + if [[ "${cur}" == -* ]]; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + else + COMPREPLY=( $(compgen -d -- "${cur}") ) + fi + return 0 + ;; + esac + ;; + recover) + local opts="-w --workers --batch-memory --tmp-dir -v --verbose --health --health-port" + case "${prev}" in + --workers|-w|--batch-memory|--health-port) + return 0 + ;; + --tmp-dir) + COMPREPLY=( $(compgen -d -- "${cur}") ) + return 0 + ;; + *) + if [[ "${cur}" == -* ]]; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + fi + return 0 + ;; + esac + ;; + verify) + local opts="-s --sample --stop-on-error -v --verbose" + case "${prev}" in + --sample|-s) + return 0 + ;; + *) + if [[ "${cur}" == -* ]]; then + COMPREPLY=( $(compgen -W "${opts}" -- "${cur}") ) + else + COMPREPLY=( $(compgen -d -- "${cur}") ) + fi + return 0 + ;; + esac + ;; + completion) + if [[ ${COMP_CWORD} -eq 2 ]]; then + COMPREPLY=( $(compgen -W "bash zsh install" -- "${cur}") ) + elif [[ ${COMP_CWORD} -eq 3 && "${prev}" == "install" ]]; then + COMPREPLY=( $(compgen -W "bash zsh" -- "${cur}") ) + fi + return 0 + ;; + esac +} + +complete -F _pebblify pebblify +` +} + +func GenerateZsh() string { + return `#compdef pebblify + +_pebblify() { + local -a commands + commands=( + 'level-to-pebble:Convert a Tendermint/CometBFT data/ directory from LevelDB to PebbleDB' + 'recover:Resume a previously interrupted conversion' + 'verify:Verify that converted data matches the source' + 'completion:Generate shell completion scripts' + 'version:Show version information' + 'help:Show help' + ) + + _arguments -C \ + '1:command:->command' \ + '*::arg:->args' + + case "${state}" in + command) + _describe 'command' commands + ;; + args) + case "${words[1]}" in + level-to-pebble) + _arguments \ + '(-f --force)'{-f,--force}'[Overwrite existing temporary state]' \ + '(-w --workers)'{-w,--workers}'[Max concurrent DB conversions]:workers:' \ + '--batch-memory[Target memory per batch in MB]:memory:' \ + '--tmp-dir[Directory where .pebblify-tmp will be created]:directory:_directories' \ + '(-v --verbose)'{-v,--verbose}'[Enable verbose output]' \ + '--health[Enable HTTP health probe server]' \ + '--health-port[Port for the health server]:port:' \ + '1:source directory:_directories' \ + '2:output directory:_directories' + ;; + recover) + _arguments \ + '(-w --workers)'{-w,--workers}'[Max concurrent DB conversions]:workers:' \ + '--batch-memory[Target memory per batch in MB]:memory:' \ + '--tmp-dir[Directory containing .pebblify-tmp]:directory:_directories' \ + '(-v --verbose)'{-v,--verbose}'[Enable verbose output]' \ + '--health[Enable HTTP health probe server]' \ + '--health-port[Port for the health server]:port:' + ;; + verify) + _arguments \ + '(-s --sample)'{-s,--sample}'[Percentage of keys to verify]:percent:' \ + '--stop-on-error[Stop at first mismatch]' \ + '(-v --verbose)'{-v,--verbose}'[Show each key being verified]' \ + '1:source data directory:_directories' \ + '2:destination data directory:_directories' + ;; + completion) + _arguments \ + '1:action:(bash zsh install)' \ + '2:shell:(bash zsh)' + ;; + esac + ;; + esac +} + +_pebblify "$@" +` +} + +func InstallBash() (string, error) { + home, err := os.UserHomeDir() + if err != nil { + return "", fmt.Errorf("cannot determine home directory: %w", err) + } + + dir := filepath.Join(home, ".bash_completion.d") + if err := os.MkdirAll(dir, 0o755); err != nil { + return "", fmt.Errorf("cannot create directory %s: %w", dir, err) + } + + dest := filepath.Join(dir, "pebblify") + if err := os.WriteFile(dest, []byte(GenerateBash()), 0o644); err != nil { + return "", fmt.Errorf("cannot write %s: %w", dest, err) + } + + return dest, nil +} + +func InstallZsh() (string, error) { + dir := zshCompletionDir() + + if err := os.MkdirAll(dir, 0o755); err != nil { + return "", fmt.Errorf("cannot create directory %s: %w", dir, err) + } + + dest := filepath.Join(dir, "_pebblify") + if err := os.WriteFile(dest, []byte(GenerateZsh()), 0o644); err != nil { + return "", fmt.Errorf("cannot write %s: %w", dest, err) + } + + return dest, nil +} + +func zshCompletionDir() string { + if fpath := os.Getenv("FPATH"); fpath != "" { + parts := strings.Split(fpath, ":") + for _, p := range parts { + if strings.Contains(p, "completions") || strings.Contains(p, "zsh") { + if info, err := os.Stat(p); err == nil && info.IsDir() { + return p + } + } + } + } + + home, err := os.UserHomeDir() + if err != nil { + return filepath.Join("/usr/local/share/zsh/site-functions") + } + + return filepath.Join(home, ".zsh", "completions") +} diff --git a/internal/health/server.go b/internal/health/server.go new file mode 100644 index 0000000..0292b98 --- /dev/null +++ b/internal/health/server.go @@ -0,0 +1,95 @@ +package health + +import ( + "context" + "fmt" + "net" + "net/http" + "time" +) + +type Server struct { + httpServer *http.Server + state *ProbeState +} + +func NewServer(port int, state *ProbeState) *Server { + mux := http.NewServeMux() + s := &Server{ + httpServer: &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + }, + state: state, + } + + mux.HandleFunc("/healthz/live", s.handleLiveness) + mux.HandleFunc("/healthz/ready", s.handleReadiness) + mux.HandleFunc("/healthz/startup", s.handleStartup) + + return s +} + +func (s *Server) ListenAndServe() error { + ln, err := net.Listen("tcp", s.httpServer.Addr) + if err != nil { + return err + } + return s.httpServer.Serve(ln) +} + +func (s *Server) Shutdown(ctx context.Context) error { + return s.httpServer.Shutdown(ctx) +} + +func (s *Server) handleLiveness(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if s.state.IsAlive() { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "ok") + return + } + + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "not alive") +} + +func (s *Server) handleReadiness(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if s.state.IsReady() { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "ok") + return + } + + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "not ready") +} + +func (s *Server) handleStartup(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if s.state.IsStarted() { + w.WriteHeader(http.StatusOK) + fmt.Fprintln(w, "ok") + return + } + + w.WriteHeader(http.StatusServiceUnavailable) + fmt.Fprintln(w, "not started") +} diff --git a/internal/health/state.go b/internal/health/state.go new file mode 100644 index 0000000..698da80 --- /dev/null +++ b/internal/health/state.go @@ -0,0 +1,91 @@ +package health + +import ( + "sync" + "time" +) + +type ProbeState struct { + mu sync.RWMutex + started bool + ready bool + lastPing time.Time + livenessGrace time.Duration +} + +func NewProbeState(livenessGrace time.Duration) *ProbeState { + return &ProbeState{ + livenessGrace: livenessGrace, + lastPing: time.Now(), + } +} + +func (p *ProbeState) SetStarted() { + p.mu.Lock() + defer p.mu.Unlock() + p.started = true +} + +func (p *ProbeState) SetReady() { + p.mu.Lock() + defer p.mu.Unlock() + p.ready = true +} + +func (p *ProbeState) SetNotReady() { + p.mu.Lock() + defer p.mu.Unlock() + p.ready = false +} + +func (p *ProbeState) Ping() { + p.mu.Lock() + defer p.mu.Unlock() + p.lastPing = time.Now() +} + +func (p *ProbeState) IsStarted() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.started +} + +func (p *ProbeState) IsReady() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return p.ready +} + +func (p *ProbeState) IsAlive() bool { + p.mu.RLock() + defer p.mu.RUnlock() + return time.Since(p.lastPing) < p.livenessGrace +} + +type PingTicker struct { + stop chan struct{} +} + +func NewPingTicker(state *ProbeState, interval time.Duration) *PingTicker { + pt := &PingTicker{stop: make(chan struct{})} + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + state.Ping() + case <-pt.stop: + return + } + } + }() + + return pt +} + +func (pt *PingTicker) Stop() { + close(pt.stop) +} diff --git a/internal/migration/converter.go b/internal/migration/converter.go index 796c5f8..c73f5cf 100644 --- a/internal/migration/converter.go +++ b/internal/migration/converter.go @@ -16,14 +16,16 @@ import ( "github.com/Dockermint/Pebblify/internal/batcher" "github.com/Dockermint/Pebblify/internal/fsutil" "github.com/Dockermint/Pebblify/internal/metrics" + "github.com/Dockermint/Pebblify/internal/prom" "github.com/Dockermint/Pebblify/internal/progress" "github.com/Dockermint/Pebblify/internal/state" ) type RunConfig struct { - Workers int - BatchMemory int - Verbose bool + Workers int + BatchMemory int + Verbose bool + MetricsEnabled bool } func RunLevelToPebble(src, out string, cfg *RunConfig, tmpRoot string) error { @@ -123,7 +125,7 @@ func RunLevelToPebble(src, out string, cfg *RunConfig, tmpRoot string) error { } startRun := time.Now() - if err := convertAllDBs(statePath, st, workers, batchConfig, m, cfg.Verbose); err != nil { + if err := convertAllDBs(statePath, st, workers, batchConfig, m, cfg.Verbose, cfg.MetricsEnabled); err != nil { close(doneCh) return err } @@ -172,7 +174,7 @@ func RunLevelToPebble(src, out string, cfg *RunConfig, tmpRoot string) error { return nil } -func RunRecover(workers, batchMemory int, tmpRoot string, verbose bool) error { +func RunRecover(workers, batchMemory int, tmpRoot string, verbose bool, metricsEnabled bool) error { statePath := filepath.Join(tmpRoot, state.StateFileName) st, err := state.Read(statePath) @@ -244,7 +246,7 @@ func RunRecover(workers, batchMemory int, tmpRoot string, verbose bool) error { TargetMemoryMB: batchMemory, } - if err := convertAllDBs(statePath, st, workers, batchConfig, m, verbose); err != nil { + if err := convertAllDBs(statePath, st, workers, batchConfig, m, verbose, metricsEnabled); err != nil { close(doneCh) return err } @@ -289,7 +291,7 @@ func finalizeConversion(st *state.ConversionState, tmpRoot string) error { return nil } -func convertAllDBs(statePath string, st *state.ConversionState, workers int, batchConfig *batcher.Config, m *metrics.Metrics, verbose bool) error { +func convertAllDBs(statePath string, st *state.ConversionState, workers int, batchConfig *batcher.Config, m *metrics.Metrics, verbose bool, metricsEnabled bool) error { dbList := state.CollectPendingDBs(st) if len(dbList) == 0 { return nil @@ -302,7 +304,7 @@ func convertAllDBs(statePath string, st *state.ConversionState, workers int, bat for range workers { wg.Go(func() { for dbst := range jobs { - if err := convertSingleDB(statePath, st, dbst, batchConfig, m, verbose); err != nil { + if err := convertSingleDB(statePath, st, dbst, batchConfig, m, verbose, metricsEnabled); err != nil { errCh <- err } } @@ -328,7 +330,7 @@ func convertAllDBs(statePath string, st *state.ConversionState, workers int, bat return nil } -func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DBStatus, batchConfig *batcher.Config, m *metrics.Metrics, verbose bool) error { +func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DBStatus, batchConfig *batcher.Config, m *metrics.Metrics, verbose bool, metricsEnabled bool) error { fmt.Printf("\nConverting DB %s", dbst.Name) isResume := dbst.Status == "in_progress" && dbst.GetLastCheckpointKey() != nil @@ -353,6 +355,10 @@ func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DB return fmt.Errorf("failed to update state: %w", err) } + if metricsEnabled { + updateDBGauges(st) + } + srcDB, err := leveldb.OpenFile(dbst.SourcePath, &levopt.Options{ ErrorIfMissing: true, ReadOnly: true, @@ -419,6 +425,11 @@ func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DB return markDBFailed(statePath, st, dbst, err) } + if metricsEnabled { + prom.BatchCommits.Inc() + prom.Checkpoints.Inc() + } + if err := state.Update(statePath, st, func() { dbst.MigratedKeys = count dbst.SetLastCheckpointKey(lastKey) @@ -432,6 +443,16 @@ func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DB if time.Since(lastMetricsUpdate) >= time.Second { m.RecordKeys(dbst.Name, intervalKeys, intervalBytes, intervalBytes) + + if metricsEnabled { + prom.KeysProcessed.Add(float64(intervalKeys)) + prom.BytesRead.Add(float64(intervalBytes)) + prom.BytesWritten.Add(float64(intervalBytes)) + kps, bps := m.GetCurrentThroughput() + prom.KeysPerSecond.Set(kps) + prom.BytesPerSecond.Set(bps * 1024 * 1024) + } + intervalKeys = 0 intervalBytes = 0 lastMetricsUpdate = time.Now() @@ -445,12 +466,22 @@ func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DB if intervalKeys > 0 { m.RecordKeys(dbst.Name, intervalKeys, intervalBytes, intervalBytes) + + if metricsEnabled { + prom.KeysProcessed.Add(float64(intervalKeys)) + prom.BytesRead.Add(float64(intervalBytes)) + prom.BytesWritten.Add(float64(intervalBytes)) + } } if err := b.Commit(); err != nil { return markDBFailed(statePath, st, dbst, err) } + if metricsEnabled { + prom.BatchCommits.Inc() + } + if err := it.Error(); err != nil { return markDBFailed(statePath, st, dbst, err) } @@ -482,10 +513,29 @@ func convertSingleDB(statePath string, st *state.ConversionState, dbst *state.DB return fmt.Errorf("failed to finalize state: %w", err) } + if metricsEnabled { + updateDBGauges(st) + } + fmt.Printf("\nDB %s converted successfully (%d keys)\n", dbst.Name, count) return nil } +func updateDBGauges(st *state.ConversionState) { + counts := map[string]float64{ + "pending": 0, + "in_progress": 0, + "done": 0, + "failed": 0, + } + for _, db := range st.DBs { + counts[db.Status]++ + } + for status, count := range counts { + prom.Databases.WithLabelValues(status).Set(count) + } +} + func cleanTmp(tmpRoot string) { _ = os.RemoveAll(tmpRoot) } diff --git a/internal/prom/prom.go b/internal/prom/prom.go new file mode 100644 index 0000000..27b6e2e --- /dev/null +++ b/internal/prom/prom.go @@ -0,0 +1,99 @@ +package prom + +import ( + "context" + "fmt" + "net" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +var ( + KeysProcessed = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pebblify", + Name: "keys_processed_total", + }) + + BytesRead = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pebblify", + Name: "bytes_read_total", + }) + + BytesWritten = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pebblify", + Name: "bytes_written_total", + }) + + Databases = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "pebblify", + Name: "databases_total", + }, []string{"status"}) + + KeysPerSecond = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebblify", + Name: "keys_per_second", + }) + + BytesPerSecond = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebblify", + Name: "bytes_per_second", + }) + + BatchCommits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pebblify", + Name: "batch_commits_total", + }) + + Checkpoints = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "pebblify", + Name: "checkpoints_total", + }) +) + +func init() { + prometheus.MustRegister( + KeysProcessed, + BytesRead, + BytesWritten, + Databases, + KeysPerSecond, + BytesPerSecond, + BatchCommits, + Checkpoints, + ) +} + +type Server struct { + httpServer *http.Server +} + +func NewServer(port int) *Server { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + + return &Server{ + httpServer: &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + }, + } +} + +func (s *Server) ListenAndServe() error { + ln, err := net.Listen("tcp", s.httpServer.Addr) + if err != nil { + return err + } + return s.httpServer.Serve(ln) +} + +func (s *Server) Shutdown(ctx context.Context) error { + return s.httpServer.Shutdown(ctx) +}