Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package co.nilin.opex.api.core.inout

import java.math.BigDecimal
import java.time.LocalDateTime

data class UserDetailAssetsSnapshot(
val uuid: String,
val currencySnapshots: List<CurrencyAssetsSnapshot>,
val totalAmount: BigDecimal,
val quoteCurrency: String,
val snapshotDate: LocalDateTime,
)

data class CurrencyAssetsSnapshot(
val currency: String,
val volume: BigDecimal,
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.api.core.spi

import co.nilin.opex.api.core.inout.*
import co.nilin.opex.api.core.inout.analytics.DailyAmount
import org.springframework.security.core.userdetails.UserDetails
import java.math.BigDecimal

interface WalletProxy {
Expand Down Expand Up @@ -204,4 +205,5 @@ interface WalletProxy {
suspend fun reserveSwap(token: String, request: TransferReserveRequest) :ReservedTransferResponse
suspend fun finalizeSwap(token: String,reserveUuid: String,description: String?,transferRef: String?) : TransferResult
suspend fun getGatewayTerminal(gatewayUuid: String):List<TerminalCommand>
suspend fun getUsersDetailAssets(limit: Int, offset: Int): List<UserDetailAssetsSnapshot>
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SecurityConfig(
.pathMatchers(HttpMethod.PUT, "/opex/v1/withdraw").hasAuthority("PERM_withdraw:write")
.pathMatchers("/opex/v1/voucher").hasAuthority("PERM_voucher:submit")
.pathMatchers("/opex/v1/market/**").permitAll()
.pathMatchers("/opex/v1/analytics/users-detail-assets").permitAll()
.pathMatchers(HttpMethod.GET, "/opex/v1/market/chain").permitAll()
.pathMatchers(HttpMethod.POST, "/v1/api-key").authenticated()
.pathMatchers("/v1/api-key").hasAuthority("ROLE_admin")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
package co.nilin.opex.api.ports.opex.controller

import co.nilin.opex.api.core.inout.UserDetailAssetsSnapshot
import co.nilin.opex.api.core.inout.analytics.ActivityTotals
import co.nilin.opex.api.core.spi.WalletProxy
import co.nilin.opex.api.ports.opex.service.UserActivityAggregationService
import co.nilin.opex.api.ports.opex.util.jwtAuthentication
import co.nilin.opex.api.ports.opex.util.tokenValue
import org.springframework.security.core.annotation.CurrentSecurityContext
import org.springframework.security.core.context.SecurityContext
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import java.math.BigDecimal
import java.math.RoundingMode
import java.time.*
import kotlin.random.Random

@RestController
@RequestMapping("/opex/v1/analytics")
class UserAnalyticsController(private val userActivityAggregationService: UserActivityAggregationService) {
class UserAnalyticsController(
private val userActivityAggregationService: UserActivityAggregationService,
val walletProxy: WalletProxy,
) {

@GetMapping("/user-activity")
suspend fun userActivity(@CurrentSecurityContext securityContext: SecurityContext): Map<Long, ActivityTotals> {
val auth=securityContext.jwtAuthentication()
return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(),auth.name)
val auth = securityContext.jwtAuthentication()
return userActivityAggregationService.getLast31DaysUserStats(auth.tokenValue(), auth.name)
}

@GetMapping("/users-detail-assets")
suspend fun getUserDetailsAssets(
@RequestParam limit: Int?,
@RequestParam offset: Int?,
): List<UserDetailAssetsSnapshot> {
return walletProxy.getUsersDetailAssets(limit ?: 10, offset ?: 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -785,5 +785,24 @@ class WalletProxyImpl(@Qualifier("generalWebClient") private val webClient: WebC
.awaitFirstOrElse { throw OpexError.WithdrawNotFound.exception() }

}

override suspend fun getUsersDetailAssets(
limit: Int,
offset: Int
): List<UserDetailAssetsSnapshot> {
return withContext(ProxyDispatchers.wallet) {
webClient.get()
.uri("$baseUrl/stats/detail-assets") {
it.queryParam("limit", limit)
it.queryParam("offset", offset)
it.build()
}.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus({ t -> t.isError }, { it.createException() })
.bodyToFlux<UserDetailAssetsSnapshot>()
.collectList()
.awaitFirstOrElse { emptyList() }
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class SecurityConfig(private val webClient: WebClient) {
.pathMatchers("/admin/v1/swap/history").hasAnyAuthority("ROLE_monitoring", "ROLE_admin")
.pathMatchers("/admin/**").hasAuthority("ROLE_admin")
.pathMatchers("/stats/total-assets/**").permitAll()
.pathMatchers("/stats/detail-assets/**").permitAll()
.pathMatchers(HttpMethod.GET, "/currency/**").permitAll()
.pathMatchers("/actuator/**").permitAll()
.pathMatchers("/storage/**").hasAuthority("ROLE_admin")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import co.nilin.opex.wallet.core.inout.WalletData
import co.nilin.opex.wallet.core.inout.WalletDataResponse
import co.nilin.opex.wallet.core.inout.WalletTotal
import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot
import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshot
import co.nilin.opex.wallet.core.model.WalletType
import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager
import co.nilin.opex.wallet.core.spi.UserAssetsSnapshotManager
import co.nilin.opex.wallet.core.spi.WalletDataManager
import org.springframework.security.core.annotation.CurrentSecurityContext
import org.springframework.security.core.context.SecurityContext
Expand All @@ -17,7 +18,7 @@ import org.springframework.web.bind.annotation.*
@RequestMapping("/stats")
class WalletStatController(
private val walletDataManager: WalletDataManager,
private val totalAssetsSnapshotManager: TotalAssetsSnapshotManager
private val totalAssetsSnapshotManager: UserAssetsSnapshotManager
) {

@GetMapping("/wallets")
Expand Down Expand Up @@ -57,7 +58,15 @@ class WalletStatController(
suspend fun getWalletTotalAssetsSnapshot(
@PathVariable uuid: String,
): TotalAssetsSnapshot? {
return totalAssetsSnapshotManager.getUserLastSnapshot(uuid)
return totalAssetsSnapshotManager.getUserLastTotalAssetsSnapshot(uuid)
}

@GetMapping("/detail-assets")
suspend fun getWalletsDetailAssetsSnapshot(
@RequestParam limit: Int?,
@RequestParam offset: Int?
): List<UserDetailAssetsSnapshot> {
return totalAssetsSnapshotManager.getUsersLastDetailAssetsSnapshot(getValidLimit(limit), offset ?: 0)
}

@GetMapping("/balance/{userId}")
Expand All @@ -72,5 +81,12 @@ class WalletStatController(
)
}

private fun getValidLimit(limit: Int?): Int = when {
limit == null -> 10
limit > 100 -> 100
limit < 1 -> 1
else -> limit
}

}

Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package co.nilin.opex.wallet.app.service

import co.nilin.opex.wallet.app.service.otc.GraphService
import co.nilin.opex.wallet.core.spi.TotalAssetsSnapshotManager
import co.nilin.opex.wallet.core.spi.UserAssetsSnapshotManager
import co.nilin.opex.wallet.ports.postgres.dao.PriceRepository
import co.nilin.opex.wallet.ports.postgres.util.RedisCacheHelper
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Value
Expand All @@ -12,18 +13,28 @@ import java.math.BigDecimal

@Service
class WalletSnapshotService(
private val totalAssetsSnapshotManager: TotalAssetsSnapshotManager,
private val userAssetsSnapshotManager: UserAssetsSnapshotManager,
private val graphService: GraphService,
private val priceRepository: PriceRepository,
private val redisCacheHelper: RedisCacheHelper,
@Value("\${app.snapshot-currency}")
private val snapshotCurrency: String
) {

@Scheduled(cron = "0 0 0 * * ?", zone = "GMT" + "\${app.zone-offset}")
fun createSnapshots() {
fun createTotalAssetsSnapshot() {
runBlocking {
updatePrices()
totalAssetsSnapshotManager.createSnapshot()
userAssetsSnapshotManager.createTotalAssetsSnapshot()
}
}

@Scheduled(cron = "0 40 16 * * ?", zone = "GMT" + "\${app.zone-offset}")
fun createDetailAssetsSnapshot() {
runBlocking {
updatePrices()
userAssetsSnapshotManager.createDetailAssetsSnapshot()
redisCacheHelper.evictWithPrefix("users-detail-assets:")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package co.nilin.opex.wallet.core.model

import java.math.BigDecimal
import java.time.LocalDateTime

data class UserDetailAssetsSnapshotRaw(
val uuid: String,
val currencySnapshots: String,
val totalAmount: BigDecimal,
val quoteCurrency: String,
val snapshotDate: LocalDateTime
)

data class UserDetailAssetsSnapshot(
val uuid: String,
val currencySnapshots: List<CurrencyAssetsSnapshot>,
val totalAmount: BigDecimal,
val quoteCurrency: String,
val snapshotDate: LocalDateTime,
)

data class CurrencyAssetsSnapshot(
val currency: String,
val volume: BigDecimal,
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package co.nilin.opex.wallet.core.spi

import co.nilin.opex.wallet.core.inout.DailyAmount
import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshot
import co.nilin.opex.wallet.core.model.TotalAssetsSnapshot
import java.time.LocalDate

interface TotalAssetsSnapshotManager {
suspend fun createSnapshot()
suspend fun getUserLastSnapshot(
interface UserAssetsSnapshotManager {
suspend fun createTotalAssetsSnapshot()
suspend fun createDetailAssetsSnapshot()
suspend fun getUserLastTotalAssetsSnapshot(
uuid: String
): TotalAssetsSnapshot?

suspend fun getUsersLastDetailAssetsSnapshot(
limit: Int,
offset: Int,
): List<UserDetailAssetsSnapshot>

suspend fun getLastDaysBalance(
userId: String,
startDate: LocalDate?,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package co.nilin.opex.wallet.ports.postgres.dao

import co.nilin.opex.wallet.core.model.UserDetailAssetsSnapshotRaw
import co.nilin.opex.wallet.ports.postgres.model.DetailAssetsSnapshotModel
import org.springframework.data.r2dbc.repository.Modifying
import org.springframework.data.r2dbc.repository.Query
import org.springframework.data.repository.reactive.ReactiveCrudRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@Repository
interface DetailAssetsSnapshotRepository : ReactiveCrudRepository<DetailAssetsSnapshotModel, Long> {

@Modifying
@Query(
"""
INSERT INTO detail_assets_snapshot(uuid, currency, volume, total_amount, quote_currency, snapshot_date,batch_number)
SELECT wo.uuid,
w.currency,
SUM(w.balance) AS volume,
trunc(SUM(
CASE
WHEN w.currency = :quoteCurrency THEN w.balance
ELSE w.balance * COALESCE(p.price, 0)
END
), :precision) AS total_amount,
:quoteCurrency AS quote_currency,
NOW() AS snapshot_date,
COALESCE((SELECT MAX(batch_number) FROM detail_assets_snapshot), 0) + 1 AS batch_number
FROM wallet w
INNER JOIN public.wallet_owner wo ON wo.id = w.owner
LEFT JOIN price p
ON w.currency = p.base_currency
AND p.quote_currency = :quoteCurrency
WHERE w.balance > 0
GROUP BY wo.uuid, w.currency
"""
)
fun createDetailSnapshotsDirectly(
quoteCurrency: String,
precision: Int
): Mono<Void>


@Query(
"""
SELECT t.uuid AS uuid,
JSON_AGG(
JSON_BUILD_OBJECT(
'currency', t.currency,
'volume', trim_scale(trunc(t.volume, c.precision::int))
)
) AS currency_snapshots,
SUM(t.total_amount) AS total_amount,
t.quote_currency AS quote_currency,
t.snapshot_date AS snapshot_date
FROM detail_assets_snapshot t
INNER JOIN public.wallet_owner wo ON wo.uuid = t.uuid
INNER JOIN public.currency c ON c.symbol = t.currency
WHERE t.batch_number = (SELECT MAX(batch_number)FROM detail_assets_snapshot) AND wo.level = '1'
GROUP BY t.uuid, t.quote_currency, t.snapshot_date
ORDER BY total_amount DESC
limit :limit
offset :offset
"""
)
fun findAllLatestSnapshots(
limit: Int,
offset: Int,
): Flux<UserDetailAssetsSnapshotRaw>
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ interface TotalAssetsSnapshotRepository : ReactiveCrudRepository<TotalAssetsSnap
FROM wallet w
INNER JOIN public.wallet_owner wo on wo.id = w.owner
LEFT JOIN price p ON w.currency = p.base_currency and p.quote_currency = :quoteCurrency
WHERE w.wallet_type != 'CASHOUT'
AND w.balance > 0
WHERE w.balance > 0
GROUP BY wo.uuid
"""
)
Expand Down
Loading
Loading