Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 113 additions & 43 deletions cmd/wl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"syscall"
Expand Down Expand Up @@ -64,17 +65,22 @@ func main() {
category := createCmd.String("category", "", "Category (e.g. models, datasets)")
tags := createCmd.String("tags", "", "Comma-separated tags")
comment := createCmd.String("comment", "", "Optional comment in the torrent file")
stream := createCmd.String("stream", "", "Torrentify a remote http(s) URL without downloading it (carried as a web seed)")
var webseeds stringSlice
createCmd.Var(&webseeds, "webseed", "BEP 19 web seed URL (HTTP origin fallback); repeatable")
createCmd.Parse(os.Args[2:])

path := createCmd.Arg(0)
if path == "" {
log.Fatal("Path is required: wl create [--name NAME] <path>")
srcPath := createCmd.Arg(0)
if srcPath == "" && *stream == "" {
log.Fatal("Provide a path or --stream URL: wl create [--name NAME] <path> | wl create --stream <url>")
}
if srcPath != "" && *stream != "" {
log.Fatal("Provide either a path or --stream, not both")
}

opts := createOpts{
path: path,
path: srcPath,
stream: *stream,
name: *name,
trackerURL: *tracker,
pieceLen: *pieceLen,
Expand Down Expand Up @@ -125,6 +131,7 @@ func main() {

type createOpts struct {
path string
stream string
name string
trackerURL string
pieceLen int
Expand All @@ -151,48 +158,18 @@ func (s *stringSlice) Set(v string) error {
}

func runCreate(opts createOpts) error {
absPath, err := filepath.Abs(opts.path)
if err != nil {
return fmt.Errorf("resolve path: %w", err)
}

if opts.name == "" {
opts.name = filepath.Base(absPath)
}

announceURL := buildAnnounceURL(opts.trackerURL, opts.userID, opts.secret)

fmt.Printf("Creating torrent for %s...\n", absPath)

result, err := torrent.Create(torrent.CreateOptions{
Path: absPath,
Name: opts.name,
PieceLength: opts.pieceLen,
AnnounceURL: announceURL,
Private: opts.private,
Comment: opts.comment,
Source: envOr("WL_SOURCE", ""),
CreatedBy: envOr("WL_CREATED_BY", ""),
WebSeeds: opts.webseeds,
})
if err != nil {
return fmt.Errorf("create torrent: %w", err)
}

// Get file/dir size for registry
var result *torrent.CreateResult
var totalSize int64
info, err := os.Stat(absPath)
if err == nil {
if info.IsDir() {
filepath.Walk(absPath, func(_ string, fi os.FileInfo, _ error) error {
if fi != nil && !fi.IsDir() {
totalSize += fi.Size()
}
return nil
})
} else {
totalSize = info.Size()
}
var err error
if opts.stream != "" {
result, totalSize, opts.name, err = createFromStream(opts, announceURL)
} else {
result, totalSize, opts.name, err = createFromPath(opts, announceURL)
}
if err != nil {
return err
}

// Write .torrent file
Expand Down Expand Up @@ -234,6 +211,99 @@ func runCreate(opts createOpts) error {
return nil
}

// createFromPath builds a torrent from a local file or directory.
// Returns the result, total size, and resolved name.
func createFromPath(opts createOpts, announceURL string) (*torrent.CreateResult, int64, string, error) {
absPath, err := filepath.Abs(opts.path)
if err != nil {
return nil, 0, "", fmt.Errorf("resolve path: %w", err)
}
name := opts.name
if name == "" {
name = filepath.Base(absPath)
}

fmt.Printf("Creating torrent for %s...\n", absPath)
result, err := torrent.Create(torrent.CreateOptions{
Path: absPath,
Name: name,
PieceLength: opts.pieceLen,
AnnounceURL: announceURL,
Private: opts.private,
Comment: opts.comment,
Source: envOr("WL_SOURCE", ""),
CreatedBy: envOr("WL_CREATED_BY", ""),
WebSeeds: opts.webseeds,
})
if err != nil {
return nil, 0, "", fmt.Errorf("create torrent: %w", err)
}

var totalSize int64
if info, err := os.Stat(absPath); err == nil {
if info.IsDir() {
filepath.Walk(absPath, func(_ string, fi os.FileInfo, _ error) error {
if fi != nil && !fi.IsDir() {
totalSize += fi.Size()
}
return nil
})
} else {
totalSize = info.Size()
}
}
return result, totalSize, name, nil
}

// createFromStream torrentifies a remote HTTP origin without downloading it to
// disk: it streams the body once to hash it, and carries the origin URL as a
// web seed so clients can fetch from it. Requires a known Content-Length.
func createFromStream(opts createOpts, announceURL string) (*torrent.CreateResult, int64, string, error) {
streamURL := opts.stream
u, err := url.Parse(streamURL)
if err != nil || (u.Scheme != "http" && u.Scheme != "https") {
return nil, 0, "", fmt.Errorf("--stream must be an http(s) URL, got %q", streamURL)
}
name := opts.name
if name == "" {
name = path.Base(u.Path)
}
if name == "" || name == "." || name == "/" {
return nil, 0, "", fmt.Errorf("could not derive a name from %q; pass --name", streamURL)
}

fmt.Printf("Streaming and hashing %s...\n", streamURL)
resp, err := http.Get(streamURL)
if err != nil {
return nil, 0, "", fmt.Errorf("fetch stream: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, 0, "", fmt.Errorf("stream origin returned status %d", resp.StatusCode)
}
if resp.ContentLength <= 0 {
return nil, 0, "", fmt.Errorf("stream origin did not report a Content-Length; cannot hash without a known size")
}

// The origin is itself a web seed; add it (deduped) to any explicit ones.
webseeds := append([]string{streamURL}, opts.webseeds...)

result, err := torrent.CreateStream(torrent.CreateOptions{
Name: name,
PieceLength: opts.pieceLen,
AnnounceURL: announceURL,
Private: opts.private,
Comment: opts.comment,
Source: envOr("WL_SOURCE", ""),
CreatedBy: envOr("WL_CREATED_BY", ""),
WebSeeds: webseeds,
}, resp.Body, resp.ContentLength, name)
if err != nil {
return nil, 0, "", fmt.Errorf("create torrent from stream: %w", err)
}
return result, resp.ContentLength, name, nil
}

type registryBody struct {
InfoHash string `json:"info_hash"`
V1InfoHash string `json:"v1_info_hash,omitempty"`
Expand Down
77 changes: 68 additions & 9 deletions internal/torrent/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,8 @@ func Create(opts CreateOptions) (*CreateResult, error) {
if opts.PieceLength < MinPieceLength || !isPowerOfTwo(opts.PieceLength) {
return nil, fmt.Errorf("piece length must be a power of 2 and >= %d, got %d", MinPieceLength, opts.PieceLength)
}
for _, ws := range opts.WebSeeds {
u, err := url.Parse(ws)
if err != nil || (u.Scheme != "http" && u.Scheme != "https") || u.Host == "" {
return nil, fmt.Errorf("web seed must be an absolute http(s) URL, got %q", ws)
}
if err := validateWebSeeds(opts.WebSeeds); err != nil {
return nil, err
}

info, err := os.Stat(opts.Path)
Expand All @@ -95,6 +92,59 @@ func Create(opts CreateOptions) (*CreateResult, error) {
return nil, err
}

return finalize(opts, infoDict, pieceLayers)
}

// validateWebSeeds checks each web seed is an absolute http(s) URL.
func validateWebSeeds(seeds []string) error {
for _, ws := range seeds {
u, err := url.Parse(ws)
if err != nil || (u.Scheme != "http" && u.Scheme != "https") || u.Host == "" {
return fmt.Errorf("web seed must be an absolute http(s) URL, got %q", ws)
}
}
return nil
}

// CreateStream builds a single-file hybrid torrent by hashing an io.Reader of
// known size — without persisting the data locally. Used by `wl create --stream`
// to torrentify a remote origin (e.g. a Hugging Face / Kaggle URL): the bytes
// are read once to compute hashes, and the origin URL is carried as a web seed.
func CreateStream(opts CreateOptions, r io.Reader, size int64, name string) (*CreateResult, error) {
if opts.PieceLength == 0 {
opts.PieceLength = DefaultPieceLength
}
if opts.AnnounceURL == "" {
return nil, fmt.Errorf("announce URL is required")
}
if opts.PieceLength < MinPieceLength || !isPowerOfTwo(opts.PieceLength) {
return nil, fmt.Errorf("piece length must be a power of 2 and >= %d, got %d", MinPieceLength, opts.PieceLength)
}
if size <= 0 {
return nil, fmt.Errorf("stream size must be known and positive, got %d", size)
}
if name == "" {
return nil, fmt.Errorf("stream name is required")
}
if err := validateWebSeeds(opts.WebSeeds); err != nil {
return nil, err
}

fr, err := hashReaderHybrid(r, size, opts.PieceLength)
if err != nil {
return nil, fmt.Errorf("hash stream: %w", err)
}

infoDict := newOrderedDict()
pieceLayers := make(map[string]string)
if err := populateSingleFileInfo(infoDict, pieceLayers, opts, fr, name); err != nil {
return nil, err
}
return finalize(opts, infoDict, pieceLayers)
}

// finalize assembles the outer metainfo dict and computes the result.
func finalize(opts CreateOptions, infoDict *orderedDict, pieceLayers map[string]string) (*CreateResult, error) {
infoBytes, err := bencode.EncodeBytes(infoDict)
if err != nil {
return nil, fmt.Errorf("bencode info dict: %w", err)
Expand Down Expand Up @@ -149,10 +199,14 @@ func buildSingleFileHybrid(infoDict *orderedDict, pieceLayers map[string]string,
if err != nil {
return err
}
return populateSingleFileInfo(infoDict, pieceLayers, opts, fr, filepath.Base(opts.Path))
}

// populateSingleFileInfo fills the info dict from an already-computed fileResult.
// Shared by the on-disk (buildSingleFileHybrid) and streaming (CreateStream) paths.
func populateSingleFileInfo(infoDict *orderedDict, pieceLayers map[string]string, opts CreateOptions, fr *fileResult, filename string) error {
// For single-file hybrid torrents, the file tree key and info.name must
// both match the actual filename so clients can locate the file on disk.
filename := filepath.Base(opts.Path)
fileTree := map[string]interface{}{
filename: map[string]interface{}{
"": fr.v2Entry,
Expand Down Expand Up @@ -348,16 +402,21 @@ func hashFileHybrid(path string, pieceLen int) (*fileResult, error) {
return nil, err
}

size := info.Size()
return hashReaderHybrid(f, info.Size(), pieceLen)
}

// hashReaderHybrid computes the hybrid v1+v2 hashes for a single file of known
// size streamed from r. r is read exactly once (no seeking, nothing persisted),
// so it works for an HTTP body just as well as a local file.
func hashReaderHybrid(r io.Reader, size int64, pieceLen int) (*fileResult, error) {
if size == 0 {
return &fileResult{
size: 0,
v2Entry: map[string]interface{}{"length": 0},
}, nil
}

// Read file in 16KiB blocks for v2 Merkle tree
// Read in 16KiB blocks for the v2 Merkle tree
// Simultaneously compute v1 SHA-1 piece hashes
buf := make([]byte, BlockSize)
var blockHashes [][32]byte
Expand All @@ -366,7 +425,7 @@ func hashFileHybrid(path string, pieceLen int) (*fileResult, error) {
bytesInPiece := 0

for {
n, err := io.ReadFull(f, buf)
n, err := io.ReadFull(r, buf)
if n > 0 {
// v2: SHA-256 of each 16KiB block
blockHashes = append(blockHashes, sha256.Sum256(buf[:n]))
Expand Down
51 changes: 51 additions & 0 deletions internal/torrent/torrent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package torrent

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"os"
Expand Down Expand Up @@ -147,6 +148,56 @@ func TestHybridGoldenVectors(t *testing.T) {
}
}

func TestCreateStream(t *testing.T) {
t.Parallel()

// Same fixed input as the uniform golden vector — proves the streaming-hash
// path is byte-identical to the on-disk path AND matches Transmission.
data := make([]byte, 1_000_000)
for i := range data {
data[i] = 'A'
}
const (
wantV1 = "cc1614c7d81dc40154072a8af1394e66b4487eef"
wantV2 = "db4ca38f5db211c00c5f547b8846129934fb863bbf311373debfab0b31fc615d"
)
origin := "https://huggingface.co/org/repo/resolve/main/data.bin"

result, err := CreateStream(CreateOptions{
PieceLength: 256 * 1024,
AnnounceURL: "http://localhost:8080/announce",
WebSeeds: []string{origin},
}, bytes.NewReader(data), int64(len(data)), "data.bin")
if err != nil {
t.Fatalf("CreateStream failed: %v", err)
}
if result.InfoHashV1Hex != wantV1 {
t.Errorf("v1 hash = %s, want %s", result.InfoHashV1Hex, wantV1)
}
if result.InfoHashHex != wantV2 {
t.Errorf("v2 hash = %s, want %s", result.InfoHashHex, wantV2)
}

// The origin must be carried as a web seed.
var meta map[string]interface{}
if err := bencode.DecodeBytes(result.TorrentBytes, &meta); err != nil {
t.Fatal(err)
}
raw, ok := meta["url-list"].([]interface{})
if !ok || len(raw) != 1 || raw[0].(string) != origin {
t.Errorf("url-list = %v, want [%s]", meta["url-list"], origin)
}

// Bad inputs are rejected.
base := CreateOptions{PieceLength: 256 * 1024, AnnounceURL: "http://localhost:8080/announce"}
if _, err := CreateStream(base, bytes.NewReader(data), 0, "x"); err == nil {
t.Error("expected error for non-positive size")
}
if _, err := CreateStream(base, bytes.NewReader(data), 10, ""); err == nil {
t.Error("expected error for empty name")
}
}

func TestCreateWebSeeds(t *testing.T) {
t.Parallel()
dir := t.TempDir()
Expand Down
Loading