diff --git a/sources/components/ChatList.tsx b/sources/components/ChatList.tsx index 72ca553cc..5f0777f33 100644 --- a/sources/components/ChatList.tsx +++ b/sources/components/ChatList.tsx @@ -1,5 +1,6 @@ import * as React from 'react'; import { useSession, useSessionMessages } from "@/sync/storage"; +import { sync } from '@/sync/sync'; import { ActivityIndicator, FlatList, Platform, View } from 'react-native'; import { useCallback } from 'react'; import { useHeaderHeight } from '@/utils/responsive'; @@ -10,20 +11,30 @@ import { ChatFooter } from './ChatFooter'; import { Message } from '@/sync/typesMessage'; export const ChatList = React.memo((props: { session: Session }) => { - const { messages } = useSessionMessages(props.session.id); + const { messages, isLoaded } = useSessionMessages(props.session.id); return ( ) }); -const ListHeader = React.memo(() => { +const ListHeader = React.memo((props: { isLoadingOlder: boolean }) => { const headerHeight = useHeaderHeight(); const safeArea = useSafeAreaInsets(); - return ; + return ( + + {props.isLoadingOlder && ( + + + + )} + + + ); }); const ListFooter = React.memo((props: { sessionId: string }) => { @@ -37,11 +48,50 @@ const ChatListInternal = React.memo((props: { metadata: Metadata | null, sessionId: string, messages: Message[], + isLoaded: boolean, }) => { + const [isLoadingOlder, setIsLoadingOlder] = React.useState(false); + const [hasMoreOlder, setHasMoreOlder] = React.useState(null); + const keyExtractor = useCallback((item: any) => item.id, []); const renderItem = useCallback(({ item }: { item: any }) => ( ), [props.metadata, props.sessionId]); + + const loadOlder = useCallback(async () => { + if (!props.isLoaded || props.messages.length === 0) { + return; + } + if (isLoadingOlder || hasMoreOlder === false) { + return; + } + + setIsLoadingOlder(true); + try { + const result = await sync.loadOlderMessages(props.sessionId); + if (result.status === 'no_more') { + setHasMoreOlder(false); + } else if (result.status === 'loaded') { + setHasMoreOlder(result.hasMore); + } + } catch (error) { + console.error('Failed to load older messages:', error); + } finally { + setIsLoadingOlder(false); + } + }, [props.isLoaded, props.messages.length, props.sessionId, isLoadingOlder, hasMoreOlder]); + + const handleScroll = useCallback((e: any) => { + const n = e?.nativeEvent; + if (!n?.contentOffset || !n?.layoutMeasurement || !n?.contentSize) { + return; + } + const distanceFromEnd = n.contentSize.height - (n.contentOffset.y + n.layoutMeasurement.height); + if (distanceFromEnd < 200) { + void loadOlder(); + } + }, [loadOlder]); + return ( { + void loadOlder(); + }} + onScroll={handleScroll} + scrollEventThrottle={16} ListHeaderComponent={} - ListFooterComponent={} + ListFooterComponent={} /> ) -}); \ No newline at end of file +}); diff --git a/sources/sync/sync.ts b/sources/sync/sync.ts index 49d9c07fd..54d77e196 100644 --- a/sources/sync/sync.ts +++ b/sources/sync/sync.ts @@ -40,6 +40,8 @@ import { FeedItem } from './feedTypes'; import { UserProfile } from './friendTypes'; import { initializeTodoSync } from '../-zen/model/ops'; +const SESSION_MESSAGES_PAGE_SIZE = 150; + class Sync { // Spawned agents (especially in spawn mode) can take noticeable time to connect. private static readonly SESSION_READY_TIMEOUT_MS = 10000; @@ -52,6 +54,9 @@ class Sync { private sessionsSync: InvalidateSync; private messagesSync = new Map(); private sessionReceivedMessages = new Map>(); + private sessionOldestLoadedSeq = new Map(); + private sessionHasMoreOlderMessages = new Map(); + private sessionLoadingOlderMessages = new Set(); private sessionDataKeys = new Map(); // Store session data encryption keys internally private machineDataKeys = new Map(); // Store machine data encryption keys internally private artifactDataKeys = new Map(); // Store artifact data encryption keys internally @@ -209,7 +214,120 @@ class Sync { } - async sendMessage(sessionId: string, text: string, displayText?: string) { + public async loadOlderMessages(sessionId: string): Promise<{ loaded: number; hasMore: boolean; status: 'loaded' | 'no_more' | 'not_ready' }> { + if (this.sessionLoadingOlderMessages.has(sessionId)) { + return { + loaded: 0, + hasMore: this.sessionHasMoreOlderMessages.get(sessionId) ?? true, + status: 'loaded' + }; + } + + const beforeSeq = this.sessionOldestLoadedSeq.get(sessionId); + const knownHasMore = this.sessionHasMoreOlderMessages.get(sessionId); + if (knownHasMore === false) { + return { + loaded: 0, + hasMore: false, + status: 'no_more' + }; + } + + // Pagination state is initialized during the initial `/messages` fetch. If we haven't + // seen it yet, don't permanently disable pagination on the UI side. + if (!beforeSeq) { + return { + loaded: 0, + hasMore: knownHasMore ?? true, + status: 'not_ready' + }; + } + + this.sessionLoadingOlderMessages.add(sessionId); + try { + // Get encryption - may not be ready yet if session was just created + const encryption = this.encryption.getSessionEncryption(sessionId); + if (!encryption) { + throw new Error(`Session encryption not ready for ${sessionId}`); + } + + const response = await apiSocket.request( + `/v1/sessions/${sessionId}/messages?beforeSeq=${beforeSeq}&limit=${SESSION_MESSAGES_PAGE_SIZE}` + ); + const data = await response.json() as { + messages: ApiMessage[]; + hasMore?: boolean; + nextBeforeSeq?: number | null; + }; + + // Update pagination state + const nextBeforeSeq = typeof data.nextBeforeSeq === 'number' ? data.nextBeforeSeq : null; + if (nextBeforeSeq !== null) { + this.sessionOldestLoadedSeq.set(sessionId, Math.min(beforeSeq, nextBeforeSeq)); + } + this.sessionHasMoreOlderMessages.set(sessionId, data.hasMore ?? false); + + // Collect existing messages + let eixstingMessages = this.sessionReceivedMessages.get(sessionId); + if (!eixstingMessages) { + eixstingMessages = new Set(); + this.sessionReceivedMessages.set(sessionId, eixstingMessages); + } + + // Filter out existing messages and prepare for batch decryption + const messagesToDecrypt: ApiMessage[] = []; + for (const msg of [...data.messages].reverse()) { + if (!eixstingMessages.has(msg.id)) { + messagesToDecrypt.push(msg); + } + } + + // Batch decrypt all messages at once + const decryptedMessages = await encryption.decryptMessages(messagesToDecrypt); + + // Process decrypted messages + const normalizedMessages: NormalizedMessage[] = []; + for (const decrypted of decryptedMessages) { + if (decrypted) { + eixstingMessages.add(decrypted.id); + + // Normalize the decrypted message + const normalized = normalizeRawMessage(decrypted.id, decrypted.localId, decrypted.createdAt, decrypted.content); + if (normalized) { + normalizedMessages.push(normalized); + } + + const prev = this.sessionOldestLoadedSeq.get(sessionId); + if (prev === undefined) { + this.sessionOldestLoadedSeq.set(sessionId, decrypted.seq); + } else { + this.sessionOldestLoadedSeq.set(sessionId, Math.min(prev, decrypted.seq)); + } + } + } + + // Apply to storage + this.applyMessages(sessionId, normalizedMessages); + + return { + loaded: normalizedMessages.length, + hasMore: this.sessionHasMoreOlderMessages.get(sessionId) ?? false, + status: (this.sessionHasMoreOlderMessages.get(sessionId) ?? false) ? 'loaded' : 'no_more' + }; + } finally { + this.sessionLoadingOlderMessages.delete(sessionId); + } + } + + + async sendMessage( + sessionId: string, + text: string, + displayText?: string, + options?: { + permissionMode?: PermissionMode; + } + ) { // Get encryption const encryption = this.encryption.getSessionEncryption(sessionId); @@ -476,7 +594,11 @@ class Sync { throw new Error(`Failed to fetch sessions: ${response.status}`); } - const data = await response.json(); + const data = await response.json() as { + messages: ApiMessage[]; + hasMore?: boolean; + nextBeforeSeq?: number | null; + }; const sessions = data.sessions as Array<{ id: string; tag: string; @@ -1394,8 +1516,31 @@ class Sync { // Request const response = await apiSocket.request(`/v1/sessions/${sessionId}/messages`); - const data = await response.json(); + const data = await response.json() as { + messages: ApiMessage[]; + hasMore?: boolean; + nextBeforeSeq?: number | null; + }; + + + // Initialize pagination state from the server response (if present) + if (!this.sessionHasMoreOlderMessages.has(sessionId)) { + const hasMore = typeof data.hasMore === 'boolean' + ? data.hasMore + : (Array.isArray(data.messages) ? data.messages.length >= SESSION_MESSAGES_PAGE_SIZE : false); + this.sessionHasMoreOlderMessages.set(sessionId, hasMore); + } + + const nextBeforeSeq = typeof data.nextBeforeSeq === 'number' ? data.nextBeforeSeq : null; + if (nextBeforeSeq !== null) { + const prevOldest = this.sessionOldestLoadedSeq.get(sessionId); + this.sessionOldestLoadedSeq.set(sessionId, prevOldest === undefined ? nextBeforeSeq : Math.min(prevOldest, nextBeforeSeq)); + } else if (Array.isArray(data.messages) && data.messages.length > 0) { + const minSeqInPage = Math.min(...data.messages.map((m) => m.seq)); + const prevOldest = this.sessionOldestLoadedSeq.get(sessionId); + this.sessionOldestLoadedSeq.set(sessionId, prevOldest === undefined ? minSeqInPage : Math.min(prevOldest, minSeqInPage)); + } // Collect existing messages let eixstingMessages = this.sessionReceivedMessages.get(sessionId); if (!eixstingMessages) { @@ -1428,6 +1573,13 @@ class Sync { if (normalized) { normalizedMessages.push(normalized); } + + const prev = this.sessionOldestLoadedSeq.get(sessionId); + if (prev === undefined) { + this.sessionOldestLoadedSeq.set(sessionId, decrypted.seq); + } else { + this.sessionOldestLoadedSeq.set(sessionId, Math.min(prev, decrypted.seq)); + } } } console.log('Batch decrypted and normalized messages in', Date.now() - start, 'ms');