Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion demos/supabase-todolist/lib/powersync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ Future<String> getDatabasePath() async {
return join(dir.path, dbFilename);
}

const options = SyncOptions(syncImplementation: SyncClientImplementation.rust);
const options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'app_version': '1.0.1'});

Future<void> openDatabase() async {
// Open the local database
Expand Down
16 changes: 15 additions & 1 deletion packages/powersync_core/lib/src/sync/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import 'package:meta/meta.dart';

/// Options that affect how the sync client connects to the sync service.
final class SyncOptions {
/// A map of application metadata that is passed to the PowerSync service.
///
/// Application metadata that will be displayed in PowerSync service logs.
final Map<String, String>? appMetadata;

/// A JSON object that is passed to the sync service and forwarded to sync
/// rules.
///
Expand Down Expand Up @@ -39,19 +44,22 @@ final class SyncOptions {
this.params,
this.syncImplementation = SyncClientImplementation.defaultClient,
this.includeDefaultStreams,
this.appMetadata,
});

SyncOptions _copyWith({
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return SyncOptions(
crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime,
retryDelay: retryDelay,
params: params ?? this.params,
syncImplementation: syncImplementation,
includeDefaultStreams: includeDefaultStreams,
appMetadata: appMetadata ?? this.appMetadata,
);
}
}
Expand Down Expand Up @@ -88,14 +96,18 @@ extension type ResolvedSyncOptions(SyncOptions source) {
Duration? crudThrottleTime,
Duration? retryDelay,
Map<String, dynamic>? params,
Map<String, String>? appMetadata,
}) {
return ResolvedSyncOptions((source ?? SyncOptions())._copyWith(
crudThrottleTime: crudThrottleTime,
retryDelay: retryDelay,
params: params,
appMetadata: appMetadata,
));
}

Map<String, String> get appMetadata => source.appMetadata ?? const {};

Duration get crudThrottleTime =>
source.crudThrottleTime ?? const Duration(milliseconds: 10);

Expand All @@ -113,13 +125,15 @@ extension type ResolvedSyncOptions(SyncOptions source) {
syncImplementation: other.syncImplementation,
includeDefaultStreams:
other.includeDefaultStreams ?? includeDefaultStreams,
appMetadata: other.appMetadata ?? appMetadata,
);

final didChange = !_mapEquality.equals(newOptions.params, params) ||
newOptions.crudThrottleTime != crudThrottleTime ||
newOptions.retryDelay != retryDelay ||
newOptions.syncImplementation != source.syncImplementation ||
newOptions.includeDefaultStreams != includeDefaultStreams;
newOptions.includeDefaultStreams != includeDefaultStreams ||
!_mapEquality.equals(newOptions.appMetadata, appMetadata);
return (ResolvedSyncOptions(newOptions), didChange);
}

Expand Down
7 changes: 5 additions & 2 deletions packages/powersync_core/lib/src/sync/protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,18 @@ class StreamingSyncRequest {
bool includeChecksum = true;
String clientId;
Map<String, dynamic>? parameters;
Map<String, String>? appMetadata;

StreamingSyncRequest(this.buckets, this.parameters, this.clientId);
StreamingSyncRequest(
this.buckets, this.parameters, this.clientId, this.appMetadata);

Map<String, dynamic> toJson() {
final Map<String, dynamic> json = {
'buckets': buckets,
'include_checksum': includeChecksum,
'raw_data': true,
'client_id': clientId
'client_id': clientId,
'app_metadata': appMetadata,
};

if (parameters != null) {
Expand Down
9 changes: 5 additions & 4 deletions packages/powersync_core/lib/src/sync/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import 'package:powersync_core/src/sync/options.dart';
import 'package:powersync_core/src/user_agent/user_agent.dart';
import 'package:sqlite_async/mutex.dart';

import 'bucket_storage.dart';
import '../crud.dart';
import 'bucket_storage.dart';
import 'instruction.dart';
import 'internal_connector.dart';
import 'mutable_sync_status.dart';
import 'protocol.dart';
import 'stream_utils.dart';
import 'sync_status.dart';
import 'protocol.dart';

typedef SubscribedStream = ({String name, String parameters});

Expand Down Expand Up @@ -339,8 +339,8 @@ class StreamingSyncImplementation implements StreamingSync {

Checkpoint? targetCheckpoint;

var requestStream = _streamingSyncRequest(
StreamingSyncRequest(bucketRequests, options.params, clientId!))
var requestStream = _streamingSyncRequest(StreamingSyncRequest(
bucketRequests, options.params, clientId!, options.appMetadata))
.map(ReceivedLine.new);

var merged = addBroadcast(requestStream, _nonLineSyncEvents.stream);
Expand Down Expand Up @@ -633,6 +633,7 @@ final class _ActiveRustStreamingIteration {
await _control(
'start',
convert.json.encode({
'app_metadata': sync.options.appMetadata,
'parameters': sync.options.params,
'schema': convert.json.decode(sync.schemaJson),
'include_defaults': sync.options.includeDefaultStreams,
Expand Down
5 changes: 5 additions & 0 deletions packages/powersync_core/lib/src/web/sync_worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ class _ConnectedClient {
null => SyncClientImplementation.defaultClient,
final name => SyncClientImplementation.values.byName(name),
},
appMetadata: switch (request.appMetadataEncoded) {
null => null,
final encodedAppMetadata => Map<String, String>.from(
jsonDecode(encodedAppMetadata) as Map<String, dynamic>),
},
);

_runner = _worker.referenceSyncTask(
Expand Down
6 changes: 6 additions & 0 deletions packages/powersync_core/lib/src/web/sync_worker_protocol.dart
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
required String schemaJson,
String? syncParamsEncoded,
UpdateSubscriptions? subscriptions,
String? appMetadataEncoded,
});

external String get databaseName;
Expand All @@ -90,6 +91,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject {
external String get schemaJson;
external String? get syncParamsEncoded;
external UpdateSubscriptions? get subscriptions;
external String? get appMetadataEncoded;
}

@anonymous
Expand Down Expand Up @@ -483,6 +485,10 @@ final class WorkerCommunicationChannel {
final params => jsonEncode(params),
},
subscriptions: UpdateSubscriptions(-1, streams),
appMetadataEncoded: switch (options.source.appMetadata) {
null => null,
final appMetadata => jsonEncode(appMetadata),
},
),
));
await completion;
Expand Down
38 changes: 37 additions & 1 deletion packages/powersync_core/test/sync/stream_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import 'dart:convert';
import 'package:async/async.dart';
import 'package:logging/logging.dart';
import 'package:powersync_core/powersync_core.dart';

import 'package:test/test.dart';

import '../server/sync_server/in_memory_sync_server.dart';
Expand Down Expand Up @@ -260,4 +259,41 @@ void main() {
);
a.unsubscribe();
});

test('passes app metadata to the server (rust)', () async {
options = SyncOptions(
syncImplementation: SyncClientImplementation.rust,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});

test('passes app metadata to the server (legacy sync client)', () async {
options = SyncOptions(
// ignore: deprecated_member_use_from_same_package
syncImplementation: SyncClientImplementation.dart,
appMetadata: {'foo': 'bar'},
);

await waitForConnection();

final request = await syncService.waitForListener;
expect(
json.decode(await request.readAsString()),
containsPair(
'app_metadata',
containsPair('foo', 'bar'),
),
);
});
}