Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/server/chunked_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,19 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
if s.db.hasObsRawHex {
obsRawHexCol = ", o.raw_hex"
}
// #1751: scope_name is on the transmission row, so appending it as the
// last selected column is safe regardless of the observation fan-out.
scopeNameCol := ""
if s.db.hasScopeName {
scopeNameCol = ", t.scope_name"
}

var chunkSQL string
if s.db.isV3 {
chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id, obs.id, obs.name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + `
o.snr, o.rssi, o.score, o.path_json, strftime('%Y-%m-%dT%H:%M:%fZ', o.timestamp, 'unixepoch')` + obsRawHexCol + rpCol + scopeNameCol + `
FROM (SELECT * FROM transmissions t2 ` + whereClause + ` ORDER BY t2.id ASC LIMIT ` + fmt.Sprintf("%d", chunkSize) + `) AS t
LEFT JOIN observations o ON o.transmission_id = t.id
LEFT JOIN observers obs ON obs.rowid = o.observer_idx
Expand All @@ -260,7 +266,7 @@ func (s *PacketStore) LoadChunked(chunkSize int) error {
chunkSQL = `SELECT t.id, t.raw_hex, t.hash, t.first_seen, t.route_type,
t.payload_type, t.payload_version, t.decoded_json,
o.id, o.observer_id, o.observer_name, COALESCE(obs.iata, ''), o.direction,
o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + `
o.snr, o.rssi, o.score, o.path_json, o.timestamp` + obsRawHexCol + rpCol + scopeNameCol + `
FROM (SELECT * FROM transmissions t2 ` + whereClause + ` ORDER BY t2.id ASC LIMIT ` + fmt.Sprintf("%d", chunkSize) + `) AS t
LEFT JOIN observations o ON o.transmission_id = t.id
LEFT JOIN observers obs ON obs.id = o.observer_id
Expand Down Expand Up @@ -373,6 +379,7 @@ func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows, relayPM *prefixMap, cold
var score sql.NullInt64
var obsRawHex sql.NullString
var resolvedPathStr sql.NullString
var scopeName sql.NullString

scanArgs := []interface{}{&txID, &rawHex, &hash, &firstSeen, &routeType, &payloadType,
&payloadVersion, &decodedJSON,
Expand All @@ -384,6 +391,9 @@ func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows, relayPM *prefixMap, cold
if s.db.hasResolvedPath {
scanArgs = append(scanArgs, &resolvedPathStr)
}
if s.db.hasScopeName {
scanArgs = append(scanArgs, &scopeName)
}
if err := rows.Scan(scanArgs...); err != nil {
log.Printf("[store] LoadChunked scan error: %v", err)
continue
Expand All @@ -406,6 +416,7 @@ func (s *PacketStore) scanAndMergeChunk(rows *sql.Rows, relayPM *prefixMap, cold
RouteType: nullIntPtr(routeType),
PayloadType: nullIntPtr(payloadType),
DecodedJSON: nullStrVal(decodedJSON),
ScopeName: nullStrVal(scopeName),
obsKeys: make(map[string]bool),
observerSet: make(map[string]bool),
}
Expand Down
18 changes: 18 additions & 0 deletions cmd/server/repeater_enrich_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func (s *PacketStore) computeRepeaterRelayInfoMap(windowHours float64) map[strin
out := make(map[string]RepeaterRelayInfo, len(snap))
for key, list := range snap {
info := RepeaterRelayInfo{WindowHours: windowHours}
// #1751: accumulate the set of region scope names carried by this
// hop key across every non-advert path-hop tx (NOT time-windowed).
// Captured by the visit closure below — lazily allocated on the first
// scope hit so hosts without scope_name pay nothing per key; converted
// to a sorted, capped slice before this key's info is stored.
var scopeSet map[string]struct{}
// When key looks like a full pubkey (>= 2 hex chars), also fold
// in the matching 1-byte raw-prefix bucket to mirror
// GetRepeaterRelayInfo's behavior. We dedup by tx ID.
Expand Down Expand Up @@ -141,6 +147,17 @@ func (s *PacketStore) computeRepeaterRelayInfoMap(windowHours float64) map[strin
if p.pt == payloadTypeAdvert {
continue
}
// #1751: scope accumulation is intentionally NOT gated on
// p.ok (timestamp parseability) — a packet with an
// unparseable first_seen still proves the repeater
// transported that scope. RelayCount/LastRelayed below
// remain timestamp-gated.
if tx.ScopeName != "" {
if scopeSet == nil {
scopeSet = map[string]struct{}{}
}
scopeSet[tx.ScopeName] = struct{}{}
}
if !p.ok {
continue
}
Expand All @@ -167,6 +184,7 @@ func (s *PacketStore) computeRepeaterRelayInfoMap(windowHours float64) map[strin
visit(snap[prefix])
}
}
info.TransportedScopes = sortedCappedScopes(scopeSet)
out[key] = info
}
return out
Expand Down
53 changes: 52 additions & 1 deletion cmd/server/repeater_liveness.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"sort"
"strings"
"time"
)
Expand All @@ -26,6 +27,40 @@ type RepeaterRelayInfo struct {
// RelayCount24h is the count of distinct non-advert packets where this
// pubkey appeared as a relay hop in the last 24 hours.
RelayCount24h int `json:"relayCount24h"`
// TransportedScopes is the deduplicated, sorted set of region scope
// names (transmissions.scope_name) across ALL non-advert packets in
// which this pubkey appears as a path hop. Unlike RelayCount1h/24h this
// is NOT time-windowed — it answers "which region scopes has this
// repeater carried traffic for, ever (within the in-memory window)".
// Empty/absent on schemas without scope_name (#1751).
TransportedScopes []string `json:"transportedScopes,omitempty"`
}

// maxTransportedScopes bounds the per-node TransportedScopes list so a
// misbehaving sender flooding distinct scope_name values through a single
// repeater cannot inflate the node JSON unboundedly (#1751 review follow-up).
// Real region-scope counts are small; this is a defensive ceiling. When the
// set exceeds the cap the lexicographically-first names are kept, so the
// result stays deterministic.
const maxTransportedScopes = 32

// sortedCappedScopes converts a scope set into a sorted, length-capped slice,
// or nil when the set is empty/nil — so routes.go omits the JSON field via
// `omitempty`. Shared by the bulk (computeRepeaterRelayInfoMap) and per-node
// (computeRelayInfoFromEntries) paths to keep them in exact parity.
func sortedCappedScopes(set map[string]struct{}) []string {
if len(set) == 0 {
return nil
}
scopes := make([]string, 0, len(set))
for s := range set {
scopes = append(scopes, s)
}
sort.Strings(scopes)
if len(scopes) > maxTransportedScopes {
scopes = scopes[:maxTransportedScopes]
}
return scopes
}

// payloadTypeAdvert is the MeshCore payload type for ADVERT packets.
Expand Down Expand Up @@ -62,6 +97,9 @@ func parseRelayTS(ts string) (time.Time, bool) {
type relayEntry struct {
ts string
pt int
// scope is the tx's region scope name (transmissions.scope_name).
// Empty when absent / on older schemas. Used for TransportedScopes (#1751).
scope string
}

// collectRelayEntriesLocked returns deduplicated relayEntry snapshots for
Expand Down Expand Up @@ -105,7 +143,7 @@ func (s *PacketStore) collectRelayEntriesLocked(key string) []relayEntry {
if tx.PayloadType != nil {
pt = *tx.PayloadType
}
entries = append(entries, relayEntry{ts: tx.FirstSeen, pt: pt})
entries = append(entries, relayEntry{ts: tx.FirstSeen, pt: pt, scope: tx.ScopeName})
}
}
collect(txList)
Expand All @@ -124,11 +162,21 @@ func computeRelayInfoFromEntries(entries []relayEntry, windowHours float64) Repe

var latest time.Time
var latestRaw string
var scopeSet map[string]struct{}
for _, e := range entries {
// Self-originated adverts are not relay activity.
if e.pt == payloadTypeAdvert {
continue
}
// #1751: accumulate transported scopes BEFORE the timestamp gate —
// a non-advert path-hop tx proves scope transport even if its
// first_seen is unparseable. Mirrors the bulk path.
if e.scope != "" {
if scopeSet == nil {
scopeSet = map[string]struct{}{}
}
scopeSet[e.scope] = struct{}{}
}
t, ok := parseRelayTS(e.ts)
if !ok {
continue
Expand All @@ -144,6 +192,9 @@ func computeRelayInfoFromEntries(entries []relayEntry, windowHours float64) Repe
}
}
}
// #1751: emit transported scopes regardless of whether any timestamp
// parsed, and before the latestRaw early-return below.
info.TransportedScopes = sortedCappedScopes(scopeSet)
if latestRaw == "" {
return info
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,12 @@ func (s *Server) handleNodes(w http.ResponseWriter, r *http.Request) {
node["relay_active"] = info.RelayActive
node["relay_count_1h"] = info.RelayCount1h
node["relay_count_24h"] = info.RelayCount24h
// #1751: region scopes this repeater has transported.
// Set only when non-empty so the field is absent for
// nodes without scopes / on older schemas.
if len(info.TransportedScopes) > 0 {
node["transported_scopes"] = info.TransportedScopes
}
// usefulness_score retained for API compat; new
// consumers should read traffic_share_score
// (issue #1456). When the #672 composite ships
Expand Down Expand Up @@ -1539,6 +1545,11 @@ func (s *Server) handleNodeDetail(w http.ResponseWriter, r *http.Request) {
node["relay_window_hours"] = info.WindowHours
node["relay_count_1h"] = info.RelayCount1h
node["relay_count_24h"] = info.RelayCount24h
// #1751: region scopes this repeater has transported. Set only
// when non-empty (absent for no-scope nodes / older schemas).
if len(info.TransportedScopes) > 0 {
node["transported_scopes"] = info.TransportedScopes
}
// usefulness_score retained for API compat; new
// consumers should read traffic_share_score (#1456).
us := s.store.GetRepeaterUsefulnessScore(pubkey)
Expand Down
Loading