Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ jobs:

tests:
runs-on: parity-ubuntu
env:
DOCKER_API_VERSION: '1.44'
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down Expand Up @@ -78,7 +80,7 @@ jobs:
environment: ${{ github.event_name == 'workflow_dispatch' && 'main' || null }}
env:
REGISTRY_PATH: docker.io/${{ github.event_name == 'workflow_dispatch' && 'paritytech' || 'paritypr' }}/cattery
VERSION: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.tag_name || github.sha }}
VERSION: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.tag_name || github.event.pull_request.head.sha }}
EXTRA_TAG: ${{ github.event_name == 'workflow_dispatch' && 'latest' || 'main' }}
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.24-alpine AS builder
FROM golang:1.25.5-alpine AS builder

ARG CATTERY_VERSION="0.0.0"

Expand Down
66 changes: 66 additions & 0 deletions src/agent/Watchers/fileWatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package Watchers

import (
"cattery/agent/shutdownEvents"
"cattery/lib/messages"
"context"
"os"

"github.com/fsnotify/fsnotify"
log "github.com/sirupsen/logrus"
)

var filename = "./shutdown_file"

func WatchFile(ctx context.Context) {
go func() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()

createFile(filename)

err = watcher.Add(filename)
if err != nil {
log.Fatal(err)
}

var message string

if ctx == nil {
ctx = context.Background()
}

select {
case <-ctx.Done():
return
case event := <-watcher.Events:
if event.Op.Has(fsnotify.Write) {
message = "Modified file: " + event.Name
}
if event.Op.Has(fsnotify.Remove) {
message = "Removed file: " + event.Name
}
if event.Op.Has(fsnotify.Rename) {
message = "Renamed file: " + event.Name
}
case err := <-watcher.Errors:
message = "File error: " + err.Error()
log.Error(message)
}

log.Info(message)

shutdownEvents.Emit(messages.UnregisterReasonPreempted, message)
}()
}

func createFile(filename string) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Fatal(err)
}
f.Close()
}
48 changes: 48 additions & 0 deletions src/agent/Watchers/pingWatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package Watchers

import (
"cattery/agent/catteryClient"
"cattery/agent/shutdownEvents"
"cattery/lib/messages"
"context"
"time"

log "github.com/sirupsen/logrus"
)

func WatchPing(ctx context.Context, client *catteryClient.CatteryClient) {
go func() {
var msg string
var finished = false

if ctx == nil {
ctx = context.Background()
}

for !finished {
select {
case <-ctx.Done(): // selected when context is canceled or times out
msg = "cattery client shutdown"
finished = true
break
default:
pingResponse, err := client.Ping()
if err != nil {
msg = "error pinging controller: " + err.Error()
log.Error(msg)
continue
}

if pingResponse.Terminate {
msg = "controller ping receive 'terminate': " + pingResponse.Message
finished = true
break
}

time.Sleep(60 * time.Second)
}
}

shutdownEvents.Emit(messages.UnregisterReasonControllerKill, msg)
}()
}
25 changes: 25 additions & 0 deletions src/agent/Watchers/signalWather.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package Watchers

import (
"cattery/agent/shutdownEvents"
"cattery/lib/messages"
"os"
"os/signal"
"syscall"

log "github.com/sirupsen/logrus"
)

func WatchSignal() {
go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
signal.Notify(sigs, syscall.SIGTERM)
signal.Notify(sigs, syscall.SIGKILL)

sig := <-sigs
log.Info("Got signal ", sig)

shutdownEvents.Emit(messages.UnregisterReasonSigTerm, "Got signal "+sig.String())
}()
}
98 changes: 25 additions & 73 deletions src/agent/agent.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package agent

import (
"cattery/agent/Watchers"
"cattery/agent/catteryClient"
"cattery/agent/githubListener"
"cattery/agent/shutdownEvents"
"cattery/agent/tools"
"cattery/lib/agents"
"cattery/lib/messages"
"os"
"os/exec"
"os/signal"
"context"
"path"
"sync"
"syscall"

"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)

var RunnerFolder string
Expand All @@ -26,8 +26,8 @@ func Start() {

type CatteryAgent struct {
mutex sync.Mutex
logger *logrus.Entry
catteryClient *CatteryClient
logger *log.Entry
catteryClient *catteryClient.CatteryClient
agent *agents.Agent
agentId string

Expand All @@ -38,8 +38,8 @@ type CatteryAgent struct {
func NewCatteryAgent(runnerFolder string, catteryServerUrl string, agentId string) *CatteryAgent {
return &CatteryAgent{
mutex: sync.Mutex{},
logger: logrus.WithFields(logrus.Fields{"name": "agent", "agentId": agentId}),
catteryClient: createClient(catteryServerUrl),
logger: log.WithFields(log.Fields{"name": "agent", "agentId": agentId}),
catteryClient: createClient(catteryServerUrl, agentId),
listenerExecPath: path.Join(runnerFolder, "bin", "Runner.Listener"),
agentId: agentId,
interrupted: false,
Expand All @@ -60,76 +60,28 @@ func (a *CatteryAgent) Start() {

a.logger.Info("Agent registered, starting Listener")

var commandRun = exec.Command(a.listenerExecPath, "run", "--jitconfig", *jitConfig)
commandRun.Stdout = os.Stdout
commandRun.Stderr = os.Stderr
Watchers.WatchSignal()
Watchers.WatchFile(context.Background())
Watchers.WatchPing(context.Background(), a.catteryClient)

go func() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)
signal.Notify(sigs, syscall.SIGTERM)
signal.Notify(sigs, syscall.SIGKILL)
var ghListener = githubListener.NewGithubListener(a.listenerExecPath)
ghListener.Start(jitConfig)

sig := <-sigs
a.logger.Info("Got signal ", sig)
// blocking call
var event = shutdownEvents.WaitEvent()

a.interrupt(commandRun.Process)
}()
a.logger.Infof("Received shutdown event: %s, reason: %d", event.Message, event.Reason)

err = commandRun.Run()
if err != nil {
var errMsg = "Runner failed: " + err.Error()
a.logger.Error(errMsg)
}

a.stop()
}

func (a *CatteryAgent) interrupt(runnerProcess *os.Process) {
a.mutex.Lock()
defer a.mutex.Unlock()

if a.interrupted {
return
}

a.logger.Info("Interrupting runner")
err := a.catteryClient.InterruptAgent(a.agent)
if err != nil {
var errMsg = "Failed to interrupt agent: " + err.Error()
a.logger.Error(errMsg)
}
// SIGINT from go kills listener immediately, so we use pkill
var commandInterruptRun = exec.Command("pkill", "--signal", "SIGINT", "Runner.Listener")
err = commandInterruptRun.Run()
if err != nil {
var errMsg = "Failed to interrupt runner: " + err.Error()
a.logger.Error(errMsg)
}
// TODO: debug why SIGINT does not work correctly
// err := runnerProcess.Signal(syscall.SIGINT)
// if err != nil {
// var errMsg = "Failed to stop runner: " + err.Error()
// a.logger.Error(errMsg)
// }

a.interrupted = true
ghListener.Stop()
a.stop(event)
}

// stop stops the runner process
func (a *CatteryAgent) stop() {
func (a *CatteryAgent) stop(event shutdownEvents.ShutdownEvent) {

var reason messages.UnregisterReason

if a.interrupted {
reason = messages.UnregisterReasonPreempted
a.logger.Infof("Runner has been interrupted")
} else {
reason = messages.UnregisterReasonDone
a.logger.Infof("Runner has finished its job")
}
log.Infof("Stopping Cattery Agent with reason: %d, message: `%s`", event.Reason, event.Message)

err := a.catteryClient.UnregisterAgent(a.agent, reason)
err := a.catteryClient.UnregisterAgent(a.agent, event.Reason, event.Message)
if err != nil {
var errMsg = "Failed to unregister agent: " + err.Error()
a.logger.Error(errMsg)
Expand All @@ -142,6 +94,6 @@ func (a *CatteryAgent) stop() {
}

// createClient creates a new http client
func createClient(baseUrl string) *CatteryClient {
return NewCatteryClient(baseUrl)
func createClient(baseUrl string, agentId string) *catteryClient.CatteryClient {
return catteryClient.NewCatteryClient(baseUrl, agentId)
}
Loading