From b0ddcca3ec0e7bf4bbe6034481b328332b79b70b Mon Sep 17 00:00:00 2001 From: hexbabe Date: Thu, 12 Feb 2026 11:37:30 -0500 Subject: [PATCH 1/3] Add impl and test --- videostore/fetch_frames_test.go | 78 ++++++++++++++++++++++++++++++++- videostore/videostore.go | 11 +++++ 2 files changed, 87 insertions(+), 2 deletions(-) diff --git a/videostore/fetch_frames_test.go b/videostore/fetch_frames_test.go index 571b2d05..9fcc9f8d 100644 --- a/videostore/fetch_frames_test.go +++ b/videostore/fetch_frames_test.go @@ -6,6 +6,7 @@ import ( "errors" "image" "image/jpeg" + "sync" "sync/atomic" "testing" "time" @@ -20,9 +21,14 @@ import ( // mockCamera implements camera.Camera for testing fetchFrames behavior. type mockCamera struct { camera.Camera - name resource.Name + // mu guards fields that may be modified mid-test while fetchFrames reads them + // like imagesToReturn. Otherwise this test is unsafe and go test's -race flag + // would get really upset. + mu sync.Mutex imagesToReturn []camera.NamedImage - returnError error + + name resource.Name + returnError error } func (m *mockCamera) Name() resource.Name { @@ -38,6 +44,8 @@ func (m *mockCamera) Images( resource.ResponseMetadata, error, ) { + m.mu.Lock() + defer m.mu.Unlock() if m.returnError != nil { return nil, resource.ResponseMetadata{}, m.returnError } @@ -199,6 +207,72 @@ func TestFetchFrames(t *testing.T) { test.That(t, len(frameBytes), test.ShouldEqual, 0) }) + t.Run("Clears stale frame when camera switches to invalid MIME type", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + + // Switch camera to return depth image + depthImage, err := camera.NamedImageFromBytes([]byte("fake depth data"), "depth", "image/vnd.viam.dep") + test.That(t, err, test.ShouldBeNil) + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{depthImage} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + }) + t.Run("Continues on Images error", func(t *testing.T) { mockCam := &mockCamera{ name: resource.NewName(camera.API, "test-camera"), diff --git a/videostore/videostore.go b/videostore/videostore.go index d86fc64c..3716902a 100644 --- a/videostore/videostore.go +++ b/videostore/videostore.go @@ -524,6 +524,12 @@ func (vs *videostore) Save(_ context.Context, r *SaveRequest) (*SaveResponse, er return &SaveResponse{Filename: uploadFileName}, nil } +// clearLatestFrame sets the latest frame atomic to a nil byte slice. +// Useful to remove stale frames when underlying cam is unhealthy. +func (vs *videostore) clearLatestFrame() { + vs.latestFrame.Store([]byte(nil)) +} + func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerConfig, ) { frameInterval := time.Second / time.Duration(framePoller.Framerate) @@ -541,6 +547,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo framePoller.Camera.Name(), err, ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } @@ -550,6 +557,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo "no images received from camera %s", framePoller.Camera.Name(), ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } else if len(namedImages) > 1 { @@ -558,6 +566,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo len(namedImages), framePoller.Camera.Name(), ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } @@ -566,11 +575,13 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo data, err := namedImage.Bytes(ctx) if err != nil { vs.logger.Warnf("failed to get bytes from image: %v", err) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } if actualMimeType, _ := rutils.CheckLazyMIMEType(namedImage.MimeType()); actualMimeType != rutils.MimeTypeJPEG { vs.logger.Warnf("expected image mime type %s got %s: ", rutils.MimeTypeJPEG, actualMimeType) + vs.clearLatestFrame() continue } vs.latestFrame.Store(data) From f27fdba53e8d0b2c327cd36946596cfc4b7b40cb Mon Sep 17 00:00:00 2001 From: sean yu Date: Fri, 13 Mar 2026 13:12:10 -0400 Subject: [PATCH 2/3] Avoid redundant atomic store in clearLatestFrame --- videostore/videostore.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/videostore/videostore.go b/videostore/videostore.go index 3716902a..4c31a584 100644 --- a/videostore/videostore.go +++ b/videostore/videostore.go @@ -527,7 +527,9 @@ func (vs *videostore) Save(_ context.Context, r *SaveRequest) (*SaveResponse, er // clearLatestFrame sets the latest frame atomic to a nil byte slice. // Useful to remove stale frames when underlying cam is unhealthy. func (vs *videostore) clearLatestFrame() { - vs.latestFrame.Store([]byte(nil)) + if frame := vs.latestFrame.Load(); frame != nil { + vs.latestFrame.Store([]byte(nil)) + } } func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerConfig, From 693942d854238b38121c10a69a5a028534ec83dd Mon Sep 17 00:00:00 2001 From: sean yu Date: Mon, 16 Mar 2026 17:11:50 -0400 Subject: [PATCH 3/3] test: add clearLatestFrame coverage for 0-images, multi-image, and Images() error paths --- videostore/fetch_frames_test.go | 222 ++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) diff --git a/videostore/fetch_frames_test.go b/videostore/fetch_frames_test.go index 9fcc9f8d..6f117134 100644 --- a/videostore/fetch_frames_test.go +++ b/videostore/fetch_frames_test.go @@ -234,11 +234,13 @@ func TestFetchFrames(t *testing.T) { }) // Poll until a valid frame is stored + var storedFrameBytes []byte timeout := time.After(5 * time.Second) for { frame := vs.latestFrame.Load() frameBytes, ok := frame.([]byte) if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes break } select { @@ -248,6 +250,8 @@ func TestFetchFrames(t *testing.T) { time.Sleep(50 * time.Millisecond) } } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) // Switch camera to return depth image depthImage, err := camera.NamedImageFromBytes([]byte("fake depth data"), "depth", "image/vnd.viam.dep") @@ -257,11 +261,13 @@ func TestFetchFrames(t *testing.T) { mockCam.mu.Unlock() // Poll until the stale frame is cleared + var clearedFrameBytes []byte timeout = time.After(5 * time.Second) for { frame := vs.latestFrame.Load() frameBytes, ok := frame.([]byte) if ok && frameBytes == nil { + clearedFrameBytes = frameBytes break } select { @@ -271,6 +277,222 @@ func TestFetchFrames(t *testing.T) { time.Sleep(50 * time.Millisecond) } } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when 0 images returned", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return 0 images + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when more than 1 image returned", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return 2 images + secondImage, err := camera.NamedImageFromBytes(jpegData, "depth", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{jpegImage, secondImage} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when Images() returns error", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return an error + mockCam.mu.Lock() + mockCam.returnError = errors.New("camera unavailable") + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) }) t.Run("Continues on Images error", func(t *testing.T) {