package client import ( "context" "fmt" "net" "strconv" "strings" "time" "storrent/bt" "storrent/dht" "storrent/metainfo" "storrent/utp" ) type state int const ( stopped state = iota downloading seeding done errored ) const ( maxPeers = 8 maxPending = 5 dialTimeout = 30 * time.Second retryInterval = 30 * time.Second dhtTimeout = 5 * time.Minute recvBufSize = 128 ) type recv struct { msg *bt.Msg err error p *bt.Peer } type statusReq struct { field string resp chan string } type torrent struct { id int meta *metainfo.File dir string dht *dht.DHT utp *utp.Socket start chan struct{} stop chan struct{} seed chan struct{} incoming chan *bt.Peer addPeer chan *net.TCPAddr status chan statusReq peers *peerManager scheduler *pieceScheduler dialPool *dialPool st state down int64 up int64 ctx context.Context cancel context.CancelFunc recvs chan recv retry <-chan time.Time } func newTorrent(id int, m *metainfo.File, dir string, d *dht.DHT, utpSock *utp.Socket) *torrent { t := &torrent{ id: id, meta: m, dir: dir, dht: d, utp: utpSock, start: make(chan struct{}, 1), stop: make(chan struct{}, 1), seed: make(chan struct{}, 1), incoming: make(chan *bt.Peer), addPeer: make(chan *net.TCPAddr, 8), status: make(chan statusReq), peers: newPeerManager(maxPeers), scheduler: newPieceScheduler(m, dir), dialPool: newDialPool(d, utpSock, m.InfoHash), st: stopped, recvs: make(chan recv, recvBufSize), } return t } func (t *torrent) run() { for { select { case <-t.start: t.handleStart() case <-t.seed: t.handleSeed() case <-t.stop: t.handleStop() case <-t.retry: t.handleRetry() case d := <-t.dialPool.results: t.handleDialResult(d) case p := <-t.incoming: t.handleIncoming(p) case addr := <-t.addPeer: t.handleAddPeer(addr) case r := <-t.recvs: t.handleRecv(r) case req := <-t.status: req.resp <- t.handleStatus(req.field) } } } func (t *torrent) handleStart() { if t.st != stopped { return } t.st = downloading t.ctx, t.cancel = context.WithCancel(context.Background()) t.dialPool.start(t.ctx) } func (t *torrent) handleSeed() { if t.st != stopped && t.st != done { return } if !t.scheduler.isComplete() { return } t.st = seeding t.ctx, t.cancel = context.WithCancel(context.Background()) t.dialPool.start(t.ctx) } func (t *torrent) handleStop() { if !t.isActive() { return } t.cancel() t.dialPool.stop() t.peers.closeAll() t.retry = nil t.st = stopped } func (t *torrent) handleRetry() { t.retry = nil if t.isActive() { t.dialPool.start(t.ctx) } } func (t *torrent) handleDialResult(d dialResult) { if d.done { if t.peers.count() < maxPeers && t.isActive() { t.retry = time.After(retryInterval) } return } if d.peer == nil { return } p := d.peer if !t.peers.add(p) { p.Close() return } go t.recvLoop(p) if err := t.initPeer(p); err != nil { p.Close() t.peers.remove(p) } } func (t *torrent) handleIncoming(p *bt.Peer) { t.peers.addUnlimited(p) go t.recvLoop(p) if err := p.Send(&bt.Msg{Kind: bt.Bitfield, Bitfield: t.scheduler.bitfield(false)}); err != nil { p.Close() t.peers.remove(p) return } if err := p.Send(&bt.Msg{Kind: bt.Unchoke}); err != nil { p.Close() t.peers.remove(p) } } func (t *torrent) handleAddPeer(addr *net.TCPAddr) { if !t.isActive() { return } t.dialPool.dialSingle(t.ctx, addr) } func (t *torrent) handleRecv(r recv) { if r.err != nil { r.p.Close() t.peers.remove(r.p) t.scheduler.peerDisconnected(r.p) if t.peers.count() < maxPeers && t.isActive() { t.dialPool.start(t.ctx) } return } if r.msg == nil { return } p := r.p m := r.msg switch m.Kind { case bt.Choke: p.Choked = true p.Pending = 0 case bt.Unchoke: p.Choked = false case bt.Have: p.SetPiece(int(m.Index)) case bt.Bitfield: p.SetPieces(m.Bitfield, t.scheduler.count()) case bt.Piece: t.scheduler.put(int(m.Index), int(m.Begin), m.Block) p.Pending-- t.down += int64(len(m.Block)) case bt.Request: t.handleRequest(p, m) } t.sendRequests(p) t.checkCompletion() } func (t *torrent) handleRequest(p *bt.Peer, m *bt.Msg) { data := t.scheduler.pieceData(int(m.Index)) if data == nil { return } end := int(m.Begin + m.Length) if end > len(data) { return } if err := p.Send(&bt.Msg{ Kind: bt.Piece, Index: m.Index, Begin: m.Begin, Block: data[m.Begin:end], }); err != nil { p.Close() return } t.up += int64(m.Length) } func (t *torrent) sendRequests(p *bt.Peer) { if p.Choked { return } for p.Pending < maxPending { req := t.scheduler.nextRequest(p) if req == nil { break } if err := p.Send(req); err != nil { p.Close() break } p.Pending++ } } func (t *torrent) checkCompletion() { if t.st == downloading && t.scheduler.isComplete() { t.st = done t.peers.closeAll() } } func (t *torrent) initPeer(p *bt.Peer) error { if t.st == seeding { return p.Send(&bt.Msg{Kind: bt.Bitfield, Bitfield: t.scheduler.bitfield(true)}) } return p.Send(&bt.Msg{Kind: bt.Interested}) } func (t *torrent) recvLoop(p *bt.Peer) { for { msg, err := p.Recv() t.recvs <- recv{msg, err, p} if err != nil { return } } } func (t *torrent) isActive() bool { return t.st == downloading || t.st == seeding } func (t *torrent) startDownload() { select { case t.start <- struct{}{}: default: } } func (t *torrent) startSeed() { select { case t.seed <- struct{}{}: default: } } func (t *torrent) stopTorrent() { select { case t.stop <- struct{}{}: default: } } func (t *torrent) enqueuePeer(addr *net.TCPAddr) { t.addPeer <- addr } func (t *torrent) getStatus(field string) string { ch := make(chan string) t.status <- statusReq{field, ch} return <-ch } func (t *torrent) handleStatus(field string) string { switch field { case "name": return t.meta.Info.Name case "state": return t.st.String() case "progress": verified, total := t.scheduler.progress() if total == 0 { return "0" } return strconv.Itoa(verified * 100 / total) case "size": return strconv.FormatInt(t.meta.Size, 10) case "down": return strconv.FormatInt(t.down, 10) case "up": return strconv.FormatInt(t.up, 10) case "pieces": verified, total := t.scheduler.progress() return fmt.Sprintf("%d/%d", verified, total) case "peers": return t.formatPeers() } return "" } func (t *torrent) formatPeers() string { var buf strings.Builder npieces := t.scheduler.count() for _, p := range t.peers.all() { st := "unchoked" if p.Choked { st = "choked" } have := 0 for _, h := range p.Have { if h { have++ } } fmt.Fprintf(&buf, "%s %s %s have:%d/%d pending:%d\n", p.Addr, p.PeerID[:8], st, have, npieces, p.Pending) } return buf.String() } func (s state) String() string { switch s { case stopped: return "stopped" case downloading: return "downloading" case seeding: return "seeding" case done: return "done" case errored: return "error" } return "unknown" }