Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/megatron/energon/cache/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,18 @@ def get_path(self) -> str:
def get_media_metadata(self, key: str) -> MediaMetadataBase:
if self._media_metadata_available is None:
try:
has_metadata = self.sqlite_reader.db_has_media_metadata()
self._media_metadata_available = self.sqlite_reader.db_has_media_metadata()
except sqlite3.Error as exc: # pragma: no cover - defensive
self._media_metadata_available = False
raise RuntimeError(
"Failed to inspect media metadata table. Re-run `energon prepare --media-metadata-by-...`."
) from exc

if not has_metadata:
raise RuntimeError(
"Media metadata is not available for this dataset. "
"Run `energon prepare --media-metadata-by-...` to generate it."
)

self._media_metadata_available = True
if not self._media_metadata_available:
raise RuntimeError(
"Media metadata is not available for this dataset. "
"Run `energon prepare --media-metadata-by-...` to generate it."
)

try:
row = self.sqlite_reader.get_media_metadata(key)
Expand Down
57 changes: 47 additions & 10 deletions src/megatron/energon/epathlib/epath.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,17 +266,15 @@ def url(self) -> str:
return f"msc://{self.profile}{int_path_str}"

def is_local(self) -> bool:
if self.profile == "dss":
# For now, a DSS path is always considered local.
# Note that this does not mean it exists on the local filesystem.
return True
else:
return self.profile == DEFAULT_PROFILE_NAME
# It will return a posix path if the fs is local, otherwise None
return self.fs.get_posix_path(self._internal_str_path) is not None

def local_path(self) -> PathlibPath:
if not self.is_local():
# This resolves the path if it exists, probably ok.
posix_path = self.fs.get_posix_path(self._internal_str_path)
if posix_path is None:
raise ValueError(f"Path {self} is not local")
return PathlibPath(self._internal_str_path)
return PathlibPath(posix_path)

def is_dir(self) -> bool:
try:
Expand All @@ -290,14 +288,50 @@ def is_file(self) -> bool:
def mkdir(self, exist_ok: bool = True, parents: bool = False):
pass

def glob(self, pattern) -> Generator["EPath", None, None]:
def walk(self) -> Generator["EPath", None, None]:
"""Returns all files within this path (no folders)."""
# Prefix to be removed from found paths to remap to relative paths
root_prefix = self._internal_str_path.lstrip("/")

for obj in self.fs.list_recursive(self._internal_str_path):
Copy link
Copy Markdown
Collaborator

@philipp-fischer philipp-fischer May 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either us or MSC team needs to fix this for local paths. This is way slower than os.walk. Maybe for now we should have os.walk here explicitly for local paths. In MSC they do os.listdir + sorting + isdir/isdile plus object construction for each file, i.e. lots of overhead.
But let's make sure we still preserve DSS URLs when using os.walk.

rel = obj.key
if root_prefix:
if rel.startswith(root_prefix + "/"):
rel = rel[len(root_prefix) + 1 :]
elif rel.startswith("/" + root_prefix + "/"):
rel = rel[len(root_prefix) + 2 :]
elif rel == root_prefix or rel == "/" + root_prefix:
rel = "."

path = EPath(self)
path.internal_path = self._resolve(self.internal_path / PurePosixPath(rel))
yield path

def glob(self, pattern: str) -> Generator["EPath", None, None]:
"""Returns all files matching the pattern within this path (no folders)."""
search_path_pattern = (self / pattern)._internal_str_path
# MSC glob matches keys like ``bucket/key``; a leading ``/`` breaks wcmatch (pattern
# ``/b/**`` never matches ``b/parts/x``). Returned keys may repeat the bucket prefix; strip
# it before joining with ``internal_path`` so we do not get ``/b/b/parts/...``.
search_path_pattern = search_path_pattern.lstrip("/")

# Prefix to be removed from found paths to remap to relative paths
root_prefix = self._internal_str_path.lstrip("/")

for path in self.fs.glob(search_path_pattern):
assert isinstance(path, str)

rel = path
if root_prefix:
if rel.startswith(root_prefix + "/"):
rel = rel[len(root_prefix) + 1 :]
elif rel.startswith("/" + root_prefix + "/"):
rel = rel[len(root_prefix) + 2 :]
elif rel == root_prefix or rel == "/" + root_prefix:
rel = "."

new_path = EPath(self)
new_path.internal_path = self._resolve(self.internal_path / PurePosixPath(path))
new_path.internal_path = self._resolve(self.internal_path / PurePosixPath(rel))
Comment thread
philipp-fischer marked this conversation as resolved.

yield new_path

Expand All @@ -321,6 +355,9 @@ def relative_to(self, other: "EPath") -> str:

return str(self.internal_path.relative_to(other.internal_path))

def stat(self) -> msc.types.ObjectMetadata:
return self.fs.info(self._internal_str_path)

@property
def display_name(self) -> str:
if self.profile == "dss":
Expand Down
14 changes: 13 additions & 1 deletion src/megatron/energon/flavors/webdataset/indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def __init__(
part_name TEXT,
content_byte_offset INTEGER,
content_byte_size INTEGER)
if enable_media_metadata is True, it also creates the media_metadata table:
- media_metadata(entry_key TEXT PRIMARY KEY,
metadata_type TEXT NOT NULL,
metadata_json TEXT NOT NULL)
if enable_media_metadata is True, it also creates the media_filters table:
- media_filters(filter_id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy TEXT NOT NULL,
patterns TEXT,
created_at_utc TEXT DEFAULT CURRENT_TIMESTAMP,
UNIQUE(strategy, patterns))
Also creates indexes:
- samples(sample_key)
- samples(tar_file_id, sample_index)
Expand All @@ -70,7 +80,6 @@ def __init__(

# Initialize SQLite connection
# Only supporting local file system, because sqlite does not support remote file systems.
# TODO: Implement remote file systems. Maybe create locally in tmp then upload?
path = self.sqlite_path.local_path()
path.parent.mkdir(parents=True, exist_ok=True)
self.db = sqlite3.connect(path)
Expand Down Expand Up @@ -355,6 +364,9 @@ class SqliteIndexReader:
part_name TEXT,
content_byte_offset INTEGER,
content_byte_size INTEGER)
- media_metadata(entry_key TEXT PRIMARY KEY,
metadata_type TEXT NOT NULL,
metadata_json TEXT NOT NULL)
"""

sqlite_path: EPath
Expand Down
Loading
Loading