Skip to content

Conversation

@takaokouji
Copy link

Summary

Client-side implementation of polling-based event notification fallback for Mesh v2. Adds WebSocket connection testing and automatic protocol switching to support environments where WebSocket connections are blocked.

Implementation Details

WebSocket Connection Test

Added testWebSocket() method to detect WebSocket availability:

testWebSocket() {
  return new Promise(resolve => {
    try {
      const wsUrl = GRAPHQL_ENDPOINT
        .replace('https://', 'wss://')
        .replace('appsync-api', 'appsync-realtime-api');

      const socket = new WebSocket(wsUrl, 'graphql-ws');
      const timeout = setTimeout(() => {
        socket.close();
        resolve(false);
      }, 3000);

      socket.onopen = () => {
        clearTimeout(timeout);
        socket.close();
        resolve(true);
      };

      socket.onerror = () => {
        clearTimeout(timeout);
        resolve(false);
      };
    } catch (error) {
      resolve(false);
    }
  });
}

Protocol Selection

Host (Group Creation)

  1. Tests WebSocket connectivity using testWebSocket()
  2. Sets useWebSocket flag based on test result
  3. Sends flag to server via createGroup mutation
  4. Starts subscription or polling based on flag

Member (Group Join)

  1. Receives useWebSocket flag from joinGroup response
  2. Starts subscription or polling based on flag

Polling Implementation

Start Polling

startPolling() {
  this.stopPolling();
  if (!this.groupId) return;

  log.info(`Mesh V2: Starting event polling (Interval: ${this.pollingIntervalSeconds}s)`);

  if (!this.lastFetchTime) {
    this.lastFetchTime = new Date().toISOString();
  }

  this.pollingTimer = setInterval(() => {
    this.pollEvents();
  }, this.pollingIntervalSeconds * 1000);
}

Poll Events

async pollEvents() {
  if (!this.groupId || !this.client || this.useWebSocket) return;

  try {
    const result = await this.client.query({
      query: GET_EVENTS_SINCE,
      variables: {
        groupId: this.groupId,
        domain: this.domain,
        since: this.lastFetchTime
      },
      fetchPolicy: 'network-only'
    });

    const events = result.data.getEventsSince;
    if (events && events.length > 0) {
      log.info(`Mesh V2: Polled ${events.length} events`);

      const batchEvent = {
        firedByNodeId: 'polling-server',
        events: events.map(e => ({ /* ... */ }))
      };
      this.handleBatchEvent(batchEvent);

      // Update cursor
      this.lastFetchTime = events[events.length - 1].cursor;
    }
  } catch (error) {
    log.error(`Mesh V2: Event polling failed: ${error}`);
    const reason = this.shouldDisconnectOnError(error);
    if (reason) {
      this.cleanupAndDisconnect(reason);
    }
  }
}

Event Transmission Protocol Switching

if (this.useWebSocket) {
  // Protocol A: WebSocket
  await this.client.mutate({
    mutation: FIRE_EVENTS,
    variables: { /* ... */ }
  });
} else {
  // Protocol B: Polling
  const result = await this.client.mutate({
    mutation: RECORD_EVENTS,
    variables: { /* ... */ }
  });

  // Update lastFetchTime if null
  if (!this.lastFetchTime) {
    this.lastFetchTime = result.data.recordEventsByNode.nextSince;
  }
}

Cursor Management

  • lastFetchTime: Stores last cursor (or initial ISO timestamp)
  • Updated to Event.cursor after each polling
  • Used as since parameter in next getEventsSince call

GraphQL Operations

Added new operations to gql-operations.js:

  • RECORD_EVENTS: Mutation for recording events (polling)
  • GET_EVENTS_SINCE: Query for retrieving events (polling)

Testing

Unit Tests

New test file: test/unit/mesh_service_v2_polling.js

  • pollEvents fetches and handles events correctly
  • fireEventsBatch uses RECORD_EVENTS when useWebSocket is false
  • Cursor-based pagination works correctly
  • lastFetchTime is updated with cursor value

Test Coverage

  • WebSocket test scenarios
  • Protocol switching (host and member)
  • Polling start/stop
  • Event polling and processing
  • Cursor management

Configuration

Default Values

  • Polling interval: 2 seconds (from server pollingIntervalSeconds)
  • WebSocket test timeout: 3 seconds

Cost Optimization

  • WebSocket Priority: Uses WebSocket when available (lower cost)
  • Polling Fallback: Only uses polling when WebSocket is unavailable
  • Cursor-based: Efficient pagination with cursor

Breaking Changes

None. This is a backward-compatible addition.

Related

🤖 Generated with Claude Code

takaokouji and others added 14 commits January 11, 2026 13:07
- Added testWebSocket to detect protocol availability
- Implemented polling fallback in MeshV2Service
- Updated gql-operations.js with new polling mutations and queries
- Exported GRAPHQL_ENDPOINT from mesh-client.js
- Added unit tests for polling functionality

Co-Authored-By: Gemini <noreply@google.com>
- Change GetEventsSince  type from AWSDateTime! to String!
- Improve testWebSocket URL construction to support custom domains
- Added 'cursor' field to GET_EVENTS_SINCE query to correctly update lastFetchTime
- Added guard in pollEvents to prevent execution if lastFetchTime is null
- Set lastFetchTime to createdAt in createGroup
- Set lastFetchTime to current time in joinGroup
- Remove redundant initializations in startPolling and fireEventsBatch
- Added getForcePollingFromUrl to utils.js
- Use forcePolling setting in MeshV2Service to bypass WebSocket test
- Override useWebSocket to false if forcePolling is enabled during group join
- Updated JOIN_GROUP, RENEW_HEARTBEAT, and SEND_MEMBER_HEARTBEAT queries to include createdAt
- Initialize lastFetchTime to empty string in constructor
- Use node.createdAt from server when joining group
- Use event.cursor to update lastFetchTime during polling
- Added fallback for lastFetchTime in pollEvents to avoid GraphQL error
- Updated tests with createdAt mock data
- Set lastFetchTime to current client time just before createGroup/joinGroup mutations
- Reverted server-side createdAt dependency in GraphQL queries
- Ensure stopPolling is called during cleanup to minimize requests
- Updated tests to match reverted schema
- Removed hallucinated handleEvent call
- Filter out self-fired events in pollEvents
- Sort and queue other nodes' events with correct offsets for playback
- Update lastFetchTime consistently from the latest polled event
- Added _queueEventsForPlayback private method
- Simplified pollEvents and handleBatchEvent by using the new method
- Ensured consistent sorting, offset calculation, and playback start trigger
- Update lastFetchTime in fireEventsBatch
- Check pendingBroadcasts in pollEvents test
- Add queryCount tracking to pollEvents
- Add tests for self-event filtering and lastFetchTime fallback
- Matched client operations with updated server schema
- Ensured consistency across all group-related GraphQL operations
@takaokouji takaokouji merged commit dcf613e into develop Jan 11, 2026
1 check passed
@takaokouji takaokouji deleted the feature/polling-fallback branch January 11, 2026 13:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants