Conversation
Codecov Report❌ Patch coverage is
☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Greptile SummaryThis PR simplifies the Key changes:
Notable issues found:
Confidence Score: 3/5
Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Slot Ticker fires] --> B[Compute currentSlot / currentEpoch / nextEpoch]
B --> C[tickCtx = deadline @ SlotStartTime currentSlot+1]
C --> D[prepareCurrentEpoch\ncheck dutyFetchIntents currentEpoch]
D --> E{intent exists\nand unfulfilled?}
E -- Yes --> F[fetchAndProcessDuties\ncurrentEpoch]
F --> G[dutyFetchIntents currentEpoch = true]
E -- No --> H[executeAggregatorDuties]
G --> H
H --> I[prepareNextEpoch\ncheck dutyFetchIntents nextEpoch]
I --> J{intent exists\nunfulfilled\nand good time?}
J -- Yes --> K[fetchAndProcessDuties\nnextEpoch]
J -- No --> L[Cleanup old epoch data\nif last slot of epoch]
K --> L
L --> M{Inner select:\nindicesChange or deadline?}
M -- indicesChange received --> N[Set intents for currentEpoch + nextEpoch]
N --> O{atLastSlotOfCurrentEpoch?}
O -- Yes --> P[prune currentEpoch intent\nprepareNextEpoch immediately]
O -- No --> Q[prepareCurrentEpoch immediately]
M -- Too late / timeout --> R[Defer to next slot]
P --> S[Schedule nextEpoch intent if not already set]
Q --> S
R --> S
AA[Reorg Event] --> AB[Compute currentSlot via EstimatedCurrentSlot]
AB --> AC[reorgCtx = deadline @ SlotStartTime currentSlot+2]
AC --> AD{reorgEvent.Current?}
AD -- false prev epoch --> AE[Set intent for reorgEpoch]
AD -- true curr epoch --> AF[Skip current epoch intent]
AE --> AG[Set intent for reorgEpoch+1]
AF --> AG
AG --> AH{atLastSlotOfCurrentEpoch?}
AH -- Yes --> AI[prune currentEpoch\nprepareNextEpoch]
AH -- No --> AJ[prepareCurrentEpoch]
|
operator/duties/attester.go
Outdated
| select { | ||
| case <-h.indicesChange: | ||
| logger.Info("🔁 indices change received") | ||
|
|
||
| // Some validator-related state has updated, means we need to re-fetch the duties for the current | ||
| // and next epoch to ensure we have the up-to-date duties for all validators for both epochs. | ||
| h.dutyFetchIntents[currentEpoch] = struct{}{} | ||
| h.dutyFetchIntents[currentEpoch+1] = struct{}{} | ||
|
|
||
| // When at epoch boundary, we only care about pre-fetching & preparing the duties for the next epoch | ||
| // (the current epoch will have been passed upon the next slot-tick). Otherwise, pre-fetch & prepare | ||
| // the duties for the current epoch. | ||
| if h.lastTickedSlotAtEpochBoundary() { | ||
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | ||
| } else { | ||
| h.prepareCurrentEpoch(ctx, currentEpoch, currentSlot) | ||
| } | ||
| case <-time.After(time.Until(indicesChangeDeadline)): | ||
| // It's too late(risky) to handle indices change on the current slot, we'll do it on the next slot. | ||
| } |
There was a problem hiding this comment.
The inner select uses time.After(time.Until(indicesChangeDeadline)) without a ctx.Done() case. If the context is cancelled while waiting at this select (e.g., during shutdown), the goroutine will block up to ~one IntervalDuration (~4 seconds for mainnet) before noticing cancellation, since the outer for { select {...} } only checks ctx.Done() after this inner select returns.
| select { | |
| case <-h.indicesChange: | |
| logger.Info("🔁 indices change received") | |
| // Some validator-related state has updated, means we need to re-fetch the duties for the current | |
| // and next epoch to ensure we have the up-to-date duties for all validators for both epochs. | |
| h.dutyFetchIntents[currentEpoch] = struct{}{} | |
| h.dutyFetchIntents[currentEpoch+1] = struct{}{} | |
| // When at epoch boundary, we only care about pre-fetching & preparing the duties for the next epoch | |
| // (the current epoch will have been passed upon the next slot-tick). Otherwise, pre-fetch & prepare | |
| // the duties for the current epoch. | |
| if h.lastTickedSlotAtEpochBoundary() { | |
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | |
| } else { | |
| h.prepareCurrentEpoch(ctx, currentEpoch, currentSlot) | |
| } | |
| case <-time.After(time.Until(indicesChangeDeadline)): | |
| // It's too late(risky) to handle indices change on the current slot, we'll do it on the next slot. | |
| } | |
| indicesChangeDeadline := h.beaconConfig.SlotStartTime(currentSlot).Add(h.beaconConfig.IntervalDuration()) | |
| select { | |
| case <-h.indicesChange: | |
| logger.Info("🔁 indices change received") | |
| // Some validator-related state has updated, means we need to re-fetch the duties for the current | |
| // and next epoch to ensure we have the up-to-date duties for all validators for both epochs. | |
| h.dutyFetchIntents[currentEpoch] = struct{}{} | |
| h.dutyFetchIntents[currentEpoch+1] = struct{}{} | |
| // When at epoch boundary, we only care about pre-fetching & preparing the duties for the next epoch | |
| // (the current epoch will have been passed upon the next slot-tick). Otherwise, pre-fetch & prepare | |
| // the duties for the current epoch. | |
| if h.lastTickedSlotAtEpochBoundary() { | |
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | |
| } else { | |
| h.prepareCurrentEpoch(ctx, currentEpoch, currentSlot) | |
| } | |
| case <-ctx.Done(): | |
| return | |
| case <-time.After(time.Until(indicesChangeDeadline)): | |
| // It's too late(risky) to handle indices change on the current slot, we'll do it on the next slot. | |
| } |
Context Used: Rule from dashboard - When implementing delays or sleep operations in Go, ensure they respect context cancellation by usin... (source)
There was a problem hiding this comment.
Makes sense, added the case <-ctx.Done() clause now.
| func (h *AttesterHandler) HandleInitialDuties(ctx context.Context) { | ||
| ctx, cancel := context.WithTimeout(ctx, h.beaconConfig.SlotDuration) | ||
| defer cancel() | ||
|
|
||
| h.fetchCurrentEpoch = true | ||
| h.fetchNextEpoch = true | ||
|
|
||
| slot := h.beaconConfig.EstimatedCurrentSlot() | ||
| epoch := h.beaconConfig.EstimatedEpochAtSlot(slot) | ||
| h.processFetching(ctx, epoch, slot) | ||
| } | ||
|
|
||
| func (h *AttesterHandler) processFetching(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) { | ||
| ctx, span := tracer.Start(ctx, | ||
| observability.InstrumentName(observabilityNamespace, "attester.fetch"), | ||
| trace.WithAttributes( | ||
| observability.BeaconEpochAttribute(epoch), | ||
| observability.BeaconSlotAttribute(slot), | ||
| observability.BeaconRoleAttribute(spectypes.BNRoleAttester), | ||
| )) | ||
| defer span.End() | ||
|
|
||
| if h.fetchCurrentEpoch { | ||
| span.AddEvent("fetching current epoch duties") | ||
| if err := h.fetchAndProcessDuties(ctx, epoch, slot); err != nil { | ||
| h.logger.Error("failed to fetch duties for current epoch", zap.Error(err)) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| return | ||
| } | ||
| h.fetchCurrentEpoch = false | ||
| } | ||
| currentSlot := h.beaconConfig.EstimatedCurrentSlot() | ||
| currentEpoch := h.beaconConfig.EstimatedEpochAtSlot(currentSlot) | ||
|
|
||
| // This additional shouldFetchNexEpoch check here is an optimization that prevents | ||
| // unnecessary(duplicate) fetches in some cases + also delays the fetch until we | ||
| // cannot delay it much further. | ||
| if h.fetchNextEpoch && h.shouldFetchNexEpoch(slot) { | ||
| span.AddEvent("fetching next epoch duties") | ||
| if err := h.fetchAndProcessDuties(ctx, epoch+1, slot); err != nil { | ||
| h.logger.Error("failed to fetch duties for next epoch", zap.Error(err)) | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| return | ||
| } | ||
| h.fetchNextEpoch = false | ||
| } | ||
| h.dutyFetchIntents[currentEpoch] = struct{}{} | ||
| h.dutyFetchIntents[currentEpoch+1] = struct{}{} | ||
|
|
||
| span.SetStatus(codes.Ok, "") | ||
| h.prepareCurrentEpoch(ctx, currentEpoch, currentSlot) | ||
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | ||
| } |
There was a problem hiding this comment.
HandleInitialDuties is called synchronously during scheduler startup (see // This call is blocking. comment in scheduler.go) before HandleDuties is started. Without a timeout, if the beacon node is slow or unavailable, the blocking network calls inside prepareCurrentEpoch and prepareNextEpoch will hang indefinitely, stalling the entire scheduler startup.
The timeout should be restored:
func (h *AttesterHandler) HandleInitialDuties(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, h.beaconConfig.SlotDuration)
defer cancel()
currentSlot := h.beaconConfig.EstimatedCurrentSlot()
currentEpoch := h.beaconConfig.EstimatedEpochAtSlot(currentSlot)
...
}
operator/duties/attester.go
Outdated
| func() { | ||
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, slot) | ||
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, currentSlot) | ||
| defer cancel() | ||
|
|
||
| h.executeAggregatorDuties(tickCtx, currentEpoch, slot) | ||
| h.processFetching(tickCtx, currentEpoch, slot) | ||
| h.executeAggregatorDuties(tickCtx, currentEpoch, currentSlot) | ||
|
|
||
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | ||
| }() |
There was a problem hiding this comment.
Inside the func() closure, tickCtx is created with a one-epoch deadline via ctxWithDeadlineInOneEpoch() to bound duty-fetching calls. However, prepareNextEpoch on line 104 is called with the parent ctx instead of tickCtx, bypassing the deadline entirely.
If the beacon node is slow, prepareNextEpoch can now run unconstrained during the tick-processing window. Either pass tickCtx to prepareNextEpoch, or — if the intent is to allow next-epoch fetching to run longer — move it outside the closure and document why:
| func() { | |
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, slot) | |
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, currentSlot) | |
| defer cancel() | |
| h.executeAggregatorDuties(tickCtx, currentEpoch, slot) | |
| h.processFetching(tickCtx, currentEpoch, slot) | |
| h.executeAggregatorDuties(tickCtx, currentEpoch, currentSlot) | |
| h.prepareNextEpoch(ctx, currentEpoch, currentSlot) | |
| }() | |
| func() { | |
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, currentSlot) | |
| defer cancel() | |
| h.executeAggregatorDuties(tickCtx, currentEpoch, currentSlot) | |
| h.prepareNextEpoch(tickCtx, currentEpoch, currentSlot) | |
| }() |
There was a problem hiding this comment.
This has been addressed now.
|
@greptile could you re-review this PR ? |
| func (h *AttesterHandler) atLastSlotOfCurrentEpoch() bool { | ||
| currentSlot := h.beaconConfig.EstimatedCurrentSlot() | ||
| slotsPerEpoch := h.beaconConfig.SlotsPerEpoch | ||
| return uint64(slot)%slotsPerEpoch > slotsPerEpoch/2-2 | ||
| return uint64(currentSlot+1)%slotsPerEpoch == 0 | ||
| } |
There was a problem hiding this comment.
atLastSlotOfCurrentEpoch re-estimates the current slot independently
atLastSlotOfCurrentEpoch() calls EstimatedCurrentSlot() again internally, rather than reusing the currentSlot value already computed by the caller. This creates a potential inconsistency near epoch boundaries:
- In the tick handler,
currentSlotcomes fromh.ticker.Slot(). If tick processing takes non-trivial time (e.g., 1+ seconds),EstimatedCurrentSlot()(wall-clock based) can return a slot one ahead of the ticker's slot. - Example: we're at slot 31 (last slot of epoch 0),
h.ticker.Slot()= 31, but by the time the inner select resolves andatLastSlotOfCurrentEpoch()is called, the clock has advanced to slot 32 →(32+1)%32 = 1 ≠ 0→ returnsfalse, causingprepareCurrentEpochto be called instead ofprepareNextEpochat epoch-boundary.
The same issue affects calls from HandleInitialDuties and the reorg handler where currentSlot is also independently computed moments earlier.
The fix is to accept currentSlot phase0.Slot as a parameter:
func (h *AttesterHandler) atLastSlotOfCurrentEpoch(currentSlot phase0.Slot) bool {
slotsPerEpoch := h.beaconConfig.SlotsPerEpoch
return uint64(currentSlot+1)%slotsPerEpoch == 0
}and update all call sites to pass their already-computed currentSlot.
| func() { | ||
| tickCtx, cancel := h.ctxWithDeadlineInOneEpoch(ctx, reorgEvent.Slot) | ||
| defer cancel() | ||
| // Clean up the irrelevant data to prevent infinite memory growth at the 1 slot of the epoch. |
There was a problem hiding this comment.
Misleading comment: "at the 1 slot of the epoch"
The comment says "at the 1 slot of the epoch", but the condition uint64(currentSlot+1)%slotsPerEpoch == 0 actually fires at the last slot of the epoch (e.g., slot 31 in a 32-slot epoch, since 32 % 32 == 0).
| // Clean up the irrelevant data to prevent infinite memory growth at the 1 slot of the epoch. | |
| // Clean up the irrelevant data to prevent infinite memory growth at the last slot of the epoch. |
| err := h.fetchAndProcessDuties(ctx, logger, currentEpoch, currentSlot) | ||
| if err != nil { | ||
| h.logger.Error("failed to prepare duties for current epoch", zap.Error(err)) | ||
| return | ||
| } | ||
| h.dutyFetchIntents[currentEpoch] = true // the intent has been fulfilled | ||
|
|
||
| logger.Info("fetching duties for the current epoch succeeded") | ||
| } |
There was a problem hiding this comment.
Inconsistent logger usage: error path drops contextual fields
The info-level logs in prepareCurrentEpoch (and prepareNextEpoch) use the logger parameter (enriched with epoch_slot_pos, current_epoch, current_slot), but the error path falls back to h.logger (base logger, no context fields). This makes the error harder to correlate in Kibana.
| err := h.fetchAndProcessDuties(ctx, logger, currentEpoch, currentSlot) | |
| if err != nil { | |
| h.logger.Error("failed to prepare duties for current epoch", zap.Error(err)) | |
| return | |
| } | |
| h.dutyFetchIntents[currentEpoch] = true // the intent has been fulfilled | |
| logger.Info("fetching duties for the current epoch succeeded") | |
| } | |
| err := h.fetchAndProcessDuties(ctx, logger, currentEpoch, currentSlot) | |
| if err != nil { | |
| logger.Error("failed to prepare duties for current epoch", zap.Error(err)) | |
| return | |
| } |
This PR aims to clarify/document/simplify the behavior of
AttesterHandler(also making the handling of reorgs and indices-changes more precise, especially at epoch-boundaries).Notable changes:
fetchCurrentEpoch/fetchNextEpochdictating when to fetch duties we now have the exact epoch->intent mapping (dutyFetchIntents) that works 100% well when crossing epoch-boundary (while the meaning offetchCurrentEpoch/fetchNextEpochdepends on what the current epoch is - hence sometimes it might lead to intents being interpreted incorrectly at epoch-boundaries) ... this is especially critical to our fetch-retries issuing retry-requests on the next slot (and, if necessary, the next slot after that, etc.) potentially crossing that epoch-boundary with outdated/incorrect fetch-intentsmaxof these 2 deadlines ... which is suboptimal and confusing)If this approach is looking good, we should apply it also to
SyncCommitteeHandler(and toProposerHandler- see also a relevant PR with some prior work towards that: #2699)Resolves #2705 for
AttesterHandler.