Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions src/client/chat/BackendChat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { ChatMessage } from '../../proto/ChatMessage';
// import { messageStorageSortKey } from '@root/src/shared/account';
import WsProvider from './WsProvider';
// import keystoreStorage from '@root/src/shared/storages/keystoreStorage';
// import messagesSessionStorage from '@root/src/shared/storages/messageSessionStorage';
// import messagesStorage from '@root/src/shared/storages/messageStorage';
import { hexToU8a, u8aToHex } from '@/utils';
import ChatApi from './ChatApi';
import EventEmitter from 'eventemitter3';

export class BackendChat {
isDisconnect: boolean = false;
chatApi: ChatApi;
endpoint?: string;
seedRpc?: string;
eventemitter: EventEmitter;
constructor() {
this.eventemitter = new EventEmitter();
this.chatApi = new ChatApi();
}

changeEndPoint(endpoint: string, seedRpc?: string) {
if (this.chatApi?.provider) {
this.chatApi.provider.disconnect();
}
this.endpoint = endpoint;
const wsProvider = new WsProvider(endpoint);
this.chatApi = new ChatApi({
provider: wsProvider,
seedRpc: seedRpc || this.seedRpc || '',
});
this.chatApi.accountSubscribeMessage(this.onMessage as never);
this.chatApi.onError(this.onError);
return this.chatApi.isReady;
}

async sendMessage(
from: string,
to: string,
message: string,
fromNode: string,
toNode: string,
signature?: Uint8Array,
) {
const data = {
from,
to,
message,
fromNode,
toNode,
signature,
};
console.log('BackendChat', this.chatApi, data);
const outMsg = await this.chatApi.accountSendMessage(from, to, message, fromNode, toNode, signature);
// const storageKey = messageStorageSortKey(from, to);
const mf = {
id: u8aToHex(outMsg.id),
from,
to,
message,
sign: '',
};
return mf;
// messagesStorage.addMessage(storageKey, mf);
// messagesSessionStorage.updateSession(mf.from, { to: mf.to });
}

switchSeedRpc = async (rpc: string, address: string) => {
this.seedRpc = rpc;
this.chatApi.seedRpcServer = rpc;
// const address = await keystoreStorage.get();
if (!address) return;
// await keystoreStorage.set(address);
const data = await this.chatApi.getEndpoint(address);
if (!data) return;
const url = new URL(data.wsAddr);
const wsUrl = `${data.wsAddr}/ws${url.port}`;
await this.changeEndPoint(wsUrl);
this.chatApi.provider!.websocket!.send(hexToU8a(address));
};

changeAccount = async (address: string) => {
if (!address) return;
// await keystoreStorage.set(address);
const data = await this.chatApi.getEndpoint(address);
if (!data) return;
const url = new URL(data.wsAddr);
const wsUrl = `${data.wsAddr}/ws${url.port}`;
await this.changeEndPoint(wsUrl);
this.chatApi?.provider?.websocket?.send(hexToU8a(address));
};

onMessage = async (chatMessage: ChatMessage) => {

try {
this.eventemitter.emit('onMessage', chatMessage);
console.log('onMessage', this)
} catch (error) {
console.error('message decode error', error);
}
}

async disconnect() {
this.chatApi?.provider?.disconnect();
this.eventemitter.removeAllListeners();
}
subscribeMessage(cb: (message: ChatMessage) => void){
this.eventemitter.addListener('onMessage', cb)
}
onError = async () => {
console.log('backend error');
};
onClose = async () => {
console.log('backend close');
};
}
234 changes: 148 additions & 86 deletions src/client/chat/ChatApi.ts
Original file line number Diff line number Diff line change
@@ -1,118 +1,180 @@
import { ChatMessage, ChatType } from '@/proto/ChatMessage';
import { ZMessage, ZType } from '@/proto/zmessage';
import { stringToU8a, stringToU8a as u8aToU8a } from '@/utils';
import { blake2s } from '@noble/hashes/blake2s';
import WsProvider from './WsProvider';
import { Clock, ClockInfo, OutboundMsg, ZChat, ZMessage, ZType } from '@/proto/ZMsg';

export interface JsonRpcObject {
id: number;
jsonrpc: '2.0';
}

export interface JsonRpcRequest extends JsonRpcObject {
method: string;
params: unknown;
}

export interface JsonRpcResponseBaseError {
code: number;
data?: number | string;
message: string;
}

export interface RpcErrorInterface<T> {
code: number;
data?: T;
message: string;
stack: string;
}

interface JsonRpcResponseSingle<T> {
error?: JsonRpcResponseBaseError;
result: T;
}

interface JsonRpcResponseSubscription<T> {
method?: string;
params: {
error?: JsonRpcResponseBaseError;
result: T;
subscription: number | string;
};
}

export type JsonRpcResponseBase<T> = JsonRpcResponseSingle<T> & JsonRpcResponseSubscription<T>;

export type JsonRpcResponse<T> = JsonRpcObject & JsonRpcResponseBase<T>;

import { hexToU8a, stringToU8a, u8aToHex } from '@/utils';
import { blake2s } from '@noble/hashes/blake2s';
import { hexToBytes } from '@noble/hashes/utils';
// import axios from 'axios';
export interface ChatApiOptions {
provider?: WsProvider;
url?: string[];
seedRpc: string;
}
export default class ChatApi {
private isReadyPromise: Promise<ChatApi>;
provider: WsProvider;
provider: WsProvider | undefined;
seedRpcServer = '';
account: string | undefined;
constructor(options?: ChatApiOptions) {
if(!options?.provider) {
this.provider = new WsProvider([]);
}else{
this.provider = options.provider;
if (options?.provider) {
this.provider = options?.provider;
}
if (options?.seedRpc) {
this.seedRpcServer = options?.seedRpc;
}
this.isReadyPromise = new Promise(resolve => {
this.provider.connect().then(() => {
resolve(this);
});
});
}

public static create(options?: ChatApiOptions): ChatApi {
const newOptions: ChatApiOptions = {
...options
}
if(!newOptions?.provider) {
newOptions.provider = new WsProvider(newOptions.url || []);
}
const instance = new ChatApi(newOptions);
return instance;
public static create(options?: ChatApiOptions): Promise<ChatApi> {
const instance = new ChatApi(options);
return instance.isReady;
}
public get isReady(): Promise<ChatApi> {
return this.isReadyPromise;
return new Promise((resolve, reject) => {
this.provider?.isReadyPromise
.then(() => {
resolve(this);
})
.catch(e => {
reject(e);
});
});
}

public async sendMessage(from: string, to: string, message: string, node: string, signature?: Uint8Array) {
public async connect(address: string) {
this.account = address;
this.provider!.websocket!.send(address);
}

public getEndpoint = async (address: string) => {
const body = {
method: 'getWsAddr',
address,
};
console.log('this.seedRpcServer', this.seedRpcServer);
console.log('this.seedRpcServer', body);
if (!this.seedRpcServer) {
return;
}

// const res = await axios.post(this.seedRpcServer, body);
const res = await fetch(this.seedRpcServer, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
const data = await res.json();
const wsAddr = data.wsAddr;
const wsUrl = new URL(wsAddr);
const seedRpcUrl = new URL(this.seedRpcServer);
return {
wsAddr: `${wsUrl.protocol}//${seedRpcUrl.hostname}:${wsUrl.port}`,
};
// return data;
};

public async accountSendMessage(
from: string,
to: string,
message: string,
fromNode: string,
node: string,
signature?: Uint8Array,
) {
return await this.sendMessage(from, to, message, signature);
}

public async sendMessage(from: string, to: string, message: string, signature?: Uint8Array) {
const chatMessage = ChatMessage.create({
id: blake2s(stringToU8a(message + new Date().getMilliseconds())),
// id: blake2s(stringToU8a(message + new Date().getMilliseconds())),
version: 0,
type: ChatType.CHAT_TYPE_MESSAGE,
publicKey: u8aToU8a(from),
// publicKey: u8aToU8a(from),
data: stringToU8a(message),
signature: u8aToU8a(message),
from: u8aToU8a(from),
to: u8aToU8a(to),
signature: signature,
from: hexToBytes(from),
to: hexToBytes(to),
});
const chatBuffer = ChatMessage.encode(chatMessage).finish();
const hashId = blake2s(chatBuffer);
const clockId = blake2s(hashId);
const clockValues = {
[u8aToHex(clockId)]: '1',
};
const clock = Clock.create({
values: clockValues,
});
const clockInfo = ClockInfo.create({
nodeId: clockId,
messageId: hashId,
clock,
count: '1',
createAt: new Date().getMilliseconds().toString(),
});
const chat = ZChat.create({
messageData: chatBuffer, // (message),
clock: clockInfo,
});
console.log('self zchat', chat);
const data = ZChat.encode(chat).finish();
const messageCreated = ZMessage.create({
// id: hashId,
// version: 0,
// action: ZAction.Z_TYPE_WRITE,
type: ZType.Z_TYPE_ZCHAT,
// identity: ZIdentity.U_TYPE_CLI,
// publicKey: u8aToU8a(from),
data: data,
// signature: signature,
from: hexToU8a(from),
to: hexToU8a(to),
});

console.log('self messageCreated', messageCreated);

// const innerMessage = Innermsg.create({
// message: messageCreated,
// identity: Identity.IDENTITY_CLIENT,
// action: Action.ACTION_WRITE,
// });
// console.log('innerMessage', innerMessage);
const outboundMsg = OutboundMsg.create({
id: hashId,
version: 0,
type: ZType.Z_TYPE_RNG,
publicKey: u8aToU8a(from),
from: hexToU8a(from),
type: ZType.Z_TYPE_ZCHAT,
to: hexToU8a(to),
data: chatBuffer,
signature: signature,
from: u8aToU8a(from),
to: u8aToU8a(node),
});
const buffer = ZMessage.encode(messageCreated).finish();

console.log('messageCreated', messageCreated);
// const buffer = Innermsg.encode(innerMessage).finish();
// const buffer = ZMessage.encode(messageCreated).finish();
const buffer = OutboundMsg.encode(outboundMsg).finish();

this.provider.sendMessage(buffer);
}
console.log('OutboundMsg', buffer.toString());
this.provider!.sendMessage(buffer);

public async subscribeReceiveMessage(cb: (message: ZMessage) => void) {
this.provider.addEventListener('receive:message', cb);
// const originBuffer = new Uint8Array(
// '42 13 72 101 108 108 111 44 32 119 111 114 108 100 33 58 32 163 32 16 47 89 196 210 55 102 242 29 38 162 166 117 34 155 52 97 67 13 142 197 117 241 100 196 203 148 207 12 242 66 32 239 197 79 14 219 189 35 37 13 172 203 41 29 157 111 27 166 69 75 119 66 48 155 119 27 111 59 207 33 157 114 108'
// .split(' ')
// .map(item => Number(item)),
// );
// console.log('messageCreated', messageCreated);
// const originData = ZMessage.decode(originBuffer);
// // const originDataBuffer = ZMessage.encode(originData).finish();
// console.log('messageCreated originData', originData);
// this.provider.sendMessage(originDataBuffer);
return outboundMsg;
}
public async accountSubscribeMessage(cb: (message: unknown) => void) {
this.provider!.addEventListener('account_receiveMessage', cb);
}
public async subscribeMessage(cb: (message: unknown) => void) {
this.accountSubscribeMessage(cb);
}
public async onError(cb: () => void) {
this.provider!.eventemitter.addListener('error', cb);
}
public async unsubscribeReceiveMessage(cb: (message: ZMessage) => void) {
this.provider.removeEventListener('receive:message', cb);
public async onClose(cb: () => void) {
this.provider!.eventemitter.addListener('close', cb);
}
}
Loading