Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions internal/client/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,24 @@ func DownloadMVP(ctx context.Context, opts DownloadOptions) error {
return nil
}

// connectToAny tries each address until one succeeds handshake.
func connectToAny(ctx context.Context, addrs []string, infoHash []byte, peerID string) (*PeerConn, string, error) {
var lastErr error
for _, addr := range addrs {
p, err := Connect(ctx, addr)
if err != nil {
lastErr = err
continue
}
if err := p.Handshake(ctx, infoHash, peerID); err != nil {
p.Close()
lastErr = err
continue
}
// Send interested immediately after handshake
if err := p.WriteMessage(&Message{ID: MsgInterested}); err != nil {
p.Close()
lastErr = err
continue
}
return p, addr, nil
// dialAndHandshake connects to a single peer, performs the BEP 3/10 handshake,
// and sends our initial "interested". On any failure the connection is closed
// and the error returned so the caller can move on to the next address.
func dialAndHandshake(ctx context.Context, addr string, infoHash []byte, peerID string) (*PeerConn, error) {
p, err := Connect(ctx, addr)
if err != nil {
return nil, err
}
if err := p.Handshake(ctx, infoHash, peerID); err != nil {
p.Close()
return nil, err
}
// Send interested immediately after handshake.
if err := p.WriteMessage(&Message{ID: MsgInterested}); err != nil {
p.Close()
return nil, err
}
return nil, "", fmt.Errorf("all peers failed: %w", lastErr)
return p, nil
}

// downloadPiece downloads and verifies a single piece over an existing connection.
Expand Down Expand Up @@ -155,7 +150,16 @@ func downloadPiece(ctx context.Context, p *PeerConn, index int, size int, expect
}
}

case MsgExtended, MsgBitfield, MsgHave:
case MsgExtended:
// BEP 11: a peer addresses ut_pex messages to us using the local
// ID we advertised (localPexID). Harvest discovered peers; the
// owning worker drains them after this piece finishes.
if len(msg.Payload) > 0 && msg.Payload[0] == localPexID {
p.handlePexMessage(msg.Payload[1:])
}
continue

case MsgBitfield, MsgHave:
continue
}
}
Expand Down
105 changes: 102 additions & 3 deletions internal/client/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@ import (
wbencode "weightless/internal/bencode"
)

// Extension ID mapping for BEP 10
// Extension names for BEP 10. Peers send extended messages using the local
// IDs WE advertise in our handshake `m` dict, so the constants below are both
// the advertised names and (via localMetadataID/localPexID) the IDs we expect
// inbound messages to carry.
const (
ExtMetadata = "ut_metadata"
ExtMetadata = "ut_metadata" // BEP 9 metadata exchange
ExtPex = "ut_pex" // BEP 11 peer exchange
)

// Local extension IDs we advertise in sendExtendedHandshake. A peer that
// supports an extension addresses its messages to us using these IDs.
const (
localMetadataID = 1
localPexID = 2
)

// BEP 3 wire constants. The handshake is *exactly* 68 bytes: 1 length byte +
Expand Down Expand Up @@ -51,6 +62,11 @@ type PeerConn struct {
// BEP 10 Extension data
PeerExtensions map[string]int
MetadataSize int

// pexPeers buffers addresses harvested from inbound ut_pex (BEP 11)
// messages. Owned by the single worker goroutine driving this conn — the
// read loop appends, the same worker drains via DrainPexPeers. No lock.
pexPeers []string
}

// Connect establishes a TCP connection to the given address.
Expand Down Expand Up @@ -223,7 +239,8 @@ func (p *PeerConn) RequestMetadata(piece int) error {
func (p *PeerConn) sendExtendedHandshake() error {
m := map[string]interface{}{
"m": map[string]int{
ExtMetadata: 1, // We map ut_metadata to local ID 1
ExtMetadata: localMetadataID, // ut_metadata → local ID 1
ExtPex: localPexID, // ut_pex → local ID 2
},
}
payloadBytes, err := bencode.EncodeBytes(m)
Expand Down Expand Up @@ -266,3 +283,85 @@ func (p *PeerConn) WriteMessage(m *Message) error {
func (p *PeerConn) Close() {
p.conn.Close()
}

// handlePexMessage parses the body of an inbound ut_pex (BEP 11) extended
// message and buffers any discovered peers for later draining. body is the
// extended-message payload AFTER the leading ext-id byte.
//
// Malformed PEX is silently ignored: peer exchange is best-effort and must
// never break an in-flight piece download. Strict recognition lives in
// parsePexPeers (unit-tested); the runtime path just discards what it can't read.
func (p *PeerConn) handlePexMessage(body []byte) {
peers, err := parsePexPeers(body)
if err != nil {
return
}
p.pexPeers = append(p.pexPeers, peers...)
}

// DrainPexPeers returns the peers harvested from ut_pex messages so far and
// clears the buffer. Called by the owning worker between piece downloads.
func (p *PeerConn) DrainPexPeers() []string {
out := p.pexPeers
p.pexPeers = nil
return out
}

// parsePexPeers recognizes a ut_pex (BEP 11) message dict and returns the
// peers in its `added` (compact IPv4, 6 bytes each) and `added6` (compact
// IPv6, 18 bytes each) fields. `dropped`, flags, and any other keys are
// ignored — v0 only grows the pool, never shrinks it.
//
// LangSec: validate the bencode structure against PeerMessageLimits before
// decoding, and reject compact lists whose length isn't a clean multiple of
// the peer-entry size.
func parsePexPeers(dict []byte) ([]string, error) {
if err := wbencode.Validate(dict, wbencode.PeerMessageLimits); err != nil {
return nil, fmt.Errorf("validate pex message: %w", err)
}

var msg struct {
Added []byte `bencode:"added"`
Added6 []byte `bencode:"added6"`
}
if err := bencode.DecodeBytes(dict, &msg); err != nil {
return nil, fmt.Errorf("decode pex message: %w", err)
}

var addrs []string
if len(msg.Added) > 0 {
v4, err := unpackPeers(msg.Added, net.IPv4len)
if err != nil {
return nil, fmt.Errorf("pex added: %w", err)
}
addrs = append(addrs, v4...)
}
if len(msg.Added6) > 0 {
v6, err := unpackPeers(msg.Added6, net.IPv6len)
if err != nil {
return nil, fmt.Errorf("pex added6: %w", err)
}
addrs = append(addrs, v6...)
}

// Drop unconnectable entries. Real clients (e.g. Transmission) advertise
// peers with port 0 or an unspecified IP; dialing them only wastes a worker.
out := addrs[:0]
for _, a := range addrs {
if connectableAddr(a) {
out = append(out, a)
}
}
return out, nil
}

// connectableAddr reports whether addr is worth dialing: a parseable host:port
// with a non-zero port and a specified (non-0.0.0.0/::) IP.
func connectableAddr(addr string) bool {
host, port, err := net.SplitHostPort(addr)
if err != nil || port == "0" {
return false
}
ip := net.ParseIP(host)
return ip != nil && !ip.IsUnspecified()
}
151 changes: 151 additions & 0 deletions internal/client/pex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package client

import (
"net"
"testing"

"github.com/zeebo/bencode"
)

// encodePexDict bencodes a ut_pex message dict from raw compact peer bytes.
func encodePexDict(t *testing.T, added, added6 []byte) []byte {
t.Helper()
dict := struct {
Added []byte `bencode:"added"`
Added6 []byte `bencode:"added6"`
}{added, added6}
data, err := bencode.EncodeBytes(dict)
if err != nil {
t.Fatalf("encode pex dict: %v", err)
}
return data
}

func TestParsePexPeers(t *testing.T) {
// Two IPv4 peers: 1.2.3.4:6881, 5.6.7.8:6882 (6 bytes each).
added := []byte{1, 2, 3, 4, 0x1a, 0xe1, 5, 6, 7, 8, 0x1a, 0xe2}
// One IPv6 peer: [::1]:6883 (18 bytes).
v6 := net.ParseIP("::1").To16()
added6 := append(append([]byte{}, v6...), 0x1a, 0xe3)

peers, err := parsePexPeers(encodePexDict(t, added, added6))
if err != nil {
t.Fatalf("parsePexPeers: %v", err)
}

want := []string{"1.2.3.4:6881", "5.6.7.8:6882", "[::1]:6883"}
if len(peers) != len(want) {
t.Fatalf("got %d peers %v, want %d %v", len(peers), peers, len(want), want)
}
for i := range want {
if peers[i] != want[i] {
t.Errorf("peer %d: got %q, want %q", i, peers[i], want[i])
}
}
}

func TestParsePexPeersDropsUnconnectable(t *testing.T) {
// A real client (Transmission, observed in e2e) can advertise a peer with
// port 0; an unspecified IP is likewise unconnectable. Both must be dropped
// while the valid peer survives.
added := []byte{
1, 2, 3, 4, 0x1a, 0xe1, // 1.2.3.4:6881 (valid)
127, 0, 0, 1, 0, 0, // 127.0.0.1:0 (port 0 — drop)
0, 0, 0, 0, 0x1a, 0xe1, // 0.0.0.0:6881 (unspecified — drop)
}
peers, err := parsePexPeers(encodePexDict(t, added, nil))
if err != nil {
t.Fatalf("parsePexPeers: %v", err)
}
if len(peers) != 1 || peers[0] != "1.2.3.4:6881" {
t.Errorf("expected only [1.2.3.4:6881], got %v", peers)
}
}

func TestParsePexPeersEmpty(t *testing.T) {
// A dict with no added/added6 keys is valid and yields no peers.
peers, err := parsePexPeers(encodePexDict(t, nil, nil))
if err != nil {
t.Fatalf("parsePexPeers: %v", err)
}
if len(peers) != 0 {
t.Errorf("expected 0 peers, got %v", peers)
}
}

func TestParsePexPeersRejectsMalformed(t *testing.T) {
tests := []struct {
name string
dict []byte
}{
{"not bencode", []byte("definitely not bencode")},
// added length 5 is not a clean multiple of the 6-byte v4 entry size.
{"ragged added", encodePexDict(t, []byte{1, 2, 3, 4, 0x1a}, nil)},
// added6 length 17 is not a clean multiple of the 18-byte v6 entry size.
{"ragged added6", encodePexDict(t, nil, make([]byte, 17))},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if _, err := parsePexPeers(tt.dict); err == nil {
t.Errorf("expected error for %s, got nil", tt.name)
}
})
}
}

func TestDrainPexPeers(t *testing.T) {
added := []byte{1, 2, 3, 4, 0x1a, 0xe1}
p := &PeerConn{}

// Malformed PEX must be silently dropped, never appended.
p.handlePexMessage([]byte("garbage"))
if got := p.DrainPexPeers(); got != nil {
t.Fatalf("malformed pex should yield no peers, got %v", got)
}

// Two messages accumulate; drain returns all and clears.
p.handlePexMessage(encodePexDict(t, added, nil))
p.handlePexMessage(encodePexDict(t, added, nil))
got := p.DrainPexPeers()
if len(got) != 2 {
t.Fatalf("expected 2 buffered peers, got %v", got)
}
if got := p.DrainPexPeers(); got != nil {
t.Errorf("drain should have cleared the buffer, got %v", got)
}
}

func TestExtendedHandshakeAdvertisesPex(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()

p := &PeerConn{conn: client, state: stateHandshook}
go func() {
_ = p.sendExtendedHandshake()
}()

m, err := ReadMessage(server)
if err != nil {
t.Fatalf("read extended handshake: %v", err)
}
if m.ID != MsgExtended {
t.Fatalf("expected MsgExtended, got %d", m.ID)
}
if m.Payload[0] != 0 {
t.Fatalf("expected handshake ext-id 0, got %d", m.Payload[0])
}

var hs struct {
M map[string]int `bencode:"m"`
}
if err := bencode.DecodeBytes(m.Payload[1:], &hs); err != nil {
t.Fatalf("decode handshake: %v", err)
}
if hs.M[ExtMetadata] != localMetadataID {
t.Errorf("ut_metadata: got %d, want %d", hs.M[ExtMetadata], localMetadataID)
}
if hs.M[ExtPex] != localPexID {
t.Errorf("ut_pex: got %d, want %d", hs.M[ExtPex], localPexID)
}
}
Loading
Loading