Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"fmt"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"gorm.io/gorm/clause"
Expand Down Expand Up @@ -29,6 +30,23 @@ func (p *PostgresDatabase) GetHealthyNodeTwinIds(ctx context.Context) ([]uint32,
return nodeTwinIDs, err
}

func (p *PostgresDatabase) GetUnindexedNodeTwinIDs(ctx context.Context, indexedTable string) ([]uint32, error) {
nodeTwinIDs := make([]uint32, 0)

query := fmt.Sprintf(`
SELECT twin_id
FROM node
WHERE twin_id NOT IN (
SELECT DISTINCT node_twin_id
FROM %s
)
ORDER BY twin_id ASC
`, indexedTable)

err := p.gormDB.WithContext(ctx).Raw(query).Scan(&nodeTwinIDs).Error
return nodeTwinIDs, err
}

func (p *PostgresDatabase) UpsertNodesGPU(ctx context.Context, gpus []types.NodeGPU) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "id"}, {Name: "node_twin_id"}},
Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Database interface {
GetLastNodeTwinID(ctx context.Context) (uint32, error)
GetNodeTwinIDsAfter(ctx context.Context, twinID uint32) ([]uint32, error)
GetHealthyNodeTwinIds(ctx context.Context) ([]uint32, error)
GetUnindexedNodeTwinIDs(ctx context.Context, indexedTable string) ([]uint32, error)

// indexer upserters
UpsertNodesGPU(ctx context.Context, gpus []types.NodeGPU) error
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/cpu_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (w *CpuBenchmarkWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *CpuBenchmarkWork) IndexedTable() string {
return "cpu_benchmark"
}

func (w *CpuBenchmarkWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.CpuBenchmark, error) {
payload := struct {
Name string
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/dmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (w *DMIWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *DMIWork) IndexedTable() string {
return "dmi"
}

func (w *DMIWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Dmi, error) {
var dmi zosDmiTypes.DMI
err := callNode(ctx, rmb, DmiCallCmd, nil, twinId, &dmi)
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (w *FeatureWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *FeatureWork) IndexedTable() string {
return "node_features"
}

func (w *FeatureWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeFeatures, error) {
var features []string
err := callNode(ctx, rmb, featuresCallCmd, nil, twinId, &features)
Expand Down
41 changes: 23 additions & 18 deletions grid-proxy/internal/indexer/finders.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ var (
finders = map[string]Finder{
"up": upNodesFinder,
"healthy": healthyNodesFinder,
"new": newNodesFinder,
}
)

Expand Down Expand Up @@ -46,29 +45,35 @@ func healthyNodesFinder(ctx context.Context, interval time.Duration, db db.Datab
}
}

func newNodesFinder(ctx context.Context, interval time.Duration, db db.Database, idsChan chan uint32) {
// newUnindexedNodesFinder finds nodes that exist in node table but not in the indexed table
func newUnindexedNodesFinder(ctx context.Context, interval time.Duration, db db.Database, idsChan chan uint32, indexedTable string) {
ticker := time.NewTicker(interval)
latestCheckedID, err := db.GetLastNodeTwinID(ctx)
if err != nil {
log.Error().Err(err).Msg("failed to get last node twin id")
defer ticker.Stop()

queryUnindexedNodes := func() {
unindexedIDs, err := db.GetUnindexedNodeTwinIDs(ctx, indexedTable)
if err != nil {
log.Error().Err(err).Str("table", indexedTable).Msg("failed to get unindexed nodes")
return
}

if len(unindexedIDs) == 0 {
return
}

log.Info().Int("count", len(unindexedIDs)).Str("table", indexedTable).Msg("found unindexed nodes")

for _, id := range unindexedIDs {
idsChan <- id
}
}

queryUnindexedNodes()

for {
select {
case <-ticker.C:
newIDs, err := db.GetNodeTwinIDsAfter(ctx, latestCheckedID)
if err != nil {
log.Error().Err(err).Msgf("failed to get node twin ids after %d", latestCheckedID)
continue
}
if len(newIDs) == 0 {
continue
}

latestCheckedID = newIDs[0]
for _, id := range newIDs {
idsChan <- id
}
queryUnindexedNodes()
case <-ctx.Done():
return
}
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (w *GPUWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *GPUWork) IndexedTable() string {
return "node_gpu"
}

func (w *GPUWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeGPU, error) {
// in case an error returned? return directly we can leave the previously indexed cards
// in case null returned? we need to clean all previously added cards till now
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (w *HealthWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *HealthWork) IndexedTable() string {
return "health_report"
}

func (w *HealthWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.HealthReport, error) {
var diagnostics diagnostics.Diagnostics
_ = callNode(ctx, rmb, healthCallCmd, nil, twinId, &diagnostics)
Expand Down
7 changes: 6 additions & 1 deletion grid-proxy/internal/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Work[T any] interface {
Finders() map[string]time.Duration
Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]T, error)
Upsert(ctx context.Context, db db.Database, batch []T) error
IndexedTable() string
}

type Indexer[T any] struct {
Expand Down Expand Up @@ -55,7 +56,11 @@ func NewIndexer[T any](

func (i *Indexer[T]) Start(ctx context.Context) {
for name, interval := range i.work.Finders() {
go finders[name](ctx, interval, i.dbClient, i.idChan)
if name == "new" {
go newUnindexedNodesFinder(ctx, interval, i.dbClient, i.idChan, i.work.IndexedTable())
} else {
go finders[name](ctx, interval, i.dbClient, i.idChan)
}
}

for j := uint(0); j < i.workerNum; j++ {
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/ipv6.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (w *Ipv6Work) Finders() map[string]time.Duration {
return w.finders
}

func (w *Ipv6Work) IndexedTable() string {
return "node_ipv6"
}

func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error) {
var has_ipv6 bool
if err := callNode(ctx, rmb, cmd, nil, id, &has_ipv6); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/location.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (w *LocationWork) Finders() map[string]time.Duration {
return w.finders
}

func (w *LocationWork) IndexedTable() string {
return "node_location"
}

func (w *LocationWork) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.NodeLocation, error) {
var loc geoip.Location
if err := callNode(ctx, rmb, locationCmd, nil, id, &loc); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/speed.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (w *SpeedWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *SpeedWork) IndexedTable() string {
return "speed"
}

func (w *SpeedWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Speed, error) {
payload := struct {
Name string
Expand Down
4 changes: 4 additions & 0 deletions grid-proxy/internal/indexer/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (w *WorkloadWork) Finders() map[string]time.Duration {
return w.findersInterval
}

func (w *WorkloadWork) IndexedTable() string {
return "node_workloads"
}

func (w *WorkloadWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodesWorkloads, error) {
var response struct {
Users struct {
Expand Down
Loading