Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@
// SPDX-License-Identifier: Apache-2.0

import { LRUCache } from 'typescript-lru-cache'
import { ACSCache, ACSCacheOptions } from './cache'
import { ACSKey } from '../types'
import { ResolvedAcsOptions } from '../service'
import { LedgerCommonSchemas } from '@canton-network/core-ledger-client-types'
import { ACSKey } from '../../types'
import { PaginatedResolvedAcsOptions, ResolvedAcsOptions } from '../../service'
import { AbstractLedgerProvider } from '@canton-network/core-provider-ledger'
import { LedgerCommonSchemas } from '@canton-network/core-ledger-client-types'
import { ACSCache, PaginatedACSCache, ACSCacheOptions } from '../item'

export class ACSCacheCollection {
private readonly collection: LRUCache<string, ACSCache>
export abstract class BaseCacheCollection<
Cache extends ACSCache | PaginatedACSCache,
Options extends ResolvedAcsOptions | PaginatedResolvedAcsOptions =
Cache extends ACSCache
? ResolvedAcsOptions
: PaginatedResolvedAcsOptions,
> {
protected readonly collection: LRUCache<string, Cache>

constructor(
private readonly ledger: AbstractLedgerProvider,
protected readonly ledger: AbstractLedgerProvider,
private readonly options: ACSCacheOptions = {
maxSize: 100,
entryExpirationTimeInMS: 10 * 60 * 1000,
Expand All @@ -26,12 +32,11 @@ export class ACSCacheCollection {
* Resolves party references and constructs cache keys from the provided template and interface IDs.
* Queries are deduplicated and cached per party-template-interface combination.
*
* @override
* @see {@link ACSReader.readRaw}
*/
public async readFromCache(
options: ResolvedAcsOptions
): Promise<Array<LedgerCommonSchemas['JsGetActiveContractsResponse']>> {
options: Options
): Promise<LedgerCommonSchemas['JsGetActiveContractsResponse'][]> {
const { parties, interfaceIds, templateIds } = options
const keys: ACSKey[] =
parties?.flatMap((party) => {
Expand All @@ -49,12 +54,14 @@ export class ACSCacheCollection {
return await this.query({ options, keys })
}

private getCache(key: ACSKey) {
protected abstract createCache(): Cache

protected getCache(key: ACSKey) {
const serializedKey = this.serializeKey(key)
const existingCache = this.collection.get(serializedKey)
if (existingCache) return existingCache

const newCache = new ACSCache(this.ledger)
const newCache = this.createCache()
this.collection.set(serializedKey, newCache)

return newCache
Expand All @@ -64,23 +71,19 @@ export class ACSCacheCollection {
* Updates the cached active contract set for a specific key and returns contracts at the requested offset.
* If the cache is outdated, fetches updates from the ledger and applies them incrementally.
*/
private async updateCache(args: {
options: ResolvedAcsOptions
protected abstract updateCache(args: {
options: Options
key: ACSKey
}) {
const cache = this.getCache(args.key)
await cache.update(args.options)
return await cache.calculateAt(args.options.offset)
}
}): Promise<ReturnType<Cache['calculateAt']>>

/**
* Queries multiple cache keys in parallel and combines the results.
* Each key represents a unique party-template-interface combination to be queried independently.
*/
private async query(args: {
options: ResolvedAcsOptions
protected async query(args: {
options: Options
keys: ACSKey[]
}): Promise<Array<LedgerCommonSchemas['JsGetActiveContractsResponse']>> {
}): Promise<LedgerCommonSchemas['JsGetActiveContractsResponse'][]> {
const { options, keys } = args
return (
await Promise.all(
Expand All @@ -91,7 +94,7 @@ export class ACSCacheCollection {
).flat()
}

private serializeKey(key: ACSKey): string {
protected serializeKey(key: ACSKey): string {
return `${key.party ?? 'ANY'}_T${key.templateId ?? '()'}_I${key.interfaceId ?? '()'}`
}
}
38 changes: 38 additions & 0 deletions core/acs-reader/src/cache/collection/collection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

import { ACSKey } from '../../types'
import { PaginatedResolvedAcsOptions, ResolvedAcsOptions } from '../../service'
import { BaseCacheCollection } from './base'
import { ACSCache, PaginatedACSCache } from '../item'

export class ACSCacheCollection extends BaseCacheCollection<ACSCache> {
protected async updateCache(args: {
options: ResolvedAcsOptions
key: ACSKey
}) {
const cache = this.getCache(args.key)
await cache.update(args.options)
return await cache.calculateAt(args.options.offset)
}

protected createCache(): ACSCache {
return new ACSCache(this.ledger)
}
}

export class PaginatedACSCacheCollection extends BaseCacheCollection<PaginatedACSCache> {
// TODO: avoid logic duplication
protected async updateCache(args: {
options: PaginatedResolvedAcsOptions
key: ACSKey
}) {
const cache = this.getCache(args.key)
await cache.update(args.options)
return await cache.calculateAt(args.options.offset)
}

protected createCache(): PaginatedACSCache {
return new PaginatedACSCache(this.ledger)
}
}
5 changes: 5 additions & 0 deletions core/acs-reader/src/cache/collection/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

export * from './base'
export * from './collection'
71 changes: 71 additions & 0 deletions core/acs-reader/src/cache/item/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

import { ACEvent, ACSState, PaginatedACSState } from '../../types'
import { AbstractLedgerProvider } from '@canton-network/core-provider-ledger'
import { LRUCacheOptions } from 'typescript-lru-cache'
import { LedgerCommonSchemas } from '@canton-network/core-ledger-client-types'
import pino from 'pino'
import {
ResolvedAcsOptions,
AcsService,
PaginatedResolvedAcsOptions,
} from '../../service'

export const logger = pino({ name: 'acs-reader/cache' })

/**
* Checks if an event represents a contract creation.
* Used to distinguish between created and archived events when processing cache updates.
*/
export function isCreatedEvent(event: ACEvent): event is ACEvent & {
archived: true
event: LedgerCommonSchemas['CreatedEvent']
} {
return !event.archived
}

export type ACSCacheOptions = Pick<
LRUCacheOptions<string, BaseACSCache>,
'maxSize' | 'entryExpirationTimeInMS'
>

export type BaseCache<Paginated extends boolean> = {
State: Paginated extends true ? PaginatedACSState : ACSState
Options: Paginated extends true
? PaginatedResolvedAcsOptions
: ResolvedAcsOptions
ReturnValue: ReturnType<
AcsService[Paginated extends true
? 'getPaginatedActiveContracts'
: 'getActiveContracts']
>
}

export abstract class BaseACSCache<Paginated extends boolean = false> {
protected abstract readonly state: BaseCache<Paginated>['State']

protected readonly service: AcsService

constructor(protected readonly ledger: AbstractLedgerProvider) {
this.service = new AcsService(ledger)
}

/**
* Updates the cache to include ledger changes up to the specified offset.
* Fetches and applies incremental updates from the ledger, initializing the cache if needed.
* Automatically prunes old events when the update buffer exceeds configured thresholds.
*/
public abstract update(
options: BaseCache<Paginated>['Options']
): Promise<void>

/**
* Calculates the active contract set at a specific ledger offset.
* Applies cached updates to the initial snapshot and filters out archived contracts.
* Throws an error if the cache is not initialized or the requested offset is too old.
*/
public abstract calculateAt(
offset: number
): LedgerCommonSchemas['JsGetActiveContractsResponse'][]
}
6 changes: 6 additions & 0 deletions core/acs-reader/src/cache/item/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

export * from './base'
export * from './item'
export * from './paginatedItem'
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
// Copyright (c) 2025-2026 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

import { ACEvent, ACS_UPDATE_CONFIG, ACSState } from '../types'
import {
AbstractLedgerProvider,
Ops,
} from '@canton-network/core-provider-ledger'
import { LRUCacheOptions } from 'typescript-lru-cache'
import { ACEvent, ACS_UPDATE_CONFIG, ACSState } from '../../types'
import { Ops } from '@canton-network/core-provider-ledger'
import { LedgerCommonSchemas } from '@canton-network/core-ledger-client-types'
import pino from 'pino'
import {
ResolvedAcsOptions,
AcsService,
buildActiveContractFilter,
} from '../service'
import { ResolvedAcsOptions, buildActiveContractFilter } from '../../service'
import { ContractId } from '@canton-network/core-types'
import { BaseACSCache, isCreatedEvent, logger } from './base'

export type ACSCacheOptions = Pick<
LRUCacheOptions<string, ACSCache>,
'maxSize' | 'entryExpirationTimeInMS'
>

const logger = pino({ name: 'acs-reader/cache' })

export class ACSCache {
private readonly state: ACSState = {
export class ACSCache extends BaseACSCache {
protected readonly state: ACSState = {
initial: {
offset: 0,
acs: [],
Expand All @@ -35,11 +20,6 @@ export class ACSCache {
},
archivedACs: new Set(),
}
private readonly service: AcsService

constructor(private readonly ledger: AbstractLedgerProvider) {
this.service = new AcsService(ledger)
}

/**
* Returns the initial snapshot of the active contract set.
Expand All @@ -57,11 +37,6 @@ export class ACSCache {
return this.state.updates
}

/**
* Updates the cache to include ledger changes up to the specified offset.
* Fetches and applies incremental updates from the ledger, initializing the cache if needed.
* Automatically prunes old events when the update buffer exceeds configured thresholds.
*/
public async update(options: ResolvedAcsOptions) {
if (!this.initial.acs.length || this.initial.offset > options.offset) {
await this.initState(options)
Expand All @@ -78,7 +53,9 @@ export class ACSCache {
},
})

// in practise length should never be > maxUpdatesToFetch only equal (server should never return more than limit in query). This is just a safeguard.
/**
* in practice length should never be > maxUpdatesToFetch only equal (server should never return more than limit in query). This is just a safeguard.
*/
if (updates.length >= ACS_UPDATE_CONFIG.maxUpdatesToFetch)
void this.update(options)

Expand All @@ -97,11 +74,6 @@ export class ACSCache {
}
}

/**
* Calculates the active contract set at a specific ledger offset.
* Applies cached updates to the initial snapshot and filters out archived contracts.
* Throws an error if the cache is not initialized or the requested offset is too old.
*/
public calculateAt(offset: number) {
if (!this.initial.acs)
throw Error('No ACS initialized. Call `.update()` first')
Expand Down Expand Up @@ -299,14 +271,3 @@ export class ACSCache {
return { newEvents, newOffset }
}
}

/**
* Checks if an event represents a contract creation.
* Used to distinguish between created and archived events when processing cache updates.
*/
function isCreatedEvent(event: ACEvent): event is ACEvent & {
archived: true
event: LedgerCommonSchemas['CreatedEvent']
} {
return !event.archived
}
Loading