From 6a187ddddc3d53b93b561a617150b8e10a5896fa Mon Sep 17 00:00:00 2001 From: Bibo Hao Date: Wed, 18 Feb 2026 20:22:27 +0000 Subject: [PATCH 1/7] catch up upstream --- doc/doc-debug.md | 17 + src/pidproxy/signal.go | 1 + src/supervisord/confApi.go | 47 -- src/supervisord/config/config.go | 5 + src/supervisord/content_checker.go | 4 +- src/supervisord/ctl.go | 6 +- src/supervisord/logtail.go | 2 +- src/supervisord/main.go | 13 +- src/supervisord/process/process.go | 155 +++- .../process/process_manager_test.go | 2 +- src/supervisord/rest-rpc.go | 3 +- src/supervisord/version.go | 9 +- src/supervisord/xmlrpc.go | 4 +- src/supervisord/xmlrpcclient/xmlrpc-client.go | 22 +- src/webgui/index.html | 665 ++++++++++-------- 15 files changed, 564 insertions(+), 391 deletions(-) delete mode 100644 src/supervisord/confApi.go diff --git a/doc/doc-debug.md b/doc/doc-debug.md index 18d0e1c7..342947bb 100644 --- a/doc/doc-debug.md +++ b/doc/doc-debug.md @@ -15,3 +15,20 @@ docker run --rm -it \ mkdir -pv /var/log/supervisord /opt/supervisord/supervisord -c ./supervisord.conf ``` + +## Replace names and organize imports in code + +```bash +function replace_str_in_file() { + local file_path="$1" + local name_pattern="$2" + local str_search="$3" + local str_replace="$4" + # Find all .go files in the target directory recursively + find "${file_path}" -type f \( -name "${name_pattern}" \) | while read -r FILE; do + echo "Replace ${str_search} with ${str_replace} in: $FILE" && sed -i "s/${str_search}/${str_replace}/" "$FILE" + done +} + +replace_str_in_file "./" "*.go" "github.com\/ochinchina\/supervisord" "supervisord" +``` diff --git a/src/pidproxy/signal.go b/src/pidproxy/signal.go index 1c0cc089..720e64d7 100644 --- a/src/pidproxy/signal.go +++ b/src/pidproxy/signal.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package main diff --git a/src/supervisord/confApi.go b/src/supervisord/confApi.go deleted file mode 100644 index 9370d3b7..00000000 --- a/src/supervisord/confApi.go +++ /dev/null @@ -1,47 +0,0 @@ -package main - -import ( - "net/http" - - "github.com/gorilla/mux" -) - -type ConfApi struct { - router *mux.Router - supervisor *Supervisor -} - -// NewLogtail creates a Logtail object -func NewConfApi(supervisor *Supervisor) *ConfApi { - return &ConfApi{router: mux.NewRouter(), supervisor: supervisor} -} - -// CreateHandler creates http handlers to process the program stdout and stderr through http interface -func (ca *ConfApi) CreateHandler() http.Handler { - ca.router.HandleFunc("/conf/{program}", ca.getProgramConfFile).Methods("GET") - return ca.router -} - -func (ca *ConfApi) getProgramConfFile(writer http.ResponseWriter, request *http.Request) { - vars := mux.Vars(request) - if vars == nil { - writer.WriteHeader(http.StatusNotFound) - return - } - - programName := vars["program"] - programConfigPath := getProgramConfigPath(programName, ca.supervisor) - if programConfigPath == "" { - writer.WriteHeader(http.StatusNotFound) - return - } - - b, err := readFile(programConfigPath) - if err != nil { - writer.WriteHeader(http.StatusNotFound) - return - } - - writer.WriteHeader(http.StatusOK) - writer.Write(b) -} diff --git a/src/supervisord/config/config.go b/src/supervisord/config/config.go index bea82bab..143a151b 100644 --- a/src/supervisord/config/config.go +++ b/src/supervisord/config/config.go @@ -345,6 +345,11 @@ func parseEnv(s string) *map[string]string { for i = start; i < n && s[i] != '='; { i++ } + + if start >= n || i+1 >= n { + break + } + key := s[start:i] start = i + 1 if s[start] == '"' { diff --git a/src/supervisord/content_checker.go b/src/supervisord/content_checker.go index 22372875..afef1785 100644 --- a/src/supervisord/content_checker.go +++ b/src/supervisord/content_checker.go @@ -39,7 +39,7 @@ func (bc *BaseChecker) Write(b []byte) (int, error) { func (bc *BaseChecker) isReady() bool { for _, include := range bc.includes { - if strings.Index(bc.data, include) == -1 { + if !strings.Contains(bc.data, include) { return false } } @@ -109,7 +109,7 @@ func (tc *TCPChecker) start() { b := make([]byte, 1024) var err error for { - tc.conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", tc.host, tc.port)) + tc.conn, err = net.Dial("tcp", net.JoinHostPort(tc.host, fmt.Sprintf("%d", tc.port))) if err == nil || tc.baseChecker.timeoutTime.Before(time.Now()) { break } diff --git a/src/supervisord/ctl.go b/src/supervisord/ctl.go index f3d76fc6..aa8609e9 100644 --- a/src/supervisord/ctl.go +++ b/src/supervisord/ctl.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "github.com/jessevdk/go-flags" "net/http" "os" "strings" "supervisord/config" "supervisord/types" "supervisord/xmlrpcclient" + + "github.com/jessevdk/go-flags" ) // CtlCommand the entry of ctl command @@ -186,6 +187,7 @@ func (x *CtlCommand) status(rpcc *xmlrpcclient.XMLRPCClient, processes []string) if reply, err := rpcc.GetAllProcessInfo(); err == nil { x.showProcessInfo(&reply, processesMap) } else { + fmt.Fprintf(os.Stderr, "%s\n", err) os.Exit(1) } } @@ -245,6 +247,7 @@ func (x *CtlCommand) shutdown(rpcc *xmlrpcclient.XMLRPCClient) { fmt.Printf("Hmmm! Something gone wrong?!\n") } } else { + fmt.Fprintf(os.Stderr, "%s\n", err) os.Exit(1) } } @@ -263,6 +266,7 @@ func (x *CtlCommand) reload(rpcc *xmlrpcclient.XMLRPCClient) { fmt.Printf("Removed Groups: %s\n", strings.Join(reply.RemovedGroup, ",")) } } else { + fmt.Fprintf(os.Stderr, "%s\n", err) os.Exit(1) } } diff --git a/src/supervisord/logtail.go b/src/supervisord/logtail.go index 7cec4d09..a55e8d57 100644 --- a/src/supervisord/logtail.go +++ b/src/supervisord/logtail.go @@ -45,7 +45,7 @@ func (lt *Logtail) getLog(logType string, w http.ResponseWriter, req *http.Reque return } - var ok = false + var ok bool = false var compositeLogger *logger.CompositeLogger = nil if logType == "stdout" { compositeLogger, ok = proc.StdoutLog.(*logger.CompositeLogger) diff --git a/src/supervisord/main.go b/src/supervisord/main.go index 959416bd..4e128c36 100644 --- a/src/supervisord/main.go +++ b/src/supervisord/main.go @@ -11,13 +11,16 @@ import ( "syscall" "unicode" + "supervisord/config" + "supervisord/logger" + "github.com/jessevdk/go-flags" "github.com/ochinchina/go-ini" log "github.com/sirupsen/logrus" - "supervisord/config" - "supervisord/logger" ) +var BuildVersion string = "" + // Options the command line options type Options struct { Configuration string `short:"c" long:"configuration" description:"the configuration file"` @@ -111,7 +114,8 @@ func findSupervisordConf() (string, error) { "/etc/supervisord.conf", "/etc/supervisor/supervisord.conf", "../etc/supervisord.conf", - "../supervisord.conf"} + "../supervisord.conf", + "./supervisord.conf"} for _, file := range possibleSupervisordConf { if _, err := os.Stat(file); err == nil { @@ -162,6 +166,9 @@ func getSupervisordLogFile(configFile string) string { } func main() { + if BuildVersion != "" { + VERSION = BuildVersion + } ReapZombie() // when execute `supervisord` without sub-command, it should start the server diff --git a/src/supervisord/process/process.go b/src/supervisord/process/process.go index 73a17079..4fbca28c 100644 --- a/src/supervisord/process/process.go +++ b/src/supervisord/process/process.go @@ -15,13 +15,15 @@ import ( "syscall" "time" - "github.com/ochinchina/filechangemonitor" - "github.com/robfig/cron/v3" - log "github.com/sirupsen/logrus" "supervisord/config" "supervisord/events" "supervisord/logger" "supervisord/signals" + + "github.com/mitchellh/go-ps" + "github.com/ochinchina/filechangemonitor" + "github.com/robfig/cron/v3" + log "github.com/sirupsen/logrus" ) // State the state of process @@ -162,8 +164,8 @@ func (p *Process) Start(wait bool) { } go func() { - for { + // we'll do retry start if it sets. p.run(func() { if wait { runCond.L.Lock() @@ -176,7 +178,7 @@ func (p *Process) Start(wait bool) { time.Sleep(5 * time.Second) } if p.stopByUser { - log.WithFields(log.Fields{"program": p.GetName()}).Info("Stopped by user, don't start it again") + log.WithFields(log.Fields{"program": p.GetName()}).Info("program stopped by user, don't start it again") break } if !p.isAutoRestart() { @@ -225,7 +227,9 @@ func (p *Process) GetDescription() string { } return fmt.Sprintf("pid %d, uptime %d:%02d:%02d", p.cmd.Process.Pid, hours%24, minutes%60, seconds%60) } else if p.state != Stopped { - return p.stopTime.String() + if p.stopTime.Unix() > 0 { + return p.stopTime.String() + } } return "" } @@ -396,7 +400,7 @@ func (p *Process) getExitCodes() []int { func (p *Process) isRunning() bool { if p.cmd != nil && p.cmd.Process != nil { if runtime.GOOS == "windows" { - proc, err := os.FindProcess(p.cmd.Process.Pid) + proc, err := ps.FindProcess(p.cmd.Process.Pid) return proc != nil && err == nil } return p.cmd.Process.Signal(syscall.Signal(0)) == nil @@ -496,6 +500,16 @@ func (p *Process) waitForExit(startSecs int64) { p.lock.Lock() defer p.lock.Unlock() p.stopTime = time.Now() + + // FIXME: we didn't set eventlistener logger + // since it's stdout/stderr has been specifically managed. + if p.StdoutLog != nil { + p.StdoutLog.Close() + } + if p.StderrLog != nil { + p.StderrLog.Close() + } + } // fail to start the program @@ -522,6 +536,13 @@ func (p *Process) monitorProgramIsRunning(endTime time.Time, monitorExited *int3 } } +// 这个函数可能有以下几种执行完成的情况: +// +// 1. 程序正在运行中,因此函数直接返回。 +// 2. 程序尚未运行,函数开始尝试多次启动程序,直到启动成功。 +// 3. 程序成功启动并正在运行中,函数启动了一个后台监视程序来监视程序运行情况,并向 `finishCb` 函数传递一个标记告知程序已停止,函数直接返回。 +// 4. 程序启动失败,超出了尝试次数,函数将程序状态标记为 `FATAL`,并向 `finishCb` 函数传递一个标记告知程序已停止,函数直接返回。 +// 5. 程序被终止或运行失败,超出了重试次数,函数将程序状态标记为 `EXITED`,并向 `finishCb` 函数传递一个标记告知程序已停止,函数直接返回。 func (p *Process) run(finishCb func()) { p.lock.Lock() defer p.lock.Unlock() @@ -531,8 +552,8 @@ func (p *Process) run(finishCb func()) { log.WithFields(log.Fields{"program": p.GetName()}).Info("Don't start program because it is running") finishCb() return - } + p.startTime = time.Now() atomic.StoreInt32(p.retryTimes, 0) startSecs := p.getStartSeconds() @@ -543,7 +564,8 @@ func (p *Process) run(finishCb func()) { finishCbWrapper := func() { once.Do(finishCb) } - // process is not expired and not stoped by user + + //process is not expired and not stoped by user for !p.stopByUser { if restartPause > 0 && atomic.LoadInt32(p.retryTimes) != 0 { // pause @@ -607,6 +629,7 @@ func (p *Process) run(finishCb func()) { // Set startsec to 0 to indicate that the program needn't stay // running for any particular amount of time. if startSecs <= 0 { + atomic.StoreInt32(&monitorExited, 1) log.WithFields(log.Fields{"program": p.GetName()}).Info("success to start program") p.changeStateTo(Running) go finishCbWrapper() @@ -616,7 +639,7 @@ func (p *Process) run(finishCb func()) { finishCbWrapper() }() } - log.WithFields(log.Fields{"program": p.GetName()}).Debug("wait program exit") + log.WithFields(log.Fields{"program": p.GetName()}).Debug("check program is starting and wait if it exit") p.lock.Unlock() procExitC := make(chan struct{}) @@ -646,10 +669,17 @@ func (p *Process) run(finishCb func()) { p.lock.Lock() - // if the program still in running after startSecs - if p.state == Running { - p.changeStateTo(Exited) - log.WithFields(log.Fields{"program": p.GetName()}).Info("program exited") + // we break the restartRetry loop if: + // 1. process still in running after startSecs (although it's exited right now) + // 2. it's stopping by user (we unlocked before waitForExit, so the flag stopByUser will have a chance to change). + if p.state == Running || p.state == Stopping { + if !p.stopByUser { + p.changeStateTo(Exited) + log.WithFields(log.Fields{"program": p.GetName()}).Info("program exited") + } else { + p.changeStateTo(Stopped) + log.WithFields(log.Fields{"program": p.GetName()}).Info("program stopped by user") + } break } else { p.changeStateTo(Backoff) @@ -742,10 +772,35 @@ func (p *Process) setEnv() { envFromFiles := p.config.GetEnvFromFiles("envFiles") env := p.config.GetEnv("environment") if len(env)+len(envFromFiles) != 0 { - p.cmd.Env = append(append(os.Environ(), envFromFiles...), env...) + p.cmd.Env = mergeKeyValueArrays(p.cmd.Env, append(append(os.Environ(), envFromFiles...), env...)) } else { - p.cmd.Env = os.Environ() + p.cmd.Env = mergeKeyValueArrays(p.cmd.Env, os.Environ()) + } +} + +// 辅助函数:带覆盖的环境变量追加 +func mergeKeyValueArrays(arr1, arr2 []string) []string { + keySet := make(map[string]bool) + result := make([]string, 0, len(arr1)+len(arr2)) + + // 处理第一个数组,保留所有元素 + for _, item := range arr1 { + if key := strings.SplitN(item, "=", 2)[0]; key != "" { + keySet[key] = true + } + result = append(result, item) + } + + // 处理第二个数组,跳过已存在的键 + for _, item := range arr2 { + if key := strings.SplitN(item, "=", 2)[0]; key != "" { + if !keySet[key] { + result = append(result, item) + } + } } + + return result } func (p *Process) setDir() { @@ -929,9 +984,68 @@ func (p *Process) setUser() error { } } setUserID(p.cmd.SysProcAttr, uint32(uid), uint32(gid)) + + // 强制设置关键环境变量 + p.cmd.Env = appendEnvWithOverride(p.cmd.Env, + "HOME", u.HomeDir, // 强制HOME目录 + "USER", u.Username, // 用户名 + "LOGNAME", u.Username, // 登录名 + "PATH", defaultPath(u), // 安全PATH + ) + + // 删除root残留的环境变量 + filterRootEnv(&p.cmd.Env) + return nil } +// 辅助函数:带覆盖的环境变量追加 +func appendEnvWithOverride(env []string, pairs ...string) []string { + newEnv := make([]string, 0, len(env)+len(pairs)/2) + set := make(map[string]bool) + + // 先添加新变量 + for i := 0; i < len(pairs); i += 2 { + key := pairs[i] + value := pairs[i+1] + newEnv = append(newEnv, fmt.Sprintf("%s=%s", key, value)) + set[key] = true + } + + // 保留未覆盖的旧变量 + for _, e := range env { + parts := strings.SplitN(e, "=", 2) + if len(parts) < 2 || set[parts[0]] { + continue + } + newEnv = append(newEnv, e) + } + + return newEnv +} + +// 辅助函数:生成安全PATH +func defaultPath(u *user.User) string { + // 根据用户类型返回不同PATH + if u.Uid == "0" { + return "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + } + return "/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games" +} + +// 辅助函数:过滤危险的环境变量 +func filterRootEnv(env *[]string) { + filtered := make([]string, 0, len(*env)) + for _, e := range *env { + if strings.HasPrefix(e, "SUDO_") || + strings.HasPrefix(e, "XDG_RUNTIME_DIR=") { + continue + } + filtered = append(filtered, e) + } + *env = filtered +} + // Stop sends signal to process to make it quit func (p *Process) Stop(wait bool) { p.lock.Lock() @@ -942,8 +1056,10 @@ func (p *Process) Stop(wait bool) { log.WithFields(log.Fields{"program": p.GetName()}).Info("program is not running") return } - log.WithFields(log.Fields{"program": p.GetName()}).Info("stop the program") - sigs := strings.Fields(p.config.GetString("stopsignal", "SIGTERM")) + + log.WithFields(log.Fields{"program": p.GetName()}).Info("stopping the program") + p.changeStateTo(Stopping) + sigs := strings.Fields(p.config.GetString("stopsignal", "TERM")) waitsecs := time.Duration(p.config.GetInt("stopwaitsecs", 10)) * time.Second killwaitsecs := time.Duration(p.config.GetInt("killwaitsecs", 2)) * time.Second stopasgroup := p.config.GetBool("stopasgroup", false) @@ -997,6 +1113,9 @@ func (p *Process) Stop(wait bool) { // GetStatus returns status of program as a string func (p *Process) GetStatus() string { + if p.cmd.ProcessState == nil { + return "" + } if p.cmd.ProcessState.Exited() { return p.cmd.ProcessState.String() } diff --git a/src/supervisord/process/process_manager_test.go b/src/supervisord/process/process_manager_test.go index 644113f9..97ecee6e 100644 --- a/src/supervisord/process/process_manager_test.go +++ b/src/supervisord/process/process_manager_test.go @@ -5,7 +5,7 @@ import ( "testing" ) -var procs = NewManager() +var procs *Manager = NewManager() func TestProcessMgrAdd(t *testing.T) { entry := &config.Entry{ConfigDir: ".", Group: "test", Name: "program:test1"} diff --git a/src/supervisord/rest-rpc.go b/src/supervisord/rest-rpc.go index 7fb08fd8..890fb5fc 100644 --- a/src/supervisord/rest-rpc.go +++ b/src/supervisord/rest-rpc.go @@ -5,8 +5,9 @@ import ( "io/ioutil" "net/http" - "github.com/gorilla/mux" "supervisord/types" + + "github.com/gorilla/mux" ) // SupervisorRestful the restful interface to control the programs defined in configuration file diff --git a/src/supervisord/version.go b/src/supervisord/version.go index b93cb07f..b03041e9 100644 --- a/src/supervisord/version.go +++ b/src/supervisord/version.go @@ -5,7 +5,11 @@ import ( ) // VERSION the version of supervisor -const VERSION = "v0.0.0" + +var ( + VERSION = "v0.0.0" + COMMIT = "" +) // VersionCommand implement the flags.Commander interface type VersionCommand struct { @@ -15,7 +19,8 @@ var versionCommand VersionCommand // Execute implement Execute() method defined in flags.Commander interface, executes the given command func (v VersionCommand) Execute(args []string) error { - fmt.Println(VERSION) + fmt.Println("Version:", VERSION) + fmt.Println(" Commit:", COMMIT) return nil } diff --git a/src/supervisord/xmlrpc.go b/src/supervisord/xmlrpc.go index 886d140d..120934b9 100644 --- a/src/supervisord/xmlrpc.go +++ b/src/supervisord/xmlrpc.go @@ -11,12 +11,13 @@ import ( "path/filepath" "strings" + "supervisord/process" + "github.com/gorilla/rpc" "github.com/ochinchina/gorilla-xmlrpc/xml" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" log "github.com/sirupsen/logrus" - "supervisord/process" ) // XMLRPC mange the XML RPC servers @@ -188,7 +189,6 @@ func (p *XMLRPC) startHTTPServer(user string, password string, protocol string, continue } dir := filepath.Dir(filePath) - // fmt.Println(dir) mux.Handle("/log/"+realName+"/", http.StripPrefix("/log/"+realName+"/", http.FileServer(http.Dir(dir)))) } diff --git a/src/supervisord/xmlrpcclient/xmlrpc-client.go b/src/supervisord/xmlrpcclient/xmlrpc-client.go index c15e25a3..d27e7b14 100644 --- a/src/supervisord/xmlrpcclient/xmlrpc-client.go +++ b/src/supervisord/xmlrpcclient/xmlrpc-client.go @@ -10,9 +10,8 @@ import ( "net" "net/http" "net/url" - "time" - "supervisord/types" + "time" "github.com/ochinchina/gorilla-xmlrpc/xml" ) @@ -111,6 +110,7 @@ func (r *XMLRPCClient) processResponse(resp *http.Response, processBody func(io. func (r *XMLRPCClient) postInetHTTP(method string, url string, data interface{}, processBody func(io.ReadCloser, error)) { req, err := r.createHTTPRequest(method, url, data) if err != nil { + processBody(emptyReader, err) return } @@ -122,9 +122,7 @@ func (r *XMLRPCClient) postInetHTTP(method string, url string, data interface{}, resp, err := http.DefaultClient.Do(req) if err != nil { - if r.verbose { - fmt.Println("Fail to send request to supervisord:", err) - } + processBody(emptyReader, fmt.Errorf("Fail to send http request to supervisord: %s", err)) return } r.processResponse(resp, processBody) @@ -140,35 +138,31 @@ func (r *XMLRPCClient) postUnixHTTP(method string, path string, data interface{} conn, err = net.Dial("unix", path) } if err != nil { - if r.verbose { - fmt.Printf("Fail to connect unix socket path: %s\n", r.serverurl) - } + processBody(emptyReader, fmt.Errorf("Fail to connect unix socket path: %s. %s", r.serverurl, err)) return } defer conn.Close() if r.timeout > 0 { if err := conn.SetDeadline(time.Now().Add(r.timeout)); err != nil { + processBody(emptyReader, err) return } } req, err := r.createHTTPRequest(method, "/RPC2", data) if err != nil { + processBody(emptyReader, fmt.Errorf("Fail to create http request. %s", err)) return } err = req.Write(conn) if err != nil { - if r.verbose { - fmt.Printf("Fail to write to unix socket %s\n", r.serverurl) - } + processBody(emptyReader, fmt.Errorf("Fail to write to unix socket %s", r.serverurl)) return } resp, err := http.ReadResponse(bufio.NewReader(conn), req) if err != nil { - if r.verbose { - fmt.Printf("Fail to read response %s\n", err) - } + processBody(emptyReader, fmt.Errorf("Fail to read response %s", err)) return } r.processResponse(resp, processBody) diff --git a/src/webgui/index.html b/src/webgui/index.html index 5cc6cac7..ddf11905 100644 --- a/src/webgui/index.html +++ b/src/webgui/index.html @@ -1,351 +1,418 @@ - + - + Go-Supervisor - - - - - - - - - + + + + + + + + + - -

Go-Supervisor

+

Go-Supervisor

-

Programs

-
-
- - - - -
+

Programs

+
+
+ + + + +
- - - - - - - - -
ProgramStateDescriptionAction
+ + + + + + + + +
ProgramStateDescriptionAction
-