Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,35 @@
"name": "y-libp2p",
"version": "0.0.2",
"description": "js-libp2p provider for yjs",
"main": "dist/index.js",
"exports": "./dist/index.js",
"type": "module",
"author": "marcopolo",
"license": "MIT",
"devDependencies": {
"@babel/core": "^7.16.0",
"@babel/preset-env": "^7.16.4",
"@babel/preset-typescript": "^7.16.0",
"@chainsafe/libp2p-noise": "^4.1.1",
"@chainsafe/libp2p-noise": "^8.0.1",
"@libp2p/mplex": "^5.0.0",
"@libp2p/websockets": "^3.0.2",
"@types/jest": "^27.0.3",
"babel-jest": "^27.4.2",
"jest": "^27.4.3",
"libp2p-gossipsub": "^0.11.4",
"libp2p-mplex": "^0.10.4",
"libp2p-websockets": "^0.16.2",
"libp2p-gossipsub": "^0.13.0",
"multiaddr": "^10.0.1",
"typescript": "^4.5.2"
},
"engines": {
"node": ">=16.0.0"
},
"scripts": {
"test": "jest",
"test": "jest dist/",
"build": "tsc",
"build-watch": "tsc -w"
},
"dependencies": {
"y-protocols": "^1.0.x",
"peer-id": ">=0.16.0"
"@libp2p/peer-id": "^1.1.15",
"y-protocols": "^1.0.x"
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update the peer dependency below

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will do. thanks

},
"peerDependencies": {
"yjs": "13.5.x",
Expand Down
24 changes: 12 additions & 12 deletions src/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import Provider from "./index"
import { Provider } from "./index.js"
import { Doc, Doc as YDoc, } from 'yjs'
import { Uint8ArrayEquals } from "./util"
import { Uint8ArrayEquals } from "./util.js"
import * as Y from 'yjs'
// @ts-ignore
import * as awarenessProtocol from 'y-protocols/dist/awareness.cjs'
import Libp2p from 'libp2p'
import {Libp2p as ILibp2p} from 'libp2p'
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using libp2p class type as Libp2p (so prefixed an I). It was also giving a type error (related to namespace)

// @ts-ignore
import { createPeer } from './test-utils/create-peer.js'

Expand All @@ -25,8 +25,8 @@ it.only('Provider syncs doc across 2 peers', async () => {
const ydoc1 = new YDoc()
const ydoc2 = new YDoc()

const node1: Libp2p = await createPeer()
const node2: Libp2p = await createPeer()
const node1: ILibp2p = await createPeer()
const node2: ILibp2p = await createPeer()

const provider1 = new Provider(ydoc1, node1, topic)
const provider2 = new Provider(ydoc2, node2, topic)
Expand Down Expand Up @@ -60,8 +60,8 @@ it('Provider syncs doc across 2 unsynced peers', async () => {
const ydoc2 = new YDoc()
ydoc2.getText("testDoc").insert(0, "Good bye")

const node1: Libp2p = await createPeer()
const node2: Libp2p = await createPeer()
const node1: ILibp2p = await createPeer()
const node2: ILibp2p = await createPeer()

const provider1 = new Provider(ydoc1, node1, topic)
const provider2 = new Provider(ydoc2, node2, topic)
Expand Down Expand Up @@ -96,11 +96,11 @@ function printStates(docs: { [key: string]: YDoc }) {
console.log("--- Doc States ---" + str)
}

async function connectNodes(nodes: Libp2p[]) {
async function connectNodes(nodes: ILibp2p[]) {
const firstNode = nodes[0]
for (let i = 1; i < nodes.length; i++) {
const node = nodes[i]
await firstNode.dial(node.multiaddrs[0] + '/p2p/' + node.peerId.toB58String())
await firstNode.dial(node.peerId)
await firstNode.ping(node.peerId)
await node.ping(firstNode.peerId)
}
Expand All @@ -110,7 +110,7 @@ async function connectNodes(nodes: Libp2p[]) {
if (i === j) continue
const node = nodes[i]
const otherNode = nodes[j]
node.peerStore.addressBook.set(otherNode.peerId, otherNode.multiaddrs)
node.peerStore.addressBook.set(otherNode.peerId, otherNode.getMultiaddrs())
}
}
}
Expand All @@ -120,8 +120,8 @@ it('Provider syncs awareness across 2 peers', async () => {
const ydoc1 = new YDoc()
const ydoc2 = new YDoc()

const node1: Libp2p = await createPeer()
const node2: Libp2p = await createPeer()
const node1: ILibp2p = await createPeer()
const node2: ILibp2p = await createPeer()

const provider1 = new Provider(ydoc1, node1, topic)
const provider2 = new Provider(ydoc2, node2, topic)
Expand Down
77 changes: 38 additions & 39 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import * as Y from 'yjs'
import type { Doc as YDoc } from 'yjs'
import type Libp2p from 'libp2p'
import { Uint8ArrayEquals } from './util'
import PeerId from 'peer-id'
import type { BufferList } from 'libp2p-interfaces/src/pubsub'
import { Connection } from 'libp2p-interfaces/src/topology'
import { Libp2p as ILibp2p } from "libp2p";
import { Uint8ArrayEquals } from './util.js'
import { Connection } from '@libp2p/interface-connection';
// @ts-ignore
import * as awarenessProtocol from 'y-protocols/dist/awareness.cjs'
import type { Awareness } from 'y-protocols/awareness'

import { peerIdFromString } from "@libp2p/peer-id";

// the Muxedstream type is wrong for the protocol streams
type ProtocolStream = {
sink: (data: Iterable<any> | AsyncIterable<any>) => Promise<void>
source: AsyncIterable<BufferList>
source: AsyncIterable<Uint8Array>
close: () => void
}


function changesTopic(topic: string): string {
return `/marcopolo/gossipPad/${topic}/changes/0.0.01`
return `/marcopolo/gossipPad/${topic}/changes/0.0.01`;
}

function stateVectorTopic(topic: string): string {
Expand All @@ -33,9 +33,9 @@ function awarenessProtocolTopic(topic: string): string {
return `/marcopolo/gossipPad/${topic}/awareness/0.0.1`
}

class Provider {
export class Provider {
ydoc: YDoc;
node: Libp2p;
node: ILibp2p;
peerID: string;
topic: string
stateVectors: { [key: string]: Uint8Array } = {};
Expand All @@ -46,11 +46,11 @@ class Provider {

aggressivelyKeepPeersUpdated: boolean = true;

constructor(ydoc: YDoc, node: Libp2p, topic: string) {
constructor(ydoc: YDoc, node: ILibp2p, topic: string) {
this.ydoc = ydoc;
this.node = node;
this.topic = topic;
this.peerID = this.node.peerId.toB58String()
this.peerID = this.node.peerId.toString()
this.stateVectors[this.peerID] = Y.encodeStateVector(this.ydoc)
this.awareness = new awarenessProtocol.Awareness(ydoc)

Expand All @@ -61,14 +61,23 @@ class Provider {
ydoc.on('update', this.onUpdate.bind(this));

node.pubsub.subscribe(changesTopic(topic))
node.pubsub.on(changesTopic(topic), this.onPubSubChanges.bind(this));

node.pubsub.subscribe(stateVectorTopic(topic))
node.pubsub.on(stateVectorTopic(topic), this.onPubSubStateVector.bind(this));

node.pubsub.subscribe(awarenessProtocolTopic(topic))
node.pubsub.on(awarenessProtocolTopic(topic), this.onPubSubAwareness.bind(this));

this.node.pubsub.addEventListener('message', async (event) => {
if (event.detail.topic === changesTopic(topic)) {
this.onPubSubChanges.bind(this)
}

if (event.detail.topic === stateVectorTopic(topic)) {
this.onPubSubStateVector.bind(this)
}

if (event.detail.topic === awarenessProtocolTopic(topic)) {
this.onPubSubAwareness.bind(this)
}
});

// @ts-ignore
node.handle(syncProtocol(topic), this.onSyncMsg.bind(this));

Expand All @@ -77,13 +86,10 @@ class Provider {

destroy() {
this.node.pubsub.unsubscribe(changesTopic(this.topic))
this.node.pubsub.removeAllListeners(changesTopic(this.topic))

this.node.pubsub.unsubscribe(stateVectorTopic(this.topic))
this.node.pubsub.removeAllListeners(stateVectorTopic(this.topic))

this.node.pubsub.unsubscribe(awarenessProtocolTopic(this.topic))
this.node.pubsub.removeAllListeners(awarenessProtocolTopic(this.topic))

this.node.pubsub.removeEventListener('message');
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this change in the new version of js libp2p?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// @ts-ignore
node.unhandle(syncProtocol(topic))
Expand All @@ -92,15 +98,17 @@ class Provider {
}

// Not required, but nice if we can get synced against a peer sooner rather than latter
private async tryInitialSync(updateData: Uint8Array, origin: this | any) {
private async tryInitialSync(updateData: Uint8Array, origin: this | any): Promise<Boolean | void> {
const tries = 10;
const maxWaitTime = 1000;
let waitTime = 100;
for (let i = 0; i < tries; i++) {
if (this.initialSync) {
return
}
const peers = [...this.node.pubsub.topics.get(stateVectorTopic(this.topic)) || []]

// needed to type hack
const peers = [...(this.node as any).pubsub.topics.get(stateVectorTopic(this.topic)) || []]

if (peers.length !== 0) {
const peer = peers[i % peers.length]
Expand Down Expand Up @@ -199,7 +207,7 @@ class Provider {
}

private async onSyncMsg({ stream, connection, ...rest }: { stream: ProtocolStream, connection: Connection }) {
await this.runSyncProtocol(stream, connection.remotePeer.toB58String(), false)
await this.runSyncProtocol(stream, connection.remotePeer.toString(), false)
}

private queuePeerSync(peerID: string) {
Expand All @@ -215,22 +223,14 @@ class Provider {

private async syncPeer(peerID: string) {
const thiz = this;
const multiaddrs = await this.node.peerStore.addressBook.getMultiaddrsForPeer(PeerId.createFromB58String(peerID));
let success = false;
if (!multiaddrs) {
try {
const stream = await this.node.dialProtocol(peerIdFromString(peerID), syncProtocol(this.topic)) as any as { stream: ProtocolStream }
await this.runSyncProtocol(stream as any, peerID, true)
success = true;
return
}
for (const ma of multiaddrs) {
const maStr = ma.toString()
try {
const { stream } = await this.node.dialProtocol(maStr, syncProtocol(this.topic)) as any as { stream: ProtocolStream }
await this.runSyncProtocol(stream, peerID, true)
success = true;
return
} catch (e) {
console.warn(`Failed to sync with ${maStr}`, e)
continue;
}
} catch (e) {
console.warn(`Failed to sync with ${peerIdFromString(peerID)}`, e)
}

throw new Error("Failed to sync with peer")
Expand All @@ -239,4 +239,3 @@ class Provider {

// TODO awareness

export default Provider
22 changes: 11 additions & 11 deletions src/test-utils/create-peer.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
'use strict'


/**
* These utilities rely on the fixtures defined in test/fixtures
Expand All @@ -10,13 +10,13 @@

const Libp2p = require('libp2p')
const { Multiaddr } = require('multiaddr')
const PeerId = require('peer-id')
const PeerId = require('@libp2p/peer-id')
const Gossipsub = require('libp2p-gossipsub')

const WS = require('libp2p-websockets')
const filters = require('libp2p-websockets/src/filters')
const MPLEX = require('libp2p-mplex')
const { NOISE } = require('@chainsafe/libp2p-noise')
const WS = require('@libp2p/websockets')
const filters = require('@libp2p/websockets/src/filters')
const Mplex = require('@libp2p/mplex')
const { Noise } = require('@chainsafe/libp2p-noise')

const RelayPeer = {
id: 'QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN',
Expand Down Expand Up @@ -50,9 +50,9 @@ const transportKey = WS.prototype[Symbol.toStringTag]

const defaultConfig = {
modules: {
transport: [WS],
streamMuxer: [MPLEX],
connEncryption: [NOISE],
transports: [new WS()],
streamMuxers: [new Mplex()],
connEncryption: [new Noise()],
pubsub: Gossipsub
},
config: {
Expand Down Expand Up @@ -97,7 +97,7 @@ let currentPeerIdx = 0
*/
async function createPeer({ peerId, peerIdx = currentPeerIdx, started = true, config = {} } = {}) {
if (!peerId) {
peerId = await PeerId.createFromJSON(Peers[currentPeerIdx++ % Peers.length])
peerId = await PeerId.createPeerId(Peers[currentPeerIdx++ % Peers.length])
}
const libp2p = await Libp2p.create({
peerId: peerId,
Expand Down Expand Up @@ -136,7 +136,7 @@ function addPeersToAddressBook(peers) {
*/
async function createPeers({ number = 1, started = true, seedAddressBook = true, config = {} } = {}) {
const peerIds = await Promise.all(
Array.from({ length: number }, (_, i) => Peers[i] ? PeerId.createFromJSON(Peers[i]) : PeerId.create())
Array.from({ length: number }, (_, i) => Peers[i] ? PeerId.createPeerId(Peers[i]) : PeerId.createPeerId({ type: "Ed25519"}))
)
const peers = await Promise.all(
Array.from({ length: number }, (_, i) => createPeer({ peerId: peerIds[i], started: false, config: config }))
Expand Down
4 changes: 2 additions & 2 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
// "noLib": true, /* Disable including any library files, including the default lib.d.ts. */
// "useDefineForClassFields": true, /* Emit ECMAScript-standard-compliant class fields. */
/* Modules */
"module": "commonjs", /* Specify what module code is generated. */
"module": "Node16", /* Specify what module code is generated. */
// "rootDir": "./", /* Specify the root folder within your source files. */
// "moduleResolution": "node", /* Specify how TypeScript looks up a file from a given module specifier. */
"moduleResolution": "Node16", /* Specify how TypeScript looks up a file from a given module specifier. */
// "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */
// "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */
// "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */
Expand Down