storrent/utp/socket.go
2026-01-19 21:13:01 +09:00

212 lines
3.8 KiB
Go

package utp
import (
"context"
"crypto/rand"
"encoding/binary"
"net"
"sync"
)
const (
Data = 0
Fin = 1
State = 2
Reset = 3
Syn = 4
)
const headerSize = 20
type header struct {
typ uint8
ver uint8
ext uint8
connID uint16
timestamp uint32
timeDiff uint32
wnd uint32
seq uint16
ack uint16
}
func encode(h *header, buf []byte) {
buf[0] = (h.typ << 4) | (h.ver & 0x0F)
buf[1] = h.ext
binary.BigEndian.PutUint16(buf[2:], h.connID)
binary.BigEndian.PutUint32(buf[4:], h.timestamp)
binary.BigEndian.PutUint32(buf[8:], h.timeDiff)
binary.BigEndian.PutUint32(buf[12:], h.wnd)
binary.BigEndian.PutUint16(buf[16:], h.seq)
binary.BigEndian.PutUint16(buf[18:], h.ack)
}
func decode(buf []byte) header {
return header{
typ: (buf[0] >> 4) & 0x0F,
ver: buf[0] & 0x0F,
ext: buf[1],
connID: binary.BigEndian.Uint16(buf[2:]),
timestamp: binary.BigEndian.Uint32(buf[4:]),
timeDiff: binary.BigEndian.Uint32(buf[8:]),
wnd: binary.BigEndian.Uint32(buf[12:]),
seq: binary.BigEndian.Uint16(buf[16:]),
ack: binary.BigEndian.Uint16(buf[18:]),
}
}
type Socket struct {
conn *net.UDPConn
mu sync.RWMutex
conns map[uint16]*Conn
accepts chan *Conn
ctx context.Context
cancel context.CancelFunc
}
type packet struct {
hdr header
payload []byte
addr *net.UDPAddr
}
func New(addr string) (*Socket, error) {
a, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", a)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
s := &Socket{
conn: conn,
conns: make(map[uint16]*Conn),
accepts: make(chan *Conn, 16),
ctx: ctx,
cancel: cancel,
}
return s, nil
}
func (s *Socket) Start() {
go s.reader()
}
func (s *Socket) reader() {
buf := make([]byte, 65535)
for {
n, addr, err := s.conn.ReadFromUDP(buf)
if err != nil {
return
}
if n < headerSize {
continue
}
hdr := decode(buf)
payload := make([]byte, n-headerSize)
copy(payload, buf[headerSize:n])
if hdr.typ == Syn {
s.handleSyn(hdr, payload, addr)
} else {
s.dispatch(hdr, payload)
}
}
}
func (s *Socket) handleSyn(hdr header, payload []byte, addr *net.UDPAddr) {
c := s.newConn(hdr.connID, addr, false)
s.mu.Lock()
s.conns[c.recvID] = c
s.mu.Unlock()
go c.run()
c.in <- packet{hdr, payload, addr}
select {
case s.accepts <- c:
default:
c.Close()
}
}
func (s *Socket) dispatch(hdr header, payload []byte) {
s.mu.RLock()
c := s.conns[hdr.connID]
s.mu.RUnlock()
if c != nil {
select {
case c.in <- packet{hdr, payload, nil}:
default:
}
}
}
func (s *Socket) removeConn(id uint16) {
s.mu.Lock()
delete(s.conns, id)
s.mu.Unlock()
}
func (s *Socket) newConn(peerID uint16, addr *net.UDPAddr, initiator bool) *Conn {
ctx, cancel := context.WithCancel(s.ctx)
c := &Conn{
sock: s,
addr: addr,
in: make(chan packet, 16),
reads: make(chan readReq),
writes: make(chan writeReq),
closeReq: make(chan struct{}),
ready: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
if initiator {
c.recvID = randUint16()
c.sendID = c.recvID + 1
} else {
c.recvID = peerID + 1
c.sendID = peerID
}
return c
}
func (s *Socket) DialContext(ctx context.Context, addr *net.UDPAddr) (*Conn, error) {
c := s.newConn(0, addr, true)
s.mu.Lock()
s.conns[c.recvID] = c
s.mu.Unlock()
go c.run()
select {
case <-c.ready:
return c, nil
case <-ctx.Done():
c.Close()
return nil, ctx.Err()
}
}
func (s *Socket) Accept() *Conn {
return <-s.accepts
}
func (s *Socket) Close() {
s.conn.Close()
s.cancel()
}
func randUint16() uint16 {
var b [2]byte
if _, err := rand.Read(b[:]); err != nil {
panic(err)
}
return binary.BigEndian.Uint16(b[:])
}