From da94d6d371ce92629ec72cfe19fb1ffef00686e1 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Wed, 18 Dec 2024 15:15:36 +0530 Subject: [PATCH 1/8] instead of flow based control added valid transition map to control state updates --- lib/workerclient.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/workerclient.go b/lib/workerclient.go index 5edbb77f..8d181186 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -54,6 +54,17 @@ const ( MaxWorkerState = 7 ) +var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWorkerStatus][]HeraWorkerStatus{ + wsUnset: {wsSchd, wsInit}, + wsSchd: {wsInit, wsUnset}, + wsInit: {wsSchd, wsAcpt, wsUnset}, + wsAcpt: {wsBusy}, + wsBusy: {wsWait, wsQuce, wsFnsh}, + wsWait: {wsQuce, wsFnsh}, + wsFnsh: {wsAcpt}, + wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh" +} + const bfChannelSize = 30 // workerMsg is used to communicate with the coordinator, it contains the control message metadata plus the actual payload From bc63e126307b94ea82534acc4c44f4f8471ecb64 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Wed, 18 Dec 2024 15:35:12 +0530 Subject: [PATCH 2/8] added missing transtion from finish state to scheduled state --- lib/workerclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workerclient.go b/lib/workerclient.go index 8d181186..cc0ed70c 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -61,7 +61,7 @@ var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWo wsAcpt: {wsBusy}, wsBusy: {wsWait, wsQuce, wsFnsh}, wsWait: {wsQuce, wsFnsh}, - wsFnsh: {wsAcpt}, + wsFnsh: {wsAcpt, wsSchd}, wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh" } From d1c94c51ea3c117019a949b1fad36852921d5f7b Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Thu, 19 Dec 2024 13:59:44 +0530 Subject: [PATCH 3/8] move process reaping login go from C --- lib/workerclient.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/lib/workerclient.go b/lib/workerclient.go index cc0ed70c..5edbb77f 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -54,17 +54,6 @@ const ( MaxWorkerState = 7 ) -var validStateTransitionMap map[HeraWorkerStatus][]HeraWorkerStatus = map[HeraWorkerStatus][]HeraWorkerStatus{ - wsUnset: {wsSchd, wsInit}, - wsSchd: {wsInit, wsUnset}, - wsInit: {wsSchd, wsAcpt, wsUnset}, - wsAcpt: {wsBusy}, - wsBusy: {wsWait, wsQuce, wsFnsh}, - wsWait: {wsQuce, wsFnsh}, - wsFnsh: {wsAcpt, wsSchd}, - wsQuce: {wsInit, wsFnsh}, //Forceful termination target state "wsInit", Graceful termination "wsFnsh" -} - const bfChannelSize = 30 // workerMsg is used to communicate with the coordinator, it contains the control message metadata plus the actual payload From 912e18b84a8e8de81b86fc8802a73cd38c438cf0 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Fri, 17 Jan 2025 11:17:31 +0530 Subject: [PATCH 4/8] fixing race condition related to worker recovery issue --- lib/workerbroker.go | 1 + lib/workerclient.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 05265754..b5201e57 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -326,6 +326,7 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") } workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + time.Sleep(5 * time.Second) pool.RestartWorker(workerclient) } } else { diff --git a/lib/workerclient.go b/lib/workerclient.go index 5edbb77f..fff56c49 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -591,7 +591,7 @@ func (worker *WorkerClient) initiateRecover(param int, p *WorkerPool, prior Hera param = common.StrandedSkipBreakHiLoad } } else { - rv = time.After(time.Millisecond * time.Duration(GetConfig().StrandedWorkerTimeoutMs)) + rv = time.After(time.Millisecond * 100000) } buff := []byte{byte(param), byte((worker.rqId & 0xFF000000) >> 24), byte((worker.rqId & 0x00FF0000) >> 16), byte((worker.rqId & 0x0000FF00) >> 8), byte((worker.rqId & 0x000000FF))} From b184043b45f8cda46fd721c35fc8c0eeadda1c07 Mon Sep 17 00:00:00 2001 From: Rajesh Samala Date: Mon, 20 Jan 2025 22:07:19 +0530 Subject: [PATCH 5/8] removed unwanted changes related to induced delays during execution --- lib/workerbroker.go | 1 - lib/workerclient.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/workerbroker.go b/lib/workerbroker.go index b5201e57..05265754 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -326,7 +326,6 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") } workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - time.Sleep(5 * time.Second) pool.RestartWorker(workerclient) } } else { diff --git a/lib/workerclient.go b/lib/workerclient.go index fff56c49..5edbb77f 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -591,7 +591,7 @@ func (worker *WorkerClient) initiateRecover(param int, p *WorkerPool, prior Hera param = common.StrandedSkipBreakHiLoad } } else { - rv = time.After(time.Millisecond * 100000) + rv = time.After(time.Millisecond * time.Duration(GetConfig().StrandedWorkerTimeoutMs)) } buff := []byte{byte(param), byte((worker.rqId & 0xFF000000) >> 24), byte((worker.rqId & 0x00FF0000) >> 16), byte((worker.rqId & 0x0000FF00) >> 8), byte((worker.rqId & 0x000000FF))} From 6d78baf58824044b0ff569c0a19b3eaf60b1acb1 Mon Sep 17 00:00:00 2001 From: Rajesh S <105205300+rasamala83@users.noreply.github.com> Date: Tue, 4 Feb 2025 12:03:50 +0530 Subject: [PATCH 6/8] Revert "first version of changes to avoid accidental worker state update during worker recovery" (#410) * Revert "fixin functional test comparision" This reverts commit 44bebff635ec1b1a0372bcf23d3ab1902730d680. * Revert "removed unwanted changes related to induced delays during execution" This reverts commit c00b147fda8a5a20c82806cebe46e54aa4a3973e. * Revert "fixing race condition related to worker recovery issue" This reverts commit 02c150059269fc4f72eef69570f8a71508f0ecfd. * Revert "move process reaping login go from C" This reverts commit 8db5c701b0b0dfae6773be3c371ced75adea41c1. * Revert "remove restart worker logic as it will taken care worker monitor" This reverts commit 957b0de56a460cd8a8beb91f3ab03f4389a8c81a. * Revert "added missing transtion from finish state to scheduled state" This reverts commit eeb29433ac062549fde267683495c0763c2835dd. * Revert "update test code based on state transition map" This reverts commit cd92e68dcc1101c1482564297529eea6d4b877bc. * Revert "instead of flow based control added valid transition map to control state updates" This reverts commit 24e20391fa3a1fb86d3361702a4a00e6caf6f237. * Revert "remove unwanted log message" This reverts commit 363564d2aaadd21965577bf9cebcda42d99464a5. * Revert "first version of changes to avoid accidental worker state update during worker recovery" This reverts commit 873992083dd32e76af3e44aeaeba8c91caab1bb0. --- lib/adaptivequemgr.go | 47 +++--- lib/util.go | 18 +-- lib/workerbroker.go | 116 +++++++-------- lib/workerclient.go | 85 +++-------- lib/workerpool.go | 1 + lib/workerpool_test.go | 7 - .../dmldisconnect/main_test.go | 97 ++++++------ tests/unittest/sqlEvict/main_test.go | 140 +----------------- 8 files changed, 159 insertions(+), 352 deletions(-) diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 4fdd2860..8e49e2e0 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "strings" "sync/atomic" + "strings" "time" "github.com/paypal/hera/cal" @@ -118,7 +118,7 @@ type BindCount struct { Workers map[string]*WorkerClient // lookup by ticket } -func bindEvictNameOk(bindName string) bool { +func bindEvictNameOk(bindName string) (bool) { commaNames := GetConfig().BindEvictionNames if len(commaNames) == 0 { // for tests, allow all names to be subject to bind eviction @@ -126,7 +126,7 @@ func bindEvictNameOk(bindName string) bool { } commaNames = strings.ToLower(commaNames) bindName = strings.ToLower(bindName) - for _, okSubname := range strings.Split(commaNames, ",") { + for _, okSubname := range strings.Split(commaNames,",") { if strings.Contains(bindName, okSubname) { return true } @@ -134,15 +134,12 @@ func bindEvictNameOk(bindName string) bool { return false } -/* - A bad query with multiple binds will add independent bind throttles to all - -bind name and values -*/ -func (mgr *adaptiveQueueManager) doBindEviction() int { +/* A bad query with multiple binds will add independent bind throttles to all +bind name and values */ +func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttleCount := 0 GetBindEvict().lock.Lock() - for _, keyValues := range GetBindEvict().BindThrottle { + for _,keyValues := range GetBindEvict().BindThrottle { throttleCount += len(keyValues) } GetBindEvict().lock.Unlock() @@ -175,14 +172,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { } continue } - contextBinds := parseBinds(request) - sqlsrcPrefix := worker.clientHostPrefix.Load().(string) - sqlsrcApp := worker.clientApp.Load().(string) + contextBinds := parseBinds(request) + sqlsrcPrefix := worker.clientHostPrefix.Load().(string) + sqlsrcApp := worker.clientApp.Load().(string) if sqlsrcPrefix != "" { contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) + logger.GetLogger().Log(logger.Debug, msg) } } for bindName0, bindValue := range contextBinds { @@ -203,8 +200,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { } concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) + logger.GetLogger().Log(logger.Debug, msg) } entry, ok := bindCounts[concatKey] if !ok { @@ -213,7 +210,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { Name: bindName, Value: bindValue, Workers: make(map[string]*WorkerClient), - } + } bindCounts[concatKey] = entry } @@ -230,7 +227,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { bindName := entry.Name bindValue := entry.Value - if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) { + if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) { continue } // evict sqlhash, bindvalue @@ -244,7 +241,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { if mgr.dispatchedWorkers[worker] != ticket || worker.Status == wsFnsh || - atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ { + worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ { continue } @@ -277,10 +274,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() int { throttle.incrAllowEveryX() } else { throttle := BindThrottle{ - Name: bindName, - Value: bindValue, - Sqlhash: sqlhash, - AllowEveryX: 3*len(entry.Workers) + 1, + Name: bindName, + Value: bindValue, + Sqlhash: sqlhash, + AllowEveryX: 3*len(entry.Workers) + 1, } now := time.Now() throttle.RecentAttempt.Store(&now) @@ -467,7 +464,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) { } } } else { - if worker != nil && worker.Status == wsFnsh { + if worker != nil && worker.Status == wsFnsh { if logger.GetLogger().V(logger.Warning) { logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid) } diff --git a/lib/util.go b/lib/util.go index 1ba6930a..1852810d 100644 --- a/lib/util.go +++ b/lib/util.go @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) { } /* -1st return value: the number -2nd return value: the number of digits + 1st return value: the number + 2nd return value: the number of digits */ func atoi(bf []byte) (int, int) { sz := len(bf) @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) { } /* -1st return value: the number -2nd return value: the number of digits + 1st return value: the number + 2nd return value: the number of digits */ func atoui(str string) (uint64, int) { sz := len(str) @@ -164,13 +164,3 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) { } return 0, false } - -// Contains This is utility method to check whether value present in list or not -func Contains[T comparable](slice []T, value T) bool { - for _, val := range slice { - if val == value { - return true - } - } - return false -} diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 05265754..0f9df171 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -19,11 +19,13 @@ package lib import ( "errors" - "github.com/paypal/hera/utility/logger" "os" "os/signal" "sync" "syscall" + + "github.com/paypal/hera/utility" + "github.com/paypal/hera/utility/logger" ) // HeraWorkerType defines the possible worker type @@ -62,7 +64,7 @@ type WorkerBroker struct { // and restart the stopped workers. // pidworkermap map[int32]*WorkerClient - lock sync.Mutex + lock sync.Mutex // // loaded from cfg once and used later. @@ -202,9 +204,7 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor // GetWorkerPool get the worker pool object for the type and id // ids holds optional paramenters. -// -// ids[0] == instance id; ids[1] == shard id. -// +// ids[0] == instance id; ids[1] == shard id. // if a particular id is not set, it defaults to 0. // TODO: interchange sid <--> instId since instId is not yet used func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) { @@ -273,69 +273,59 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { // we can get all the pids in this call. double the size in case we // get none-hera defunct processes. +1 in case racing casue mapsize=0. // - defunctPids := make([]int32, 0) - for { - var status syscall.WaitStatus - - //Reap exited children in non-blocking mode - pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil) - if pid > 0 { - if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status) - } - defunctPids = append(defunctPids, int32(pid)) - } else if pid == 0 { - break - } else { - if errors.Is(err, syscall.ECHILD) { - break - } else { - logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err) - } - } + var arraySize = 2*len(broker.pidworkermap) + 1 + var defunctPids = make([]int32, arraySize) + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap) } - - if len(defunctPids) > 0 { - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids) + if arraySize > 0 { + utility.ReapDefunctPids(defunctPids) + } + if logger.GetLogger().V(logger.Info) { + logger.GetLogger().Log(logger.Info, "exited worker", defunctPids) + } + broker.lock.Lock() + for i := 0; i < arraySize; i++ { + // + // last valid entry in stoppedpids is followed by one or more zeros. + // + if defunctPids[i] == 0 { + break } - broker.lock.Lock() - for _, pid := range defunctPids { - var workerclient = broker.pidworkermap[pid] - if workerclient != nil { - delete(broker.pidworkermap, pid) - pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) - if err != nil { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) - } - } else { - // - // a worker could be terminated while serving a request. - // in these cases, doRead() in workerclient will get an - // EOF and exit. doSession() in coordinator will get the - // worker outCh closed event and exit, at which point - // coordinator itself calls returnworker to set connstate - // from assign to idle. - // no need to publish the following event again. - // - //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { - // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) - //} - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") - } - workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - pool.RestartWorker(workerclient) + var workerclient = broker.pidworkermap[defunctPids[i]] + if workerclient != nil { + delete(broker.pidworkermap, defunctPids[i]) + pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) + if err != nil { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) } } else { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found") + // + // a worker could be terminated while serving a request. + // in these cases, doRead() in workerclient will get an + // EOF and exit. doSession() in coordinator will get the + // worker outCh closed event and exit, at which point + // coordinator itself calls returnworker to set connstate + // from assign to idle. + // no need to publish the following event again. + // + //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { + // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) + //} + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") } + workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + pool.RestartWorker(workerclient) + } + } else { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found") } } - broker.lock.Unlock() } + broker.lock.Unlock() case syscall.SIGTERM: if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "Got SIGTERM") @@ -375,8 +365,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { } /* -resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of -the number of workers changed + resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of + the number of workers changed */ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) { broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers @@ -391,7 +381,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha } /* -changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools + changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools */ func (broker *WorkerBroker) changeMaxWorkers() { wW := GetNumWWorkers(0) diff --git a/lib/workerclient.go b/lib/workerclient.go index 5edbb77f..1a86d619 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -21,21 +21,20 @@ import ( "bytes" "errors" "fmt" - "github.com/paypal/hera/cal" - "github.com/paypal/hera/common" - "github.com/paypal/hera/utility/encoding/netstring" - "github.com/paypal/hera/utility/logger" "math/rand" "net" "os" "path/filepath" - "runtime" "strconv" "strings" - "sync" "sync/atomic" "syscall" "time" + + "github.com/paypal/hera/cal" + "github.com/paypal/hera/common" + "github.com/paypal/hera/utility/encoding/netstring" + "github.com/paypal/hera/utility/logger" ) // HeraWorkerStatus defines the posible states the worker can be in @@ -148,15 +147,12 @@ type WorkerClient struct { rqId uint32 // - // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state + // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state. // isUnderRecovery int32 // Throtle workers lifecycle thr Throttler - - //mutex lock to update state from single go-routine - stateLock sync.Mutex } type strandedCalInfo struct { @@ -204,7 +200,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam } // TODO worker.racID = -1 - atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0) + worker.isUnderRecovery = 0 if worker.ctrlCh != nil { close(worker.ctrlCh) } @@ -214,7 +210,6 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam // msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker // worker.ctrlCh = make(chan *workerMsg, 5) - return worker } @@ -638,12 +633,15 @@ type WorkerClientRecoverParam struct { func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int) { if atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 0, 1) { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "begin recover worker Id: ", worker.ID, " process Id: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "begin recover worker: ", worker.pid) } } else { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.pid) } + // + // defer will not be called. + // return } defer func() { @@ -667,9 +665,6 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor return } priorWorkerStatus := worker.Status - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("about to recover worker Id: %d, worker process Id: %d as part of reconvery process, setting worker state to Quece", worker.ID, worker.pid)) - } worker.setState(wsQuce) killparam := common.StrandedClientClose if len(param) > 0 { @@ -682,12 +677,8 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor case <-workerRecoverTimeout: worker.thr.CanRun() worker.setState(wsInit) // Set the worker state to INIT when we decide to Terminate the worker - GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: worker.Status}) worker.Terminate() worker.callogStranded("RECYCLED", info) - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d and process: %d recovered as part of workerRecoverTimeout set status to INIT", worker.ID, worker.pid)) - } return case msg, ok := <-worker.channel(): if !ok { @@ -723,10 +714,8 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid) } worker.callogStranded("RECOVERED", info) + worker.setState(wsFnsh) - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid)) - } p.ReturnWorker(worker, ticket) // // donot set state to ACPT since worker could already be picked up by another @@ -915,7 +904,7 @@ func (worker *WorkerClient) doRead() { worker.setState(wsWait) } if eor != common.EORMoreIncomingRequests { - worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)} + worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId} payload = nil } else { // buffer data to avoid race condition @@ -952,13 +941,8 @@ func (worker *WorkerClient) doRead() { // Write sends a message to the worker func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error { - if atomic.LoadInt32(&worker.isUnderRecovery) == 1 { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.") - } - return ErrWorkerFail - } worker.setState(wsBusy) + worker.rqId += uint32(nsCount) // @@ -982,19 +966,16 @@ func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error // setState updates the worker state func (worker *WorkerClient) setState(status HeraWorkerStatus) { - currentStatus := worker.Status - if currentStatus == status { + if worker.Status == status { return } - if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) { - logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) - if logger.GetLogger().V(logger.Debug) { - worker.printCallStack() - } - return + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker pid=", worker.pid, " changing status from", worker.Status, "to", status) } - //This checks whether state transition is valid or not + + // TODO: sync atomic set worker.Status = status + GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) } @@ -1021,27 +1002,3 @@ func (worker *WorkerClient) isProcessRunning() bool { } return true } - -func (worker *WorkerClient) printCallStack() { - // Define a large enough buffer to capture the stack. - const depth = 64 - pcs := make([]uintptr, depth) - - // Collect the stack trace. - n := runtime.Callers(2, pcs) // Skip the first 2 callers (runtime and printCallStack itself). - frames := runtime.CallersFrames(pcs[:n]) - indent := 0 - // Iterate through the frames and print function names and line numbers. - var builder strings.Builder - builder.WriteString(fmt.Sprintf("worker Id= %d Process Id= %d Call Stack:", worker.ID, worker.pid)) - for { - frame, more := frames.Next() - builder.WriteString(fmt.Sprintf("%s - %s\n", strings.Repeat(" ", indent), frame.Function)) - builder.WriteString(fmt.Sprintf("%s at %s:%d\n", strings.Repeat(" ", indent), frame.File, frame.Line)) - indent++ - if !more { - break - } - } - logger.GetLogger().Log(logger.Debug, builder.String()) -} diff --git a/lib/workerpool.go b/lib/workerpool.go index 37d96855..50aab16f 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -193,6 +193,7 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) { } pool.activeQ.Remove(worker) pool.poolCond.L.Unlock() + go pool.spawnWorker(worker.ID) return nil } diff --git a/lib/workerpool_test.go b/lib/workerpool_test.go index 14b8cf03..4e3db96d 100644 --- a/lib/workerpool_test.go +++ b/lib/workerpool_test.go @@ -76,13 +76,6 @@ func TestPoolDempotency(t *testing.T) { wd := NewWorker(3, wtypeRW, 0, 0, "cloc", nil) we := NewWorker(4, wtypeRW, 0, 0, "cloc", nil) wf := NewWorker(5, wtypeRW, 0, 0, "cloc", nil) - wa.setState(wsInit) - wb.setState(wsInit) - wc.setState(wsInit) - wd.setState(wsInit) - we.setState(wsInit) - wf.setState(wsInit) - wa.setState(wsAcpt) wb.setState(wsAcpt) wc.setState(wsAcpt) diff --git a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go index f2430977..e261b35a 100644 --- a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go +++ b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go @@ -1,14 +1,13 @@ -package main - +package main import ( "context" "database/sql" "fmt" - "github.com/paypal/hera/tests/functionaltest/testutil" - "github.com/paypal/hera/utility/logger" "os" "testing" "time" + "github.com/paypal/hera/tests/functionaltest/testutil" + "github.com/paypal/hera/utility/logger" ) /* @@ -18,6 +17,7 @@ No setup needed */ + var mx testutil.Mux var tableName string @@ -29,8 +29,8 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["log_level"] = "5" appcfg["log_file"] = "hera.log" appcfg["rac_sql_interval"] = "0" - appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" - appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" + appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" + appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" appcfg["child.executable"] = "mysqlworker" appcfg["database_type"] = "mysql" @@ -43,16 +43,18 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { func setupDb() error { testutil.RunDML("DROP TABLE IF EXISTS test_simple_table_2") - return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") + return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") } + func TestMain(m *testing.M) { os.Exit(testutil.UtilMain(m, cfg, setupDb)) } + /* ########################################################################################## - # Perform an insert without commit - # While the query is in transaction, close connection + # Perform an insert without commit + # While the query is in transaction, close connection # Verify worker get stranded and recovered ########################################################################################## */ @@ -61,53 +63,54 @@ func TestDmlDisconnect(t *testing.T) { logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") hostname := testutil.GetHostname() - fmt.Println("Hostname: ", hostname) - db, err := sql.Open("hera", hostname+":31002") - if err != nil { - t.Fatal("Error starting Mux:", err) - return - } + fmt.Println ("Hostname: ", hostname); + db, err := sql.Open("hera", hostname + ":31002") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } db.SetMaxIdleConns(0) defer db.Close() - fmt.Println("Open new connection") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - conn, err := db.Conn(ctx) - if err != nil { - t.Fatalf("Error getting connection %s\n", err.Error()) - } + fmt.Println ("Open new connection"); + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } - fmt.Println("Perform an insert without commit") - stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") + fmt.Println ("Perform an insert without commit"); + stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") _, err = stmt.Exec() if err != nil { - t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) + } + stmt.Close() + cancel() + fmt.Println ("Close connection while insert query is in transaction"); + conn.Close() + + time.Sleep(1 * time.Second); + fmt.Println ("Verify worker get stranded and recovered"); + if ( testutil.RegexCount("begin recover worker:") < 1) { + t.Fatalf ("Error: should have worker recovered"); + } + + if ( testutil.RegexCount("stranded conn recovered") < 1) { + t.Fatalf ("Error: should have stranded conn recovered"); + } + + fmt.Println ("Verify worker recovery is seen in CALlog") + count := testutil.RegexCountFile ("RECOVER.*dedicated.*0", "cal.log") + if (count < 1) { + t.Fatalf ("Error: should see worker recovery event"); } - stmt.Close() - cancel() - fmt.Println("Close connection while insert query is in transaction") - conn.Close() - - time.Sleep(1 * time.Second) - fmt.Println("Verify worker get stranded and recovered") - if testutil.RegexCount("begin recover worker") < 1 { - t.Fatalf("Error: should have worker recovered") - } - - if testutil.RegexCount("stranded conn recovered") < 1 { - t.Fatalf("Error: should have stranded conn recovered") - } - - fmt.Println("Verify worker recovery is seen in CALlog") - count := testutil.RegexCountFile("RECOVER.*dedicated.*0", "cal.log") - if count < 1 { - t.Fatalf("Error: should see worker recovery event") - } - count = testutil.RegexCountFile("STRANDED.*RECOVERED.*0", "cal.log") - if count < 1 { - t.Fatalf("Error: should see worker recovery event") + count = testutil.RegexCountFile ("STRANDED.*RECOVERED.*0", "cal.log") + if (count < 1) { + t.Fatalf ("Error: should see worker recovery event"); } logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect done -------------------------------------------------------------") } + diff --git a/tests/unittest/sqlEvict/main_test.go b/tests/unittest/sqlEvict/main_test.go index 5a704796..5cf84296 100644 --- a/tests/unittest/sqlEvict/main_test.go +++ b/tests/unittest/sqlEvict/main_test.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "fmt" - "math/rand" "os" "testing" "time" @@ -34,11 +33,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["bind_eviction_threshold_pct"] = "50" appcfg["request_backlog_timeout"] = "1000" - appcfg["soft_eviction_probability"] = "10" + appcfg["soft_eviction_probability"] = "100" opscfg := make(map[string]string) - max_conn = 50 - opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", 10) + max_conn = 25 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) opscfg["opscfg.default.server.log_level"] = "5" opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" @@ -100,56 +99,6 @@ func sleepyQ(conn *sql.Conn, delayRow int) error { return nil } -func sleepyDmlQ(conn *sql.Conn, delayRow int) error { - inserQuery := "insert into sleep_info (id,seconds) values (:id, sleep_option(:seconds))" - updateQuery := "update sleep_info set seconds = sleep_option(:seconds) where id=:id" - defer func(conn *sql.Conn) { - err := conn.Close() - if err != nil { - fmt.Printf("Error closing conn %s\n", err.Error()) - } - }(conn) - tx, _ := conn.BeginTx(context.Background(), nil) - inst1, err := conn.PrepareContext(context.Background(), inserQuery) - if err != nil { - fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) - return err - } - defer func(inst1 *sql.Stmt) { - err := inst1.Close() - if err != nil { - fmt.Printf("Error closing insert statement sleepyDmlQ %s\n", err.Error()) - } - }(inst1) - _, err = inst1.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) - if err != nil { - fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) - return err - } - updateStmt, err := conn.PrepareContext(context.Background(), updateQuery) - if err != nil { - fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) - return err - } - defer func(updateStmt *sql.Stmt) { - err := updateStmt.Close() - if err != nil { - fmt.Printf("Error closing update statement sleepyDmlQ %s\n", err.Error()) - } - }(updateStmt) - _, err = updateStmt.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) - if err != nil { - fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) - return err - } - err = tx.Commit() - if err != nil { - fmt.Printf("Error committing sleepyDmlQ %s\n", err.Error()) - return err - } - return nil -} - func simpleEvict() { db, err := sql.Open("hera", "127.0.0.1:31002") if err != nil { @@ -208,7 +157,10 @@ func TestSqlEvict(t *testing.T) { simpleEvict() if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { t.Fatal("backlog timeout was not triggered") - } + } // */ + /* if (testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-102: backlog eviction", "hera.log") == 0) { + t.Fatal("backlog eviction was not triggered") + } // */ if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { t.Fatal("soft eviction was not triggered") } @@ -216,81 +168,5 @@ func TestSqlEvict(t *testing.T) { t.Fatal("eviction was not triggered") } logger.GetLogger().Log(logger.Debug, "TestSqlEvict stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(10 * time.Second) + time.Sleep(2 * time.Second) } // */ - -func TestSqlEvictDML(t *testing.T) { - logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - dmlEvict() - if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { - t.Fatal("backlog timeout was not triggered") - } - if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { - t.Fatal("soft eviction was not triggered") - } - if testutil.RegexCountFile("coordinator dispatchrequest: stranded conn HERA-101: saturation kill", "hera.log") == 0 { - t.Fatal("eviction was not triggered") - } - logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(10 * time.Second) -} - -func dmlEvict() { - db, err := sql.Open("hera", "127.0.0.1:31002") - if err != nil { - fmt.Printf("Error db %s\n", err.Error()) - return - } - db.SetConnMaxLifetime(2 * time.Second) - db.SetMaxIdleConns(0) - db.SetMaxOpenConns(22111) - defer func(db *sql.DB) { - err := db.Close() - if err != nil { - fmt.Printf("Error closing db %s\n", err.Error()) - } - }(db) - - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error conn %s\n", err.Error()) - return - } - err = sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Error Executing first sleepyDmlQ %s\n", err.Error()) - return - } - - for i := 0; i < int(max_conn)+1; i++ { - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error #%d conn %s\n", i, err.Error()) - continue - } - time.Sleep(time.Millisecond * 100) - fmt.Printf("connection count %d\n", i) - go func(index int) { - err := sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Long query Request Id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) - } - }(i) - } - - for i := 0; i < 50; i++ { - conn, err := db.Conn(context.Background()) - if err != nil { - fmt.Printf("Error #%d conn %s\n", i, err.Error()) - continue - } - time.Sleep(time.Millisecond * 100) - fmt.Printf("connection count %d\n", i) - go func(index int) { - err := sleepyDmlQ(conn, 1600) - if err != nil { - fmt.Printf("Request id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) - } - }(i) - } -} From 0e8b98c59b69f9f05d888ac4c851339649444956 Mon Sep 17 00:00:00 2001 From: Rajesh S <105205300+rasamala83@users.noreply.github.com> Date: Wed, 5 Feb 2025 11:44:25 +0530 Subject: [PATCH 7/8] Non graceful worker recovery issues and workflow issue (#411) * changes for adding atomic checks while sending and relading data during worker recovery * fixing the workflow issues --- lib/adaptivequemgr.go | 47 +++--- lib/util.go | 18 ++- lib/workerbroker.go | 116 ++++++++------- lib/workerclient.go | 85 ++++++++--- lib/workerpool.go | 1 - lib/workerpool_test.go | 7 + .../dmldisconnect/main_test.go | 97 ++++++------ tests/unittest/sqlEvict/main_test.go | 140 +++++++++++++++++- 8 files changed, 352 insertions(+), 159 deletions(-) diff --git a/lib/adaptivequemgr.go b/lib/adaptivequemgr.go index 8e49e2e0..4fdd2860 100644 --- a/lib/adaptivequemgr.go +++ b/lib/adaptivequemgr.go @@ -21,8 +21,8 @@ import ( "errors" "fmt" "math/rand" - "sync/atomic" "strings" + "sync/atomic" "time" "github.com/paypal/hera/cal" @@ -118,7 +118,7 @@ type BindCount struct { Workers map[string]*WorkerClient // lookup by ticket } -func bindEvictNameOk(bindName string) (bool) { +func bindEvictNameOk(bindName string) bool { commaNames := GetConfig().BindEvictionNames if len(commaNames) == 0 { // for tests, allow all names to be subject to bind eviction @@ -126,7 +126,7 @@ func bindEvictNameOk(bindName string) (bool) { } commaNames = strings.ToLower(commaNames) bindName = strings.ToLower(bindName) - for _, okSubname := range strings.Split(commaNames,",") { + for _, okSubname := range strings.Split(commaNames, ",") { if strings.Contains(bindName, okSubname) { return true } @@ -134,12 +134,15 @@ func bindEvictNameOk(bindName string) (bool) { return false } -/* A bad query with multiple binds will add independent bind throttles to all -bind name and values */ -func (mgr *adaptiveQueueManager) doBindEviction() (int) { +/* + A bad query with multiple binds will add independent bind throttles to all + +bind name and values +*/ +func (mgr *adaptiveQueueManager) doBindEviction() int { throttleCount := 0 GetBindEvict().lock.Lock() - for _,keyValues := range GetBindEvict().BindThrottle { + for _, keyValues := range GetBindEvict().BindThrottle { throttleCount += len(keyValues) } GetBindEvict().lock.Unlock() @@ -172,14 +175,14 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } continue } - contextBinds := parseBinds(request) - sqlsrcPrefix := worker.clientHostPrefix.Load().(string) - sqlsrcApp := worker.clientApp.Load().(string) + contextBinds := parseBinds(request) + sqlsrcPrefix := worker.clientHostPrefix.Load().(string) + sqlsrcApp := worker.clientApp.Load().(string) if sqlsrcPrefix != "" { contextBinds[SrcPrefixAppKey] = fmt.Sprintf("%s%s", sqlsrcPrefix, sqlsrcApp) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: Add AZ+App to contextBinds: %s", contextBinds[SrcPrefixAppKey]) + logger.GetLogger().Log(logger.Debug, msg) } } for bindName0, bindValue := range contextBinds { @@ -200,8 +203,8 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { } concatKey := fmt.Sprintf("%d|%s|%s", sqlhash, bindName, bindValue) if logger.GetLogger().V(logger.Debug) { - msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) - logger.GetLogger().Log(logger.Debug, msg) + msg := fmt.Sprintf("Req info: lookup concatKey = %s in bindCounts", concatKey) + logger.GetLogger().Log(logger.Debug, msg) } entry, ok := bindCounts[concatKey] if !ok { @@ -210,7 +213,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { Name: bindName, Value: bindValue, Workers: make(map[string]*WorkerClient), - } + } bindCounts[concatKey] = entry } @@ -227,7 +230,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { bindName := entry.Name bindValue := entry.Value - if len(entry.Workers) < int( float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers) ) { + if len(entry.Workers) < int(float64(GetConfig().BindEvictionThresholdPct)/100.*float64(numDispatchedWorkers)) { continue } // evict sqlhash, bindvalue @@ -241,7 +244,7 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { if mgr.dispatchedWorkers[worker] != ticket || worker.Status == wsFnsh || - worker.isUnderRecovery == 1 /* Recover() uses compare & swap */ { + atomic.LoadInt32(&worker.isUnderRecovery) == 1 /* Recover() uses compare & swap */ { continue } @@ -274,10 +277,10 @@ func (mgr *adaptiveQueueManager) doBindEviction() (int) { throttle.incrAllowEveryX() } else { throttle := BindThrottle{ - Name: bindName, - Value: bindValue, - Sqlhash: sqlhash, - AllowEveryX: 3*len(entry.Workers) + 1, + Name: bindName, + Value: bindValue, + Sqlhash: sqlhash, + AllowEveryX: 3*len(entry.Workers) + 1, } now := time.Now() throttle.RecentAttempt.Store(&now) @@ -464,7 +467,7 @@ func (mgr *adaptiveQueueManager) getWorkerToRecover() (*WorkerClient, bool) { } } } else { - if worker != nil && worker.Status == wsFnsh { + if worker != nil && worker.Status == wsFnsh { if logger.GetLogger().V(logger.Warning) { logger.GetLogger().Log(logger.Warning, "worker.pid state is in FNSH, so skipping", worker.pid) } diff --git a/lib/util.go b/lib/util.go index 1852810d..1ba6930a 100644 --- a/lib/util.go +++ b/lib/util.go @@ -79,8 +79,8 @@ func IsPidRunning(pid int) (isRunning bool) { } /* - 1st return value: the number - 2nd return value: the number of digits +1st return value: the number +2nd return value: the number of digits */ func atoi(bf []byte) (int, int) { sz := len(bf) @@ -96,8 +96,8 @@ func atoi(bf []byte) (int, int) { } /* - 1st return value: the number - 2nd return value: the number of digits +1st return value: the number +2nd return value: the number of digits */ func atoui(str string) (uint64, int) { sz := len(str) @@ -164,3 +164,13 @@ func ExtractSQLHash(request *netstring.Netstring) (uint32, bool) { } return 0, false } + +// Contains This is utility method to check whether value present in list or not +func Contains[T comparable](slice []T, value T) bool { + for _, val := range slice { + if val == value { + return true + } + } + return false +} diff --git a/lib/workerbroker.go b/lib/workerbroker.go index 0f9df171..05265754 100644 --- a/lib/workerbroker.go +++ b/lib/workerbroker.go @@ -19,13 +19,11 @@ package lib import ( "errors" + "github.com/paypal/hera/utility/logger" "os" "os/signal" "sync" "syscall" - - "github.com/paypal/hera/utility" - "github.com/paypal/hera/utility/logger" ) // HeraWorkerType defines the possible worker type @@ -64,7 +62,7 @@ type WorkerBroker struct { // and restart the stopped workers. // pidworkermap map[int32]*WorkerClient - lock sync.Mutex + lock sync.Mutex // // loaded from cfg once and used later. @@ -204,7 +202,9 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor // GetWorkerPool get the worker pool object for the type and id // ids holds optional paramenters. -// ids[0] == instance id; ids[1] == shard id. +// +// ids[0] == instance id; ids[1] == shard id. +// // if a particular id is not set, it defaults to 0. // TODO: interchange sid <--> instId since instId is not yet used func (broker *WorkerBroker) GetWorkerPool(wType HeraWorkerType, ids ...int) (workerbroker *WorkerPool, err error) { @@ -273,59 +273,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { // we can get all the pids in this call. double the size in case we // get none-hera defunct processes. +1 in case racing casue mapsize=0. // - var arraySize = 2*len(broker.pidworkermap) + 1 - var defunctPids = make([]int32, arraySize) - if logger.GetLogger().V(logger.Verbose) { - logger.GetLogger().Log(logger.Verbose, "Wait SIGCHLD len=", arraySize-1, ", pwmap:", broker.pidworkermap) - } - if arraySize > 0 { - utility.ReapDefunctPids(defunctPids) - } - if logger.GetLogger().V(logger.Info) { - logger.GetLogger().Log(logger.Info, "exited worker", defunctPids) - } - broker.lock.Lock() - for i := 0; i < arraySize; i++ { - // - // last valid entry in stoppedpids is followed by one or more zeros. - // - if defunctPids[i] == 0 { + defunctPids := make([]int32, 0) + for { + var status syscall.WaitStatus + + //Reap exited children in non-blocking mode + pid, err := syscall.Wait4(-1, &status, syscall.WNOHANG, nil) + if pid > 0 { + if logger.GetLogger().V(logger.Verbose) { + logger.GetLogger().Log(logger.Verbose, "received worker exit signal for pid:", pid, " status: ", status) + } + defunctPids = append(defunctPids, int32(pid)) + } else if pid == 0 { break + } else { + if errors.Is(err, syscall.ECHILD) { + break + } else { + logger.GetLogger().Log(logger.Warning, "error in wait signal: ", err) + } } - var workerclient = broker.pidworkermap[defunctPids[i]] - if workerclient != nil { - delete(broker.pidworkermap, defunctPids[i]) - pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) - if err != nil { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + + if len(defunctPids) > 0 { + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker exit signal received from pids :", defunctPids) + } + broker.lock.Lock() + for _, pid := range defunctPids { + var workerclient = broker.pidworkermap[pid] + if workerclient != nil { + delete(broker.pidworkermap, pid) + pool, err := GetWorkerBrokerInstance().GetWorkerPool(workerclient.Type, workerclient.instID, workerclient.shardID) + if err != nil { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Can't get pool for", workerclient, ":", err) + } + } else { + // + // a worker could be terminated while serving a request. + // in these cases, doRead() in workerclient will get an + // EOF and exit. doSession() in coordinator will get the + // worker outCh closed event and exit, at which point + // coordinator itself calls returnworker to set connstate + // from assign to idle. + // no need to publish the following event again. + // + //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { + // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) + //} + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, "worker (id=", workerclient.ID, "pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + } + workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long + pool.RestartWorker(workerclient) } } else { - // - // a worker could be terminated while serving a request. - // in these cases, doRead() in workerclient will get an - // EOF and exit. doSession() in coordinator will get the - // worker outCh closed event and exit, at which point - // coordinator itself calls returnworker to set connstate - // from assign to idle. - // no need to publish the following event again. - // - //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) { - // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle}) - //} - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker (pid=", workerclient.pid, ") received signal. transits from state ", workerclient.Status, " to terminated.") + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "Exited worker pid =", pid, " not found") } - workerclient.setState(wsUnset) // Set the state to UNSET to make sure worker does not stay in FNSH state so long - pool.RestartWorker(workerclient) - } - } else { - if logger.GetLogger().V(logger.Alert) { - logger.GetLogger().Log(logger.Alert, "Exited worker pid =", defunctPids[i], " not found") } } + broker.lock.Unlock() } - broker.lock.Unlock() case syscall.SIGTERM: if logger.GetLogger().V(logger.Debug) { logger.GetLogger().Log(logger.Debug, "Got SIGTERM") @@ -365,8 +375,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) { } /* - resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of - the number of workers changed +resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of +the number of workers changed */ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, shardID int) { broker.poolCfgs[0][wType].maxWorkerCnt = maxWorkers @@ -381,7 +391,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha } /* - changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools +changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools */ func (broker *WorkerBroker) changeMaxWorkers() { wW := GetNumWWorkers(0) diff --git a/lib/workerclient.go b/lib/workerclient.go index 1a86d619..5edbb77f 100644 --- a/lib/workerclient.go +++ b/lib/workerclient.go @@ -21,20 +21,21 @@ import ( "bytes" "errors" "fmt" + "github.com/paypal/hera/cal" + "github.com/paypal/hera/common" + "github.com/paypal/hera/utility/encoding/netstring" + "github.com/paypal/hera/utility/logger" "math/rand" "net" "os" "path/filepath" + "runtime" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" - - "github.com/paypal/hera/cal" - "github.com/paypal/hera/common" - "github.com/paypal/hera/utility/encoding/netstring" - "github.com/paypal/hera/utility/logger" ) // HeraWorkerStatus defines the posible states the worker can be in @@ -147,12 +148,15 @@ type WorkerClient struct { rqId uint32 // - // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state. + // under recovery. 0: no; 1: yes. use atomic.CompareAndSwapInt32 to check state and use atomic.LoadInt32 to read state // isUnderRecovery int32 // Throtle workers lifecycle thr Throttler + + //mutex lock to update state from single go-routine + stateLock sync.Mutex } type strandedCalInfo struct { @@ -200,7 +204,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam } // TODO worker.racID = -1 - worker.isUnderRecovery = 0 + atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 1, 0) if worker.ctrlCh != nil { close(worker.ctrlCh) } @@ -210,6 +214,7 @@ func NewWorker(wid int, wType HeraWorkerType, instID int, shardID int, moduleNam // msg. if adaptiveqmgr blocks on a non-buffered channel, there is a deadlock when return worker // worker.ctrlCh = make(chan *workerMsg, 5) + return worker } @@ -633,15 +638,12 @@ type WorkerClientRecoverParam struct { func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam WorkerClientRecoverParam, info *strandedCalInfo, param ...int) { if atomic.CompareAndSwapInt32(&worker.isUnderRecovery, 0, 1) { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "begin recover worker: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "begin recover worker Id: ", worker.ID, " process Id: ", worker.pid) } } else { if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.pid) + logger.GetLogger().Log(logger.Debug, "worker already underrecovery: ", worker.ID, " process Id: ", worker.pid) } - // - // defer will not be called. - // return } defer func() { @@ -665,6 +667,9 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor return } priorWorkerStatus := worker.Status + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("about to recover worker Id: %d, worker process Id: %d as part of reconvery process, setting worker state to Quece", worker.ID, worker.pid)) + } worker.setState(wsQuce) killparam := common.StrandedClientClose if len(param) > 0 { @@ -677,8 +682,12 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor case <-workerRecoverTimeout: worker.thr.CanRun() worker.setState(wsInit) // Set the worker state to INIT when we decide to Terminate the worker + GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: worker.Status}) worker.Terminate() worker.callogStranded("RECYCLED", info) + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d and process: %d recovered as part of workerRecoverTimeout set status to INIT", worker.ID, worker.pid)) + } return case msg, ok := <-worker.channel(): if !ok { @@ -714,8 +723,10 @@ func (worker *WorkerClient) Recover(p *WorkerPool, ticket string, recovParam Wor logger.GetLogger().Log(logger.Info, "stranded conn recovered", worker.Type, worker.pid) } worker.callogStranded("RECOVERED", info) - worker.setState(wsFnsh) + if logger.GetLogger().V(logger.Debug) { + logger.GetLogger().Log(logger.Debug, fmt.Sprintf("worker Id: %d, worker process: %d recovered as part of message from channel set status to FINSH", worker.ID, worker.pid)) + } p.ReturnWorker(worker, ticket) // // donot set state to ACPT since worker could already be picked up by another @@ -904,7 +915,7 @@ func (worker *WorkerClient) doRead() { worker.setState(wsWait) } if eor != common.EORMoreIncomingRequests { - worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: rqId} + worker.outCh <- &workerMsg{data: payload, eor: true, free: (eor == common.EORFree), inTransaction: ((eor == common.EORInTransaction) || (eor == common.EORInCursorInTransaction)), rqId: uint32(rqId)} payload = nil } else { // buffer data to avoid race condition @@ -941,8 +952,13 @@ func (worker *WorkerClient) doRead() { // Write sends a message to the worker func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error { + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 { + if logger.GetLogger().V(logger.Alert) { + logger.GetLogger().Log(logger.Alert, "workerclient write error: worker is under recovery.") + } + return ErrWorkerFail + } worker.setState(wsBusy) - worker.rqId += uint32(nsCount) // @@ -966,16 +982,19 @@ func (worker *WorkerClient) Write(ns *netstring.Netstring, nsCount uint16) error // setState updates the worker state func (worker *WorkerClient) setState(status HeraWorkerStatus) { - if worker.Status == status { + currentStatus := worker.Status + if currentStatus == status { return } - if logger.GetLogger().V(logger.Debug) { - logger.GetLogger().Log(logger.Debug, "worker pid=", worker.pid, " changing status from", worker.Status, "to", status) + if atomic.LoadInt32(&worker.isUnderRecovery) == 1 && (status == wsWait || status == wsBusy) { + logger.GetLogger().Log(logger.Warning, "worker : ", worker.ID, "processId: ", worker.pid, " seeing invalid state transition from ", currentStatus, " to ", status) + if logger.GetLogger().V(logger.Debug) { + worker.printCallStack() + } + return } - - // TODO: sync atomic set + //This checks whether state transition is valid or not worker.Status = status - GetStateLog().PublishStateEvent(StateEvent{eType: WorkerStateEvt, shardID: worker.shardID, wType: worker.Type, instID: worker.instID, workerID: worker.ID, newWState: status}) } @@ -1002,3 +1021,27 @@ func (worker *WorkerClient) isProcessRunning() bool { } return true } + +func (worker *WorkerClient) printCallStack() { + // Define a large enough buffer to capture the stack. + const depth = 64 + pcs := make([]uintptr, depth) + + // Collect the stack trace. + n := runtime.Callers(2, pcs) // Skip the first 2 callers (runtime and printCallStack itself). + frames := runtime.CallersFrames(pcs[:n]) + indent := 0 + // Iterate through the frames and print function names and line numbers. + var builder strings.Builder + builder.WriteString(fmt.Sprintf("worker Id= %d Process Id= %d Call Stack:", worker.ID, worker.pid)) + for { + frame, more := frames.Next() + builder.WriteString(fmt.Sprintf("%s - %s\n", strings.Repeat(" ", indent), frame.Function)) + builder.WriteString(fmt.Sprintf("%s at %s:%d\n", strings.Repeat(" ", indent), frame.File, frame.Line)) + indent++ + if !more { + break + } + } + logger.GetLogger().Log(logger.Debug, builder.String()) +} diff --git a/lib/workerpool.go b/lib/workerpool.go index 50aab16f..37d96855 100644 --- a/lib/workerpool.go +++ b/lib/workerpool.go @@ -193,7 +193,6 @@ func (pool *WorkerPool) RestartWorker(worker *WorkerClient) (err error) { } pool.activeQ.Remove(worker) pool.poolCond.L.Unlock() - go pool.spawnWorker(worker.ID) return nil } diff --git a/lib/workerpool_test.go b/lib/workerpool_test.go index 4e3db96d..14b8cf03 100644 --- a/lib/workerpool_test.go +++ b/lib/workerpool_test.go @@ -76,6 +76,13 @@ func TestPoolDempotency(t *testing.T) { wd := NewWorker(3, wtypeRW, 0, 0, "cloc", nil) we := NewWorker(4, wtypeRW, 0, 0, "cloc", nil) wf := NewWorker(5, wtypeRW, 0, 0, "cloc", nil) + wa.setState(wsInit) + wb.setState(wsInit) + wc.setState(wsInit) + wd.setState(wsInit) + we.setState(wsInit) + wf.setState(wsInit) + wa.setState(wsAcpt) wb.setState(wsAcpt) wc.setState(wsAcpt) diff --git a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go index e261b35a..f2430977 100644 --- a/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go +++ b/tests/functionaltest/strandedchild_tests/dmldisconnect/main_test.go @@ -1,13 +1,14 @@ -package main +package main + import ( "context" "database/sql" "fmt" + "github.com/paypal/hera/tests/functionaltest/testutil" + "github.com/paypal/hera/utility/logger" "os" "testing" "time" - "github.com/paypal/hera/tests/functionaltest/testutil" - "github.com/paypal/hera/utility/logger" ) /* @@ -17,7 +18,6 @@ No setup needed */ - var mx testutil.Mux var tableName string @@ -29,8 +29,8 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["log_level"] = "5" appcfg["log_file"] = "hera.log" appcfg["rac_sql_interval"] = "0" - appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" - appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" + appcfg["opscfg.default.server.idle_timeout_ms"] = "3000" + appcfg["opscfg.default.server.transaction_idle_timeout_ms"] = "5000" appcfg["child.executable"] = "mysqlworker" appcfg["database_type"] = "mysql" @@ -43,18 +43,16 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { func setupDb() error { testutil.RunDML("DROP TABLE IF EXISTS test_simple_table_2") - return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") + return testutil.RunDML("CREATE TABLE test_simple_table_2 (accountID VARCHAR(64) PRIMARY KEY, NAME VARCHAR(64), STATUS VARCHAR(64), CONDN VARCHAR(64))") } - func TestMain(m *testing.M) { os.Exit(testutil.UtilMain(m, cfg, setupDb)) } - /* ########################################################################################## - # Perform an insert without commit - # While the query is in transaction, close connection + # Perform an insert without commit + # While the query is in transaction, close connection # Verify worker get stranded and recovered ########################################################################################## */ @@ -63,54 +61,53 @@ func TestDmlDisconnect(t *testing.T) { logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") hostname := testutil.GetHostname() - fmt.Println ("Hostname: ", hostname); - db, err := sql.Open("hera", hostname + ":31002") - if err != nil { - t.Fatal("Error starting Mux:", err) - return - } + fmt.Println("Hostname: ", hostname) + db, err := sql.Open("hera", hostname+":31002") + if err != nil { + t.Fatal("Error starting Mux:", err) + return + } db.SetMaxIdleConns(0) defer db.Close() - fmt.Println ("Open new connection"); - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - conn, err := db.Conn(ctx) - if err != nil { - t.Fatalf("Error getting connection %s\n", err.Error()) - } + fmt.Println("Open new connection") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + conn, err := db.Conn(ctx) + if err != nil { + t.Fatalf("Error getting connection %s\n", err.Error()) + } - fmt.Println ("Perform an insert without commit"); - stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") + fmt.Println("Perform an insert without commit") + stmt, _ := conn.PrepareContext(ctx, "/*TestBasic*/ insert into test_simple_table_2 (accountID, Name, Status) VALUES (12345, 'Linda Smith' , '111')") _, err = stmt.Exec() if err != nil { - t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) - } - stmt.Close() - cancel() - fmt.Println ("Close connection while insert query is in transaction"); - conn.Close() - - time.Sleep(1 * time.Second); - fmt.Println ("Verify worker get stranded and recovered"); - if ( testutil.RegexCount("begin recover worker:") < 1) { - t.Fatalf ("Error: should have worker recovered"); - } - - if ( testutil.RegexCount("stranded conn recovered") < 1) { - t.Fatalf ("Error: should have stranded conn recovered"); - } - - fmt.Println ("Verify worker recovery is seen in CALlog") - count := testutil.RegexCountFile ("RECOVER.*dedicated.*0", "cal.log") - if (count < 1) { - t.Fatalf ("Error: should see worker recovery event"); + t.Fatalf("Error preparing test (create row in table) %s\n", err.Error()) } - count = testutil.RegexCountFile ("STRANDED.*RECOVERED.*0", "cal.log") - if (count < 1) { - t.Fatalf ("Error: should see worker recovery event"); + stmt.Close() + cancel() + fmt.Println("Close connection while insert query is in transaction") + conn.Close() + + time.Sleep(1 * time.Second) + fmt.Println("Verify worker get stranded and recovered") + if testutil.RegexCount("begin recover worker") < 1 { + t.Fatalf("Error: should have worker recovered") + } + + if testutil.RegexCount("stranded conn recovered") < 1 { + t.Fatalf("Error: should have stranded conn recovered") + } + + fmt.Println("Verify worker recovery is seen in CALlog") + count := testutil.RegexCountFile("RECOVER.*dedicated.*0", "cal.log") + if count < 1 { + t.Fatalf("Error: should see worker recovery event") + } + count = testutil.RegexCountFile("STRANDED.*RECOVERED.*0", "cal.log") + if count < 1 { + t.Fatalf("Error: should see worker recovery event") } logger.GetLogger().Log(logger.Debug, "TestDmlDisconnect done -------------------------------------------------------------") } - diff --git a/tests/unittest/sqlEvict/main_test.go b/tests/unittest/sqlEvict/main_test.go index 5cf84296..5a704796 100644 --- a/tests/unittest/sqlEvict/main_test.go +++ b/tests/unittest/sqlEvict/main_test.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "math/rand" "os" "testing" "time" @@ -33,11 +34,11 @@ func cfg() (map[string]string, map[string]string, testutil.WorkerType) { appcfg["bind_eviction_threshold_pct"] = "50" appcfg["request_backlog_timeout"] = "1000" - appcfg["soft_eviction_probability"] = "100" + appcfg["soft_eviction_probability"] = "10" opscfg := make(map[string]string) - max_conn = 25 - opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", int(max_conn)) + max_conn = 50 + opscfg["opscfg.default.server.max_connections"] = fmt.Sprintf("%d", 10) opscfg["opscfg.default.server.log_level"] = "5" opscfg["opscfg.default.server.saturation_recover_threshold"] = "10" @@ -99,6 +100,56 @@ func sleepyQ(conn *sql.Conn, delayRow int) error { return nil } +func sleepyDmlQ(conn *sql.Conn, delayRow int) error { + inserQuery := "insert into sleep_info (id,seconds) values (:id, sleep_option(:seconds))" + updateQuery := "update sleep_info set seconds = sleep_option(:seconds) where id=:id" + defer func(conn *sql.Conn) { + err := conn.Close() + if err != nil { + fmt.Printf("Error closing conn %s\n", err.Error()) + } + }(conn) + tx, _ := conn.BeginTx(context.Background(), nil) + inst1, err := conn.PrepareContext(context.Background(), inserQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(inst1 *sql.Stmt) { + err := inst1.Close() + if err != nil { + fmt.Printf("Error closing insert statement sleepyDmlQ %s\n", err.Error()) + } + }(inst1) + _, err = inst1.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + updateStmt, err := conn.PrepareContext(context.Background(), updateQuery) + if err != nil { + fmt.Printf("Error preparing sleepyDmlQ %s\n", err.Error()) + return err + } + defer func(updateStmt *sql.Stmt) { + err := updateStmt.Close() + if err != nil { + fmt.Printf("Error closing update statement sleepyDmlQ %s\n", err.Error()) + } + }(updateStmt) + _, err = updateStmt.ExecContext(context.Background(), sql.Named("id", rand.Int()), sql.Named("seconds", delayRow)) + if err != nil { + fmt.Printf("Error query sleepyDmlQ %s\n", err.Error()) + return err + } + err = tx.Commit() + if err != nil { + fmt.Printf("Error committing sleepyDmlQ %s\n", err.Error()) + return err + } + return nil +} + func simpleEvict() { db, err := sql.Open("hera", "127.0.0.1:31002") if err != nil { @@ -157,10 +208,7 @@ func TestSqlEvict(t *testing.T) { simpleEvict() if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { t.Fatal("backlog timeout was not triggered") - } // */ - /* if (testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-102: backlog eviction", "hera.log") == 0) { - t.Fatal("backlog eviction was not triggered") - } // */ + } if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { t.Fatal("soft eviction was not triggered") } @@ -168,5 +216,81 @@ func TestSqlEvict(t *testing.T) { t.Fatal("eviction was not triggered") } logger.GetLogger().Log(logger.Debug, "TestSqlEvict stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") - time.Sleep(2 * time.Second) + time.Sleep(10 * time.Second) } // */ + +func TestSqlEvictDML(t *testing.T) { + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML begin +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + dmlEvict() + if testutil.RegexCountFile("HERA-100: backlog timeout", "hera.log") == 0 { + t.Fatal("backlog timeout was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: no worker HERA-104: saturation soft sql eviction", "hera.log") == 0 { + t.Fatal("soft eviction was not triggered") + } + if testutil.RegexCountFile("coordinator dispatchrequest: stranded conn HERA-101: saturation kill", "hera.log") == 0 { + t.Fatal("eviction was not triggered") + } + logger.GetLogger().Log(logger.Debug, "TestSqlEvictDML stop +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n") + time.Sleep(10 * time.Second) +} + +func dmlEvict() { + db, err := sql.Open("hera", "127.0.0.1:31002") + if err != nil { + fmt.Printf("Error db %s\n", err.Error()) + return + } + db.SetConnMaxLifetime(2 * time.Second) + db.SetMaxIdleConns(0) + db.SetMaxOpenConns(22111) + defer func(db *sql.DB) { + err := db.Close() + if err != nil { + fmt.Printf("Error closing db %s\n", err.Error()) + } + }(db) + + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error conn %s\n", err.Error()) + return + } + err = sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Error Executing first sleepyDmlQ %s\n", err.Error()) + return + } + + for i := 0; i < int(max_conn)+1; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Long query Request Id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } + + for i := 0; i < 50; i++ { + conn, err := db.Conn(context.Background()) + if err != nil { + fmt.Printf("Error #%d conn %s\n", i, err.Error()) + continue + } + time.Sleep(time.Millisecond * 100) + fmt.Printf("connection count %d\n", i) + go func(index int) { + err := sleepyDmlQ(conn, 1600) + if err != nil { + fmt.Printf("Request id: %d Error executing the sleepyDmlQ %s\n", index, err.Error()) + } + }(i) + } +} From ca0ae6056eeddc9d855849026edb116c354ed7ef Mon Sep 17 00:00:00 2001 From: vineeth Date: Thu, 27 Feb 2025 16:42:24 +0530 Subject: [PATCH 8/8] bind_hash_mapping sampling logging changes w.r.t. longer corrid --- worker/cppworker/worker/OCCChild.cpp | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/worker/cppworker/worker/OCCChild.cpp b/worker/cppworker/worker/OCCChild.cpp index 91da8a18..79d9f46d 100644 --- a/worker/cppworker/worker/OCCChild.cpp +++ b/worker/cppworker/worker/OCCChild.cpp @@ -1056,11 +1056,27 @@ int OCCChild::handle_command(const int _cmd, std::string &_line) { if (config->is_switch_enabled("enable_bind_hash_logging", false)) { bool skip = false; - if (m_corr_id.length() > 16) { // Max hex value that can be stored in ULLONG_MAX (FFFFFFFFFFFFFFFF) + if (m_corr_id.length() > 16 && m_corr_id.length() != 32) { // Max hex value that can be stored in ULLONG_MAX (FFFFFFFFFFFFFFFF); corrid.length() != 32 checked to handle new corrid with length 32 skip = true; // Reduce the logging noise. } if (!skip) { - unsigned long long int corrid = strtoull(m_corr_id.c_str(), NULL , 16); + unsigned long long int corrid; + unsigned long long int corrIdMsbVal; + unsigned long long int corrIdLsbVal; + bool longCorrId = false; + std::string corrIdMsb; + std::string corrIdLsb; + + if (m_corr_id.length() == 32) { + longCorrId = true; + corrIdMsb = m_corr_id.substr(0, 16); // First 16 digits (Most Significant) + corrIdLsb = m_corr_id.substr(16, 16); // Last 16 digits (Least Significant) + corrIdMsbVal = strtoull(corrIdMsb.c_str(), NULL, 16); + corrIdLsbVal = strtoull(corrIdLsb.c_str(), NULL, 16); + } else { + corrid = strtoull(m_corr_id.c_str(), NULL , 16); + } + if (errno == ERANGE) { std::ostringstream msg; msg << "m_err=error on strtoull(), errno=" << errno; @@ -1072,7 +1088,7 @@ int OCCChild::handle_command(const int _cmd, std::string &_line) e_name.AddData("corr_id_", m_corr_id); e_name.Completed(); } - if (!skip && (bit_mask == (corrid & bit_mask))) { // Allow + if (!skip && ((longCorrId && (bit_mask == (corrIdMsbVal & bit_mask)) && (bit_mask == (corrIdLsbVal & bit_mask))) || (!longCorrId && bit_mask == (corrid & bit_mask)))) { // Allow if (bind_array->size() > 0) { // Skip SQL with large number of binds if (bind_array->size() > 5 || bind_array->at(0).get()->array_row_num > 1) {