diff --git a/videostore/fetch_frames_test.go b/videostore/fetch_frames_test.go index 571b2d05..6f117134 100644 --- a/videostore/fetch_frames_test.go +++ b/videostore/fetch_frames_test.go @@ -6,6 +6,7 @@ import ( "errors" "image" "image/jpeg" + "sync" "sync/atomic" "testing" "time" @@ -20,9 +21,14 @@ import ( // mockCamera implements camera.Camera for testing fetchFrames behavior. type mockCamera struct { camera.Camera - name resource.Name + // mu guards fields that may be modified mid-test while fetchFrames reads them + // like imagesToReturn. Otherwise this test is unsafe and go test's -race flag + // would get really upset. + mu sync.Mutex imagesToReturn []camera.NamedImage - returnError error + + name resource.Name + returnError error } func (m *mockCamera) Name() resource.Name { @@ -38,6 +44,8 @@ func (m *mockCamera) Images( resource.ResponseMetadata, error, ) { + m.mu.Lock() + defer m.mu.Unlock() if m.returnError != nil { return nil, resource.ResponseMetadata{}, m.returnError } @@ -199,6 +207,294 @@ func TestFetchFrames(t *testing.T) { test.That(t, len(frameBytes), test.ShouldEqual, 0) }) + t.Run("Clears stale frame when camera switches to invalid MIME type", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return depth image + depthImage, err := camera.NamedImageFromBytes([]byte("fake depth data"), "depth", "image/vnd.viam.dep") + test.That(t, err, test.ShouldBeNil) + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{depthImage} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when 0 images returned", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return 0 images + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when more than 1 image returned", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return 2 images + secondImage, err := camera.NamedImageFromBytes(jpegData, "depth", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + mockCam.mu.Lock() + mockCam.imagesToReturn = []camera.NamedImage{jpegImage, secondImage} + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + + t.Run("Clears stale frame when Images() returns error", func(t *testing.T) { + jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG) + test.That(t, err, test.ShouldBeNil) + + mockCam := &mockCamera{ + name: resource.NewName(camera.API, "test-camera"), + imagesToReturn: []camera.NamedImage{ + jpegImage, + }, + } + + vs := &videostore{ + config: Config{}, + logger: logger, + latestFrame: &atomic.Value{}, + } + vs.latestFrame.Store([]byte{}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go vs.fetchFrames(ctx, FramePollerConfig{ + Framerate: 30, + Camera: mockCam, + }) + + // Poll until a valid frame is stored + var storedFrameBytes []byte + timeout := time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && len(frameBytes) > 0 { + storedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for valid frame to be stored") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, storedFrameBytes, test.ShouldNotBeNil) + test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0) + + // Switch camera to return an error + mockCam.mu.Lock() + mockCam.returnError = errors.New("camera unavailable") + mockCam.mu.Unlock() + + // Poll until the stale frame is cleared + var clearedFrameBytes []byte + timeout = time.After(5 * time.Second) + for { + frame := vs.latestFrame.Load() + frameBytes, ok := frame.([]byte) + if ok && frameBytes == nil { + clearedFrameBytes = frameBytes + break + } + select { + case <-timeout: + t.Fatal("timed out waiting for stale frame to be cleared") + default: + time.Sleep(50 * time.Millisecond) + } + } + test.That(t, clearedFrameBytes, test.ShouldBeNil) + }) + t.Run("Continues on Images error", func(t *testing.T) { mockCam := &mockCamera{ name: resource.NewName(camera.API, "test-camera"), diff --git a/videostore/videostore.go b/videostore/videostore.go index d86fc64c..4c31a584 100644 --- a/videostore/videostore.go +++ b/videostore/videostore.go @@ -524,6 +524,14 @@ func (vs *videostore) Save(_ context.Context, r *SaveRequest) (*SaveResponse, er return &SaveResponse{Filename: uploadFileName}, nil } +// clearLatestFrame sets the latest frame atomic to a nil byte slice. +// Useful to remove stale frames when underlying cam is unhealthy. +func (vs *videostore) clearLatestFrame() { + if frame := vs.latestFrame.Load(); frame != nil { + vs.latestFrame.Store([]byte(nil)) + } +} + func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerConfig, ) { frameInterval := time.Second / time.Duration(framePoller.Framerate) @@ -541,6 +549,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo framePoller.Camera.Name(), err, ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } @@ -550,6 +559,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo "no images received from camera %s", framePoller.Camera.Name(), ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } else if len(namedImages) > 1 { @@ -558,6 +568,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo len(namedImages), framePoller.Camera.Name(), ) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } @@ -566,11 +577,13 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo data, err := namedImage.Bytes(ctx) if err != nil { vs.logger.Warnf("failed to get bytes from image: %v", err) + vs.clearLatestFrame() time.Sleep(retryIntervalSeconds * time.Second) continue } if actualMimeType, _ := rutils.CheckLazyMIMEType(namedImage.MimeType()); actualMimeType != rutils.MimeTypeJPEG { vs.logger.Warnf("expected image mime type %s got %s: ", rutils.MimeTypeJPEG, actualMimeType) + vs.clearLatestFrame() continue } vs.latestFrame.Store(data)