From 620ca8f614098c8293b65c604e512ce072dbbc17 Mon Sep 17 00:00:00 2001 From: haike Date: Wed, 5 Jun 2024 11:35:18 +0800 Subject: [PATCH] feat: add p2p chat --- src/client/chat/BackendChat.ts | 116 + src/client/chat/ChatApi.ts | 234 ++- src/client/chat/WsProvider.ts | 137 +- src/components/Header/index.tsx | 25 + src/main.tsx | 1 + src/pages/Chat/components/Message.tsx | 74 +- src/pages/Chat/components/Network.tsx | 19 +- src/proto/ZMsg.proto | 187 ++ src/proto/ZMsg.ts | 2560 +++++++++++++++++++++++ src/redux/store/account/accountSlice.ts | 1 + src/utils/index.ts | 9 +- 11 files changed, 3224 insertions(+), 139 deletions(-) create mode 100644 src/client/chat/BackendChat.ts create mode 100644 src/proto/ZMsg.proto create mode 100644 src/proto/ZMsg.ts diff --git a/src/client/chat/BackendChat.ts b/src/client/chat/BackendChat.ts new file mode 100644 index 0000000..fb9cc13 --- /dev/null +++ b/src/client/chat/BackendChat.ts @@ -0,0 +1,116 @@ +import { ChatMessage } from '../../proto/ChatMessage'; +// import { messageStorageSortKey } from '@root/src/shared/account'; +import WsProvider from './WsProvider'; +// import keystoreStorage from '@root/src/shared/storages/keystoreStorage'; +// import messagesSessionStorage from '@root/src/shared/storages/messageSessionStorage'; +// import messagesStorage from '@root/src/shared/storages/messageStorage'; +import { hexToU8a, u8aToHex } from '@/utils'; +import ChatApi from './ChatApi'; +import EventEmitter from 'eventemitter3'; + +export class BackendChat { + isDisconnect: boolean = false; + chatApi: ChatApi; + endpoint?: string; + seedRpc?: string; + eventemitter: EventEmitter; + constructor() { + this.eventemitter = new EventEmitter(); + this.chatApi = new ChatApi(); + } + + changeEndPoint(endpoint: string, seedRpc?: string) { + if (this.chatApi?.provider) { + this.chatApi.provider.disconnect(); + } + this.endpoint = endpoint; + const wsProvider = new WsProvider(endpoint); + this.chatApi = new ChatApi({ + provider: wsProvider, + seedRpc: seedRpc || this.seedRpc || '', + }); + this.chatApi.accountSubscribeMessage(this.onMessage as never); + this.chatApi.onError(this.onError); + return this.chatApi.isReady; + } + + async sendMessage( + from: string, + to: string, + message: string, + fromNode: string, + toNode: string, + signature?: Uint8Array, + ) { + const data = { + from, + to, + message, + fromNode, + toNode, + signature, + }; + console.log('BackendChat', this.chatApi, data); + const outMsg = await this.chatApi.accountSendMessage(from, to, message, fromNode, toNode, signature); + // const storageKey = messageStorageSortKey(from, to); + const mf = { + id: u8aToHex(outMsg.id), + from, + to, + message, + sign: '', + }; + return mf; + // messagesStorage.addMessage(storageKey, mf); + // messagesSessionStorage.updateSession(mf.from, { to: mf.to }); + } + + switchSeedRpc = async (rpc: string, address: string) => { + this.seedRpc = rpc; + this.chatApi.seedRpcServer = rpc; + // const address = await keystoreStorage.get(); + if (!address) return; + // await keystoreStorage.set(address); + const data = await this.chatApi.getEndpoint(address); + if (!data) return; + const url = new URL(data.wsAddr); + const wsUrl = `${data.wsAddr}/ws${url.port}`; + await this.changeEndPoint(wsUrl); + this.chatApi.provider!.websocket!.send(hexToU8a(address)); + }; + + changeAccount = async (address: string) => { + if (!address) return; + // await keystoreStorage.set(address); + const data = await this.chatApi.getEndpoint(address); + if (!data) return; + const url = new URL(data.wsAddr); + const wsUrl = `${data.wsAddr}/ws${url.port}`; + await this.changeEndPoint(wsUrl); + this.chatApi?.provider?.websocket?.send(hexToU8a(address)); + }; + + onMessage = async (chatMessage: ChatMessage) => { + + try { + this.eventemitter.emit('onMessage', chatMessage); + console.log('onMessage', this) + } catch (error) { + console.error('message decode error', error); + } + } + + async disconnect() { + this.chatApi?.provider?.disconnect(); + this.eventemitter.removeAllListeners(); + } + subscribeMessage(cb: (message: ChatMessage) => void){ + this.eventemitter.addListener('onMessage', cb) + } + onError = async () => { + console.log('backend error'); + }; + onClose = async () => { + console.log('backend close'); + }; +} diff --git a/src/client/chat/ChatApi.ts b/src/client/chat/ChatApi.ts index 20c7fbe..7207c64 100644 --- a/src/client/chat/ChatApi.ts +++ b/src/client/chat/ChatApi.ts @@ -1,118 +1,180 @@ import { ChatMessage, ChatType } from '@/proto/ChatMessage'; -import { ZMessage, ZType } from '@/proto/zmessage'; -import { stringToU8a, stringToU8a as u8aToU8a } from '@/utils'; -import { blake2s } from '@noble/hashes/blake2s'; import WsProvider from './WsProvider'; +import { Clock, ClockInfo, OutboundMsg, ZChat, ZMessage, ZType } from '@/proto/ZMsg'; -export interface JsonRpcObject { - id: number; - jsonrpc: '2.0'; -} - -export interface JsonRpcRequest extends JsonRpcObject { - method: string; - params: unknown; -} - -export interface JsonRpcResponseBaseError { - code: number; - data?: number | string; - message: string; -} - -export interface RpcErrorInterface { - code: number; - data?: T; - message: string; - stack: string; -} - -interface JsonRpcResponseSingle { - error?: JsonRpcResponseBaseError; - result: T; -} - -interface JsonRpcResponseSubscription { - method?: string; - params: { - error?: JsonRpcResponseBaseError; - result: T; - subscription: number | string; - }; -} - -export type JsonRpcResponseBase = JsonRpcResponseSingle & JsonRpcResponseSubscription; - -export type JsonRpcResponse = JsonRpcObject & JsonRpcResponseBase; - +import { hexToU8a, stringToU8a, u8aToHex } from '@/utils'; +import { blake2s } from '@noble/hashes/blake2s'; +import { hexToBytes } from '@noble/hashes/utils'; +// import axios from 'axios'; export interface ChatApiOptions { provider?: WsProvider; - url?: string[]; + seedRpc: string; } export default class ChatApi { - private isReadyPromise: Promise; - provider: WsProvider; + provider: WsProvider | undefined; + seedRpcServer = ''; + account: string | undefined; constructor(options?: ChatApiOptions) { - if(!options?.provider) { - this.provider = new WsProvider([]); - }else{ - this.provider = options.provider; + if (options?.provider) { + this.provider = options?.provider; + } + if (options?.seedRpc) { + this.seedRpcServer = options?.seedRpc; } - this.isReadyPromise = new Promise(resolve => { - this.provider.connect().then(() => { - resolve(this); - }); - }); } - public static create(options?: ChatApiOptions): ChatApi { - const newOptions: ChatApiOptions = { - ...options - } - if(!newOptions?.provider) { - newOptions.provider = new WsProvider(newOptions.url || []); - } - const instance = new ChatApi(newOptions); - return instance; + public static create(options?: ChatApiOptions): Promise { + const instance = new ChatApi(options); + return instance.isReady; } public get isReady(): Promise { - return this.isReadyPromise; + return new Promise((resolve, reject) => { + this.provider?.isReadyPromise + .then(() => { + resolve(this); + }) + .catch(e => { + reject(e); + }); + }); } - public async sendMessage(from: string, to: string, message: string, node: string, signature?: Uint8Array) { + public async connect(address: string) { + this.account = address; + this.provider!.websocket!.send(address); + } + + public getEndpoint = async (address: string) => { + const body = { + method: 'getWsAddr', + address, + }; + console.log('this.seedRpcServer', this.seedRpcServer); + console.log('this.seedRpcServer', body); + if (!this.seedRpcServer) { + return; + } + + // const res = await axios.post(this.seedRpcServer, body); + const res = await fetch(this.seedRpcServer, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + const data = await res.json(); + const wsAddr = data.wsAddr; + const wsUrl = new URL(wsAddr); + const seedRpcUrl = new URL(this.seedRpcServer); + return { + wsAddr: `${wsUrl.protocol}//${seedRpcUrl.hostname}:${wsUrl.port}`, + }; + // return data; + }; + + public async accountSendMessage( + from: string, + to: string, + message: string, + fromNode: string, + node: string, + signature?: Uint8Array, + ) { + return await this.sendMessage(from, to, message, signature); + } + + public async sendMessage(from: string, to: string, message: string, signature?: Uint8Array) { const chatMessage = ChatMessage.create({ - id: blake2s(stringToU8a(message + new Date().getMilliseconds())), + // id: blake2s(stringToU8a(message + new Date().getMilliseconds())), version: 0, type: ChatType.CHAT_TYPE_MESSAGE, - publicKey: u8aToU8a(from), + // publicKey: u8aToU8a(from), data: stringToU8a(message), - signature: u8aToU8a(message), - from: u8aToU8a(from), - to: u8aToU8a(to), + signature: signature, + from: hexToBytes(from), + to: hexToBytes(to), }); const chatBuffer = ChatMessage.encode(chatMessage).finish(); const hashId = blake2s(chatBuffer); + const clockId = blake2s(hashId); + const clockValues = { + [u8aToHex(clockId)]: '1', + }; + const clock = Clock.create({ + values: clockValues, + }); + const clockInfo = ClockInfo.create({ + nodeId: clockId, + messageId: hashId, + clock, + count: '1', + createAt: new Date().getMilliseconds().toString(), + }); + const chat = ZChat.create({ + messageData: chatBuffer, // (message), + clock: clockInfo, + }); + console.log('self zchat', chat); + const data = ZChat.encode(chat).finish(); const messageCreated = ZMessage.create({ + // id: hashId, + // version: 0, + // action: ZAction.Z_TYPE_WRITE, + type: ZType.Z_TYPE_ZCHAT, + // identity: ZIdentity.U_TYPE_CLI, + // publicKey: u8aToU8a(from), + data: data, + // signature: signature, + from: hexToU8a(from), + to: hexToU8a(to), + }); + + console.log('self messageCreated', messageCreated); + + // const innerMessage = Innermsg.create({ + // message: messageCreated, + // identity: Identity.IDENTITY_CLIENT, + // action: Action.ACTION_WRITE, + // }); + // console.log('innerMessage', innerMessage); + const outboundMsg = OutboundMsg.create({ id: hashId, - version: 0, - type: ZType.Z_TYPE_RNG, - publicKey: u8aToU8a(from), + from: hexToU8a(from), + type: ZType.Z_TYPE_ZCHAT, + to: hexToU8a(to), data: chatBuffer, - signature: signature, - from: u8aToU8a(from), - to: u8aToU8a(node), }); - const buffer = ZMessage.encode(messageCreated).finish(); - console.log('messageCreated', messageCreated); + // const buffer = Innermsg.encode(innerMessage).finish(); + // const buffer = ZMessage.encode(messageCreated).finish(); + const buffer = OutboundMsg.encode(outboundMsg).finish(); - this.provider.sendMessage(buffer); - } + console.log('OutboundMsg', buffer.toString()); + this.provider!.sendMessage(buffer); - public async subscribeReceiveMessage(cb: (message: ZMessage) => void) { - this.provider.addEventListener('receive:message', cb); + // const originBuffer = new Uint8Array( + // '42 13 72 101 108 108 111 44 32 119 111 114 108 100 33 58 32 163 32 16 47 89 196 210 55 102 242 29 38 162 166 117 34 155 52 97 67 13 142 197 117 241 100 196 203 148 207 12 242 66 32 239 197 79 14 219 189 35 37 13 172 203 41 29 157 111 27 166 69 75 119 66 48 155 119 27 111 59 207 33 157 114 108' + // .split(' ') + // .map(item => Number(item)), + // ); + // console.log('messageCreated', messageCreated); + // const originData = ZMessage.decode(originBuffer); + // // const originDataBuffer = ZMessage.encode(originData).finish(); + // console.log('messageCreated originData', originData); + // this.provider.sendMessage(originDataBuffer); + return outboundMsg; + } + public async accountSubscribeMessage(cb: (message: unknown) => void) { + this.provider!.addEventListener('account_receiveMessage', cb); + } + public async subscribeMessage(cb: (message: unknown) => void) { + this.accountSubscribeMessage(cb); + } + public async onError(cb: () => void) { + this.provider!.eventemitter.addListener('error', cb); } - public async unsubscribeReceiveMessage(cb: (message: ZMessage) => void) { - this.provider.removeEventListener('receive:message', cb); + public async onClose(cb: () => void) { + this.provider!.eventemitter.addListener('close', cb); } } diff --git a/src/client/chat/WsProvider.ts b/src/client/chat/WsProvider.ts index 116ccb1..9d5c34b 100644 --- a/src/client/chat/WsProvider.ts +++ b/src/client/chat/WsProvider.ts @@ -1,38 +1,145 @@ -import { ZMessage } from '@/proto/zmessage'; import { EventEmitter } from 'eventemitter3'; +import { ChatMessage } from '@/proto/ChatMessage'; +import { InboundMsg } from '@/proto/ZMsg'; +export const noop = () => {}; + +const RETRY_DELAY = 5_000; + export default class WsProvider { - private websocket: WebSocket | null; + websocket: WebSocket | null = null; private endpoints: string; - private eventemitter: EventEmitter; + private autoConnectMs = RETRY_DELAY; + public readonly isReadyPromise: Promise; + eventemitter: EventEmitter; constructor(endpoint: string | string[]) { - this.websocket = null; this.eventemitter = new EventEmitter(); const endpoints = Array.isArray(endpoint) ? endpoint : [endpoint]; const defaultEndpoint = endpoints[0]; this.endpoints = defaultEndpoint; + if (this.autoConnectMs && this.autoConnectMs > 0) { + this.connectWithRetry().catch(noop); + } + this.isReadyPromise = new Promise((resolve): void => { + this.eventemitter.once('connected', (): void => { + resolve(this); + }); + }); + } + + public async connectWithRetry(): Promise { + if (this.autoConnectMs > 0) { + try { + await this.connect(); + } catch { + setTimeout((): void => { + this.connectWithRetry().catch(noop); + }, this.autoConnectMs); + } + } } public async connect(): Promise { - this.websocket = new WebSocket(this.endpoints); if (this.websocket) { - this.websocket.onmessage = this.onSocketMessage; + throw new Error('WebSocket is already connected'); + } + try { + this.websocket = new WebSocket(this.endpoints); + if (this.websocket) { + this.websocket.onmessage = this.onSocketMessage; + this.websocket.onerror = this.handleError; + this.websocket.onclose = this.handleClose; + this.websocket.onopen = this.handleSocketOpen; + } + } catch (error) { + // l.error(error); + + this.eventemitter.emit('error', error); + + throw error; } } - disconnect(): Promise { - return Promise.resolve(); + public async disconnect(): Promise { + this.autoConnectMs = 0; + try { + if (this.websocket) { + // 1000 - Normal closure; the connection successfully completed + this.websocket.close(1000); + this.eventemitter.removeAllListeners(); + } + } catch (error) { + console.error(error); + + this.eventemitter.emit('error', error); + + throw error; + } } + private onSocketMessage = (message: MessageEvent): void => { message.data.arrayBuffer().then(buffer => { - const decoded = ZMessage.decode(new Uint8Array(buffer)); - this.eventemitter.emit('receive:message', decoded); + // const decodedOrigin = ZChat.decode(new Uint8Array(buffer)); + // console.log('onSocketMessage', decodedOrigin); + // const decoded = ZMessage.create({ + // data: new Uint8Array(buffer), + // }); + + const inboundMsg = InboundMsg.decode(new Uint8Array(buffer)); + + const chatMessage = ChatMessage.decode(inboundMsg.data); + chatMessage.id = inboundMsg.id; + console.log('onSocketMessage buffer', chatMessage); + console.log('onSocketMessage chat Message', chatMessage); + this.eventemitter.emit('account_receiveMessage', chatMessage); }); + + // const response = JSON.parse(message.data) as JsonRpcResponse; + // console.log('onSocketMessage', message); + + // return response.method === undefined ? this.onSocketMessageResult(response) : this.onSocketMessageResult(response); }; - addEventListener( event: string, cb: (message: ZMessage) => void) { - this.eventemitter.addListener(event, cb); + on(type: string, sub: () => void): () => void { + this.eventemitter.on(type, sub); + return (): void => { + this.eventemitter.removeListener(type, sub); + }; } - removeEventListener( event: string, cb: (message: ZMessage) => void) { - this.eventemitter.removeListener(event, cb); + addEventListener(method: string, cb: (result: unknown) => void) { + this.eventemitter.addListener(method, cb); } sendMessage(message: Uint8Array) { - this.websocket?.send(message); + this.websocket!.send(message); } + handleError = () => { + if (this.websocket) { + this.websocket.onclose = null; + this.websocket.onerror = null; + this.websocket.onmessage = null; + this.websocket.onopen = null; + this.websocket = null; + } + if (this.autoConnectMs > 0) { + setTimeout((): void => { + this.connectWithRetry().catch(noop); + }, this.autoConnectMs); + } + this.eventemitter.emit('error'); + }; + handleSocketOpen = () => { + this.eventemitter.emit('connected'); + }; + handleClose = () => { + if (this.websocket) { + this.websocket.onclose = null; + this.websocket.onerror = null; + this.websocket.onmessage = null; + this.websocket.onopen = null; + this.websocket = null; + } + if (this.autoConnectMs > 0) { + setTimeout((): void => { + this.connectWithRetry().catch(noop); + }, this.autoConnectMs); + } + this.eventemitter.emit('close'); + this.eventemitter.emit('disconnected'); + }; } diff --git a/src/components/Header/index.tsx b/src/components/Header/index.tsx index 299c541..15b150e 100644 --- a/src/components/Header/index.tsx +++ b/src/components/Header/index.tsx @@ -1,6 +1,8 @@ import { useAppDispatch } from "@/redux/hooks"; import { useAccount } from "@/redux/hooks/accounts"; import { accountActions } from "@/redux/store/account/accountSlice"; +import { useClipboard, useToast } from "@chakra-ui/react"; +import { ClipboardDocumentIcon } from "@heroicons/react/24/solid"; import { useCallback } from "react"; import { Link, useNavigate } from "react-router-dom"; @@ -11,6 +13,8 @@ function Header() { dispatch(accountActions.removeAccount(account?.address)) }, [account?.address, dispatch]); const navigate = useNavigate(); + const { onCopy } = useClipboard(account?.address); + const toast = useToast(); return (
@@ -67,6 +71,27 @@ function Header() {
{account ?