Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
v13_1 "github.com/classic-terra/core/v4/app/upgrades/v13_1"
v14_1 "github.com/classic-terra/core/v4/app/upgrades/v14_1"
v14_2 "github.com/classic-terra/core/v4/app/upgrades/v14_2"
"github.com/classic-terra/core/v4/app/upgrades/v14_3"
v2 "github.com/classic-terra/core/v4/app/upgrades/v2"
v3 "github.com/classic-terra/core/v4/app/upgrades/v3"
v4 "github.com/classic-terra/core/v4/app/upgrades/v4"
Expand Down Expand Up @@ -107,6 +108,7 @@ var (
v13_1.Upgrade,
v14_1.Upgrade,
v14_2.Upgrade,
v14_3.Upgrade,
}

// Forks defines forks to be applied to the network
Expand Down Expand Up @@ -175,9 +177,9 @@ func NewTerraApp(
baseAppOptions = append(baseAppOptions, func(app *baseapp.BaseApp) {
var mempool *appmempool.FifoMempool
if maxTxs := cast.ToInt(appOpts.Get(server.FlagMempoolMaxTxs)); maxTxs > 0 {
mempool = appmempool.NewFifoMempool(appmempool.FifoMaxTxOpt(maxTxs))
mempool = appmempool.NewFifoMempool(appmempool.FifoMaxTxOpt(maxTxs), appmempool.FifoTxEncoderOpt(txConfig.TxEncoder()))
} else {
mempool = appmempool.NewFifoMempool()
mempool = appmempool.NewFifoMempool(appmempool.FifoTxEncoderOpt(txConfig.TxEncoder()))
}
handler := baseapp.NewDefaultProposalHandler(mempool, app)
app.SetMempool(mempool)
Expand Down Expand Up @@ -278,6 +280,8 @@ func NewTerraApp(
panic("error while reading wasm config: " + err.Error())
}

replacementTracker := customante.NewReplacementTracker()

anteHandler, err := customante.NewAnteHandler(
customante.HandlerOptions{
AccountKeeper: app.AccountKeeper,
Expand All @@ -298,6 +302,8 @@ func NewTerraApp(
StakingKeeper: app.StakingKeeper,
TaxKeeper: &app.TaxKeeper,
Cdc: app.appCodec,
CommitMultiStore: app.CommitMultiStore(),
ReplacementTracker: replacementTracker,
},
)
if err != nil {
Expand Down
103 changes: 92 additions & 11 deletions app/mempool/mempool_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ type FifoMempool struct {
txsOracle *clist.CList // Oracle transactions FIFO queue
txsMap sync.Map // For quick lookup of existing transactions
txsMapOracle sync.Map // For quick lookup of existing transactions
txsBytes sync.Map // For distinguishing replacements with the same sender and nonce
maxTx int
txEncoder sdk.TxEncoder
}

type FifoMempoolOptions func(mp *FifoMempool)
Expand All @@ -65,13 +67,15 @@ func FifoMaxTxOpt(maxTx int) FifoMempoolOptions {
}
}

func (mp *FifoMempool) Insert(_ context.Context, tx sdk.Tx) error {
mp.mtx.RLock()
defer mp.mtx.RUnlock()
totalTxs := mp.txs.Len() + mp.txsOracle.Len()
if mp.maxTx >= 0 && totalTxs >= mp.maxTx {
return mempool.ErrMempoolTxMaxCapacity
func FifoTxEncoderOpt(txEncoder sdk.TxEncoder) FifoMempoolOptions {
return func(mp *FifoMempool) {
mp.txEncoder = txEncoder
}
}

func (mp *FifoMempool) Insert(_ context.Context, tx sdk.Tx) error {
mp.mtx.Lock()
defer mp.mtx.Unlock()
if mp.maxTx < 0 {
return nil
}
Expand All @@ -80,14 +84,57 @@ func (mp *FifoMempool) Insert(_ context.Context, tx sdk.Tx) error {
if err != nil {
return err
}
txBytes, err := mp.getTxBytes(tx)
if err != nil {
return err
}

isOracle := helper.IsOracleTx(tx.GetMsgs())
if elem, ok := mp.txsMap.Load(txKey); ok {
if !isOracle {
// In-place replacement requires an encoder to distinguish the new tx
// from the old one at Remove time. Without one, skip the update so
// a later Remove of the superseded tx cannot accidentally evict the
// current one.
if mp.txEncoder != nil {
elem.(*clist.CElement).Value = tx
mp.txsBytes.Store(txKey, txBytes)
}
return nil
}
mp.txsMap.Delete(txKey)
mp.txsBytes.Delete(txKey)
mp.txs.Remove(elem.(*clist.CElement))
}
if elem, ok := mp.txsMapOracle.Load(txKey); ok {
if isOracle {
if mp.txEncoder != nil {
elem.(*clist.CElement).Value = tx
mp.txsBytes.Store(txKey, txBytes)
}
return nil
}
mp.txsMapOracle.Delete(txKey)
mp.txsBytes.Delete(txKey)
mp.txsOracle.Remove(elem.(*clist.CElement))
}

totalTxs := mp.txs.Len() + mp.txsOracle.Len()
if mp.maxTx >= 0 && totalTxs >= mp.maxTx {
return mempool.ErrMempoolTxMaxCapacity
}

// Add to appropriate queue based on transaction type
if helper.IsOracleTx(tx.GetMsgs()) {
if isOracle {
e := mp.txsOracle.PushBack(tx)
mp.txsMapOracle.Store(txKey, e)
} else {
e := mp.txs.PushBack(tx)
mp.txsMap.Store(txKey, e)
}
if txBytes != "" {
mp.txsBytes.Store(txKey, txBytes)
}

return nil
}
Expand Down Expand Up @@ -159,21 +206,35 @@ func (it *fifoIterator) Tx() sdk.Tx {
}

func (mp *FifoMempool) Remove(tx sdk.Tx) error {
mp.mtx.RLock()
defer mp.mtx.RUnlock()
mp.mtx.Lock()
defer mp.mtx.Unlock()
txKey, err := getTxKey(tx)
if err != nil {
return err
}
txBytes, err := mp.getTxBytes(tx)
if err != nil {
return err
}

isOracle := helper.IsOracleTx(tx.GetMsgs())
if isOracle {
if elem, ok := mp.txsMapOracle.LoadAndDelete(txKey); ok {
if elem, ok := mp.txsMapOracle.Load(txKey); ok {
if !mp.matchesStoredTx(txKey, txBytes) {
return nil
}
mp.txsMapOracle.Delete(txKey)
mp.txsBytes.Delete(txKey)
mp.txsOracle.Remove(elem.(*clist.CElement))
return nil
}
} else {
if elem, ok := mp.txsMap.LoadAndDelete(txKey); ok {
if elem, ok := mp.txsMap.Load(txKey); ok {
if !mp.matchesStoredTx(txKey, txBytes) {
return nil
}
mp.txsMap.Delete(txKey)
mp.txsBytes.Delete(txKey)
mp.txs.Remove(elem.(*clist.CElement))
return nil
}
Expand All @@ -182,6 +243,13 @@ func (mp *FifoMempool) Remove(tx sdk.Tx) error {
return mempool.ErrTxNotFound
}

func (mp *FifoMempool) matchesStoredTx(txKey customTxKey, txBytes string) bool {
storedTxBytes, ok := mp.txsBytes.Load(txKey)
// No entry means no encoder was configured (bytes were never stored), so we
// have no discriminator and must allow the removal.
return !ok || storedTxBytes == txBytes
}

func (mp *FifoMempool) CountTx() int {
mp.mtx.RLock()
defer mp.mtx.RUnlock()
Expand All @@ -204,6 +272,19 @@ func getTxKey(tx sdk.Tx) (customTxKey, error) {
return key, nil
}

func (mp *FifoMempool) getTxBytes(tx sdk.Tx) (string, error) {
if mp.txEncoder == nil {
return "", nil
}

txBytes, err := mp.txEncoder(tx)
if err != nil {
return "", err
}

return string(txBytes), nil
}

type customTxKey struct {
address string
nonce uint64
Expand Down
42 changes: 42 additions & 0 deletions app/mempool/mempool_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,48 @@ func (s *MempoolTestSuite) TestTxNotFoundOnSender() {
require.Equal(t, mempool.ErrTxNotFound, err)
}

func (s *MempoolTestSuite) TestDuplicateSenderNonceReplacesTx() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
accounts := simtypes.RandomAccounts(rand.New(rand.NewSource(0)), 1)
addr := accounts[0].Address
// Use a deterministic encoder keyed by testTx.id so replacement detection
// works correctly even when different object instances represent the same tx.
mockEnc := func(tx sdk.Tx) ([]byte, error) {
return []byte(fmt.Sprintf("%d", tx.(testTx).id)), nil
}
mp := appmempool.NewFifoMempool(appmempool.FifoTxEncoderOpt(mockEnc))

originalTx := testTx{
id: 1,
nonce: 432,
address: addr,
priority: rand.Int63(),
msgs: []sdk.Msg{&banktypes.MsgSend{}},
}
replacementTx := testTx{
id: 2,
nonce: 432,
address: addr,
priority: rand.Int63(),
msgs: []sdk.Msg{&banktypes.MsgSend{}},
}

require.NoError(t, mp.Insert(ctx, originalTx))
require.NoError(t, mp.Insert(ctx, replacementTx))
require.Equal(t, 1, mp.CountTx())

itr := mp.Select(ctx, nil)
selectedTxs := fetchTxs(itr, 1000)
require.Len(t, selectedTxs, 1)
require.Equal(t, 2, selectedTxs[0].(testTx).id)

require.NoError(t, mp.Remove(originalTx))
require.Equal(t, 1, mp.CountTx())
require.NoError(t, mp.Remove(replacementTx))
require.Equal(t, 0, mp.CountTx())
}

func (s *MempoolTestSuite) TestBatchTx_WhenEnoughMemPool() {
t := s.T()
ctx := sdk.NewContext(nil, tmproto.Header{}, false, log.NewNopLogger())
Expand Down
21 changes: 21 additions & 0 deletions app/upgrades/v14_3/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//nolint:revive
package v14_3

import (
store "cosmossdk.io/store/types"
"github.com/classic-terra/core/v4/app/upgrades"
)

const UpgradeName = "v14_3"

var Upgrade = upgrades.Upgrade{
UpgradeName: UpgradeName,
CreateUpgradeHandler: CreateV143UpgradeHandler,
// Add new stores introduced since the last upgrade here. If there are
// no new stores for this upgrade, leave this empty.
StoreUpgrades: store.StoreUpgrades{
Added: []string{},
Deleted: []string{},
Renamed: []store.StoreRename{},
},
}
24 changes: 24 additions & 0 deletions app/upgrades/v14_3/upgrades.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//nolint:revive
package v14_3

import (
"context"

upgradetypes "cosmossdk.io/x/upgrade/types"
"github.com/classic-terra/core/v4/app/keepers"
"github.com/classic-terra/core/v4/app/upgrades"
"github.com/cosmos/cosmos-sdk/types/module"
)

// CreateV143UpgradeHandler wires module migrations for v14_3.
// Add any one-off migration logic here before/after RunMigrations if needed.
func CreateV143UpgradeHandler(
mm *module.Manager,
cfg module.Configurator,
_ upgrades.BaseAppParamManager,
keepers *keepers.AppKeepers,
) upgradetypes.UpgradeHandler {
return func(ctx context.Context, _ upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) {
return mm.RunMigrations(ctx, cfg, fromVM)
}
}
12 changes: 12 additions & 0 deletions custom/auth/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ante
import (
corestoretypes "cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors"
storetypes "cosmossdk.io/store/types"
txsigning "cosmossdk.io/x/tx/signing"
wasmkeeper "github.com/CosmWasm/wasmd/x/wasm/keeper"
wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types"
Expand Down Expand Up @@ -43,6 +44,8 @@ type HandlerOptions struct {
StakingKeeper *stakingkeeper.Keeper
TaxKeeper *taxkeeper.Keeper
Cdc codec.BinaryCodec
CommitMultiStore storetypes.CommitMultiStore
ReplacementTracker *ReplacementTracker
}

// NewAnteHandler returns an AnteHandler that checks and increments sequence
Expand Down Expand Up @@ -81,6 +84,14 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) {
return nil, errorsmod.Wrap(sdkerrors.ErrLogic, "tax handler is required for ante builder")
}

if options.ReplacementTracker == nil {
return nil, errorsmod.Wrap(sdkerrors.ErrLogic, "replacement tracker is required for ante builder")
}

if options.CommitMultiStore == nil {
return nil, errorsmod.Wrap(sdkerrors.ErrLogic, "commit multi store is required for ante builder")
}

return sdk.ChainAnteDecorators(
ante.NewSetUpContextDecorator(), // outermost AnteDecorator. SetUpContext must be called first
wasmkeeper.NewLimitSimulationGasDecorator(options.WasmConfig.SimulationGasLimit),
Expand All @@ -96,6 +107,7 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) {
// MinInitialDepositDecorator prevents submitting governance proposal low initial deposit
NewMinInitialDepositDecorator(options.GovKeeper, options.TreasuryKeeper),
ante.NewConsumeGasForTxSizeDecorator(options.AccountKeeper),
NewTxReplacementDecorator(options.AccountKeeper, options.CommitMultiStore, options.ReplacementTracker),
NewFeeDecorator(options.AccountKeeper, options.BankKeeper, options.FeegrantKeeper, options.TaxExemptionKeeper, options.TreasuryKeeper, options.DistributionKeeper, *options.TaxKeeper),
dyncommante.NewDyncommDecorator(options.Cdc, options.DyncommKeeper, options.StakingKeeper),

Expand Down
Loading
Loading