From d67472417c7aa8949b62aa456b0d64932337844f Mon Sep 17 00:00:00 2001 From: iksnerd Date: Sat, 13 Jun 2026 18:07:18 +0300 Subject: [PATCH] Add BEP 11 PEX and a growable shared peer pool to the swarm MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The swarm statically partitioned the initial peer list across workers at Start, so a worker that lost its peers couldn't get others' and PEX peers had nowhere to go. Replace that with a shared, growable pool and wire in BEP 11 peer exchange. Protocol (peer.go): - Advertise ut_pex (local id 2) alongside ut_metadata in the BEP 10 extended handshake. - parsePexPeers validates against PeerMessageLimits, then unpacks added (compact v4) and added6 (compact v6) via the existing unpackPeers, and drops unconnectable entries (port 0 / unspecified IP) at the boundary. - PeerConn buffers harvested peers (handlePexMessage / DrainPexPeers). Swarm (swarm.go): - peerChan (buffered) + mutex-guarded knownAddrs replace the static split; AddPeers dedups and pushes non-blocking. - Workers acquire a connection from the pool BEFORE pulling work, so a worker waiting on a peer can't strand a piece marked in-progress. - inFlight (atomic) is incremented at the pull, before dialing, so the new exhaustion supervisor can't mistake an in-flight connect for a dead swarm; it cancels only after the pool is empty, inFlight==0, and pieces remain, sustained across a short grace window. - The unbuffered-resultChan / drain / close-after-WaitGroup shutdown is preserved verbatim. Wiring (downloader.go): downloadPiece harvests ut_pex messages; the worker drains them into the pool after each piece. connectToAny is replaced by a single-addr dialAndHandshake. Tested unit (dead-peer skip, PEX discovery e2e, exhaustion, no-leak) under -race, and end-to-end against a real Transmission seeder (5 MiB / 20 pieces, checksum verified) — during which Transmission's ut_pex was harvested and used, validating the path against a real client. --- internal/client/downloader.go | 50 ++++---- internal/client/peer.go | 105 +++++++++++++++- internal/client/pex_test.go | 151 ++++++++++++++++++++++ internal/client/swarm.go | 186 +++++++++++++++++++++++----- internal/client/swarm_test.go | 133 ++++++++++++++++++++ internal/client/testhelpers_test.go | 84 +++++++++++++ 6 files changed, 654 insertions(+), 55 deletions(-) create mode 100644 internal/client/pex_test.go diff --git a/internal/client/downloader.go b/internal/client/downloader.go index 8e3b9ec..344f5d7 100644 --- a/internal/client/downloader.go +++ b/internal/client/downloader.go @@ -63,29 +63,24 @@ func DownloadMVP(ctx context.Context, opts DownloadOptions) error { return nil } -// connectToAny tries each address until one succeeds handshake. -func connectToAny(ctx context.Context, addrs []string, infoHash []byte, peerID string) (*PeerConn, string, error) { - var lastErr error - for _, addr := range addrs { - p, err := Connect(ctx, addr) - if err != nil { - lastErr = err - continue - } - if err := p.Handshake(ctx, infoHash, peerID); err != nil { - p.Close() - lastErr = err - continue - } - // Send interested immediately after handshake - if err := p.WriteMessage(&Message{ID: MsgInterested}); err != nil { - p.Close() - lastErr = err - continue - } - return p, addr, nil +// dialAndHandshake connects to a single peer, performs the BEP 3/10 handshake, +// and sends our initial "interested". On any failure the connection is closed +// and the error returned so the caller can move on to the next address. +func dialAndHandshake(ctx context.Context, addr string, infoHash []byte, peerID string) (*PeerConn, error) { + p, err := Connect(ctx, addr) + if err != nil { + return nil, err + } + if err := p.Handshake(ctx, infoHash, peerID); err != nil { + p.Close() + return nil, err + } + // Send interested immediately after handshake. + if err := p.WriteMessage(&Message{ID: MsgInterested}); err != nil { + p.Close() + return nil, err } - return nil, "", fmt.Errorf("all peers failed: %w", lastErr) + return p, nil } // downloadPiece downloads and verifies a single piece over an existing connection. @@ -155,7 +150,16 @@ func downloadPiece(ctx context.Context, p *PeerConn, index int, size int, expect } } - case MsgExtended, MsgBitfield, MsgHave: + case MsgExtended: + // BEP 11: a peer addresses ut_pex messages to us using the local + // ID we advertised (localPexID). Harvest discovered peers; the + // owning worker drains them after this piece finishes. + if len(msg.Payload) > 0 && msg.Payload[0] == localPexID { + p.handlePexMessage(msg.Payload[1:]) + } + continue + + case MsgBitfield, MsgHave: continue } } diff --git a/internal/client/peer.go b/internal/client/peer.go index a7b5bd2..6cfd92f 100644 --- a/internal/client/peer.go +++ b/internal/client/peer.go @@ -13,9 +13,20 @@ import ( wbencode "weightless/internal/bencode" ) -// Extension ID mapping for BEP 10 +// Extension names for BEP 10. Peers send extended messages using the local +// IDs WE advertise in our handshake `m` dict, so the constants below are both +// the advertised names and (via localMetadataID/localPexID) the IDs we expect +// inbound messages to carry. const ( - ExtMetadata = "ut_metadata" + ExtMetadata = "ut_metadata" // BEP 9 metadata exchange + ExtPex = "ut_pex" // BEP 11 peer exchange +) + +// Local extension IDs we advertise in sendExtendedHandshake. A peer that +// supports an extension addresses its messages to us using these IDs. +const ( + localMetadataID = 1 + localPexID = 2 ) // BEP 3 wire constants. The handshake is *exactly* 68 bytes: 1 length byte + @@ -51,6 +62,11 @@ type PeerConn struct { // BEP 10 Extension data PeerExtensions map[string]int MetadataSize int + + // pexPeers buffers addresses harvested from inbound ut_pex (BEP 11) + // messages. Owned by the single worker goroutine driving this conn — the + // read loop appends, the same worker drains via DrainPexPeers. No lock. + pexPeers []string } // Connect establishes a TCP connection to the given address. @@ -223,7 +239,8 @@ func (p *PeerConn) RequestMetadata(piece int) error { func (p *PeerConn) sendExtendedHandshake() error { m := map[string]interface{}{ "m": map[string]int{ - ExtMetadata: 1, // We map ut_metadata to local ID 1 + ExtMetadata: localMetadataID, // ut_metadata → local ID 1 + ExtPex: localPexID, // ut_pex → local ID 2 }, } payloadBytes, err := bencode.EncodeBytes(m) @@ -266,3 +283,85 @@ func (p *PeerConn) WriteMessage(m *Message) error { func (p *PeerConn) Close() { p.conn.Close() } + +// handlePexMessage parses the body of an inbound ut_pex (BEP 11) extended +// message and buffers any discovered peers for later draining. body is the +// extended-message payload AFTER the leading ext-id byte. +// +// Malformed PEX is silently ignored: peer exchange is best-effort and must +// never break an in-flight piece download. Strict recognition lives in +// parsePexPeers (unit-tested); the runtime path just discards what it can't read. +func (p *PeerConn) handlePexMessage(body []byte) { + peers, err := parsePexPeers(body) + if err != nil { + return + } + p.pexPeers = append(p.pexPeers, peers...) +} + +// DrainPexPeers returns the peers harvested from ut_pex messages so far and +// clears the buffer. Called by the owning worker between piece downloads. +func (p *PeerConn) DrainPexPeers() []string { + out := p.pexPeers + p.pexPeers = nil + return out +} + +// parsePexPeers recognizes a ut_pex (BEP 11) message dict and returns the +// peers in its `added` (compact IPv4, 6 bytes each) and `added6` (compact +// IPv6, 18 bytes each) fields. `dropped`, flags, and any other keys are +// ignored — v0 only grows the pool, never shrinks it. +// +// LangSec: validate the bencode structure against PeerMessageLimits before +// decoding, and reject compact lists whose length isn't a clean multiple of +// the peer-entry size. +func parsePexPeers(dict []byte) ([]string, error) { + if err := wbencode.Validate(dict, wbencode.PeerMessageLimits); err != nil { + return nil, fmt.Errorf("validate pex message: %w", err) + } + + var msg struct { + Added []byte `bencode:"added"` + Added6 []byte `bencode:"added6"` + } + if err := bencode.DecodeBytes(dict, &msg); err != nil { + return nil, fmt.Errorf("decode pex message: %w", err) + } + + var addrs []string + if len(msg.Added) > 0 { + v4, err := unpackPeers(msg.Added, net.IPv4len) + if err != nil { + return nil, fmt.Errorf("pex added: %w", err) + } + addrs = append(addrs, v4...) + } + if len(msg.Added6) > 0 { + v6, err := unpackPeers(msg.Added6, net.IPv6len) + if err != nil { + return nil, fmt.Errorf("pex added6: %w", err) + } + addrs = append(addrs, v6...) + } + + // Drop unconnectable entries. Real clients (e.g. Transmission) advertise + // peers with port 0 or an unspecified IP; dialing them only wastes a worker. + out := addrs[:0] + for _, a := range addrs { + if connectableAddr(a) { + out = append(out, a) + } + } + return out, nil +} + +// connectableAddr reports whether addr is worth dialing: a parseable host:port +// with a non-zero port and a specified (non-0.0.0.0/::) IP. +func connectableAddr(addr string) bool { + host, port, err := net.SplitHostPort(addr) + if err != nil || port == "0" { + return false + } + ip := net.ParseIP(host) + return ip != nil && !ip.IsUnspecified() +} diff --git a/internal/client/pex_test.go b/internal/client/pex_test.go new file mode 100644 index 0000000..091013f --- /dev/null +++ b/internal/client/pex_test.go @@ -0,0 +1,151 @@ +package client + +import ( + "net" + "testing" + + "github.com/zeebo/bencode" +) + +// encodePexDict bencodes a ut_pex message dict from raw compact peer bytes. +func encodePexDict(t *testing.T, added, added6 []byte) []byte { + t.Helper() + dict := struct { + Added []byte `bencode:"added"` + Added6 []byte `bencode:"added6"` + }{added, added6} + data, err := bencode.EncodeBytes(dict) + if err != nil { + t.Fatalf("encode pex dict: %v", err) + } + return data +} + +func TestParsePexPeers(t *testing.T) { + // Two IPv4 peers: 1.2.3.4:6881, 5.6.7.8:6882 (6 bytes each). + added := []byte{1, 2, 3, 4, 0x1a, 0xe1, 5, 6, 7, 8, 0x1a, 0xe2} + // One IPv6 peer: [::1]:6883 (18 bytes). + v6 := net.ParseIP("::1").To16() + added6 := append(append([]byte{}, v6...), 0x1a, 0xe3) + + peers, err := parsePexPeers(encodePexDict(t, added, added6)) + if err != nil { + t.Fatalf("parsePexPeers: %v", err) + } + + want := []string{"1.2.3.4:6881", "5.6.7.8:6882", "[::1]:6883"} + if len(peers) != len(want) { + t.Fatalf("got %d peers %v, want %d %v", len(peers), peers, len(want), want) + } + for i := range want { + if peers[i] != want[i] { + t.Errorf("peer %d: got %q, want %q", i, peers[i], want[i]) + } + } +} + +func TestParsePexPeersDropsUnconnectable(t *testing.T) { + // A real client (Transmission, observed in e2e) can advertise a peer with + // port 0; an unspecified IP is likewise unconnectable. Both must be dropped + // while the valid peer survives. + added := []byte{ + 1, 2, 3, 4, 0x1a, 0xe1, // 1.2.3.4:6881 (valid) + 127, 0, 0, 1, 0, 0, // 127.0.0.1:0 (port 0 — drop) + 0, 0, 0, 0, 0x1a, 0xe1, // 0.0.0.0:6881 (unspecified — drop) + } + peers, err := parsePexPeers(encodePexDict(t, added, nil)) + if err != nil { + t.Fatalf("parsePexPeers: %v", err) + } + if len(peers) != 1 || peers[0] != "1.2.3.4:6881" { + t.Errorf("expected only [1.2.3.4:6881], got %v", peers) + } +} + +func TestParsePexPeersEmpty(t *testing.T) { + // A dict with no added/added6 keys is valid and yields no peers. + peers, err := parsePexPeers(encodePexDict(t, nil, nil)) + if err != nil { + t.Fatalf("parsePexPeers: %v", err) + } + if len(peers) != 0 { + t.Errorf("expected 0 peers, got %v", peers) + } +} + +func TestParsePexPeersRejectsMalformed(t *testing.T) { + tests := []struct { + name string + dict []byte + }{ + {"not bencode", []byte("definitely not bencode")}, + // added length 5 is not a clean multiple of the 6-byte v4 entry size. + {"ragged added", encodePexDict(t, []byte{1, 2, 3, 4, 0x1a}, nil)}, + // added6 length 17 is not a clean multiple of the 18-byte v6 entry size. + {"ragged added6", encodePexDict(t, nil, make([]byte, 17))}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if _, err := parsePexPeers(tt.dict); err == nil { + t.Errorf("expected error for %s, got nil", tt.name) + } + }) + } +} + +func TestDrainPexPeers(t *testing.T) { + added := []byte{1, 2, 3, 4, 0x1a, 0xe1} + p := &PeerConn{} + + // Malformed PEX must be silently dropped, never appended. + p.handlePexMessage([]byte("garbage")) + if got := p.DrainPexPeers(); got != nil { + t.Fatalf("malformed pex should yield no peers, got %v", got) + } + + // Two messages accumulate; drain returns all and clears. + p.handlePexMessage(encodePexDict(t, added, nil)) + p.handlePexMessage(encodePexDict(t, added, nil)) + got := p.DrainPexPeers() + if len(got) != 2 { + t.Fatalf("expected 2 buffered peers, got %v", got) + } + if got := p.DrainPexPeers(); got != nil { + t.Errorf("drain should have cleared the buffer, got %v", got) + } +} + +func TestExtendedHandshakeAdvertisesPex(t *testing.T) { + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + p := &PeerConn{conn: client, state: stateHandshook} + go func() { + _ = p.sendExtendedHandshake() + }() + + m, err := ReadMessage(server) + if err != nil { + t.Fatalf("read extended handshake: %v", err) + } + if m.ID != MsgExtended { + t.Fatalf("expected MsgExtended, got %d", m.ID) + } + if m.Payload[0] != 0 { + t.Fatalf("expected handshake ext-id 0, got %d", m.Payload[0]) + } + + var hs struct { + M map[string]int `bencode:"m"` + } + if err := bencode.DecodeBytes(m.Payload[1:], &hs); err != nil { + t.Fatalf("decode handshake: %v", err) + } + if hs.M[ExtMetadata] != localMetadataID { + t.Errorf("ut_metadata: got %d, want %d", hs.M[ExtMetadata], localMetadataID) + } + if hs.M[ExtPex] != localPexID { + t.Errorf("ut_pex: got %d, want %d", hs.M[ExtPex], localPexID) + } +} diff --git a/internal/client/swarm.go b/internal/client/swarm.go index 9e14711..6241de5 100644 --- a/internal/client/swarm.go +++ b/internal/client/swarm.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "sync" + "sync/atomic" + "time" ) // PieceStatus represents the state of a piece in the swarm. @@ -16,6 +18,22 @@ const ( PieceCompleted ) +const ( + // peerPoolHeadroom is the buffered capacity of the shared peer pool. It + // holds the initial seed plus PEX (BEP 11) discoveries; trackers cap a + // swarm's returned peers well below this, and a full pool simply drops + // further additions (they can be re-offered later). + peerPoolHeadroom = 256 + // exhaustionPoll is how often the supervisor checks for a stalled swarm. + exhaustionPoll = 250 * time.Millisecond + // exhaustionConfirms is the number of consecutive positive polls required + // before declaring the peer supply exhausted. Requiring two (≈500ms of + // sustained idle) absorbs the microsecond gap between a worker pulling the + // last address and marking itself in-flight, so a connect about to start + // can't be mistaken for a dead swarm. + exhaustionConfirms = 2 +) + // Swarm manages multiple peer connections and coordinates the download. type Swarm struct { meta TorrentMeta @@ -25,6 +43,18 @@ type Swarm struct { workChan chan int // Channel to dispatch piece indices to workers resultChan chan pieceResult // Channel to receive downloaded pieces + + // Shared, growable peer pool. peerChan feeds addresses to any idle worker; + // knownAddrs (guarded by poolMu) dedupes both the initial seed and PEX + // discoveries. inFlight counts workers currently holding or acquiring a + // connection; completed counts verified pieces. Both are read by the + // exhaustion supervisor. + peerChan chan string + knownAddrs map[string]bool + poolMu sync.Mutex + inFlight atomic.Int64 + completed atomic.Int64 + exhausted atomic.Bool } type pieceResult struct { @@ -33,7 +63,8 @@ type pieceResult struct { err error } -// NewSwarm creates a new swarm manager. +// NewSwarm creates a new swarm manager. The peer pool is initialized here so +// AddPeers is safe to call before or during Start (e.g. PEX discoveries). func NewSwarm(meta TorrentMeta, maxWorkers int) *Swarm { return &Swarm{ meta: meta, @@ -41,6 +72,27 @@ func NewSwarm(meta TorrentMeta, maxWorkers int) *Swarm { pieces: make([]PieceStatus, meta.PieceCount), workChan: make(chan int, meta.PieceCount), resultChan: make(chan pieceResult), + peerChan: make(chan string, peerPoolHeadroom), + knownAddrs: make(map[string]bool), + } +} + +// AddPeers dedupes the given addresses against the known set and pushes the +// fresh ones onto the shared pool. The push is non-blocking: if the pool is +// full the address is dropped and left un-known, so it can be re-offered later. +func (s *Swarm) AddPeers(addrs []string) { + for _, addr := range addrs { + s.poolMu.Lock() + if s.knownAddrs[addr] { + s.poolMu.Unlock() + continue + } + select { + case s.peerChan <- addr: + s.knownAddrs[addr] = true // mark known only once it's actually queued + default: + } + s.poolMu.Unlock() } } @@ -54,33 +106,42 @@ func (s *Swarm) Start(ctx context.Context, addrs []string, infoHash []byte, peer s.workChan <- i } - // 2. Spawn workers + // 2. Seed the shared peer pool (the pool itself was created in NewSwarm). + s.AddPeers(addrs) + + // 3. Spawn a fixed pool of workers. Unlike the old static partition, every + // worker pulls from the shared peerChan, so idle workers block until the + // pool (or PEX) supplies a peer rather than dying. maxWorkers := s.maxWorkers - if len(addrs) < maxWorkers { - maxWorkers = len(addrs) + if maxWorkers < 1 { + maxWorkers = 1 } var wg sync.WaitGroup for i := 0; i < maxWorkers; i++ { wg.Add(1) - // We distribute addresses among workers - workerAddrs := []string{} - for j := i; j < len(addrs); j += maxWorkers { - workerAddrs = append(workerAddrs, addrs[j]) - } - go func(addrs []string) { + go func() { defer wg.Done() - s.worker(ctx, addrs, infoHash, peerID) - }(workerAddrs) + s.worker(ctx, infoHash, peerID) + }() } - // 3. Close resultChan after all workers exit + // 4. Supervisor: cancel the download if the peer supply is exhausted while + // pieces remain (workers no longer die on connect failure, so the swarm + // can't end itself the way it used to). + supDone := make(chan struct{}) + go func() { + defer close(supDone) + s.superviseExhaustion(ctx, cancel) + }() + + // 5. Close resultChan after all workers exit go func() { wg.Wait() close(s.resultChan) }() - // 4. Collection loop — exits when resultChan is closed (all workers done) + // 6. Collection loop — exits when resultChan is closed (all workers done) var dlErr error completed := 0 for res := range s.resultChan { @@ -107,6 +168,7 @@ func (s *Swarm) Start(ctx context.Context, addrs []string, infoHash []byte, peer s.pieces[res.index] = PieceCompleted s.mu.Unlock() completed++ + s.completed.Store(int64(completed)) log.Printf("Progress: [%d/%d] pieces verified", completed, s.meta.PieceCount) if completed == s.meta.PieceCount { @@ -115,29 +177,80 @@ func (s *Swarm) Start(ctx context.Context, addrs []string, infoHash []byte, peer } } - // 5. Drain remaining results so wg goroutine can close resultChan + // 7. Drain remaining results so wg goroutine can close resultChan for range s.resultChan { } + // 8. Stop the supervisor and wait for it before returning (no leak). + cancel() + <-supDone + if dlErr != nil { return dlErr } if completed < s.meta.PieceCount { + if s.exhausted.Load() { + return fmt.Errorf("peer supply exhausted (%d/%d pieces)", completed, s.meta.PieceCount) + } return fmt.Errorf("all workers died before download completed (%d/%d)", completed, s.meta.PieceCount) } return nil } -func (s *Swarm) worker(ctx context.Context, addrs []string, infoHash []byte, peerID string) { +// superviseExhaustion polls for a stalled swarm: the pool is empty, no worker +// holds or is acquiring a connection, and pieces remain. When that holds for +// exhaustionConfirms consecutive polls it cancels the download. +func (s *Swarm) superviseExhaustion(ctx context.Context, cancel context.CancelFunc) { + t := time.NewTicker(exhaustionPoll) + defer t.Stop() + + confirms := 0 + for { + select { + case <-ctx.Done(): + return + case <-t.C: + stalled := len(s.peerChan) == 0 && + s.inFlight.Load() == 0 && + s.completed.Load() < int64(s.meta.PieceCount) + if !stalled { + confirms = 0 + continue + } + confirms++ + if confirms >= exhaustionConfirms { + s.exhausted.Store(true) + cancel() + return + } + } + } +} + +// worker acquires a peer connection from the shared pool, then downloads pieces +// over it until the connection dies (re-acquire) or the context is cancelled. +func (s *Swarm) worker(ctx context.Context, infoHash []byte, peerID string) { var conn *PeerConn var currentAddr string defer func() { if conn != nil { conn.Close() + s.inFlight.Add(-1) // release the in-flight count this conn held } }() for { + // Acquire a connection BEFORE pulling work, so a worker waiting on the + // pool can't strand a piece marked in-progress behind it. + if conn == nil { + var ok bool + conn, currentAddr, ok = s.acquireConn(ctx, infoHash, peerID) + if !ok { + return // ctx cancelled while waiting for a peer + } + log.Printf("Worker connected to %s", currentAddr) + } + select { case index := <-s.workChan: s.mu.Lock() @@ -148,19 +261,6 @@ func (s *Swarm) worker(ctx context.Context, addrs []string, infoHash []byte, pee s.pieces[index] = PieceInProgress s.mu.Unlock() - // Ensure we have a connection - if conn == nil { - var err error - conn, currentAddr, err = connectToAny(ctx, addrs, infoHash, peerID) - if err != nil { - log.Printf("Worker failed to connect to any peer. Exiting.") - s.resultChan <- pieceResult{index: index, err: err} - return - } - log.Printf("Worker connected to %s", currentAddr) - } - - // Download piece pieceSize := s.meta.PieceLength remaining := s.meta.TotalSize - int64(index)*int64(s.meta.PieceLength) if remaining < int64(pieceSize) { @@ -173,10 +273,13 @@ func (s *Swarm) worker(ctx context.Context, addrs []string, infoHash []byte, pee expectedHash := s.meta.Pieces[index*20 : (index+1)*20] data, err := downloadPiece(ctx, conn, index, pieceSize, expectedHash) + // Whatever the outcome, harvest any peers this peer exchanged. + s.AddPeers(conn.DrainPexPeers()) if err != nil { log.Printf("Worker piece %d failed from %s: %v", index, currentAddr, err) conn.Close() conn = nil + s.inFlight.Add(-1) // this connection is gone; we'll re-acquire s.resultChan <- pieceResult{index: index, err: err} continue } @@ -188,3 +291,28 @@ func (s *Swarm) worker(ctx context.Context, addrs []string, infoHash []byte, pee } } } + +// acquireConn pulls addresses from the shared pool and dials them until one +// completes the handshake. Returns ok=false if the context is cancelled while +// waiting. A successful return leaves inFlight incremented for the returned +// connection; the caller (worker) is responsible for the matching decrement. +func (s *Swarm) acquireConn(ctx context.Context, infoHash []byte, peerID string) (*PeerConn, string, bool) { + for { + select { + case <-ctx.Done(): + return nil, "", false + case addr := <-s.peerChan: + // Count this attempt as in-flight from the moment we take the + // address, before dialing — the supervisor must never see an empty + // pool with a connect in progress and call the swarm dead. + s.inFlight.Add(1) + conn, err := dialAndHandshake(ctx, addr, infoHash, peerID) + if err != nil { + log.Printf("Worker failed to connect to %s: %v", addr, err) + s.inFlight.Add(-1) + continue + } + return conn, addr, true + } + } +} diff --git a/internal/client/swarm_test.go b/internal/client/swarm_test.go index f14a4a6..e6a22e6 100644 --- a/internal/client/swarm_test.go +++ b/internal/client/swarm_test.go @@ -1,9 +1,13 @@ package client import ( + "bytes" "context" "crypto/sha1" + "net" "runtime" + "strings" + "sync/atomic" "testing" "time" ) @@ -89,3 +93,132 @@ func startTestPeer(t *testing.T, pieces ...[]byte) string { return ln.Addr().String() } + +// twoPieceFixture builds a 2-piece torrent (16-byte 'A' piece, 8-byte 'B' +// piece) plus a preallocated store, returning the meta, concatenated data, and +// info hash. +func twoPieceFixture(t *testing.T) (TorrentMeta, []byte, []byte, *Storage) { + t.Helper() + piece0 := bytes.Repeat([]byte{'A'}, 16) + piece1 := bytes.Repeat([]byte{'B'}, 8) + h0 := sha1.Sum(piece0) + h1 := sha1.Sum(piece1) + pieces := append(append([]byte{}, h0[:]...), h1[:]...) + + infoHash := make([]byte, 20) + copy(infoHash, "testhash1234567890ab") + + meta := TorrentMeta{ + Name: "test", + InfoHashV1: infoHash, + PieceLength: 16, + PieceCount: 2, + TotalSize: 24, + Pieces: pieces, + Files: []FileEntry{{Path: "test.dat", Length: 24}}, + } + store := NewStorage(t.TempDir(), meta.Files) + store.Preallocate() + return meta, append(piece0, piece1...), infoHash, store +} + +// startPeer runs an accept loop serving each connection with handle. +func startPeer(t *testing.T, handle func(net.Conn)) string { + t.Helper() + ln, err := listenTCP(t) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { ln.Close() }) + go func() { + for { + conn, err := ln.Accept() + if err != nil { + return + } + go handle(conn) + } + }() + return ln.Addr().String() +} + +// deadAddr returns the address of a listener that has been closed, so dialing +// it fails fast with "connection refused". +func deadAddr(t *testing.T) string { + t.Helper() + ln, err := listenTCP(t) + if err != nil { + t.Fatal(err) + } + addr := ln.Addr().String() + ln.Close() + return addr +} + +// A worker that pulls a dead address from the shared pool must move on to the +// next address rather than dying — the core of the growable-pool refactor. +func TestSwarmSkipsDeadPeer(t *testing.T) { + meta, allData, infoHash, store := twoPieceFixture(t) + live := startTestPeer(t, allData[:16], allData[16:]) + + swarm := NewSwarm(meta, 1) + // Dead address first in the pool; the live one must still be reached. + err := swarm.Start(context.Background(), []string{deadAddr(t), live}, infoHash, "-WL0020-test12345678", store) + if err != nil { + t.Fatalf("swarm should recover from a dead seed peer: %v", err) + } +} + +// PEX end-to-end: the only seeded peer serves piece 0, advertises a second +// peer via ut_pex, then chokes piece 1 — so the swarm can only finish by +// discovering the PEX peer and using it. +func TestSwarmPexDiscovery(t *testing.T) { + meta, allData, infoHash, store := twoPieceFixture(t) + + var pexPeerConns atomic.Int64 + pexPeer := startPeer(t, func(c net.Conn) { + pexPeerConns.Add(1) + handleTestPeerConn(c, allData) + }) + + seed := startPeer(t, func(c net.Conn) { + servePexThenChokePeer(t, c, allData, pexPeer, 1) // chokes piece index 1 + }) + + swarm := NewSwarm(meta, 1) + err := swarm.Start(context.Background(), []string{seed}, infoHash, "-WL0020-test12345678", store) + if err != nil { + t.Fatalf("swarm failed to finish via PEX peer: %v", err) + } + if pexPeerConns.Load() == 0 { + t.Error("expected the swarm to connect to the PEX-advertised peer") + } +} + +// With no reachable peers and no PEX, the supervisor must end the download +// promptly with a "peer supply exhausted" error rather than hanging. +func TestSwarmExhaustsPeerSupply(t *testing.T) { + meta, _, infoHash, store := twoPieceFixture(t) + + goroutinesBefore := runtime.NumGoroutine() + + swarm := NewSwarm(meta, 2) + start := time.Now() + err := swarm.Start(context.Background(), []string{deadAddr(t), deadAddr(t)}, infoHash, "-WL0020-test12345678", store) + elapsed := time.Since(start) + + if err == nil { + t.Fatal("expected exhaustion error, got nil") + } + if !strings.Contains(err.Error(), "peer supply exhausted") { + t.Errorf("expected 'peer supply exhausted', got: %v", err) + } + if elapsed > 5*time.Second { + t.Errorf("exhaustion took too long: %v (should terminate via grace window)", elapsed) + } + + time.Sleep(50 * time.Millisecond) + if after := runtime.NumGoroutine(); after > goroutinesBefore+1 { + t.Errorf("goroutine leak: before=%d, after=%d", goroutinesBefore, after) + } +} diff --git a/internal/client/testhelpers_test.go b/internal/client/testhelpers_test.go index bcc5bb7..54a8749 100644 --- a/internal/client/testhelpers_test.go +++ b/internal/client/testhelpers_test.go @@ -70,6 +70,90 @@ func serveMetadataPeer(conn net.Conn, metaBytes []byte) { } } +// pexMessagePayload builds the body of a ut_pex (BEP 11) extended message +// advertising a single IPv4 peer. The payload is addressed with localPexID — +// the extension ID our client advertises — exactly as a real peer would send it. +func pexMessagePayload(t *testing.T, addr string) []byte { + t.Helper() + host, portStr, err := net.SplitHostPort(addr) + if err != nil { + t.Fatalf("split %q: %v", addr, err) + } + ip := net.ParseIP(host).To4() + if ip == nil { + t.Fatalf("not an IPv4 addr: %q", addr) + } + port, err := net.LookupPort("tcp", portStr) + if err != nil { + t.Fatalf("port %q: %v", portStr, err) + } + added := append([]byte{}, ip...) + added = append(added, byte(port>>8), byte(port)) + + dict, _ := bencode.EncodeBytes(struct { + Added []byte `bencode:"added"` + }{added}) + return append([]byte{localPexID}, dict...) +} + +// servePexThenChokePeer serves piece data, but: (1) right after unchoking it +// sends one ut_pex message advertising pexAddr, and (2) it refuses to serve the +// piece at chokeIndex by replying with a Choke. This forces the swarm to +// discover the PEX-advertised peer and use it to finish the choked piece. +func servePexThenChokePeer(t *testing.T, conn net.Conn, allData []byte, pexAddr string, chokeIndex uint32) { + defer conn.Close() + + buf := make([]byte, 68) + if _, err := io.ReadFull(conn, buf); err != nil { + return + } + conn.Write(buf) // echo handshake + if buf[25]&0x10 != 0 { + ReadMessage(conn) // client's extended handshake + extPayload, _ := bencode.EncodeBytes(map[string]interface{}{ + "m": map[string]int{"ut_metadata": 1, "ut_pex": 2}, + }) + WriteMessage(conn, &Message{ID: MsgExtended, Payload: append([]byte{0}, extPayload...)}) + } + + pexSent := false + for { + m, err := ReadMessage(conn) + if err != nil || m == nil { + return + } + switch m.ID { + case MsgInterested: + WriteMessage(conn, &Message{ID: MsgUnchoke}) + if !pexSent { + WriteMessage(conn, &Message{ID: MsgExtended, Payload: pexMessagePayload(t, pexAddr)}) + pexSent = true + } + case MsgRequest: + if len(m.Payload) < 12 { + return + } + index := binary.BigEndian.Uint32(m.Payload[0:4]) + begin := binary.BigEndian.Uint32(m.Payload[4:8]) + length := binary.BigEndian.Uint32(m.Payload[8:12]) + if index == chokeIndex { + WriteMessage(conn, &Message{ID: MsgChoke}) + continue + } + globalOffset := int(index)*16 + int(begin) + end := globalOffset + int(length) + if end > len(allData) { + end = len(allData) + } + payload := make([]byte, 8+end-globalOffset) + binary.BigEndian.PutUint32(payload[0:4], index) + binary.BigEndian.PutUint32(payload[4:8], begin) + copy(payload[8:], allData[globalOffset:end]) + WriteMessage(conn, &Message{ID: MsgPiece, Payload: payload}) + } + } +} + // handleTestPeerConn simulates a minimal BitTorrent peer. // It handles BEP 3 handshake (echoed), BEP 10 extended handshake, and piece requests. func handleTestPeerConn(conn net.Conn, allData []byte) {