Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions sources/components/ChatList.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as React from 'react';
import { useSession, useSessionMessages } from "@/sync/storage";
import { sync } from '@/sync/sync';
import { ActivityIndicator, FlatList, Platform, View } from 'react-native';
import { useCallback } from 'react';
import { useHeaderHeight } from '@/utils/responsive';
Expand All @@ -10,20 +11,30 @@ import { ChatFooter } from './ChatFooter';
import { Message } from '@/sync/typesMessage';

export const ChatList = React.memo((props: { session: Session }) => {
const { messages } = useSessionMessages(props.session.id);
const { messages, isLoaded } = useSessionMessages(props.session.id);
return (
<ChatListInternal
metadata={props.session.metadata}
sessionId={props.session.id}
messages={messages}
isLoaded={isLoaded}
/>
)
});

const ListHeader = React.memo(() => {
const ListHeader = React.memo((props: { isLoadingOlder: boolean }) => {
const headerHeight = useHeaderHeight();
const safeArea = useSafeAreaInsets();
return <View style={{ flexDirection: 'row', alignItems: 'center', height: headerHeight + safeArea.top + 32 }} />;
return (
<View>
{props.isLoadingOlder && (
<View style={{ paddingVertical: 12 }}>
<ActivityIndicator size="small" />
</View>
)}
<View style={{ flexDirection: 'row', alignItems: 'center', height: headerHeight + safeArea.top + 32 }} />
</View>
);
});

const ListFooter = React.memo((props: { sessionId: string }) => {
Expand All @@ -37,11 +48,50 @@ const ChatListInternal = React.memo((props: {
metadata: Metadata | null,
sessionId: string,
messages: Message[],
isLoaded: boolean,
}) => {
const [isLoadingOlder, setIsLoadingOlder] = React.useState(false);
const [hasMoreOlder, setHasMoreOlder] = React.useState<boolean | null>(null);

const keyExtractor = useCallback((item: any) => item.id, []);
const renderItem = useCallback(({ item }: { item: any }) => (
<MessageView message={item} metadata={props.metadata} sessionId={props.sessionId} />
), [props.metadata, props.sessionId]);

const loadOlder = useCallback(async () => {
if (!props.isLoaded || props.messages.length === 0) {
return;
}
if (isLoadingOlder || hasMoreOlder === false) {
return;
}

setIsLoadingOlder(true);
try {
const result = await sync.loadOlderMessages(props.sessionId);
if (result.status === 'no_more') {
setHasMoreOlder(false);
} else if (result.status === 'loaded') {
setHasMoreOlder(result.hasMore);
}
} catch (error) {
console.error('Failed to load older messages:', error);
} finally {
setIsLoadingOlder(false);
}
}, [props.isLoaded, props.messages.length, props.sessionId, isLoadingOlder, hasMoreOlder]);

const handleScroll = useCallback((e: any) => {
const n = e?.nativeEvent;
if (!n?.contentOffset || !n?.layoutMeasurement || !n?.contentSize) {
return;
}
const distanceFromEnd = n.contentSize.height - (n.contentOffset.y + n.layoutMeasurement.height);
if (distanceFromEnd < 200) {
void loadOlder();
}
}, [loadOlder]);

return (
<FlatList
data={props.messages}
Expand All @@ -54,8 +104,14 @@ const ChatListInternal = React.memo((props: {
keyboardShouldPersistTaps="handled"
keyboardDismissMode={Platform.OS === 'ios' ? 'interactive' : 'none'}
renderItem={renderItem}
onEndReachedThreshold={0.2}
onEndReached={() => {
void loadOlder();
}}
onScroll={handleScroll}
scrollEventThrottle={16}
ListHeaderComponent={<ListFooter sessionId={props.sessionId} />}
ListFooterComponent={<ListHeader />}
ListFooterComponent={<ListHeader isLoadingOlder={isLoadingOlder} />}
/>
)
});
});
158 changes: 155 additions & 3 deletions sources/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import { FeedItem } from './feedTypes';
import { UserProfile } from './friendTypes';
import { initializeTodoSync } from '../-zen/model/ops';

const SESSION_MESSAGES_PAGE_SIZE = 150;

class Sync {
// Spawned agents (especially in spawn mode) can take noticeable time to connect.
private static readonly SESSION_READY_TIMEOUT_MS = 10000;
Expand All @@ -52,6 +54,9 @@ class Sync {
private sessionsSync: InvalidateSync;
private messagesSync = new Map<string, InvalidateSync>();
private sessionReceivedMessages = new Map<string, Set<string>>();
private sessionOldestLoadedSeq = new Map<string, number>();
private sessionHasMoreOlderMessages = new Map<string, boolean>();
private sessionLoadingOlderMessages = new Set<string>();
private sessionDataKeys = new Map<string, Uint8Array>(); // Store session data encryption keys internally
private machineDataKeys = new Map<string, Uint8Array>(); // Store machine data encryption keys internally
private artifactDataKeys = new Map<string, Uint8Array>(); // Store artifact data encryption keys internally
Expand Down Expand Up @@ -209,7 +214,120 @@ class Sync {
}


async sendMessage(sessionId: string, text: string, displayText?: string) {
public async loadOlderMessages(sessionId: string): Promise<{ loaded: number; hasMore: boolean; status: 'loaded' | 'no_more' | 'not_ready' }> {
if (this.sessionLoadingOlderMessages.has(sessionId)) {
return {
loaded: 0,
hasMore: this.sessionHasMoreOlderMessages.get(sessionId) ?? true,
status: 'loaded'
};
}

const beforeSeq = this.sessionOldestLoadedSeq.get(sessionId);
const knownHasMore = this.sessionHasMoreOlderMessages.get(sessionId);
if (knownHasMore === false) {
return {
loaded: 0,
hasMore: false,
status: 'no_more'
};
}

// Pagination state is initialized during the initial `/messages` fetch. If we haven't
// seen it yet, don't permanently disable pagination on the UI side.
if (!beforeSeq) {
return {
loaded: 0,
hasMore: knownHasMore ?? true,
status: 'not_ready'
};
}

this.sessionLoadingOlderMessages.add(sessionId);
try {
// Get encryption - may not be ready yet if session was just created
const encryption = this.encryption.getSessionEncryption(sessionId);
if (!encryption) {
throw new Error(`Session encryption not ready for ${sessionId}`);
}

const response = await apiSocket.request(
`/v1/sessions/${sessionId}/messages?beforeSeq=${beforeSeq}&limit=${SESSION_MESSAGES_PAGE_SIZE}`
);
const data = await response.json() as {
messages: ApiMessage[];
hasMore?: boolean;
nextBeforeSeq?: number | null;
};

// Update pagination state
const nextBeforeSeq = typeof data.nextBeforeSeq === 'number' ? data.nextBeforeSeq : null;
if (nextBeforeSeq !== null) {
this.sessionOldestLoadedSeq.set(sessionId, Math.min(beforeSeq, nextBeforeSeq));
}
this.sessionHasMoreOlderMessages.set(sessionId, data.hasMore ?? false);

// Collect existing messages
let eixstingMessages = this.sessionReceivedMessages.get(sessionId);
if (!eixstingMessages) {
eixstingMessages = new Set<string>();
this.sessionReceivedMessages.set(sessionId, eixstingMessages);
}

// Filter out existing messages and prepare for batch decryption
const messagesToDecrypt: ApiMessage[] = [];
for (const msg of [...data.messages].reverse()) {
if (!eixstingMessages.has(msg.id)) {
messagesToDecrypt.push(msg);
}
}

// Batch decrypt all messages at once
const decryptedMessages = await encryption.decryptMessages(messagesToDecrypt);

// Process decrypted messages
const normalizedMessages: NormalizedMessage[] = [];
for (const decrypted of decryptedMessages) {
if (decrypted) {
eixstingMessages.add(decrypted.id);

// Normalize the decrypted message
const normalized = normalizeRawMessage(decrypted.id, decrypted.localId, decrypted.createdAt, decrypted.content);
if (normalized) {
normalizedMessages.push(normalized);
}

const prev = this.sessionOldestLoadedSeq.get(sessionId);
if (prev === undefined) {
this.sessionOldestLoadedSeq.set(sessionId, decrypted.seq);
} else {
this.sessionOldestLoadedSeq.set(sessionId, Math.min(prev, decrypted.seq));
}
}
}

// Apply to storage
this.applyMessages(sessionId, normalizedMessages);

return {
loaded: normalizedMessages.length,
hasMore: this.sessionHasMoreOlderMessages.get(sessionId) ?? false,
status: (this.sessionHasMoreOlderMessages.get(sessionId) ?? false) ? 'loaded' : 'no_more'
};
} finally {
this.sessionLoadingOlderMessages.delete(sessionId);
}
}


async sendMessage(
sessionId: string,
text: string,
displayText?: string,
options?: {
permissionMode?: PermissionMode;
}
) {

// Get encryption
const encryption = this.encryption.getSessionEncryption(sessionId);
Expand Down Expand Up @@ -476,7 +594,11 @@ class Sync {
throw new Error(`Failed to fetch sessions: ${response.status}`);
}

const data = await response.json();
const data = await response.json() as {
messages: ApiMessage[];
hasMore?: boolean;
nextBeforeSeq?: number | null;
};
const sessions = data.sessions as Array<{
id: string;
tag: string;
Expand Down Expand Up @@ -1394,8 +1516,31 @@ class Sync {

// Request
const response = await apiSocket.request(`/v1/sessions/${sessionId}/messages`);
const data = await response.json();
const data = await response.json() as {
messages: ApiMessage[];
hasMore?: boolean;
nextBeforeSeq?: number | null;
};



// Initialize pagination state from the server response (if present)
if (!this.sessionHasMoreOlderMessages.has(sessionId)) {
const hasMore = typeof data.hasMore === 'boolean'
? data.hasMore
: (Array.isArray(data.messages) ? data.messages.length >= SESSION_MESSAGES_PAGE_SIZE : false);
this.sessionHasMoreOlderMessages.set(sessionId, hasMore);
}

const nextBeforeSeq = typeof data.nextBeforeSeq === 'number' ? data.nextBeforeSeq : null;
if (nextBeforeSeq !== null) {
const prevOldest = this.sessionOldestLoadedSeq.get(sessionId);
this.sessionOldestLoadedSeq.set(sessionId, prevOldest === undefined ? nextBeforeSeq : Math.min(prevOldest, nextBeforeSeq));
} else if (Array.isArray(data.messages) && data.messages.length > 0) {
const minSeqInPage = Math.min(...data.messages.map((m) => m.seq));
const prevOldest = this.sessionOldestLoadedSeq.get(sessionId);
this.sessionOldestLoadedSeq.set(sessionId, prevOldest === undefined ? minSeqInPage : Math.min(prevOldest, minSeqInPage));
}
// Collect existing messages
let eixstingMessages = this.sessionReceivedMessages.get(sessionId);
if (!eixstingMessages) {
Expand Down Expand Up @@ -1428,6 +1573,13 @@ class Sync {
if (normalized) {
normalizedMessages.push(normalized);
}

const prev = this.sessionOldestLoadedSeq.get(sessionId);
if (prev === undefined) {
this.sessionOldestLoadedSeq.set(sessionId, decrypted.seq);
} else {
this.sessionOldestLoadedSeq.set(sessionId, Math.min(prev, decrypted.seq));
}
}
}
console.log('Batch decrypted and normalized messages in', Date.now() - start, 'ms');
Expand Down