Skip to content

feat(snmp-discovery): serialize diode ingest via queued client#419

Merged
jajeffries merged 20 commits into
developfrom
feat/snmp-ingest-queue
Jun 26, 2026
Merged

feat(snmp-discovery): serialize diode ingest via queued client#419
jajeffries merged 20 commits into
developfrom
feat/snmp-ingest-queue

Conversation

@jajeffries

@jajeffries jajeffries commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Add a QueuedClient wrapper around diode.Client that serializes ingest through a buffered channel with a single consumer goroutine, preventing concurrent OAuth re-auth storms when many SNMP crawl jobs complete at once.
  • Wire the queue in snmp-discovery via --ingest-buffer-size (default 512) and expose it from the agent backend as orb.backends.snmp_discovery.ingest_buffer_size.

What changed

  • New orb-discovery/snmp-discovery/ingest/QueuedClient implementing diode.Client with enqueue backpressure, context cancellation, and idempotent shutdown.
  • cmd/main.go wraps the real diode client at startup and closes it on shutdown.
  • Agent snmp_discovery backend reads ingest_buffer_size from YAML (validated at configure time) and passes --ingest-buffer-size to the subprocess.
  • Unit tests for serial execution, ctx cancel while queued, Close/drain behavior, and backend CLI forwarding.
  • Documentation in docs/backends/snmp_discovery/README.md and docs/config_samples.md.

How tested

  • cd orb-discovery/snmp-discovery && go test ./ingest/... ./policy/...
  • go test ./agent/backend/snmpdiscovery/...
  • make fix-lint

Follow-ups (out of scope)

  • diode-sdk-go: singleflight on authenticate(), backoff on 502/429.
  • Staggered crawl scheduling in runner (separate from ingest serialization).

jajeffries and others added 6 commits June 25, 2026 15:27
Wrap diode.Client with a buffered channel and single consumer goroutine
to prevent concurrent OAuth re-auth storms from parallel crawl jobs.

Co-authored-by: Cursor <cursoragent@cursor.com>
Add --ingest-buffer-size (default 256), wrap the diode client at startup,
and close the queued client on shutdown.

Co-authored-by: Cursor <cursoragent@cursor.com>
Read ingest_buffer_size from snmp_discovery backend YAML (default 256)
and forward it as --ingest-buffer-size when starting the backend.

Co-authored-by: Cursor <cursoragent@cursor.com>
Cover serial ingest execution, context cancellation, Close behavior,
and agent YAML forwarding of ingest_buffer_size to the subprocess.

Co-authored-by: Cursor <cursoragent@cursor.com>
Describe the serial ingest queue, default size, and tuning guidance in
the backend README and config samples.

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries

Copy link
Copy Markdown
Contributor Author

@codex review

@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

Go test coverage

Status Elapsed Package Cover Pass Fail Skip
🟢 PASS 1.03s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/config 81.1% 61 0 0
🟢 PASS 19.81s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/data 82.9% 11771 0 0
🟢 PASS 1.02s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/env 85.7% 15 0 0
🟢 PASS 1.28s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/ingest 87.7% 11 0 0
🟢 PASS 1.40s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/mapping 89.4% 833 0 0
🟢 PASS 1.03s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/mapping/qbridge 86.2% 55 0 0
🟢 PASS 1.03s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/metrics 85.4% 27 0 0
🟢 PASS 17.33s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/policy 85.8% 147 0 0
🟢 PASS 4.68s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/server 85.7% 20 0 0
🟢 PASS 1.04s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/snmp 82.2% 41 0 0
🟢 PASS 1.03s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/targets 93.0% 30 0 0
🟢 PASS 1.02s github.com/netboxlabs/orb-agent/orb-discovery/snmp-discovery/version 100.0% 1 0 0
Total coverage: 87.9%

Co-authored-by: Cursor <cursoragent@cursor.com>

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1dfb8a64b6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
@leoparente leoparente requested a review from Copilot June 25, 2026 14:55
Signal shutdown via shutdownCh only; the consumer drains pending requests
and exits without closing the requests channel, eliminating the race where
Close could close the channel while a producer was still sending.

Co-authored-by: Cursor <cursoragent@cursor.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an ingest-serialization layer for SNMP discovery by wrapping the Diode client with a buffered, single-consumer queue to prevent concurrent OAuth re-auth storms when many crawls finish at once. It also wires the queue capacity through CLI/config and updates tests and docs accordingly.

Changes:

  • Add ingest.QueuedClient to serialize diode.Client ingest calls via a buffered channel + consumer goroutine.
  • Expose queue capacity via --ingest-buffer-size (snmp-discovery) and orb.backends.snmp_discovery.ingest_buffer_size (agent backend).
  • Add unit tests for queue behavior and backend CLI forwarding; update SNMP discovery backend documentation and config samples.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
orb-discovery/snmp-discovery/ingest/queued_client.go Implements QueuedClient to serialize ingest and manage shutdown semantics.
orb-discovery/snmp-discovery/ingest/queued_client_test.go Adds tests covering serialization, cancellation while queued, and Close behavior.
orb-discovery/snmp-discovery/cmd/main.go Adds --ingest-buffer-size flag and wraps/closes the ingest client on shutdown.
agent/backend/snmpdiscovery/snmp_discovery.go Reads ingest_buffer_size and forwards --ingest-buffer-size to the subprocess.
agent/backend/snmpdiscovery/snmp_discovery_test.go Verifies --ingest-buffer-size CLI arg forwarding when configured.
docs/config_samples.md Documents orb.backends.snmp_discovery.ingest_buffer_size in sample config.
docs/backends/snmp_discovery/README.md Documents backend-level ingest queue configuration and behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread orb-discovery/snmp-discovery/ingest/queued_client_test.go
Comment thread orb-discovery/snmp-discovery/ingest/queued_client_test.go
Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
Comment thread agent/backend/snmpdiscovery/snmp_discovery.go
@github-actions

github-actions Bot commented Jun 25, 2026

Copy link
Copy Markdown

Vulnerability Scan: Passed

Image: orb-agent:scan

No vulnerabilities found.

Commit: 1733852

jajeffries and others added 5 commits June 25, 2026 16:01
Reject zero, negative, non-integer, and unsupported types at configure
time instead of failing opaquely when the subprocess starts.

Co-authored-by: Cursor <cursoragent@cursor.com>
Warn when a completed or drained ingest result cannot be sent because
the caller is no longer waiting on the result channel.

Co-authored-by: Cursor <cursoragent@cursor.com>
Verify queued ingests still in the buffer receive ErrIngestQueueClosed
when the client shuts down while another ingest is in flight.

Co-authored-by: Cursor <cursoragent@cursor.com>
Align agent, subprocess flag default, and docs with the recommended
value for large subnet bursts.

Co-authored-by: Cursor <cursoragent@cursor.com>
After Close signals shutdown, the consumer drains buffered requests
without executing them. In-flight ingest still completes before exit.

Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries

Copy link
Copy Markdown
Contributor Author

Addressed all review feedback in the latest push:

  • Codex (send-on-closed-channel): fixed in 6bb7b6a — thread already resolved
  • Copilot (shutdown select race): fixed in 15869d8 — consumer checks shutdownCh before dequeuing; buffered work is drained, not executed
  • Copilot (ingest_buffer_size validation): fixed in 053bd18 via parseIngestBufferSize()
  • Copilot (for range workers): no change — integer ranging is valid on Go 1.26.4; CI ingest tests pass

Also includes default buffer size bump to 512 (a1557cc).

@jajeffries

Copy link
Copy Markdown
Contributor Author

@codex review

@jajeffries jajeffries self-assigned this Jun 25, 2026
@jajeffries

Copy link
Copy Markdown
Contributor Author

@copilot review

@jajeffries jajeffries marked this pull request as ready for review June 25, 2026 15:19

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 15869d845e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
Serialize enqueue with Close via an RWMutex gate so a closed shutdownCh
cannot race with a buffered send that would orphan the request. Also
watch the consumer done channel while waiting for ingest results.

Addresses Codex review on PR #419.

Co-authored-by: Cursor <cursoragent@cursor.com>
@leoparente

Copy link
Copy Markdown
Contributor

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6593155644

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
Drop the enqueue gate lock so Close can signal shutdown while producers
wait for buffer space. Recheck shutdown before executing dequeued work so
buffered requests are failed instead of run after Close starts.

Addresses Codex review on PR #419.

Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries

Copy link
Copy Markdown
Contributor Author

Addressed latest Codex feedback:

  • Gate lock during full-buffer wait: removed the RWMutex gate; enqueue no longer blocks Close while waiting for queue space. Blocked producers return ErrIngestQueueClosed when shutdown starts.
  • Execute after shutdown: consumer rechecks shutdownCh before running dequeued work; buffered requests are failed, not executed.

Pushed on latest commit; ingest tests pass.

@jajeffries

Copy link
Copy Markdown
Contributor Author

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0a74ab2b88

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread orb-discovery/snmp-discovery/cmd/main.go Outdated

@leoparente leoparente left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Just get codex reply with no new findings. And recommend a e2e testing with orb-test-lab.

@jimjkelly

Copy link
Copy Markdown

Nice work — the serialization approach is clean and the shutdown semantics are well thought out. Two suggestions:

  1. Track the diode-sdk-go follow-ups. The PR description mentions singleflight on authenticate() and backoff on 502/429/503 as out-of-scope. Are these tracked somewhere (Linear ticket, SDK issue)? The serialization here is a solid workaround, but the proper fix lives in the SDK — want to make sure it doesn't fall off the radar.

  2. Consider a per-call timeout safety net. Right now, if the diode server hangs (accepts the connection but never responds), the single consumer goroutine blocks indefinitely — and since callers generally use context.Background(), there's no deadline to bail out. The entire ingest pipeline stalls silently. A default timeout wrapping the inner Ingest call (e.g., 30-60s) would be cheap insurance against this. Something like:

    func (c *QueuedClient) execute(req *ingestRequest) {
        ctx := req.ctx
        if _, ok := ctx.Deadline(); !ok {
            var cancel context.CancelFunc
            ctx, cancel = context.WithTimeout(ctx, c.callTimeout)
            defer cancel()
        }
        resp, err := req.run(ctx)
        // ...
    }

    This is optional for this PR — the head-of-line blocking risk existed before this change too — but it'd be a small addition that prevents a bad failure mode.

@davidlanouette davidlanouette left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll finish looking tomorrow - with fresh eyes.

Comment thread orb-discovery/snmp-discovery/cmd/main.go
defer close(c.done)

for {
if c.shutdownSignaled() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it duplicates the select/case <-c.shutdownCh logic. Is this duplication serving a purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — the non-blocking shutdownSignaled() check and the blocking select on shutdownCh serve different moments in the loop. The helper catches shutdown that happens during execute() or between iterations; the select wakes the consumer when it is blocked waiting for work. Added comments in the latest push to spell this out.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't line 90 catch all shutdown events from execute too?

If you always want to handle shutdown events before you check the request, wouldn't a cleaner solution be to add a "do nothing" default to the select on L80? That would remove the duplication and accomplish the same thing. (unless it's too early for me.)

Although I think you just need the single select that you have (and remove the non-async check, something like this would remove duplication and ensure that shutdown is checked before c.requests each loop:

func (c *QueuedClient) consumer() {
	defer close(c.done)

	for {
		// Non-blocking check: Close may have run during execute() on the prior
		// iteration. The select below handles shutdown while blocked for work.
		if c.shutdownSignaled() {
			c.drainPendingFailures()
			return
		}

		select {
		case req := <-c.requests:
			c.execute(req)
		default:
			// continue the for loop if c.requests isn't ready
		}
	}
}

Although, I think this is significantly simpler and just as correct.

func (c *QueuedClient) consumer() {
	defer close(c.done)

	for {
		select {
		case req := <-c.requests:
			c.execute(req)
		case <-c.done:
			c.drainPendingFailures()
			return
		}
	}
}

Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go
Comment thread orb-discovery/snmp-discovery/ingest/queued_client.go Outdated
Wire policy manager to a cancellable root context and call cancel before
QueuedClient.Close so in-flight Diode ingests can abort on SIGTERM.
Document shutdown channel roles and why shutdownSignaled coexists with
blocking shutdownCh selects.

Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries

Copy link
Copy Markdown
Contributor Author

@codex review

jajeffries and others added 2 commits June 26, 2026 10:14
Apply a 30s deadline when callers have no context deadline, derived from
Grafana production diode-ingester Ingest latency (p99 ~938ms, max bucket
10s). Prevents a hung Diode server from blocking the ingest consumer.

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ad91b4f168

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread orb-discovery/snmp-discovery/cmd/main.go Outdated
Comment thread orb-discovery/snmp-discovery/cmd/main.go Outdated
Cancel rootCtx before server.Stop() so in-flight Diode ingests abort
promptly during SIGTERM instead of waiting on scheduler stop timeouts.
Use a fresh bounded context for metrics.Shutdown so OTLP can flush after
the application context is canceled.

Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries

jajeffries commented Jun 26, 2026

Copy link
Copy Markdown
Contributor Author

Addressed the two remaining Codex threads in the latest push:

  • Cancel policy context before stopping jobs: shutdown() now calls cancelFunc() before server.Stop() on both the signal and server-error paths, so runner r.ctx is canceled while jobs are still stopping and queued/in-flight Diode ingests can abort promptly.
  • Metrics shutdown context: metrics.Shutdown now uses a fresh 5s timeout context instead of the canceled rootCtx, so OTLP can flush final metrics on SIGTERM.

Also consolidated both shutdown paths into a shared shutdown() helper.

Re: @jimjkelly's suggestions — the per-call timeout safety net is already in QueuedClient.execute (30s default when callers pass no deadline). The sdk follow ups are: netboxlabs/diode-sdk-go#72, netboxlabs/diode-sdk-python#95 and netboxlabs/diode-sdk-go#73

Add //go:build integration tests that run snmp-discovery in dry-run
against four lab SNMP simulators, assert multi-target ingest output, and
verify SIGTERM shutdown. Expose make test-integration (Docker network)
and test-integration-local targets.

Co-authored-by: Cursor <cursoragent@cursor.com>
@jajeffries jajeffries merged commit 2d4c434 into develop Jun 26, 2026
13 checks passed
@jajeffries jajeffries deleted the feat/snmp-ingest-queue branch June 26, 2026 11:16

@davidlanouette davidlanouette left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's all I have.

Comment on lines +266 to +270
closeDone := make(chan struct{})
go func() {
require.NoError(t, client.Close())
close(closeDone)
}()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here? The "infrastructure around client.Close() isn't obvious.

It looks like you are trying to async close the client. But, then line 272 is waiting for the go routine to finish.

Is there a reason you don't just close the client and continue - without the closeDone chan and the go routine?

Comment on lines +154 to +160
case <-c.done:
select {
case res := <-req.result:
return res.resp, res.err
default:
return nil, ErrIngestQueueClosed
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this almost always hit the default condition. If there was a value in req.result, it would be caught on line 150 - before this case was selected. Therefore, line 156 won't be run.

Note: if both req.result and c.done are ready at the exact same time, then the runtime will select one at random. So, technically, line 156 could pass. But, the chances of that are pretty low (like thousands to one). Is it worth the complexity?

defer close(c.done)

for {
if c.shutdownSignaled() {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't line 90 catch all shutdown events from execute too?

If you always want to handle shutdown events before you check the request, wouldn't a cleaner solution be to add a "do nothing" default to the select on L80? That would remove the duplication and accomplish the same thing. (unless it's too early for me.)

Although I think you just need the single select that you have (and remove the non-async check, something like this would remove duplication and ensure that shutdown is checked before c.requests each loop:

func (c *QueuedClient) consumer() {
	defer close(c.done)

	for {
		// Non-blocking check: Close may have run during execute() on the prior
		// iteration. The select below handles shutdown while blocked for work.
		if c.shutdownSignaled() {
			c.drainPendingFailures()
			return
		}

		select {
		case req := <-c.requests:
			c.execute(req)
		default:
			// continue the for loop if c.requests isn't ready
		}
	}
}

Although, I think this is significantly simpler and just as correct.

func (c *QueuedClient) consumer() {
	defer close(c.done)

	for {
		select {
		case req := <-c.requests:
			c.execute(req)
		case <-c.done:
			c.drainPendingFailures()
			return
		}
	}
}

case err := <-done:
if err != nil {
// SIGTERM is expected; only fail on unexpected wait errors.
if _, ok := err.(*exec.ExitError); !ok {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: use if errors.As(err, &exec.ExitError) { instead of casting.

@github-actions

Copy link
Copy Markdown

🎉 This PR is included in version snmp-discovery-v1.34.0 🎉

The release is available on GitHub release

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants