Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,10 @@ class Packable(BaseModel):
# Extract/Encode (instance methods)
def extract(self) -> ExtractedPackable # Cached for efficiency
def encode(self) -> bytes # Calls extract() internally
def get_checksum(self) -> str # SHA256 checksum of encoded bytes

# Checksum (final property, cannot be overridden)
@cached_property
def checksum(self) -> str # SHA256 checksum of encoded bytes

# Decode/Reconstruct
@classmethod
Expand Down
17 changes: 10 additions & 7 deletions python/meshly/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ class PackableCache(Generic[T]):
Disk I/O uses ForkPool for parallelism on batch operations.

Args:
store: PackableStore for disk persistence.
store: PackableStore for disk persistence. None for memory-only mode.
decoder: Packable subclass used to decode bytes from disk.
prefix: Key prefix for namespacing within the store's assets dir.
max_memory: Maximum entries in the in-memory LRU cache.
"""

def __init__(
self,
store: PackableStore,
decoder: type[T],
store: PackableStore | None = None,
decoder: type[T] | None = None,
prefix: str = "",
max_memory: int = 10_000,
):
Expand Down Expand Up @@ -111,7 +111,7 @@ def get_many(self, keys: set[str]) -> dict[str, T]:

# Tier 2: disk (parallel via ForkPool)
missing = keys - found.keys()
if not missing:
if not missing or self._store is None:
return found

disk_hits = self._load_many_disk(missing)
Expand Down Expand Up @@ -139,7 +139,8 @@ def put_many(self, items: dict[str, T]) -> None:
self._evict()

# Disk (parallel via ForkPool)
self._save_many_disk(items)
if self._store is not None:
self._save_many_disk(items)

def clear(self) -> None:
"""Clear in-memory cache (disk is not affected)."""
Expand Down Expand Up @@ -175,9 +176,11 @@ def _save_many_disk(self, items: dict[str, T]) -> None:
for k, v in items.items():
store_key = self._store_key(k)
path = str(self._store.asset_file(store_key))
work.append((path, v.encode()))
if not Path(path).exists():
work.append((path, v.encode()))

ForkPool.map(_save_one, work, min_items_for_parallel=4)
if work:
ForkPool.map(_save_one, work, min_items_for_parallel=4)

# -- internal -------------------------------------------------------------

Expand Down
96 changes: 37 additions & 59 deletions python/meshly/packable.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,18 @@
- save() / load(): File-based asset store with deduplication

Checksum Scheme:
Packable checksums are computed from the JSON representation of extracted data.
This makes checksum recreation straightforward outside this library.
Packable checksums are computed from the SHA256 of the encoded zip bytes.
This ensures the checksum captures the exact binary representation.

Format: SHA256 of compact JSON: {"data":<data>,"json_schema":<schema>}
Keys are sorted, no whitespace (single line).

The `data` dict contains $ref entries (e.g. {"$ref":"abc123..."}) pointing
to asset checksums, so the packable checksum transitively covers all binary
content without embedding the actual bytes.
The checksum property is final and cannot be overridden by subclasses.
Attempting to override will raise a TypeError.

To recreate a checksum externally:
import hashlib, json
payload = {"data": packable_data, "json_schema": schema}
compact_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))
checksum = hashlib.sha256(compact_json.encode()).hexdigest()
import hashlib
checksum = hashlib.sha256(packable.encode()).hexdigest()
"""

import os
import time
import zipfile
from functools import cached_property, lru_cache
Expand Down Expand Up @@ -66,6 +61,7 @@ def _reconstruct_packable(cls, data: dict):
return cls.model_construct(**data)



class PackableRefInfo(RefInfo):
"""Ref model for self-contained packable $ref (encoded as zip)."""
ref: str = Field(..., alias="$ref")
Expand All @@ -87,31 +83,6 @@ class ExtractedPackable(BaseModel):
json_schema: Optional[dict[str, Any]] = Field(default=None, description="JSON Schema with encoding info")
assets: dict[str, bytes] = Field(default_factory=dict, exclude=True, description="Map of checksum -> encoded bytes for all arrays")

@cached_property
def checksum(self) -> str:
"""SHA256 checksum computed from data and json_schema.

Checksum Format:
SHA256 of compact JSON: {"data":<data>,"json_schema":<schema>}
Keys are sorted, no whitespace (single line).

Why JSON-based:
The data dict contains $ref entries pointing to asset checksums,
so this checksum transitively covers all array/binary content.
This format makes checksum recreation straightforward outside meshly:

import hashlib, json
payload = {"data": extracted_data, "json_schema": schema}
compact_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))
checksum = hashlib.sha256(compact_json.encode()).hexdigest()

Returns:
SHA256 hex digest string
"""
payload = {"data": self.data, "json_schema": self.json_schema}
json_bytes = orjson.dumps(payload, option=orjson.OPT_SORT_KEYS)
return ChecksumUtils.compute_bytes_checksum(json_bytes)

@staticmethod
def extract_checksums(data: dict[str, Any]) -> list[str]:
"""Extract all $ref checksums from a serialized data dict.
Expand Down Expand Up @@ -310,6 +281,16 @@ class Mesh(Packable):
class Config:
arbitrary_types_allowed = True

def __init_subclass__(cls, **kwargs):
"""Prevent subclasses from overriding the checksum property."""
super().__init_subclass__(**kwargs)
# Check if this class defines its own 'checksum' (not inherited)
if 'checksum' in cls.__dict__:
raise TypeError(
f"Cannot override 'checksum' property in {cls.__name__}. "
f"The checksum is computed from encoded bytes and is final."
)

@classmethod
def __get_pydantic_json_schema__(
cls, core_schema_obj: pydantic_core_schema.CoreSchema, handler: GetJsonSchemaHandler
Expand Down Expand Up @@ -394,26 +375,16 @@ def encode(self) -> bytes:

@cached_property
def checksum(self) -> str:
"""SHA256 checksum of this Packable's extracted JSON representation (cached).

Checksum Format:
SHA256 of compact JSON: {"data":<extracted_data>,"json_schema":<schema>}
Keys are sorted, no whitespace (single line).
"""SHA256 checksum of this Packable's encoded zip bytes (cached, final).

The data dict contains $ref entries pointing to asset checksums (e.g.,
{"$ref":"abc123..."}), so this checksum transitively covers all binary content.
This property cannot be overridden by subclasses. Attempting to do so
will raise a TypeError at class definition time.

To recreate this checksum outside meshly:
import hashlib, json
payload = {"data": packable_data, "json_schema": schema}
compact_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))
checksum = hashlib.sha256(compact_json.encode()).hexdigest()
import hashlib
checksum = hashlib.sha256(packable.encode()).hexdigest()
"""
return self.extract().checksum

def set_checksum(self, checksum: str) -> None:
"""Pre-populate the cached checksum to avoid re-encoding."""
self.__dict__["checksum"] = checksum
return ChecksumUtils.compute_bytes_checksum(self._encoded)

@classmethod
def decode(
Expand Down Expand Up @@ -501,8 +472,6 @@ def reconstruct(
resolved_data = SchemaUtils.resolve_from_class(cls, extracted.data, asset_provider, array_type)
result = cls(**resolved_data)

if isinstance(result, Packable):
result.set_checksum(extracted.checksum)
return result

@staticmethod
Expand Down Expand Up @@ -610,10 +579,19 @@ def save(
# print(f"Extracted packable in {elapsed_ms:.1f} ms with {len(extracted.assets)} assets")
result_key = key or self.checksum

# Save all binary assets (deduplicated by checksum)
for asset_checksum, asset_bytes in extracted.assets.items():
if not store.asset_exists(asset_checksum):
store.save_asset(asset_bytes, asset_checksum)
# Save new binary assets (skip existing)
assets_dir = store.assets_path
assets_dir_exists = assets_dir.exists()
new_assets = {
cs: data for cs, data in extracted.assets.items()
if not assets_dir_exists or not store.asset_exists(cs)
}
if new_assets:
assets_dir.mkdir(parents=True, exist_ok=True)
for cs, data in new_assets.items():
fd = os.open(str(store.asset_file(cs)), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o666)
os.write(fd, data)
os.close(fd)

# Save extracted data (data + schema + checksum) as JSON
store.save_extracted(result_key, extracted)
Expand Down
49 changes: 29 additions & 20 deletions python/meshly/utils/checksum_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
import hashlib
import json
from pathlib import Path
from typing import Any, Optional
from typing import Any, Optional, Union

import orjson
from pydantic import BaseModel

class ChecksumUtils:
"""Utility class for computing checksums."""
Expand All @@ -12,6 +15,31 @@ class ChecksumUtils:
LARGE_FILE_THRESHOLD = 10 * 1024 * 1024 # 10MB
LARGE_DIR_FILE_COUNT_THRESHOLD = 100

@staticmethod
def compute_dict_checksum(data: Union[dict, BaseModel]) -> str:
"""SHA256 checksum computed from data and json_schema.

Checksum Format:
SHA256 of compact JSON: {"data":<data>,"json_schema":<schema>}
Keys are sorted, no whitespace (single line).

Why JSON-based:
The data dict contains $ref entries pointing to asset checksums,
so this checksum transitively covers all array/binary content.
This format makes checksum recreation straightforward outside meshly:

import hashlib, json
payload = {"data": extracted_data, "json_schema": schema}
compact_json = json.dumps(payload, sort_keys=True, separators=(',', ':'))
checksum = hashlib.sha256(compact_json.encode()).hexdigest()

Returns:
SHA256 hex digest string
"""
data_dict = data.model_dump() if isinstance(data, BaseModel) else data
json_bytes = orjson.dumps(data_dict, option=orjson.OPT_SORT_KEYS)
return ChecksumUtils.compute_bytes_checksum(json_bytes)

@staticmethod
def compute_bytes_checksum(data: bytes) -> str:
"""Compute SHA256 checksum for bytes.
Expand All @@ -24,26 +52,7 @@ def compute_bytes_checksum(data: bytes) -> str:
"""
return hashlib.sha256(data).hexdigest()[:16]

@staticmethod
def compute_dict_checksum(data: dict[str, Any], assets: dict[str, bytes] = {}) -> str:
"""Compute checksum for a data dict with assets.

Combines data JSON + all asset bytes for deterministic hashing.

Args:
data: JSON-serializable dict
assets: Map of checksum -> bytes

Returns:
16-character hex string
"""
data_json = json.dumps(data, sort_keys=True).encode("utf-8")
hasher = hashlib.sha256()
hasher.update(data_json)
hasher.update(b"\x00")
for checksum in sorted(assets.keys()):
hasher.update(assets[checksum])
return hasher.hexdigest()[:16]

@staticmethod
def compute_file_checksum(file_path: Path, fast: bool = False) -> str:
Expand Down
57 changes: 40 additions & 17 deletions python/meshly/utils/schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,23 @@ def _resolve_packable_item(idx: int) -> object:
return Packable.decode(asset_bytes, ctx.array_type)


def _resolve_basemodel_item(idx: int) -> object:
"""Worker: reconstruct a single BaseModel from serialized dict."""
ctx = _PACKABLE_CTX
if ctx is None:
raise RuntimeError("_packable_context not set")

resolved = SchemaUtils.resolve_from_class(
ctx.expected_type, ctx.values[idx], ctx.assets, ctx.array_type
)
return ctx.expected_type(**resolved)


class SchemaUtils:
"""Utilities for resolving $ref values during deserialization."""

_type_hints_cache: dict[type, dict] = {}

# -------------------------------------------------------------------------
# Type helpers
# -------------------------------------------------------------------------
Expand Down Expand Up @@ -126,7 +140,10 @@ def resolve_from_class(
array_type: ArrayType = "numpy",
) -> dict[str, object]:
"""Resolve $ref values using Pydantic model type hints."""
hints = typing.get_type_hints(model_class, include_extras=True)
hints = SchemaUtils._type_hints_cache.get(model_class)
if hints is None:
hints = typing.get_type_hints(model_class, include_extras=True)
SchemaUtils._type_hints_cache[model_class] = hints
result: dict[str, object] = {}

for field_name, field_info in model_class.model_fields.items():
Expand Down Expand Up @@ -249,32 +266,38 @@ def _resolve_list_items(
assets: AssetProvider,
array_type: ArrayType,
) -> list:
"""Resolve list items, parallelizing when items are Packable $refs.
"""Resolve list items, parallelizing when items are Packable $refs or BaseModel dicts.

Uses fork-based parallelism for lists of Packable references.
Falls back to sequential for mixed types.
Uses fork-based parallelism for large homogeneous lists.
Falls back to sequential for mixed types or small lists.
"""
from meshly.packable import Packable

if not items:
return []

# Check if all items are Packable $refs and type is Packable
MIN_ITEMS_FOR_PARALLEL = 50
is_packable_type = isinstance(elem_type, type) and issubclass(elem_type, Packable)
all_packable_refs = (
len(items) >= MIN_ITEMS_FOR_PARALLEL
and is_packable_type
and all(isinstance(v, dict) and "$ref" in v for v in items)
)
is_basemodel_type = isinstance(elem_type, type) and issubclass(elem_type, BaseModel)

if all_packable_refs:
with _packable_context(items, elem_type, assets, array_type):
return ForkPool.map(
_resolve_packable_item,
range(len(items)),
min_items_for_parallel=MIN_ITEMS_FOR_PARALLEL,
)
if len(items) >= MIN_ITEMS_FOR_PARALLEL:
all_dicts = all(isinstance(v, dict) for v in items)

if is_packable_type and all_dicts and all("$ref" in v for v in items):
with _packable_context(items, elem_type, assets, array_type):
return ForkPool.map(
_resolve_packable_item,
range(len(items)),
min_items_for_parallel=MIN_ITEMS_FOR_PARALLEL,
)

if is_basemodel_type and all_dicts:
with _packable_context(items, elem_type, assets, array_type):
return ForkPool.map(
_resolve_basemodel_item,
range(len(items)),
min_items_for_parallel=MIN_ITEMS_FOR_PARALLEL,
)

# Sequential fallback
return [SchemaUtils._resolve_with_type(v, elem_type, assets, array_type) for v in items]
Expand Down
Loading
Loading