diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/UserDetailAssetsSnapshot.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/UserDetailAssetsSnapshot.kt new file mode 100644 index 000000000..1b66fac00 --- /dev/null +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/inout/UserDetailAssetsSnapshot.kt @@ -0,0 +1,17 @@ +package co.nilin.opex.api.core.inout + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class UserDetailAssetsSnapshot( + val uuid: String, + val currencySnapshots: List, + val totalAmount: BigDecimal, + val quoteCurrency: String, + val snapshotDate: LocalDateTime, +) + +data class CurrencyAssetsSnapshot( + val currency: String, + val volume: BigDecimal, +) \ No newline at end of file diff --git a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt index d65ac072e..e07054d7f 100644 --- a/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt +++ b/api/api-core/src/main/kotlin/co/nilin/opex/api/core/spi/WalletProxy.kt @@ -2,6 +2,7 @@ package co.nilin.opex.api.core.spi import co.nilin.opex.api.core.inout.* import co.nilin.opex.api.core.inout.analytics.DailyAmount +import org.springframework.security.core.userdetails.UserDetails import java.math.BigDecimal interface WalletProxy { @@ -204,4 +205,5 @@ interface WalletProxy { suspend fun reserveSwap(token: String, request: TransferReserveRequest) :ReservedTransferResponse suspend fun finalizeSwap(token: String,reserveUuid: String,description: String?,transferRef: String?) : TransferResult suspend fun getGatewayTerminal(gatewayUuid: String):List + suspend fun getUsersDetailAssets(limit: Int, offset: Int): List } \ No newline at end of file diff --git a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt index ab2ce407d..326ac702a 100644 --- a/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt +++ b/api/api-ports/api-binance-rest/src/main/kotlin/co/nilin/opex/api/ports/binance/config/SecurityConfig.kt @@ -58,6 +58,7 @@ class SecurityConfig( .pathMatchers(HttpMethod.PUT, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write") .pathMatchers("/opex/v1/voucher").hasAuthority("PERM_voucher:submit") .pathMatchers("/opex/v1/market/**").permitAll() + .pathMatchers("/opex/v1/analytics/users-detail-assets").permitAll() .pathMatchers(HttpMethod.GET, "/opex/v1/market/chain").permitAll() .pathMatchers(HttpMethod.POST, "/v1/api-key").authenticated() .pathMatchers("/v1/api-key").hasAuthority("ROLE_admin") diff --git a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt index 1451528d7..af11c880f 100644 --- a/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt +++ b/api/api-ports/api-opex-rest/src/main/kotlin/co/nilin/opex/api/ports/opex/controller/UserAnalyticsController.kt @@ -1,6 +1,8 @@ package co.nilin.opex.api.ports.opex.controller +import co.nilin.opex.api.core.inout.UserDetailAssetsSnapshot import co.nilin.opex.api.core.inout.analytics.ActivityTotals +import co.nilin.opex.api.core.spi.WalletProxy import co.nilin.opex.api.ports.opex.service.UserActivityAggregationService import co.nilin.opex.api.ports.opex.util.jwtAuthentication import co.nilin.opex.api.ports.opex.util.tokenValue @@ -8,19 +10,27 @@ import org.springframework.security.core.annotation.CurrentSecurityContext import org.springframework.security.core.context.SecurityContext import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.RestController -import java.math.BigDecimal -import java.math.RoundingMode -import java.time.* -import kotlin.random.Random @RestController @RequestMapping("/opex/v1/analytics") -class UserAnalyticsController(private val userActivityAggregationService: UserActivityAggregationService) { +class UserAnalyticsController( + private val userActivityAggregationService: UserActivityAggregationService, + val walletProxy: WalletProxy, +) { @GetMapping("/user-activity") suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map { - val auth=securityContext.jwtAuthentication() - return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(),auth.name) + val auth = securityContext.jwtAuthentication() + return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(), auth.name) + } + + @GetMapping("/users-detail-assets") + suspend fun getUserDetailsAssets( + @RequestParam limit: Int?, + @RequestParam offset: Int?, + ): List { + return walletProxy.getUsersDetailAssets(limit ?: 10, offset ?: 0) } } diff --git a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt index 592210a88..1d06776ea 100644 --- a/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt +++ b/api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/WalletProxyImpl.kt @@ -785,5 +785,24 @@ class WalletProxyImpl(@Qualifier("generalWebClient") private val webClient: WebC .awaitFirstOrElse { throw OpexError.WithdrawNotFound.exception() } } + + override suspend fun getUsersDetailAssets( + limit: Int, + offset: Int + ): List { + return withContext(ProxyDispatchers.wallet) { + webClient.get() + .uri("$baseUrl/stats/detail-assets") { + it.queryParam("limit", limit) + it.queryParam("offset", offset) + it.build() + }.accept(MediaType.APPLICATION_JSON) + .retrieve() + .onStatus({ t -> t.isError }, { it.createException() }) + .bodyToFlux() + .collectList() + .awaitFirstOrElse { emptyList() } + } + } } diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt index ce6d27821..bcdeebcc1 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/config/SecurityConfig.kt @@ -47,6 +47,7 @@ class SecurityConfig(private val webClient: WebClient) { .pathMatchers("/admin/v1/swap/history").hasAnyAuthority("ROLE_monitoring", "ROLE_admin") .pathMatchers("/admin/**").hasAuthority("ROLE_admin") .pathMatchers("/stats/total-assets/**").permitAll() + .pathMatchers("/stats/detail-assets/**").permitAll() .pathMatchers(HttpMethod.GET, "/currency/**").permitAll() .pathMatchers("/actuator/**").permitAll() .pathMatchers("/storage/**").hasAuthority("ROLE_admin") diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt index 6af632f02..ffe67e8b5 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/controller/WalletStatController.kt @@ -6,8 +6,9 @@ import co.nilin.opex.wallet.core.inout.WalletData import co.nilin.opex.wallet.core.inout.WalletDataResponse import co.nilin.opex.wallet.core.inout.WalletTotal import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot +import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshot import co.nilin.opex.wallet.core.model.WalletType -import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager +import co.nilin.opex.wallet.core.spi.UserAssetsSnapshotManager import co.nilin.opex.wallet.core.spi.WalletDataManager import org.springframework.security.core.annotation.CurrentSecurityContext import org.springframework.security.core.context.SecurityContext @@ -17,7 +18,7 @@ import org.springframework.web.bind.annotation.* @RequestMapping("/stats") class WalletStatController( private val walletDataManager: WalletDataManager, - private val totalAssetsSnapshotManager: TotalAssetsSnapshotManager + private val totalAssetsSnapshotManager: UserAssetsSnapshotManager ) { @GetMapping("/wallets") @@ -57,7 +58,15 @@ class WalletStatController( suspend fun getWalletTotalAssetsSnapshot( @PathVariable uuid: String, ): TotalAssetsSnapshot? { - return totalAssetsSnapshotManager.getUserLastSnapshot(uuid) + return totalAssetsSnapshotManager.getUserLastTotalAssetsSnapshot(uuid) + } + + @GetMapping("/detail-assets") + suspend fun getWalletsDetailAssetsSnapshot( + @RequestParam limit: Int?, + @RequestParam offset: Int? + ): List { + return totalAssetsSnapshotManager.getUsersLastDetailAssetsSnapshot(getValidLimit(limit), offset ?: 0) } @GetMapping("/balance/{userId}") @@ -72,5 +81,12 @@ class WalletStatController( ) } + private fun getValidLimit(limit: Int?): Int = when { + limit == null -> 10 + limit > 100 -> 100 + limit < 1 -> 1 + else -> limit + } + } diff --git a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/WalletSnapshotService.kt b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/WalletSnapshotService.kt index 6fc7bae67..3864c22e3 100644 --- a/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/WalletSnapshotService.kt +++ b/wallet/wallet-app/src/main/kotlin/co/nilin/opex/wallet/app/service/WalletSnapshotService.kt @@ -1,8 +1,9 @@ package co.nilin.opex.wallet.app.service import co.nilin.opex.wallet.app.service.otc.GraphService -import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager +import co.nilin.opex.wallet.core.spi.UserAssetsSnapshotManager import co.nilin.opex.wallet.ports.postgres.dao.PriceRepository +import co.nilin.opex.wallet.ports.postgres.util.RedisCacheHelper import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.runBlocking import org.springframework.beans.factory.annotation.Value @@ -12,18 +13,28 @@ import java.math.BigDecimal @Service class WalletSnapshotService( - private val totalAssetsSnapshotManager: TotalAssetsSnapshotManager, + private val userAssetsSnapshotManager: UserAssetsSnapshotManager, private val graphService: GraphService, private val priceRepository: PriceRepository, + private val redisCacheHelper: RedisCacheHelper, @Value("\${app.snapshot-currency}") private val snapshotCurrency: String ) { @Scheduled(cron = "0 0 0 * * ?", zone = "GMT" + "\${app.zone-offset}") - fun createSnapshots() { + fun createTotalAssetsSnapshot() { runBlocking { updatePrices() - totalAssetsSnapshotManager.createSnapshot() + userAssetsSnapshotManager.createTotalAssetsSnapshot() + } + } + + @Scheduled(cron = "0 40 16 * * ?", zone = "GMT" + "\${app.zone-offset}") + fun createDetailAssetsSnapshot() { + runBlocking { + updatePrices() + userAssetsSnapshotManager.createDetailAssetsSnapshot() + redisCacheHelper.evictWithPrefix("users-detail-assets:") } } diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/UserDetailAssetsSnapshot.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/UserDetailAssetsSnapshot.kt new file mode 100644 index 000000000..39fb2fd09 --- /dev/null +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/model/UserDetailAssetsSnapshot.kt @@ -0,0 +1,25 @@ +package co.nilin.opex.wallet.core.model + +import java.math.BigDecimal +import java.time.LocalDateTime + +data class UserDetailAssetsSnapshotRaw( + val uuid: String, + val currencySnapshots: String, + val totalAmount: BigDecimal, + val quoteCurrency: String, + val snapshotDate: LocalDateTime +) + +data class UserDetailAssetsSnapshot( + val uuid: String, + val currencySnapshots: List, + val totalAmount: BigDecimal, + val quoteCurrency: String, + val snapshotDate: LocalDateTime, +) + +data class CurrencyAssetsSnapshot( + val currency: String, + val volume: BigDecimal, +) \ No newline at end of file diff --git a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/UserAssetsSnapshotManager.kt similarity index 52% rename from wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt rename to wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/UserAssetsSnapshotManager.kt index afcc58233..fb58ff83f 100644 --- a/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/TotalAssetsSnapshotManager.kt +++ b/wallet/wallet-core/src/main/kotlin/co/nilin/opex/wallet/core/spi/UserAssetsSnapshotManager.kt @@ -1,14 +1,22 @@ package co.nilin.opex.wallet.core.spi import co.nilin.opex.wallet.core.inout.DailyAmount +import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshot import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot import java.time.LocalDate -interface TotalAssetsSnapshotManager { - suspend fun createSnapshot() - suspend fun getUserLastSnapshot( +interface UserAssetsSnapshotManager { + suspend fun createTotalAssetsSnapshot() + suspend fun createDetailAssetsSnapshot() + suspend fun getUserLastTotalAssetsSnapshot( uuid: String ): TotalAssetsSnapshot? + + suspend fun getUsersLastDetailAssetsSnapshot( + limit: Int, + offset: Int, + ): List + suspend fun getLastDaysBalance( userId: String, startDate: LocalDate?, diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/DetailAssetsSnapshotRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/DetailAssetsSnapshotRepository.kt new file mode 100644 index 000000000..d090ad51b --- /dev/null +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/DetailAssetsSnapshotRepository.kt @@ -0,0 +1,72 @@ +package co.nilin.opex.wallet.ports.postgres.dao + +import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshotRaw +import co.nilin.opex.wallet.ports.postgres.model.DetailAssetsSnapshotModel +import org.springframework.data.r2dbc.repository.Modifying +import org.springframework.data.r2dbc.repository.Query +import org.springframework.data.repository.reactive.ReactiveCrudRepository +import org.springframework.stereotype.Repository +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono + +@Repository +interface DetailAssetsSnapshotRepository : ReactiveCrudRepository { + + @Modifying + @Query( + """ + INSERT INTO detail_assets_snapshot(uuid, currency, volume, total_amount, quote_currency, snapshot_date,batch_number) + SELECT wo.uuid, + w.currency, + SUM(w.balance) AS volume, + trunc(SUM( + CASE + WHEN w.currency = :quoteCurrency THEN w.balance + ELSE w.balance * COALESCE(p.price, 0) + END + ), :precision) AS total_amount, + :quoteCurrency AS quote_currency, + NOW() AS snapshot_date, + COALESCE((SELECT MAX(batch_number) FROM detail_assets_snapshot), 0) + 1 AS batch_number + FROM wallet w + INNER JOIN public.wallet_owner wo ON wo.id = w.owner + LEFT JOIN price p + ON w.currency = p.base_currency + AND p.quote_currency = :quoteCurrency + WHERE w.balance > 0 + GROUP BY wo.uuid, w.currency + """ + ) + fun createDetailSnapshotsDirectly( + quoteCurrency: String, + precision: Int + ): Mono + + + @Query( + """ + SELECT t.uuid AS uuid, + JSON_AGG( + JSON_BUILD_OBJECT( + 'currency', t.currency, + 'volume', trim_scale(trunc(t.volume, c.precision::int)) + ) + ) AS currency_snapshots, + SUM(t.total_amount) AS total_amount, + t.quote_currency AS quote_currency, + t.snapshot_date AS snapshot_date + FROM detail_assets_snapshot t + INNER JOIN public.wallet_owner wo ON wo.uuid = t.uuid + INNER JOIN public.currency c ON c.symbol = t.currency + WHERE t.batch_number = (SELECT MAX(batch_number)FROM detail_assets_snapshot) AND wo.level = '1' + GROUP BY t.uuid, t.quote_currency, t.snapshot_date + ORDER BY total_amount DESC + limit :limit + offset :offset + """ + ) + fun findAllLatestSnapshots( + limit: Int, + offset: Int, + ): Flux +} \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt index 1b7259ff3..e0c2ed081 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/dao/TotalAssetsSnapshotRepository.kt @@ -30,8 +30,7 @@ interface TotalAssetsSnapshotRepository : ReactiveCrudRepository 0 + WHERE w.balance > 0 GROUP BY wo.uuid """ ) diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt deleted file mode 100644 index 9358ea5a3..000000000 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/TotalAssetsSnapshotImpl.kt +++ /dev/null @@ -1,69 +0,0 @@ -package co.nilin.opex.wallet.ports.postgres.impl - -import co.nilin.opex.common.OpexError -import co.nilin.opex.wallet.core.inout.DailyAmount -import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot -import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager -import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 -import co.nilin.opex.wallet.ports.postgres.dao.TotalAssetsSnapshotRepository -import co.nilin.opex.wallet.ports.postgres.util.toTotalAssetsSnapshot -import kotlinx.coroutines.reactive.awaitFirstOrNull -import kotlinx.coroutines.reactor.awaitSingle -import org.slf4j.LoggerFactory -import org.springframework.beans.factory.annotation.Value -import org.springframework.stereotype.Service -import java.time.LocalDate -import java.time.ZoneOffset - -@Service -class TotalAssetsSnapshotImpl( - private val currencyRepository: CurrencyRepositoryV2, - private val totalAssetsSnapshotRepository: TotalAssetsSnapshotRepository, - @Value("\${app.snapshot-currency}") - private val snapshotCurrency: String, - @Value("\${app.zone-offset}") private val zoneOffsetString: String -) : TotalAssetsSnapshotManager { - - private val logger = LoggerFactory.getLogger(TotalAssetsSnapshotImpl::class.java) - - override suspend fun createSnapshot() { - val start = System.currentTimeMillis() - logger.info("Starting snapshot creation...") - val currency = currencyRepository.fetchCurrency(symbol = snapshotCurrency)?.awaitFirstOrNull() - ?: throw OpexError.CurrencyNotFound.exception() - totalAssetsSnapshotRepository.createSnapshotsDirectly(currency.symbol, currency.precision.toInt()) - .awaitFirstOrNull() - - val end = System.currentTimeMillis() - logger.info("Snapshot creation finished in {} ms", (end - start)) - } - - override suspend fun getUserLastSnapshot( - uuid: String - ): TotalAssetsSnapshot? { - return totalAssetsSnapshotRepository.findLastSnapshotByUuid(uuid).awaitFirstOrNull()?.toTotalAssetsSnapshot() - } - - override suspend fun getLastDaysBalance( - userId: String, - startDate: LocalDate?, - quatCurrency: String?, - lastDays: Long - ): List { - - val startDate = startDate ?: LocalDate - .now(ZoneOffset.of(zoneOffsetString)) - .minusDays(lastDays) - - return totalAssetsSnapshotRepository.findDailyBalance(userId, startDate, quatCurrency ?: snapshotCurrency) - .map { - DailyAmount( - date = it.date, - totalAmount = it.totalAmount - ) - } - .collectList() - .awaitSingle() - } - -} diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/UserAssetsSnapshotImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/UserAssetsSnapshotImpl.kt new file mode 100644 index 000000000..47247988a --- /dev/null +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/UserAssetsSnapshotImpl.kt @@ -0,0 +1,117 @@ +package co.nilin.opex.wallet.ports.postgres.impl + +import co.nilin.opex.common.OpexError +import co.nilin.opex.wallet.core.inout.DailyAmount +import co.nilin.opex.wallet.core.model.CurrencyAssetsSnapshot +import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot +import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshot +import co.nilin.opex.wallet.core.spi.UserAssetsSnapshotManager +import co.nilin.opex.wallet.ports.postgres.dao.CurrencyRepositoryV2 +import co.nilin.opex.wallet.ports.postgres.dao.DetailAssetsSnapshotRepository +import co.nilin.opex.wallet.ports.postgres.dao.TotalAssetsSnapshotRepository +import co.nilin.opex.wallet.ports.postgres.util.RedisCacheHelper +import co.nilin.opex.wallet.ports.postgres.util.toTotalAssetsSnapshot +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import kotlinx.coroutines.reactive.awaitFirstOrElse +import kotlinx.coroutines.reactive.awaitFirstOrNull +import kotlinx.coroutines.reactor.awaitSingle +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value +import org.springframework.stereotype.Service +import java.time.LocalDate +import java.time.ZoneOffset + +@Service +class UserAssetsSnapshotImpl( + private val currencyRepository: CurrencyRepositoryV2, + private val totalAssetsSnapshotRepository: TotalAssetsSnapshotRepository, + private val detailAssetsSnapshotRepository: DetailAssetsSnapshotRepository, + private val objectMapper: ObjectMapper, + private val redisCacheHelper: RedisCacheHelper, + @Value("\${app.snapshot-currency}") + private val snapshotCurrency: String, + @Value("\${app.zone-offset}") private val zoneOffsetString: String +) : UserAssetsSnapshotManager { + + private val logger = LoggerFactory.getLogger(UserAssetsSnapshotImpl::class.java) + + override suspend fun createTotalAssetsSnapshot() { + val start = System.currentTimeMillis() + logger.info("Starting total assets snapshot creation...") + val currency = currencyRepository.fetchCurrency(symbol = snapshotCurrency)?.awaitFirstOrNull() + ?: throw OpexError.CurrencyNotFound.exception() + totalAssetsSnapshotRepository.createSnapshotsDirectly(currency.symbol, currency.precision.toInt()) + .awaitFirstOrNull() + val end = System.currentTimeMillis() + logger.info("Total assets snapshot creation finished in {} ms", (end - start)) + } + + override suspend fun createDetailAssetsSnapshot() { + val start = System.currentTimeMillis() + logger.info("Starting detail assets snapshot creation...") + val currency = currencyRepository.fetchCurrency(symbol = snapshotCurrency)?.awaitFirstOrNull() + ?: throw OpexError.CurrencyNotFound.exception() + detailAssetsSnapshotRepository + .createDetailSnapshotsDirectly(currency.symbol, currency.precision.toInt()) + .awaitFirstOrNull() + val end = System.currentTimeMillis() + logger.info("Detail assets snapshot creation finished in {} ms", (end - start)) + } + + override suspend fun getUserLastTotalAssetsSnapshot( + uuid: String + ): TotalAssetsSnapshot? { + return totalAssetsSnapshotRepository.findLastSnapshotByUuid(uuid).awaitFirstOrNull()?.toTotalAssetsSnapshot() + } + + override suspend fun getUsersLastDetailAssetsSnapshot( + limit: Int, + offset: Int + ): List { + val key = "users-detail-assets:$limit-$offset" + redisCacheHelper.get>(key)?.let { + return it + } + val result = detailAssetsSnapshotRepository + .findAllLatestSnapshots(limit, offset) + .map { raw -> + UserDetailAssetsSnapshot( + uuid = raw.uuid.substringAfterLast('-'), + currencySnapshots = objectMapper.readValue( + raw.currencySnapshots, + object : TypeReference>() {} + ), + totalAmount = raw.totalAmount, + quoteCurrency = raw.quoteCurrency, + snapshotDate = raw.snapshotDate + ) + } + .collectList() + .awaitFirstOrElse { emptyList() } + redisCacheHelper.put(key, result) + return result + } + + override suspend fun getLastDaysBalance( + userId: String, + startDate: LocalDate?, + quatCurrency: String?, + lastDays: Long + ): List { + + val startDate = startDate ?: LocalDate + .now(ZoneOffset.of(zoneOffsetString)) + .minusDays(lastDays) + + return totalAssetsSnapshotRepository.findDailyBalance(userId, startDate, quatCurrency ?: snapshotCurrency) + .map { + DailyAmount( + date = it.date, + totalAmount = it.totalAmount + ) + } + .collectList() + .awaitSingle() + } +} diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt index 96fc114ce..6151ad6b0 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/impl/WalletDataManagerImpl.kt @@ -22,7 +22,7 @@ import java.util.stream.Collectors @Component class WalletDataManagerImpl( private val walletRepository: WalletRepository, - private val totalAssetsSnapshotImpl: TotalAssetsSnapshotImpl, + private val totalAssetsSnapshotImpl: UserAssetsSnapshotImpl, private val currencyRepositoryV2: CurrencyRepositoryV2, private val objectMapper: ObjectMapper, private val cacheManager: CacheManager, diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/DetailAssetsSnapshotModel.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/DetailAssetsSnapshotModel.kt new file mode 100644 index 000000000..a49c62402 --- /dev/null +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/model/DetailAssetsSnapshotModel.kt @@ -0,0 +1,18 @@ +package co.nilin.opex.wallet.ports.postgres.model + +import org.springframework.data.annotation.Id +import org.springframework.data.relational.core.mapping.Table +import java.math.BigDecimal +import java.time.LocalDateTime + +@Table("detail_assets_snapshot") +data class DetailAssetsSnapshotModel( + @Id + val id: Long? = null, + val uuid: String, + val currency: String, + val volume: BigDecimal, + val totalAmount: BigDecimal, + val quoteCurrency: String, + val snapshotDate: LocalDateTime, +) \ No newline at end of file diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/util/RedisCacheHelper.kt b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/util/RedisCacheHelper.kt index 96eb58230..ca19cb435 100644 --- a/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/util/RedisCacheHelper.kt +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/kotlin/co/nilin/opex/wallet/ports/postgres/util/RedisCacheHelper.kt @@ -85,6 +85,20 @@ class RedisCacheHelper(private val redisTemplate: RedisTemplate) { } } + fun evictWithPrefix(prefix: String) { + try { + val keys = redisTemplate.keys("$prefix*") + if (!keys.isNullOrEmpty()) { + redisTemplate.delete(keys) + logger.info("Evicted ${keys.size} cache keys with prefix '$prefix'") + } else { + logger.info("No cache keys found with prefix '$prefix'") + } + } catch (e: Exception) { + logger.warn("Unable to evict cache with prefix '$prefix'", e) + } + } + fun setExpiration(key: String, interval: DynamicInterval) { try { redisTemplate.expireAt(key, interval.dateInFuture()) diff --git a/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/db/migration/V6__add_detail_assets_snapshot.sql b/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/db/migration/V6__add_detail_assets_snapshot.sql new file mode 100644 index 000000000..e449f1723 --- /dev/null +++ b/wallet/wallet-ports/wallet-persister-postgres/src/main/resources/db/migration/V6__add_detail_assets_snapshot.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS detail_assets_snapshot +( + id SERIAL PRIMARY KEY, + uuid VARCHAR(36) NOT NULL, + currency VARCHAR(50) NOT NULL REFERENCES currency (symbol), + volume DECIMAL NOT NULL, + total_amount DECIMAL NOT NULL, + quote_currency VARCHAR(50) NOT NULL REFERENCES currency (symbol), + snapshot_date TIMESTAMP NOT NULL, + batch_number INTEGER NOT NULL , + unique (uuid, currency, snapshot_date , quote_currency) +); \ No newline at end of file