Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 298 additions & 2 deletions videostore/fetch_frames_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"image"
"image/jpeg"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -20,9 +21,14 @@ import (
// mockCamera implements camera.Camera for testing fetchFrames behavior.
type mockCamera struct {
camera.Camera
name resource.Name
// mu guards fields that may be modified mid-test while fetchFrames reads them
// like imagesToReturn. Otherwise this test is unsafe and go test's -race flag
// would get really upset.
mu sync.Mutex
imagesToReturn []camera.NamedImage
returnError error

name resource.Name
returnError error
}

func (m *mockCamera) Name() resource.Name {
Expand All @@ -38,6 +44,8 @@ func (m *mockCamera) Images(
resource.ResponseMetadata,
error,
) {
m.mu.Lock()
defer m.mu.Unlock()
if m.returnError != nil {
return nil, resource.ResponseMetadata{}, m.returnError
}
Expand Down Expand Up @@ -199,6 +207,294 @@ func TestFetchFrames(t *testing.T) {
test.That(t, len(frameBytes), test.ShouldEqual, 0)
})

t.Run("Clears stale frame when camera switches to invalid MIME type", func(t *testing.T) {
jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG)
test.That(t, err, test.ShouldBeNil)

mockCam := &mockCamera{
name: resource.NewName(camera.API, "test-camera"),
imagesToReturn: []camera.NamedImage{
jpegImage,
},
}

vs := &videostore{
config: Config{},
logger: logger,
latestFrame: &atomic.Value{},
}
vs.latestFrame.Store([]byte{})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go vs.fetchFrames(ctx, FramePollerConfig{
Framerate: 30,
Camera: mockCam,
})

// Poll until a valid frame is stored
var storedFrameBytes []byte
timeout := time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && len(frameBytes) > 0 {
storedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for valid frame to be stored")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, storedFrameBytes, test.ShouldNotBeNil)
test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0)

// Switch camera to return depth image
depthImage, err := camera.NamedImageFromBytes([]byte("fake depth data"), "depth", "image/vnd.viam.dep")
test.That(t, err, test.ShouldBeNil)
mockCam.mu.Lock()
mockCam.imagesToReturn = []camera.NamedImage{depthImage}
mockCam.mu.Unlock()

// Poll until the stale frame is cleared
var clearedFrameBytes []byte
timeout = time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && frameBytes == nil {
clearedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for stale frame to be cleared")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, clearedFrameBytes, test.ShouldBeNil)
})

t.Run("Clears stale frame when 0 images returned", func(t *testing.T) {
jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG)
test.That(t, err, test.ShouldBeNil)

mockCam := &mockCamera{
name: resource.NewName(camera.API, "test-camera"),
imagesToReturn: []camera.NamedImage{
jpegImage,
},
}

vs := &videostore{
config: Config{},
logger: logger,
latestFrame: &atomic.Value{},
}
vs.latestFrame.Store([]byte{})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go vs.fetchFrames(ctx, FramePollerConfig{
Framerate: 30,
Camera: mockCam,
})

// Poll until a valid frame is stored
var storedFrameBytes []byte
timeout := time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && len(frameBytes) > 0 {
storedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for valid frame to be stored")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, storedFrameBytes, test.ShouldNotBeNil)
test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0)

// Switch camera to return 0 images
mockCam.mu.Lock()
mockCam.imagesToReturn = []camera.NamedImage{}
mockCam.mu.Unlock()

// Poll until the stale frame is cleared
var clearedFrameBytes []byte
timeout = time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && frameBytes == nil {
clearedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for stale frame to be cleared")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, clearedFrameBytes, test.ShouldBeNil)
})

t.Run("Clears stale frame when more than 1 image returned", func(t *testing.T) {
jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG)
test.That(t, err, test.ShouldBeNil)

mockCam := &mockCamera{
name: resource.NewName(camera.API, "test-camera"),
imagesToReturn: []camera.NamedImage{
jpegImage,
},
}

vs := &videostore{
config: Config{},
logger: logger,
latestFrame: &atomic.Value{},
}
vs.latestFrame.Store([]byte{})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go vs.fetchFrames(ctx, FramePollerConfig{
Framerate: 30,
Camera: mockCam,
})

// Poll until a valid frame is stored
var storedFrameBytes []byte
timeout := time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && len(frameBytes) > 0 {
storedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for valid frame to be stored")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, storedFrameBytes, test.ShouldNotBeNil)
test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0)

// Switch camera to return 2 images
secondImage, err := camera.NamedImageFromBytes(jpegData, "depth", rutils.MimeTypeJPEG)
test.That(t, err, test.ShouldBeNil)
mockCam.mu.Lock()
mockCam.imagesToReturn = []camera.NamedImage{jpegImage, secondImage}
mockCam.mu.Unlock()

// Poll until the stale frame is cleared
var clearedFrameBytes []byte
timeout = time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && frameBytes == nil {
clearedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for stale frame to be cleared")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, clearedFrameBytes, test.ShouldBeNil)
})

t.Run("Clears stale frame when Images() returns error", func(t *testing.T) {
jpegImage, err := camera.NamedImageFromBytes(jpegData, "color", rutils.MimeTypeJPEG)
test.That(t, err, test.ShouldBeNil)

mockCam := &mockCamera{
name: resource.NewName(camera.API, "test-camera"),
imagesToReturn: []camera.NamedImage{
jpegImage,
},
}

vs := &videostore{
config: Config{},
logger: logger,
latestFrame: &atomic.Value{},
}
vs.latestFrame.Store([]byte{})

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

go vs.fetchFrames(ctx, FramePollerConfig{
Framerate: 30,
Camera: mockCam,
})

// Poll until a valid frame is stored
var storedFrameBytes []byte
timeout := time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && len(frameBytes) > 0 {
storedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for valid frame to be stored")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, storedFrameBytes, test.ShouldNotBeNil)
test.That(t, len(storedFrameBytes), test.ShouldBeGreaterThan, 0)

// Switch camera to return an error
mockCam.mu.Lock()
mockCam.returnError = errors.New("camera unavailable")
mockCam.mu.Unlock()

// Poll until the stale frame is cleared
var clearedFrameBytes []byte
timeout = time.After(5 * time.Second)
for {
frame := vs.latestFrame.Load()
frameBytes, ok := frame.([]byte)
if ok && frameBytes == nil {
clearedFrameBytes = frameBytes
break
}
select {
case <-timeout:
t.Fatal("timed out waiting for stale frame to be cleared")
default:
time.Sleep(50 * time.Millisecond)
}
}
test.That(t, clearedFrameBytes, test.ShouldBeNil)
})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test case here for valid jpeg mime_type but invalid image bytes? to make sure we clear in that case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I will add.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looked into it: there's no explicit path in our code that checks and propagates a sentinel error we own on bad bytes. Want me to add that? Or would you rather assert on err string from underlying lib?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use a mocked NamedImage the errors out on Bytes()?

t.Run("Continues on Images error", func(t *testing.T) {
mockCam := &mockCamera{
name: resource.NewName(camera.API, "test-camera"),
Expand Down
13 changes: 13 additions & 0 deletions videostore/videostore.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,14 @@ func (vs *videostore) Save(_ context.Context, r *SaveRequest) (*SaveResponse, er
return &SaveResponse{Filename: uploadFileName}, nil
}

// clearLatestFrame sets the latest frame atomic to a nil byte slice.
// Useful to remove stale frames when underlying cam is unhealthy.
func (vs *videostore) clearLatestFrame() {
if frame := vs.latestFrame.Load(); frame != nil {
vs.latestFrame.Store([]byte(nil))
}
}

func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerConfig,
) {
frameInterval := time.Second / time.Duration(framePoller.Framerate)
Expand All @@ -541,6 +549,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo
framePoller.Camera.Name(),
err,
)
vs.clearLatestFrame()
time.Sleep(retryIntervalSeconds * time.Second)
continue
}
Expand All @@ -550,6 +559,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo
"no images received from camera %s",
framePoller.Camera.Name(),
)
vs.clearLatestFrame()
time.Sleep(retryIntervalSeconds * time.Second)
continue
} else if len(namedImages) > 1 {
Expand All @@ -558,6 +568,7 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo
len(namedImages),
framePoller.Camera.Name(),
)
vs.clearLatestFrame()
time.Sleep(retryIntervalSeconds * time.Second)
continue
}
Expand All @@ -566,11 +577,13 @@ func (vs *videostore) fetchFrames(ctx context.Context, framePoller FramePollerCo
data, err := namedImage.Bytes(ctx)
if err != nil {
vs.logger.Warnf("failed to get bytes from image: %v", err)
vs.clearLatestFrame()
time.Sleep(retryIntervalSeconds * time.Second)
continue
}
if actualMimeType, _ := rutils.CheckLazyMIMEType(namedImage.MimeType()); actualMimeType != rutils.MimeTypeJPEG {
vs.logger.Warnf("expected image mime type %s got %s: ", rutils.MimeTypeJPEG, actualMimeType)
vs.clearLatestFrame()
continue
}
vs.latestFrame.Store(data)
Expand Down
Loading