Skip to content

Commit 3efba20

Browse files
committed
implemented queueing
1 parent 156e580 commit 3efba20

3 files changed

Lines changed: 307 additions & 16 deletions

File tree

.github/workflows/queue-tests.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
name: Queue Integration Tests
2+
3+
on:
4+
push:
5+
branches:
6+
- "**"
7+
pull_request:
8+
9+
jobs:
10+
test-dispatch-queue:
11+
name: "Dispatcher Queue: ENABLE_QUEUE=true"
12+
runs-on: ubuntu-latest
13+
14+
steps:
15+
- name: Checkout
16+
uses: actions/checkout@v4
17+
18+
- name: Setup Go
19+
uses: actions/setup-go@v5
20+
with:
21+
go-version-file: go.mod
22+
23+
- name: Run dispatcher queue test
24+
env:
25+
ENABLE_QUEUE: "true"
26+
run: go test -v -run '^TestDispatchQueueingFIFO$' ./dispatcher

dispatcher/main.go

Lines changed: 116 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,31 @@ import (
66
"CodeSandboxAPI/models"
77
"CodeSandboxAPI/resourcemanager"
88
"fmt"
9+
"sync"
910
)
1011

11-
func Dispatch(req models.Request) (models.Response, error) {
12+
type dispatchResult struct {
13+
response models.Response
14+
err error
15+
}
16+
17+
type queuedRequest struct {
18+
req models.Request
19+
result chan dispatchResult
20+
}
21+
22+
var (
23+
reserveRAM = resourcemanager.ReserveRAM
24+
releaseRAM = resourcemanager.ReleaseRAM
25+
execute = executor.Execute
1226

27+
queueLock sync.Mutex
28+
queueCond = sync.NewCond(&queueLock)
29+
requestQueue []queuedRequest
30+
workerActive bool
31+
)
32+
33+
func normalizeRequest(req models.Request) models.Request {
1334
if req.Timeout == 0 {
1435
req.Timeout = config.Config.Limits.DefaultTimeout
1536
} else if req.Timeout > config.Config.Limits.MaxTimeout {
@@ -21,22 +42,101 @@ func Dispatch(req models.Request) (models.Response, error) {
2142
} else if req.MemoryLimit > config.Config.Limits.MaxMemoryLimit {
2243
req.MemoryLimit = config.Config.Limits.MaxMemoryLimit
2344
}
45+
46+
return req
47+
}
48+
49+
func executeWithReservedRAM(req models.Request) (models.Response, error) {
50+
defer func() {
51+
releaseRAM(req.MemoryLimit)
52+
notifyQueueWaiters()
53+
}()
54+
return execute(req)
55+
}
56+
57+
func notifyQueueWaiters() {
58+
queueLock.Lock()
59+
queueCond.Broadcast()
60+
queueLock.Unlock()
61+
}
62+
63+
func startQueueWorkerLocked() {
64+
if workerActive {
65+
return
66+
}
67+
workerActive = true
68+
go processQueue()
69+
}
70+
71+
func processQueue() {
72+
for {
73+
queueLock.Lock()
74+
for len(requestQueue) == 0 {
75+
queueCond.Wait()
76+
}
77+
78+
head := requestQueue[0]
79+
for !reserveRAM(head.req.MemoryLimit) {
80+
queueCond.Wait()
81+
head = requestQueue[0]
82+
}
83+
84+
requestQueue = requestQueue[1:]
85+
queueLock.Unlock()
86+
87+
resp, err := executeWithReservedRAM(head.req)
88+
head.result <- dispatchResult{response: resp, err: err}
89+
}
90+
}
91+
92+
func queueOrExecute(req models.Request) (models.Response, error) {
93+
queueLock.Lock()
94+
if len(requestQueue) == 0 && reserveRAM(req.MemoryLimit) {
95+
queueLock.Unlock()
96+
return executeWithReservedRAM(req)
97+
}
98+
99+
entry := queuedRequest{
100+
req: req,
101+
result: make(chan dispatchResult, 1),
102+
}
103+
requestQueue = append(requestQueue, entry)
104+
startQueueWorkerLocked()
105+
queueCond.Broadcast()
106+
queueLock.Unlock()
107+
108+
result := <-entry.result
109+
return result.response, result.err
110+
}
111+
112+
func cannotReserveResponse() (models.Response, error) {
113+
return models.Response{
114+
Stdout: "",
115+
Stderr: "Resource limit reached, please try again later",
116+
ExecutionTime: 0,
117+
}, fmt.Errorf("failed to reserve RAM")
118+
}
119+
120+
func Dispatch(req models.Request) (models.Response, error) {
121+
req = normalizeRequest(req)
122+
123+
if req.MemoryLimit > config.Config.Globals.RAM_LIMIT {
124+
return cannotReserveResponse()
125+
}
126+
24127
if !config.Config.Globals.ENABLE_QUEUE {
25-
if !resourcemanager.ReserveRAM(req.MemoryLimit) { // TODO: handle condition for queueing requests when RAM is not available
26-
return models.Response{
27-
Stdout: "",
28-
Stderr: "Resource limit reached, please try again later",
29-
ExecutionTime: 0,
30-
}, fmt.Errorf("Failed to reserve RAM")
128+
if !reserveRAM(req.MemoryLimit) {
129+
return cannotReserveResponse()
31130
}
32-
defer resourcemanager.ReleaseRAM(req.MemoryLimit)
33-
return executor.Execute(req)
34-
} else {
35-
// TODO: Implement queueing logic
36-
return models.Response{
37-
Stdout: "",
38-
Stderr: "Queueing is not implemented yet",
39-
ExecutionTime: 0,
40-
}, nil
131+
return executeWithReservedRAM(req)
41132
}
133+
134+
return queueOrExecute(req)
135+
}
136+
137+
func resetQueueStateForTests() {
138+
queueLock.Lock()
139+
requestQueue = nil
140+
queueCond.Broadcast()
141+
queueLock.Unlock()
42142
}

dispatcher/queue_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package dispatcher
2+
3+
import (
4+
"CodeSandboxAPI/config"
5+
"CodeSandboxAPI/models"
6+
"sync"
7+
"testing"
8+
"time"
9+
)
10+
11+
func TestDispatchQueueingFIFO(t *testing.T) {
12+
originalQueue := config.Config.Globals.ENABLE_QUEUE
13+
originalGlobalRAM := config.Config.Globals.RAM_LIMIT
14+
originalDefaultMem := config.Config.Limits.DefaultMemoryLimit
15+
originalMaxMem := config.Config.Limits.MaxMemoryLimit
16+
originalReserve := reserveRAM
17+
originalRelease := releaseRAM
18+
originalExecute := execute
19+
t.Cleanup(func() {
20+
config.Config.Globals.ENABLE_QUEUE = originalQueue
21+
config.Config.Globals.RAM_LIMIT = originalGlobalRAM
22+
config.Config.Limits.DefaultMemoryLimit = originalDefaultMem
23+
config.Config.Limits.MaxMemoryLimit = originalMaxMem
24+
reserveRAM = originalReserve
25+
releaseRAM = originalRelease
26+
execute = originalExecute
27+
})
28+
29+
config.Config.Globals.ENABLE_QUEUE = true
30+
config.Config.Globals.RAM_LIMIT = 1
31+
config.Config.Limits.DefaultMemoryLimit = 1
32+
config.Config.Limits.MaxMemoryLimit = 1
33+
resetQueueStateForTests()
34+
35+
var memLock sync.Mutex
36+
available := uint(1)
37+
reserveRAM = func(amount uint) bool {
38+
memLock.Lock()
39+
defer memLock.Unlock()
40+
if amount > available {
41+
return false
42+
}
43+
available -= amount
44+
return true
45+
}
46+
releaseRAM = func(amount uint) {
47+
memLock.Lock()
48+
available += amount
49+
memLock.Unlock()
50+
}
51+
52+
var orderLock sync.Mutex
53+
startOrder := make([]string, 0, 3)
54+
allowFirstToFinish := make(chan struct{})
55+
execute = func(req models.Request) (models.Response, error) {
56+
orderLock.Lock()
57+
startOrder = append(startOrder, req.Code)
58+
orderLock.Unlock()
59+
60+
if req.Code == "A" {
61+
<-allowFirstToFinish
62+
}
63+
64+
return models.Response{Stdout: req.Code}, nil
65+
}
66+
67+
resultA := make(chan models.Response, 1)
68+
errA := make(chan error, 1)
69+
go func() {
70+
resp, err := Dispatch(models.Request{Code: "A", MemoryLimit: 1})
71+
resultA <- resp
72+
errA <- err
73+
}()
74+
75+
// Wait until the first request starts and holds RAM.
76+
deadline := time.Now().Add(500 * time.Millisecond)
77+
for {
78+
orderLock.Lock()
79+
started := len(startOrder) == 1 && startOrder[0] == "A"
80+
orderLock.Unlock()
81+
if started {
82+
break
83+
}
84+
if time.Now().After(deadline) {
85+
t.Fatal("first request did not start in time")
86+
}
87+
time.Sleep(5 * time.Millisecond)
88+
}
89+
90+
resultB := make(chan models.Response, 1)
91+
errB := make(chan error, 1)
92+
go func() {
93+
resp, err := Dispatch(models.Request{Code: "B", MemoryLimit: 1})
94+
resultB <- resp
95+
errB <- err
96+
}()
97+
98+
// Ensure B is at the front of the queue before C arrives.
99+
queueDeadline := time.Now().Add(500 * time.Millisecond)
100+
for {
101+
queueLock.Lock()
102+
ready := len(requestQueue) == 1 && requestQueue[0].req.Code == "B"
103+
queueLock.Unlock()
104+
if ready {
105+
break
106+
}
107+
if time.Now().After(queueDeadline) {
108+
t.Fatal("B was not queued in time")
109+
}
110+
time.Sleep(5 * time.Millisecond)
111+
}
112+
113+
resultC := make(chan models.Response, 1)
114+
errC := make(chan error, 1)
115+
go func() {
116+
resp, err := Dispatch(models.Request{Code: "C", MemoryLimit: 1})
117+
resultC <- resp
118+
errC <- err
119+
}()
120+
121+
// B and C should be queued while A is still running.
122+
select {
123+
case <-resultB:
124+
t.Fatal("B should have stayed queued while A holds RAM")
125+
case <-time.After(40 * time.Millisecond):
126+
}
127+
128+
select {
129+
case <-resultC:
130+
t.Fatal("C should have stayed queued while A holds RAM")
131+
case <-time.After(40 * time.Millisecond):
132+
}
133+
134+
close(allowFirstToFinish)
135+
136+
if err := <-errA; err != nil {
137+
t.Fatalf("A returned error: %v", err)
138+
}
139+
if resp := <-resultA; resp.Stdout != "A" {
140+
t.Fatalf("unexpected A stdout: %q", resp.Stdout)
141+
}
142+
143+
if err := <-errB; err != nil {
144+
t.Fatalf("B returned error: %v", err)
145+
}
146+
if resp := <-resultB; resp.Stdout != "B" {
147+
t.Fatalf("unexpected B stdout: %q", resp.Stdout)
148+
}
149+
150+
if err := <-errC; err != nil {
151+
t.Fatalf("C returned error: %v", err)
152+
}
153+
if resp := <-resultC; resp.Stdout != "C" {
154+
t.Fatalf("unexpected C stdout: %q", resp.Stdout)
155+
}
156+
157+
orderLock.Lock()
158+
defer orderLock.Unlock()
159+
if len(startOrder) != 3 {
160+
t.Fatalf("unexpected execution count: got=%d order=%v", len(startOrder), startOrder)
161+
}
162+
if startOrder[0] != "A" || startOrder[1] != "B" || startOrder[2] != "C" {
163+
t.Fatalf("queue did not execute in FIFO order: %v", startOrder)
164+
}
165+
}

0 commit comments

Comments
 (0)