Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 135 additions & 12 deletions src/application/services/PortfolioAggregationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { IIntegrationRepository } from '../../contracts/repositories/IInteg
import type { IPortfolioRepository } from '../../contracts/repositories/IPortfolioRepository';
import type { IAssetValuatorRepository } from '../../contracts/repositories/IAssetValuatorRepository';
import { IntegrationSource } from '../../shared/types';
import type { DeFiPosition } from '../../shared/types';

export interface AggregationOptions {
sources?: IntegrationSource[];
Expand All @@ -12,6 +13,12 @@ export interface AggregationOptions {
forceRefresh?: boolean;
}

export interface DeFiAggregationResult {
positions: DeFiPosition[];
failedSources: { source: IntegrationSource; error: string }[];
receiptTokenAddresses: Set<string>;
}

export class PortfolioAggregationService {
private integrations: Map<IntegrationSource, IIntegrationRepository>;
private portfolioRepository: IPortfolioRepository;
Expand All @@ -29,7 +36,7 @@ export class PortfolioAggregationService {

async aggregatePortfolio(options: AggregationOptions): Promise<PortfolioAggregate> {
const portfolioId = this.generatePortfolioId(options.userId);

// Check cache if not forcing refresh
if (!options.forceRefresh) {
const cached = await this.portfolioRepository.findById(portfolioId);
Expand All @@ -47,36 +54,53 @@ export class PortfolioAggregationService {
// Determine which sources to use
const sourcesToFetch = options.sources || Array.from(this.integrations.keys());

// Fetch from all sources in parallel
const fetchPromises = sourcesToFetch.map(source =>
// Scatter: Fetch assets and DeFi positions from all sources in parallel
const assetFetchPromises = sourcesToFetch.map(source =>
this.fetchFromSource(source, options.addresses)
);
const defiFetchPromises = sourcesToFetch.map(source =>
this.fetchDeFiFromSource(source, options.addresses)
);

try {
const results = await Promise.allSettled(fetchPromises);

// Process successful fetches
for (let i = 0; i < results.length; i++) {
const result = results[i];
const [assetResults, defiResults] = await Promise.all([
Promise.allSettled(assetFetchPromises),
Promise.allSettled(defiFetchPromises)
]);

// Gather: Process asset results
for (let i = 0; i < assetResults.length; i++) {
const result = assetResults[i];
const source = sourcesToFetch[i];

if (result.status === 'fulfilled' && result.value) {
const assets = result.value;
portfolio.addSource(source);

for (const asset of assets) {
portfolio.addAsset(asset);
}
} else if (result.status === 'rejected') {
console.error(`Failed to fetch from ${source}:`, result.reason);
console.error(`Failed to fetch assets from ${source}:`, result.reason);
}
}

// Gather: Process DeFi results with deduplication
const defiAggregation = this.processDeFiResults(defiResults, sourcesToFetch);

for (const position of defiAggregation.positions) {
portfolio.addDeFiPosition(position);
}

// Coordinate: filter receipt tokens to prevent double-counting
portfolio.filterReceiptTokens(defiAggregation.receiptTokenAddresses);

// Reconcile duplicates
portfolio.reconcile();

// Enrich with prices
// Enrich with prices (assets + DeFi underlying assets)
await this.enrichWithPrices(portfolio);
await this.enrichDeFiWithPrices(portfolio);

// Save to repository
await this.portfolioRepository.save(portfolio);
Expand Down Expand Up @@ -152,6 +176,105 @@ export class PortfolioAggregationService {
return [...new Set(result)];
}

private async fetchDeFiFromSource(
source: IntegrationSource,
addresses: Map<string, string[]>
): Promise<DeFiPosition[]> {
const integration = this.integrations.get(source);
if (!integration || !integration.getDeFiPositions) {
return [];
}

if (!integration.isConnected()) {
await integration.connect();
}

const relevantAddresses = this.getRelevantAddresses(source, addresses);
if (relevantAddresses.length === 0) {
return [];
}

return integration.getDeFiPositions(relevantAddresses);
}

private processDeFiResults(
results: PromiseSettledResult<DeFiPosition[]>[],
sources: IntegrationSource[]
): DeFiAggregationResult {
const seen = new Map<string, DeFiPosition>();
const failedSources: { source: IntegrationSource; error: string }[] = [];
const receiptTokenAddresses = new Set<string>();

for (let i = 0; i < results.length; i++) {
const result = results[i];
const source = sources[i];

if (result.status === 'fulfilled' && result.value) {
for (const position of result.value) {
// Deduplicate by deduplicationKey — first-seen wins
if (!seen.has(position.deduplicationKey)) {
seen.set(position.deduplicationKey, position);
}
// Collect receipt token addresses
if (position.receiptTokenAddress) {
receiptTokenAddresses.add(position.receiptTokenAddress.toLowerCase());
}
}
} else if (result.status === 'rejected') {
console.error(`Failed to fetch DeFi from ${source}:`, result.reason);
failedSources.push({
source,
error: result.reason instanceof Error ? result.reason.message : String(result.reason)
});
}
}

return {
positions: Array.from(seen.values()),
failedSources,
receiptTokenAddresses
};
}

private async enrichDeFiWithPrices(portfolio: PortfolioAggregate): Promise<void> {
const positions = portfolio.defiPositions;
const symbols = new Set<string>();

for (const position of positions) {
for (const underlying of position.underlyingAssets) {
symbols.add(underlying.symbol);
}
}

if (symbols.size === 0) return;

try {
const prices = await this.assetValuator.getBatchPrices([...symbols]);

for (const position of positions) {
if (position.value) continue; // Already priced

let totalValue = 0;
for (const underlying of position.underlyingAssets) {
const price = prices.get(underlying.symbol);
if (price) {
totalValue += underlying.amount * price.value;
}
}

if (totalValue > 0) {
position.value = {
value: totalValue,
currency: 'USD',
timestamp: new Date()
};
}
}
} catch (error) {
console.error('Failed to enrich DeFi with prices:', error);
}
}

private async enrichWithPrices(portfolio: PortfolioAggregate): Promise<void> {
const assets = portfolio.assets;
const symbols = [...new Set(assets.map(a => a.symbol))];
Expand Down
9 changes: 5 additions & 4 deletions src/contracts/repositories/IIntegrationRepository.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import type { Asset, Portfolio, Transaction } from '../../shared/types';
import type { Asset, Portfolio, Transaction, DeFiPosition } from '../../shared/types';
import { IntegrationSource } from '../../shared/types';

export interface IIntegrationRepository {
source: IntegrationSource;

connect(): Promise<void>;
disconnect(): Promise<void>;
isConnected(): boolean;

fetchPortfolio(addresses: string[]): Promise<Portfolio>;
fetchAssets(addresses: string[]): Promise<Asset[]>;
fetchTransactions(addresses: string[], limit?: number): Promise<Transaction[]>;

getDeFiPositions?(addresses: string[]): Promise<DeFiPosition[]>;

subscribeToUpdates?(addresses: string[], callback: (update: Portfolio) => void): () => void;
}
80 changes: 69 additions & 11 deletions src/domain/aggregates/Portfolio.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import { IntegrationSource } from '../../shared/types';
import type { Price } from '../../shared/types';
import { IntegrationSource, DeFiPositionType } from '../../shared/types';
import type { Price, DeFiPosition } from '../../shared/types';
import { AssetEntity } from '../entities/Asset';
import { Money } from '../value-objects/Money';

export class PortfolioAggregate {
private _id: string;
private _userId?: string;
private _assets: Map<string, AssetEntity>;
private _defiPositions: Map<string, DeFiPosition>;
private _sources: Set<IntegrationSource>;
private _lastUpdated: Date;

constructor(params: {
id: string;
userId?: string;
assets?: AssetEntity[];
defiPositions?: DeFiPosition[];
sources?: IntegrationSource[];
lastUpdated?: Date;
}) {
this._id = params.id;
this._userId = params.userId;
this._assets = new Map();
this._defiPositions = new Map();
this._sources = new Set(params.sources || []);
this._lastUpdated = params.lastUpdated || new Date();

Expand All @@ -28,6 +31,12 @@ export class PortfolioAggregate {
this.addAsset(asset);
});
}

if (params.defiPositions) {
params.defiPositions.forEach(position => {
this.addDeFiPosition(position);
});
}
}

get id(): string {
Expand Down Expand Up @@ -69,6 +78,33 @@ export class PortfolioAggregate {
}
}

get defiPositions(): DeFiPosition[] {
return Array.from(this._defiPositions.values());
}

addDeFiPosition(position: DeFiPosition): void {
const existing = this._defiPositions.get(position.deduplicationKey);
if (!existing) {
this._defiPositions.set(position.deduplicationKey, position);
}
// Duplicate by deduplicationKey — first-seen wins
this._lastUpdated = new Date();
}

removeDeFiPosition(deduplicationKey: string): void {
if (this._defiPositions.delete(deduplicationKey)) {
this._lastUpdated = new Date();
}
}

filterReceiptTokens(receiptTokenAddresses: Set<string>): void {
for (const [id, asset] of this._assets) {
if (asset.contractAddress && receiptTokenAddresses.has(asset.contractAddress.toLowerCase())) {
this._assets.delete(id);
}
}
}

addSource(source: IntegrationSource): void {
this._sources.add(source);
this._lastUpdated = new Date();
Expand All @@ -84,16 +120,31 @@ export class PortfolioAggregate {
}

getTotalValue(currency: string = 'USD'): Money {
let total = new Money(0, currency);
let totalAmount = 0;

for (const asset of this._assets.values()) {
const value = asset.getValue();
if (value && value.currency === currency) {
total = total.add(value);
totalAmount += value.amount;
}
}

return total;

totalAmount += this.getDeFiNetValue(currency);

return new Money(Math.max(0, totalAmount), currency);
}

getDeFiNetValue(currency: string = 'USD'): number {
let net = 0;
for (const position of this._defiPositions.values()) {
if (!position.value || position.value.currency !== currency) continue;
if (position.type === DeFiPositionType.LENDING_BORROW) {
net -= position.value.value;
} else {
net += position.value.value;
}
}
return net;
}

getAssetsByChain(chain: string): AssetEntity[] {
Expand All @@ -108,11 +159,15 @@ export class PortfolioAggregate {
for (const asset of other.assets) {
this.addAsset(asset);
}


for (const position of other.defiPositions) {
this.addDeFiPosition(position);
}

for (const source of other.sources) {
this.addSource(source);
}

this._lastUpdated = new Date();
}

Expand Down Expand Up @@ -142,22 +197,25 @@ export class PortfolioAggregate {
}

isEmpty(): boolean {
return this._assets.size === 0;
return this._assets.size === 0 && this._defiPositions.size === 0;
}

clear(): void {
this._assets.clear();
this._defiPositions.clear();
this._sources.clear();
this._lastUpdated = new Date();
}

toJSON() {
const totalValue = this.getTotalValue();

return {
id: this._id,
userId: this._userId,
assets: this.assets.map(a => a.toJSON()),
defiPositions: this.defiPositions,
defiNetValue: this.getDeFiNetValue(),
totalValue: {
value: totalValue.amount,
currency: totalValue.currency,
Expand Down
Loading