Skip to content

Conversation

@mustansir14
Copy link
Contributor

@mustansir14 mustansir14 commented Nov 19, 2025

Description:

This PR supersedes #4270. The original contributor is no longer available to sign the CLA, so I have rebased + carried forward their work.

The PR aims to make the S3 source support Unit Scans by implementing the SourceUnitEnumChunker interface. It creates a unit for each bucket.

I've tried to reuse most of the logic, mainly by wrapping the single bucket scanning logic into a scanBucket method, and reusing that in ChunkUnit.

Tests are added for both Enumerate and ChunkUnit methods.

Update: This now also implements sub-unit resumption!

Checklist:

  • Tests passing (make test-community)?
  • Lint passing (make lint this requires golangci-lint)?

@mustansir14 mustansir14 requested a review from a team November 19, 2025 11:17
@mustansir14 mustansir14 requested review from a team as code owners November 19, 2025 11:17
Copy link
Contributor

@shahzadhaider1 shahzadhaider1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Please make sure to test all edge cases with this change.

Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually implement sub-unit resumption? It doesn't look like it does. @mcastorina I realize I don't actually know how to do that using the existing interfaces.

return ctx.Err()
}

ctx.Logger().V(5).Info("Enumerating bucket", "bucket", bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, "enumerating bucket" implies that the bucket itself is being ranged over, not that the bucket is being emitted from an enumeration of all buckets. I'd write this like "Found bucket" or something.

I'm not sure we even need a log message here - the bucket will show up in a list elsewhere, so logging it just duplicates that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I'll just remove the logs.


defaultClient, err := s.newClient(ctx, defaultAWSRegion, s3unit.Role)
if err != nil {
return fmt.Errorf("could not create s3 client: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding the bucket and role to this error message so that if any end users see it they have a better chance of self-diagnosing the issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@mustansir14
Copy link
Contributor Author

Does this actually implement sub-unit resumption? It doesn't look like it does.

It doesn't.

@mcastorina I realize I don't actually know how to do that using the existing interfaces.

That makes the two of us. However, I see that it is currently implemented for filesystem, and I kind of get how it works. I'll try and implement it here

@mustansir14 mustansir14 requested a review from a team as a code owner November 21, 2025 09:09
@mcastorina
Copy link
Contributor

@mcastorina I realize I don't actually know how to do that using the existing interfaces.

I added some methods to the Progress struct to handle it in a thread safe way here:

// -sub-unit-resumption------------------------------------------------------------
//
// The following collection of methods are intended to provide a thread-safe
// way to access the EncodedResumeInfo for Sources to enable saving and
// resuming progress mid SourceUnit scan.
//
// This level of synchronization is only necessary when multiple concurrent
// invocations of ChunkUnit consume/mutate the same Progress object. The source
// manager executes scans this way under certain circumstances.
//
// Usage:
// - id should be the SourceUnit ID
// - value is opaque data each Source uses
//
// GetEncodedResumeInfoFor gets the encoded resume information for the provided
// ID, usually a unit ID.
func (p *Progress) GetEncodedResumeInfoFor(id string) string {
p.mut.Lock()
defer p.mut.Unlock()
p.ensureEncodedResumeInfoByID()
return p.encodedResumeInfoByID[id]
}
// SetEncodedResumeInfoFor sets the encoded resume information for the provided
// ID, usually a unit ID.
func (p *Progress) SetEncodedResumeInfoFor(id, value string) {
p.mut.Lock()
defer p.mut.Unlock()
p.ensureEncodedResumeInfoByID()
p.encodedResumeInfoByID[id] = value
p.EncodedResumeInfo = marshalEncodedResumeInfo(p.encodedResumeInfoByID)
}
// ClearEncodedResumeInfoFor removes the encoded resume information from being
// tracked.
func (p *Progress) ClearEncodedResumeInfoFor(id string) {
p.mut.Lock()
defer p.mut.Unlock()
p.ensureEncodedResumeInfoByID()
delete(p.encodedResumeInfoByID, id)
p.EncodedResumeInfo = marshalEncodedResumeInfo(p.encodedResumeInfoByID)
}
// ensureEncodedResumeInfoByID ensures the encodedResumeInfoByID attribute is a
// non-nil map. The mutex must be held when calling this function.
func (p *Progress) ensureEncodedResumeInfoByID() {
if p.encodedResumeInfoByID != nil {
return
}
p.encodedResumeInfoByID = unmarshalEncodedResumeInfo(p.EncodedResumeInfo)
}
// marshalEncodedResumeInfo converts a map of values into a serialized string.
func marshalEncodedResumeInfo(values map[string]string) string {
marshalled, _ := json.Marshal(values)
return string(marshalled)
}
// unmarshalEncodedResumeInfo converts a serialized string into a map of values.
func unmarshalEncodedResumeInfo(data string) map[string]string {
resumeInfo := make(map[string]string)
_ = json.Unmarshal([]byte(data), &resumeInfo)
return resumeInfo
}
// -/sub-unit-resumption-----------------------------------------------------------

The idea is you need to save some data for the specific source unit ID, and you can retrieve it by that ID. When the source unit is finished, it needs to clear it from being tracked as well.

  • SetEncodedResumeInfoFor(id, value string) - sets the resumption data for the source unit ID
  • GetEncodedResumeInfoFor(id string) string - gets the resumption value for the source unit ID
  • ClearEncodedResumeInfoFor(id string) - removes the resumption value from being tracked

Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source unit unmarshalling needs some tweaks (mentioned inline).

Comment on lines 707 to 710
func (s S3SourceUnit) SourceUnitID() (string, sources.SourceUnitKind) {
// The ID is the bucket name, and the kind is "s3_bucket".
return s.Bucket, "s3_bucket"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcastorina I forget - is it a problem if SourceUnitID can't be used to round-trip a unit? (In this case, we lose the Role field.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll wait for @mcastorina's answer before making changes here, but here's what the description comment says for SourceUnitID():

// SourceUnitID uniquely identifies a source unit. It does not need to
// be human readable or two-way, however, it should be canonical and
// stable across runs.

The bucket name is a globally unique value, so with that aspect we should be good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch. I guess the round-trip-abillity happens in the source manager somewhere? (@mcastorina?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We take the full unit object and JSON marshal it, so the fields need to be public. Idk if I documented that anywhere though, but that's why a source needs to implement unmarshalling but not marshalling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the fields are public, so we're good there. But based on our discussion in the thread below regarding having the role in resumption info, it seems like a good idea to have the role in the SourceUnitID as well. I'll add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it and realized it might not be best to add this yet as this also affects sub-unit resumption because the resumption info is supposed to be saved against the SourceUnitID, and our current checkpointer only works with buckets, not roles. I have reverted the changes and will wait for your responses to decide if we want to go with roles being part of resumption info or not.

return fmt.Sprintf("https://%s.s3.%s.amazonaws.com/%s", bucket, region, key)
}

type S3SourceUnit struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've defined this unit type, but you haven't modified the source to actually use it. The source type still embeds CommonSourceUnitUnmarshaller, so it will still unmarshal source units to CommonSourceUnit instead of your new type. You'll need to define custom unmarshalling logic. (The git source has an example of custom unmarshalling logic you can look at.)

Also, I recommend putting the unit struct and related code in a separate file, because we do that for several other sources, and I think it makes things more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks for pointing this out. I wasn't aware of this. I'll make the changes.

@mustansir14 mustansir14 requested a review from rosecodym December 1, 2025 05:53
s.checkpointer.SetIsUnitScan(true)

var startAfterPtr *string
startAfter := s.Progress.GetEncodedResumeInfoFor(bucket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about the way this gets resume info using only a bucket name, but then scans that bucket also using the role stored in the unit. It seems like the information used to retrieve resumption information should be the same information that's used to scan using the retrieved information, but that's not how you've implemented this.

I can't think of any concrete, immediate problems this would cause, but that doesn't mean there aren't any - and my bigger concern is that this will impede maintainability. What do you think?

Copy link
Contributor Author

@mustansir14 mustansir14 Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this. I had an internal discussion with @amanfcp on this, and he raised a great point that there could be a case where two roles have access to the same bucket, but have different object-level access. I researched on this and turns out this is true. So it seems like a good idea to store resume info for a particular bucket AND a particular role.

My only concern here is that Legacy scans resumption does not have this. Resume Info is only stored using bucket there, and this particular case also seems to be applicable there.

I might be totally off here and there might be some place where we are already handling this particular case, so please correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this looks like an oversight in the existing implementation. It's been around for a while, so I don't think we need to urgently fix it (and I wouldn't fix it in this PR), but please add a TODO somewhere flagging the problem. We can clean it up as a later step.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've included the role in the SourceUnitID now, and resumption info is being tracked using the SourceUnitID, so this means resumption info is being stored against both role and bucket

@mustansir14 mustansir14 requested a review from rosecodym December 3, 2025 09:38
Copy link
Contributor

@rosecodym rosecodym left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your attention to detail on this! Hopefully we got everything, but it looks like you were pretty diligent with your test cases, so I'm not too worried.

if err := json.Unmarshal(data, &unit); err != nil {
return nil, err
}
bucket, kind := unit.SourceUnitID()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable called bucket actually holds the unit ID, right? (I don't think this will cause program incorrectness but it will likely confuse readers.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good catch. I had it previously when we were using just the bucket for UnitID. Made the change

@mustansir14 mustansir14 merged commit b6389e2 into trufflesecurity:main Dec 4, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants