Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 1 addition & 179 deletions ocp/data/fulfillment/memory/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"github.com/code-payments/ocp-server/database/query"
"github.com/code-payments/ocp-server/ocp/data/fulfillment"
"github.com/code-payments/ocp-server/ocp/data/intent"
"github.com/code-payments/ocp-server/database/query"
"github.com/code-payments/ocp-server/pointer"
)

Expand Down Expand Up @@ -130,88 +130,6 @@ func (s *store) findByStateAndAddress(state fulfillment.State, address string) [
return res
}

func (s *store) findByStateAndAddressAsSource(state fulfillment.State, address string) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
if item.State != state {
continue
}

if item.Source == address {
res = append(res, item)
continue
}
}
return res
}

func (s *store) findByTypeStateAndAddress(fulfillmentType fulfillment.Type, state fulfillment.State, address string) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
if item.FulfillmentType != fulfillmentType {
continue
}

if item.State != state {
continue
}

if item.Source == address {
res = append(res, item)
continue
}

if item.Destination != nil && *item.Destination == address {
res = append(res, item)
continue
}
}
return res
}

func (s *store) findByTypeStateAndAddressAsSource(fulfillmentType fulfillment.Type, state fulfillment.State, address string) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
if item.FulfillmentType != fulfillmentType {
continue
}

if item.State != state {
continue
}

if item.Source == address {
res = append(res, item)
continue
}
}
return res
}

func (s *store) findByTypeActionAndState(intentId string, actionId uint32, fulfillmentType fulfillment.Type, state fulfillment.State) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
if item.Intent != intentId {
continue
}

if item.ActionId != actionId {
continue
}

if item.FulfillmentType != fulfillmentType {
continue
}

if item.State != state {
continue
}

res = append(res, item)
}
return res
}

func (s *store) findScheduableByAddress(address string) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
Expand Down Expand Up @@ -257,21 +175,6 @@ func (s *store) findScheduableByDestination(destinaion string) []*fulfillment.Re
return res
}

func (s *store) findScheduableByType(fulfillmentType fulfillment.Type) []*fulfillment.Record {
res := make([]*fulfillment.Record, 0)
for _, item := range s.records {
if item.State != fulfillment.StateUnknown && item.State != fulfillment.StatePending {
continue
}

if item.FulfillmentType == fulfillmentType {
res = append(res, item)
continue
}
}
return res
}

func (s *store) filter(items []*fulfillment.Record, cursor query.Cursor, limit uint64, direction query.Ordering) []*fulfillment.Record {
var start uint64

Expand Down Expand Up @@ -304,16 +207,6 @@ func (s *store) filter(items []*fulfillment.Record, cursor query.Cursor, limit u
return res
}

func (s *store) filterByType(items []*fulfillment.Record, fulfillmentType fulfillment.Type) []*fulfillment.Record {
var res []*fulfillment.Record
for _, item := range items {
if item.FulfillmentType == fulfillmentType {
res = append(res, item)
}
}
return res
}

func (s *store) filterDisabledActiveScheduling(items []*fulfillment.Record) []*fulfillment.Record {
var res []*fulfillment.Record
for _, item := range items {
Expand Down Expand Up @@ -345,13 +238,6 @@ func (s *store) filterScheduledAfter(items []*fulfillment.Record, intentOrdering
return res
}

func (s *store) Count(ctx context.Context) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

return uint64(len(s.records)), nil
}

func (s *store) CountByState(ctx context.Context, state fulfillment.State) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -360,19 +246,6 @@ func (s *store) CountByState(ctx context.Context, state fulfillment.State) (uint
return uint64(len(res)), nil
}

func (s *store) CountByStateGroupedByType(ctx context.Context, state fulfillment.State) (map[fulfillment.Type]uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

items := s.findByState(state)

res := make(map[fulfillment.Type]uint64)
for _, item := range items {
res[item.FulfillmentType] += 1
}
return res, nil
}

func (s *store) CountForMetrics(ctx context.Context, state fulfillment.State) (map[fulfillment.Type]uint64, error) {
return nil, errors.New("not implemented")
}
Expand All @@ -385,22 +258,6 @@ func (s *store) CountByStateAndAddress(ctx context.Context, state fulfillment.St
return uint64(len(res)), nil
}

func (s *store) CountByTypeStateAndAddressAsSource(ctx context.Context, fulfillmentType fulfillment.Type, state fulfillment.State, address string) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

res := s.findByTypeStateAndAddressAsSource(fulfillmentType, state, address)
return uint64(len(res)), nil
}

func (s *store) CountByTypeStateAndAddress(ctx context.Context, fulfillmentType fulfillment.Type, state fulfillment.State, address string) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

res := s.findByTypeStateAndAddress(fulfillmentType, state, address)
return uint64(len(res)), nil
}

func (s *store) CountByIntentAndState(ctx context.Context, intent string, state fulfillment.State) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -409,14 +266,6 @@ func (s *store) CountByIntentAndState(ctx context.Context, intent string, state
return uint64(len(res)), nil
}

func (s *store) CountByTypeActionAndState(ctx context.Context, intentId string, actionId uint32, fulfillmentType fulfillment.Type, state fulfillment.State) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()

res := s.findByTypeActionAndState(intentId, actionId, fulfillmentType, state)
return uint64(len(res)), nil
}

func (s *store) CountByIntent(ctx context.Context, intent string) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -628,19 +477,6 @@ func (s *store) GetAllByAction(ctx context.Context, intentId string, actionId ui
return cloneAll(res), nil
}

func (s *store) GetAllByTypeAndAction(ctx context.Context, fulfillmentType fulfillment.Type, intentId string, actionId uint32) ([]*fulfillment.Record, error) {
s.mu.Lock()
defer s.mu.Unlock()

res := s.findByAction(intentId, actionId)
res = s.filterByType(res, fulfillmentType)
if len(res) == 0 {
return nil, fulfillment.ErrFulfillmentNotFound
}

return cloneAll(res), nil
}

func (s *store) GetFirstSchedulableByAddressAsSource(ctx context.Context, address string) (*fulfillment.Record, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -668,20 +504,6 @@ func (s *store) GetFirstSchedulableByAddressAsDestination(ctx context.Context, a
return nil, fulfillment.ErrFulfillmentNotFound
}

func (s *store) GetFirstSchedulableByType(ctx context.Context, fulfillmentType fulfillment.Type) (*fulfillment.Record, error) {
s.mu.Lock()
defer s.mu.Unlock()

if items := s.findScheduableByType(fulfillmentType); len(items) > 0 {
sorted := fulfillment.BySchedulingOrder(items)
sort.Sort(sorted)

cloned := sorted[0].Clone()
return &cloned, nil
}
return nil, fulfillment.ErrFulfillmentNotFound
}

func (s *store) GetNextSchedulableByAddress(ctx context.Context, address string, intentOrderingIndex uint64, actionOrderingIndex, fulfillmentOrderingIndex uint32) (*fulfillment.Record, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
Loading