Turbo Tunnel worked example

David Fifield <david@bamsoftware.com>

Updated

This is a worked example of adapting a sample client–server system (an echo server) to a Turbo Tunnel design. The Turbo Tunnel version is resistant to TCP connection termination attacks. This example uses KCP and kcp-go to implement the inner session/reliability layer.

Download source code:

To run:

server$ ./server 127.0.0.1:8000
client$ ./client 127.0.0.1:8000

To test the turbotunnel version's resistance to TCP termination, you can run through a TCP proxy that terminates connections after a timeout. One such proxy is

$ git clone https://www.bamsoftware.com/git/lilbastard.git

To run the proxy,

lilbastard$ cargo run -- -w 20 127.0.0.1:7000 127.0.0.1:8000

Then run the example programs as before, having the client connect to the proxy instead of directly to the server.

server$ ./server 127.0.0.1:7000
client$ ./client 127.0.0.1:8000

This code is in the public domain.

Related links:

Client changes

Rather than open a bare TCP connection, the turbotunnel client first generates a session identifier, then creates a RedialPacketConn to serve as the packet-sending and -receiving interface. This component will be different in designs that do not use TCP as the underlying transport.

On top of the RedialPacketConn the turbotunnel client then opens a kcp.UDPSession. (Despite the name, the kcp.UDPSession does not use UDP. It sends and receives packets using the RedialPacketConn.) It then opens a smux.Session and then finally a single smux.Stream over that. The smux.Stream plays the role that the bare TCP connection did in the original code.

plain/client/client.go turbotunnel/client/client.go
package main

import (
	"io"
	"log"
	"net"	"os"
	"sync"






)

func main() {
	if len(os.Args) != 2 {
		log.Fatalf("usage: %s IP:PORT", os.Args[0])
	}
	addr := os.Args[1]
	if err := run(addr); err != nil {
		log.Fatal(err)
	}
}

func run(addr string) error {












































	conn, err := net.Dial("tcp", addr)	if err != nil {
		return err
	}
	defer conn.Close()
	log.Printf("%v: begin", conn.RemoteAddr())
	defer log.Printf("%v: end", conn.RemoteAddr())

	handleConnection(conn)	return nil
}

// Read from the connection and write to stdout; read from stdin and write to
// the connection.
func handleConnection(conn net.Conn) {	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		_, err := io.Copy(os.Stdout, conn)		if err != nil {
			log.Printf("%v: error copying to stdout: %v", conn.RemoteAddr(), err)		}
		os.Stdout.Close()
		wg.Done()
	}()
	go func() {
		_, err := io.Copy(conn, os.Stdin)		if err != nil {
			log.Printf("%v: error copying from stdin: %v", conn.RemoteAddr(), err)		}
		conn.(*net.TCPConn).CloseWrite()		wg.Done()
	}()
	wg.Wait()
}
package main

import (
	"io"
	"log"

	"os"
	"sync"
	"time"

	"www.bamsoftware.com/git/turbotunnel-paper.git/example/turbotunnel/turbotunnel"

	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux")

func main() {
	if len(os.Args) != 2 {
		log.Fatalf("usage: %s IP:PORT", os.Args[0])
	}
	addr := os.Args[1]
	if err := run(addr); err != nil {
		log.Fatal(err)
	}
}

func run(addr string) error {
	// Generate a new SessionID to identify this session.
	sessionID := turbotunnel.NewSessionID()

	// As the packet-sending and -receiving interface, use a sequence of
	// redialed TCP connections.
	pconn := NewRedialPacketConn(sessionID, addr)

	// The KCP net.Conn is a reliable channel over pconn.
	conn, err := kcp.NewConn2(pconn.RemoteAddr(), nil, 0, 0, pconn)
	if err != nil {
		return err
	}
	defer conn.Close()
	log.Printf("session %v: begin", sessionID)
	defer log.Printf("session %v: end", sessionID)

	// Some parameter tweaks to improve performance.
	// Permit coalescing the payloads of consecutive sends.
	conn.SetStreamMode(true)
	// Disable the dynamic congestion window (limit only by the maximum of
	// local and remote static windows).
	conn.SetNoDelay(
		0, // default nodelay
		0, // default interval
		0, // default resend
		1, // nc=1 => congestion window off
	)
	// Increase the (send, receive) window sizes from the default of
	// (32, 32).
	conn.SetWindowSize(1024, 1024)

	// Add smux stream multiplexing and timeout over the KCP net.Conn.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 1 * time.Minute
	smuxConfig.MaxReceiveBuffer = 4 * 1024 * 1024 // default is 4 * 1024 * 1024
	smuxConfig.MaxStreamBuffer = 1 * 1024 * 1024  // default is 65536
	sess, err := smux.Client(conn, smuxConfig)
	if err != nil {
		return err
	}
	defer sess.Close()

	// Open a smux stream.
	stream, err := sess.OpenStream()	if err != nil {
		return err
	}

	log.Printf("session %v stream %v: begin", sessionID, stream.ID())
	defer log.Printf("session %v stream %v: end", sessionID, stream.ID())

	handleStream(stream)	return nil
}

// Read from the stream and write to stdout; read from stdin and write to the
// stream.
func handleStream(stream *smux.Stream) {	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		_, err := io.Copy(os.Stdout, stream)		if err != nil {
			log.Printf("%v: error copying to stdout: %v", stream.RemoteAddr(), err)		}
		os.Stdout.Close()
		wg.Done()
	}()
	go func() {
		_, err := io.Copy(stream, os.Stdin)		if err != nil {
			log.Printf("%v: error copying from stdin: %v", stream.RemoteAddr(), err)		}
		stream.Close()		wg.Done()
	}()
	wg.Wait()
}

Server changes

Where the original code created a bare TCP listener, the turbotunnel code creates a kcp.Listener, defined over a ListenerPacketConn. The ListenerPacketConn is the basic packet-sending and -receiving interface used by the KCP engine. ListenerPacketConn, in this program, happens to be make use of an underlying TCP listener, but don't be fooled into thinking that this TCP listener is analogous to the TCP listener in the original program. In the turbotunnel program, it is the kcp.Listener that is analogous to the TCP listener in the original program.

In the original program, the chain of calls goes acceptConnectionshandleConnection. In the turbotunnel program, there is an extra level of indirection: acceptSessionsacceptStreamshandleStream. Each smux.Session can contain multiple streams, even though this program uses only one stream per session.

plain/server/server.go turbotunnel/server/server.go
package main

import (
	"io"
	"log"
	"net"
	"os"




)

func main() {
	if len(os.Args) != 2 {
		log.Fatalf("usage: %s IP:PORT", os.Args[0])
	}
	addr := os.Args[1]
	if err := run(addr); err != nil {
		log.Fatal(err)
	}
}

func run(addr string) error {














	ln, err := net.Listen("tcp", addr)	if err != nil {
		return err
	}
	defer ln.Close()
	return acceptConnections(ln)}

func acceptConnections(ln net.Listener) error {	for {
		conn, err := ln.Accept()		if err != nil {
			if err, ok := err.(net.Error); ok && err.Temporary() {
				continue
			}
			return err
		}















































		go func() {
			defer conn.Close()
			log.Printf("%v: begin", conn.RemoteAddr())
			defer log.Printf("%v: end", conn.RemoteAddr())
			err := handleConnection(conn)			if err != nil {
				log.Printf("%v: handleConnection: %v", conn.RemoteAddr(), err)			}
		}()
	}
}

func handleConnection(conn net.Conn) error {
	n, err := io.Copy(conn, conn)
	log.Printf("%v: echoed %v bytes", conn.RemoteAddr(), n)


	return err
}
package main

import (
	"io"
	"log"
	"net"
	"os"
	"time"

	"github.com/xtaci/kcp-go/v5"
	"github.com/xtaci/smux")

func main() {
	if len(os.Args) != 2 {
		log.Fatalf("usage: %s IP:PORT", os.Args[0])
	}
	addr := os.Args[1]
	if err := run(addr); err != nil {
		log.Fatal(err)
	}
}

func run(addr string) error {
	// This listener will accept the TCP connections that carry encapsulated
	// packets.
	tcpLn, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}
	defer tcpLn.Close()

	// Build a net.PacketConn interface around the connections accepted by
	// the TCP listener.
	pconn := NewListenerPacketConn(tcpLn)

	// Create a listener for the virtual KCP sessions that exist on top of
	// the physical TCP sessions.
	ln, err := kcp.ServeConn(nil, 0, 0, pconn)	if err != nil {
		return err
	}

	return acceptSessions(ln)}

func acceptSessions(ln *kcp.Listener) error {	for {
		conn, err := ln.AcceptKCP()		if err != nil {
			if err, ok := err.(net.Error); ok && err.Temporary() {
				continue
			}
			return err
		}

		// Some parameter tweaks to improve performance.
		// Permit coalescing the payloads of consecutive sends.
		conn.SetStreamMode(true)
		// Disable the dynamic congestion window (limit only by the
		// maximum of local and remote static windows).
		conn.SetNoDelay(
			0, // default nodelay
			0, // default interval
			0, // default resend
			1, // nc=1 => congestion window off
		)
		// Increase the (send, receive) window sizes from the default of
		// (32, 32).
		conn.SetWindowSize(1024, 1024)

		go func() {
			defer conn.Close()
			log.Printf("session %v: begin", conn.RemoteAddr())
			defer log.Printf("session %v: end", conn.RemoteAddr())
			err := acceptStreams(conn)
			if err != nil {
				log.Printf("session %v: acceptStreams: %v", conn.RemoteAddr(), err)
			}
		}()
	}
}

func acceptStreams(conn *kcp.UDPSession) error {
	// Add smux stream multiplexing and timeout over the KCP net.Conn.
	smuxConfig := smux.DefaultConfig()
	smuxConfig.Version = 2
	smuxConfig.KeepAliveTimeout = 1 * time.Minute
	sess, err := smux.Server(conn, smuxConfig)
	if err != nil {
		return err
	}
	defer sess.Close()

	for {
		stream, err := sess.AcceptStream()
		if err != nil {
			if err, ok := err.(net.Error); ok && err.Temporary() {
				continue
			}
			return err
		}		go func() {
			defer stream.Close()
			log.Printf("session %v stream %v: begin", conn.RemoteAddr(), stream.ID())
			defer log.Printf("session %v stream %v: end", conn.RemoteAddr(), stream.ID())
			err := handleStream(stream)			if err != nil {
				log.Printf("session %v stream %v: handleStream: %v", conn.RemoteAddr(), stream.ID(), err)			}
		}()
	}
}

func handleStream(stream *smux.Stream) error {
	n, err := io.Copy(stream, stream)
	log.Printf("session %v stream %v: echoed %v bytes", stream.RemoteAddr(), stream.ID(), n)
	if err == io.EOF {
		err = nil
	}	return err
}

RedialPacketConn

RedialPacketConn is the basic packet-sending and -receiving interface used in the client. It implements the net.PacketConn interface.

The loop method of RedialPacketConn repeatedly dials the same address, redialing whenever a connection is interrupted. The first thing sent on the connection is the session identifier, and that session identifier applies to all the packets contained therein. It implements the ReadFrom and WriteTo methods by encapsulating and decapsulating packets on whatever TCP connection happens to be live at the time.

turbotunnel/client/redialpacketconn.go
package main

import (
	"bufio"
	"errors"
	"log"
	"net"
	"sync"
	"sync/atomic"
	"time"

	"www.bamsoftware.com/git/turbotunnel-paper.git/example/turbotunnel/turbotunnel"
)

var errClosed = errors.New("operation on closed connection")
var errNotImplemented = errors.New("not implemented")

// stringAddr satisfies the net.Addr interface using fixed strings for the
// Network and String methods.
type stringAddr struct{ network, address string }

func (addr stringAddr) Network() string { return addr.network }
func (addr stringAddr) String() string  { return addr.address }

// RedialPacketConn implements the net.PacketConn interface by continually
// dialing a static TCP address, and encapsulating packets on each successive
// TCP connection using turbotunnel.ReadPacket and turbotunnel.WritePacket.
//
// Every Turbo Tunnel design will need some sort of PacketConn adapter that
// adapts the session layer's sequence of packets to the obfuscation layer. But
// not every such adapter will look like RedialPacketConn. It depends on what
// the obfuscation layer looks like. Some obfuscation layers will not need a
// persistent connection. One could, for example, handle every ReadFrom or
// WriteTo as an independent network operation.
type RedialPacketConn struct {
	sessionID  turbotunnel.SessionID
	remoteAddr net.Addr
	recvQueue  chan []byte
	sendQueue  chan []byte
	closeOnce  sync.Once
	closed     chan struct{}
	// What error to return when the RedialPacketConn is closed.
	err atomic.Value
}

func NewRedialPacketConn(sessionID turbotunnel.SessionID, remoteAddr string) *RedialPacketConn {
	c := &RedialPacketConn{
		sessionID:  sessionID,
		remoteAddr: &stringAddr{"tcp", remoteAddr},
		recvQueue:  make(chan []byte, 32),
		sendQueue:  make(chan []byte, 32),
		closed:     make(chan struct{}),
	}
	go func() {
		c.closeWithError(c.loop())
	}()
	return c
}

// loop dials c.remoteAddr in a loop, exchanging packets on each new connection
// as long as it lasts. Only errors in dialing break the loop and report the
// error to the caller.
func (c *RedialPacketConn) loop() error {
	for {
		select {
		case <-c.closed:
			return nil
		default:
		}
		log.Printf("session %v: redialing %v", c.sessionID, c.remoteAddr)
		err := c.dialAndExchange()
		if err != nil {
			return err
		}
	}
}

func (c *RedialPacketConn) dialAndExchange() error {
	conn, err := net.Dial("tcp", c.remoteAddr.String())
	// Failure to establish a new TCP connection is a fatal error.
	if err != nil {
		return err
	}
	defer conn.Close()

	// Begin by sending the session identifier; everything after that is
	// encapsulated packets.
	_, err = conn.Write(c.sessionID[:])
	if err != nil {
		// Errors after the dial are not fatal but cause a redial.
		return nil
	}

	var wg sync.WaitGroup
	wg.Add(2)
	done := make(chan struct{})
	// Read encapsulated packets from the connection and write them to
	// c.recvQueue.
	go func() {
		defer wg.Done()
		defer close(done) // Signal the write loop to finish.
		br := bufio.NewReader(conn)
		for {
			p, err := turbotunnel.ReadPacket(br)
			if err != nil {
				return
			}
			select {
			case <-c.closed:
				return
			case c.recvQueue <- p:
			}
		}
	}()
	// Read packets from c.sendQueue and encapsulate them into the
	// connection.
	go func() {
		defer wg.Done()
		defer conn.Close() // Signal the read loop to finish.
		bw := bufio.NewWriter(conn)
		for {
			select {
			case <-c.closed:
				return
			case <-done:
				return
			case p := <-c.sendQueue:
				err := turbotunnel.WritePacket(bw, p)
				if err != nil {
					return
				}
				err = bw.Flush()
				if err != nil {
					return
				}
			}
		}
	}()

	// Exchange packets until the connection is terminated.
	wg.Wait()
	return nil
}

func (c *RedialPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
	select {
	case <-c.closed:
		return 0, nil, &net.OpError{Op: "read", Net: c.remoteAddr.Network(), Source: c.sessionID, Addr: c.remoteAddr, Err: c.err.Load().(error)}
	default:
	}
	select {
	case <-c.closed:
		return 0, nil, &net.OpError{Op: "read", Net: c.remoteAddr.Network(), Source: c.sessionID, Addr: c.remoteAddr, Err: c.err.Load().(error)}
	case buf := <-c.recvQueue:
		return copy(p, buf), c.remoteAddr, nil
	}
}

func (c *RedialPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
	select {
	case <-c.closed:
		return 0, &net.OpError{Op: "write", Net: c.remoteAddr.Network(), Source: c.sessionID, Addr: c.remoteAddr, Err: c.err.Load().(error)}
	default:
	}
	// Copy the slice so that the caller may reuse p.
	buf := make([]byte, len(p))
	copy(buf, p)
	select {
	case c.sendQueue <- buf:
	default: // Silently drop outgoing packets if the send queue is full.
	}
	return len(buf), nil
}

// closeWithError unblocks pending operations and makes future operations fail
// with the given error. If err is nil, it becomes errClosed.
func (c *RedialPacketConn) closeWithError(err error) error {
	firstClose := false
	c.closeOnce.Do(func() {
		firstClose = true
		// Store the error that will be returned for future operations.
		if err == nil {
			err = errClosed
		}
		c.err.Store(err)
		close(c.closed)
	})
	if !firstClose {
		return &net.OpError{Op: "close", Net: c.remoteAddr.Network(), Source: c.sessionID, Addr: c.remoteAddr, Err: c.err.Load().(error)}
	}
	return nil
}

func (c *RedialPacketConn) Close() error { return c.closeWithError(nil) }

func (c *RedialPacketConn) LocalAddr() net.Addr  { return c.sessionID }
func (c *RedialPacketConn) RemoteAddr() net.Addr { return c.remoteAddr }

func (c *RedialPacketConn) SetDeadline(t time.Time) error      { return errNotImplemented }
func (c *RedialPacketConn) SetReadDeadline(t time.Time) error  { return errNotImplemented }
func (c *RedialPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }

ListenerPacketConn

ListenerPacketConn is the basic packet-sending and -receiving interface used in the server. It implements the net.PacketConn interface.

ListenerPacketConn contains a QueuePacketConn that keeps track of packet send and receive queues for each live session identifier. After a TCP connection is accepted, ListenerPacketConn reads its session identifier and then begins decapsulating packets and feeding them to the QueuePacketConn, which will make them available through its ReadFrom method. At the same time ListenerPacketConn reads from the send queue for the session identifier and encapsulates outgoing packets that were placed in the queue by the WriteTo method of QueuePacketConn.

turbotunnel/server/listenerpacketconn.go
package main

import (
	"io"
	"log"
	"net"
	"sync"
	"time"

	"www.bamsoftware.com/git/turbotunnel-paper.git/example/turbotunnel/turbotunnel"
)

type ListenerPacketConn struct {
	ln net.Listener
	*turbotunnel.QueuePacketConn
}

func NewListenerPacketConn(ln net.Listener) *ListenerPacketConn {
	c := &ListenerPacketConn{
		ln,
		turbotunnel.NewQueuePacketConn(ln.Addr(), 1*time.Minute),
	}
	go func() {
		err := c.acceptConnections()
		if err != nil {
			log.Printf("acceptConnections: %v", err)
		}
	}()
	return c
}

func (c *ListenerPacketConn) acceptConnections() error {
	for {
		conn, err := c.ln.Accept()
		if err != nil {
			if err, ok := err.(net.Error); ok && err.Temporary() {
				continue
			}
			return err
		}
		go func() {
			defer conn.Close()
			err := c.handleConnection(conn)
			if err != nil {
				log.Printf("handleConnection: %v", err)
			}
		}()
	}
}

func (c *ListenerPacketConn) handleConnection(conn net.Conn) error {
	// First read the client's session identifier.
	var sessionID turbotunnel.SessionID
	_, err := io.ReadFull(conn, sessionID[:])
	if err != nil {
		return err
	}

	var wg sync.WaitGroup
	wg.Add(2)
	done := make(chan struct{})
	go func() {
		defer wg.Done()
		defer close(done) // Signal the write loop to finish.
		for {
			p, err := turbotunnel.ReadPacket(conn)
			if err != nil {
				return
			}
			c.QueuePacketConn.QueueIncoming(p, sessionID)
		}
	}()
	go func() {
		defer wg.Done()
		defer conn.Close() // Signal the read loop to finish.
		for {
			select {
			case <-done:
				return
			case p, ok := <-c.QueuePacketConn.OutgoingQueue(sessionID):
				if ok {
					err := turbotunnel.WritePacket(conn, p)
					if err != nil {
						return
					}
				}
			}
		}
	}()

	wg.Wait()
	return nil
}

func (c *ListenerPacketConn) Close() error {
	err := c.ln.Close()
	err2 := c.QueuePacketConn.Close()
	if err == nil {
		err = err2
	}
	return err
}

QueuePacketConn

QueuePacketConn transforms the "push" interface of net.PacketConn into a "pull" interface. The QueueIncoming method places an incoming packet in a queue, from where it may later be returned from a call to ReadFrom. The WriteTo method places an outgoing packet in a per–session identifier queue, from which it may be later retrieved using the OutgoingQueue method.

QueuePacketConn contains a RemoteMap that keeps track of mapping of session identifiers to outgoing queues.

turbotunnel/turbotunnel/queuepacketconn.go
package turbotunnel

import (
	"errors"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

var errClosed = errors.New("operation on closed connection")
var errNotImplemented = errors.New("not implemented")

// taggedPacket is a combination of a []byte and a net.Addr, encapsulating the
// return type of PacketConn.ReadFrom.
type taggedPacket struct {
	P    []byte
	Addr net.Addr
}

// QueuePacketConn implements net.PacketConn by storing queues of packets. There
// is one incoming queue (where packets are additionally tagged by the source
// address of the peer that sent them). There are many outgoing queues, one for
// each remote peer address that has been recently seen. The QueueIncoming
// method inserts a packet into the incoming queue, to eventually be returned by
// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
// which can later by accessed through the OutgoingQueue method.
type QueuePacketConn struct {
	remotes   *RemoteMap
	localAddr net.Addr
	recvQueue chan taggedPacket
	closeOnce sync.Once
	closed    chan struct{}
	// What error to return when the QueuePacketConn is closed.
	err atomic.Value
}

// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
// for at least a duration of timeout.
func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
	return &QueuePacketConn{
		remotes:   NewRemoteMap(timeout),
		localAddr: localAddr,
		recvQueue: make(chan taggedPacket, 32),
		closed:    make(chan struct{}),
	}
}

// QueueIncoming queues an incoming packet and its source address, to be
// returned in a future call to ReadFrom.
func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
	select {
	case <-c.closed:
		// If we're closed, silently drop it.
		return
	default:
	}
	// Copy the slice so that the caller may reuse p.
	buf := make([]byte, len(p))
	copy(buf, p)
	select {
	case c.recvQueue <- taggedPacket{buf, addr}:
	default: // Silently drop incoming packets if the receive queue is full.
	}
}

// OutgoingQueue returns the queue of outgoing packets corresponding to addr,
// creating it if necessary. The contents of the queue will be packets that are
// written to the address in question using WriteTo.
func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
	return c.remotes.SendQueue(addr)
}

// ReadFrom returns a packet and address previously stored by QueueIncoming.
func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
	select {
	case <-c.closed:
		return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
	default:
	}
	select {
	case <-c.closed:
		return 0, nil, &net.OpError{Op: "read", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
	case packet := <-c.recvQueue:
		return copy(p, packet.P), packet.Addr, nil
	}
}

// WriteTo queues an outgoing packet for the given address. The queue can later
// be retrieved using the OutgoingQueue method.
func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
	select {
	case <-c.closed:
		return 0, &net.OpError{Op: "write", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
	default:
	}
	// Copy the slice so that the caller may reuse p.
	buf := make([]byte, len(p))
	copy(buf, p)
	select {
	case c.remotes.SendQueue(addr) <- buf:
	default: // Silently drop outgoing packets if the send queue is full.
	}
	return len(buf), nil
}

// closeWithError unblocks pending operations and makes future operations fail
// with the given error. If err is nil, it becomes errClosed.
func (c *QueuePacketConn) closeWithError(err error) error {
	firstClose := false
	c.closeOnce.Do(func() {
		firstClose = true
		// Store the error that will be returned for future operations.
		if err == nil {
			err = errClosed
		}
		c.err.Store(err)
		close(c.closed)
	})
	if !firstClose {
		return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
	}
	return nil
}

// Close unblocks pending operations and makes future operations fail with a
// "closed connection" error.
func (c *QueuePacketConn) Close() error { return c.closeWithError(nil) }

// LocalAddr returns the localAddr value that was passed to NewQueuePacketConn.
func (c *QueuePacketConn) LocalAddr() net.Addr { return c.localAddr }

func (c *QueuePacketConn) SetDeadline(t time.Time) error      { return errNotImplemented }
func (c *QueuePacketConn) SetReadDeadline(t time.Time) error  { return errNotImplemented }
func (c *QueuePacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }

RemoteMap

RemoteMap manages a mapping of session identifiers to send queues. It automatically discards send queues that have not been used in a while, to avoid keeping state forever for disconnected peers. (If a send queue is discarded and the peer later reappears, it doesn't cause any loss of data—the send queue will be reinstantiated and the KCP layer will retransmit any unacknowledged packets.)

turbotunnel/turbotunnel/remotemap.go
package turbotunnel

import (
	"container/heap"
	"net"
	"sync"
	"time"
)

// remoteRecord is a record of a recently seen remote peer, with the time it was
// last seen and a send queue.
type remoteRecord struct {
	Addr      net.Addr
	LastSeen  time.Time
	SendQueue chan []byte
}

// RemoteMap manages a mapping of live remote peers, keyed by address, to their
// respective send queues.
//
// RemoteMap's functions are safe to call from multiple goroutines.
type RemoteMap struct {
	// We use an inner structure to avoid exposing public heap.Interface
	// functions to users of remoteMap.
	inner remoteMapInner
	// Synchronizes access to inner.
	lock sync.Mutex
}

// NewRemoteMap creates a RemoteMap that expires peers after a timeout.
//
// If the timeout is 0, peers never expire.
//
// The timeout does not have to be kept in sync with smux's idle timeout. If a
// peer is removed from the map while the smux session is still live, the worst
// that can happen is a loss of whatever packets were in the send queue at the
// time. If smux later decides to send more packets to the same peer, we'll
// instantiate a new send queue, and if the peer is ever seen again with a
// matching address, we'll deliver them.
func NewRemoteMap(timeout time.Duration) *RemoteMap {
	m := &RemoteMap{
		inner: remoteMapInner{
			byAge:  make([]*remoteRecord, 0),
			byAddr: make(map[string]int),
		},
	}
	if timeout > 0 {
		go func() {
			for {
				time.Sleep(timeout / 2)
				now := time.Now()
				m.lock.Lock()
				m.inner.removeExpired(now, timeout)
				m.lock.Unlock()
			}
		}()
	}
	return m
}

// SendQueue returns the send queue corresponding to addr, creating it if
// necessary.
func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.inner.Lookup(addr, time.Now()).SendQueue
}

// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
// expiring old records. byAddr is a map from addresses to heap indices, to
// allow looking up by address. Unlike RemoteMap, remoteMapInner requires
// external synchonization.
type remoteMapInner struct {
	byAge  []*remoteRecord
	byAddr map[string]int
}

// removeExpired removes all records whose LastSeen timestamp is more than
// timeout in the past.
func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
	for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
		record := heap.Pop(inner).(*remoteRecord)
		close(record.SendQueue)
	}
}

// Lookup finds the existing record corresponding to addr, or creates a new
// one if none exists yet. It updates the record's LastSeen time and returns the
// record.
func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
	var record *remoteRecord
	i, ok := inner.byAddr[addr.String()]
	if ok {
		// Found one, update its LastSeen.
		record = inner.byAge[i]
		record.LastSeen = now
		heap.Fix(inner, i)
	} else {
		// Not found, create a new one.
		record = &remoteRecord{
			Addr:      addr,
			LastSeen:  now,
			SendQueue: make(chan []byte, 32),
		}
		heap.Push(inner, record)
	}
	return record
}

// heap.Interface for remoteMapInner.

func (inner *remoteMapInner) Len() int {
	if len(inner.byAge) != len(inner.byAddr) {
		panic("inconsistent remoteMap")
	}
	return len(inner.byAge)
}

func (inner *remoteMapInner) Less(i, j int) bool {
	return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
}

func (inner *remoteMapInner) Swap(i, j int) {
	inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
	inner.byAddr[inner.byAge[i].Addr.String()] = i
	inner.byAddr[inner.byAge[j].Addr.String()] = j
}

func (inner *remoteMapInner) Push(x interface{}) {
	record := x.(*remoteRecord)
	if _, ok := inner.byAddr[record.Addr.String()]; ok {
		panic("duplicate address in remoteMap")
	}
	// Insert into byAddr map.
	inner.byAddr[record.Addr.String()] = len(inner.byAge)
	// Insert into byAge slice.
	inner.byAge = append(inner.byAge, record)
}

func (inner *remoteMapInner) Pop() interface{} {
	n := len(inner.byAddr)
	// Remove from byAge slice.
	record := inner.byAge[n-1]
	inner.byAge[n-1] = nil
	inner.byAge = inner.byAge[:n-1]
	// Remove from byAddr map.
	delete(inner.byAddr, record.Addr.String())
	return record
}

Packet encapsulation

The ReadPacket and WritePacket functions define how packets are represented in a temporary TCP connection. This example uses a simple length-prefixed scheme. In your own design you may want to use something more sophisticated that allows for padding.

Systems that are based on different substrates will, of course, have to use different encapsulation schemes. For example, see the DNS message encoding used by dnstt.

turbotunnel/turbotunnel/encapsulation.go
package turbotunnel

import (
	"encoding/binary"
	"io"
)

// ReadPacket decapsulates a packet from r. It returns io.EOF if and only if
// there were zero bytes to be read from r.
func ReadPacket(r io.Reader) ([]byte, error) {
	var length uint16
	err := binary.Read(r, binary.BigEndian, &length)
	if err != nil {
		return nil, err
	}
	p := make([]byte, length)
	_, err = io.ReadFull(r, p)
	if err == io.EOF {
		err = io.ErrUnexpectedEOF
	}
	return p, err
}

// WritePacket encapsulates a packet into w. It panics if the length of the p
// cannot be represented by a uint16.
func WritePacket(w io.Writer, p []byte) error {
	length := uint16(len(p))
	if int(length) != len(p) {
		panic("packet too long")
	}
	err := binary.Write(w, binary.BigEndian, length)
	if err != nil {
		return err
	}
	_, err = w.Write(p)
	return err
}

SessionID

SessionID defines the format of a session identifier. Here, it is just a 64-bit random string. The session identifier should be long enough to prevent guessing and random collisions. SessionID implements the net.Addr interface.

turbotunnel/turbotunnel/sessionid.go
package turbotunnel

import (
	"crypto/rand"
	"encoding/hex"
)

// SessionID represents an ongoing session, independent of any network address.
// SessionID satisfies the net.Addr interface.
type SessionID [8]byte

// NewSessionID creates a new random SessionID.
func NewSessionID() SessionID {
	var id SessionID
	_, err := rand.Read(id[:])
	if err != nil {
		panic(err)
	}
	return id
}

func (id SessionID) Network() string { return "session" }
func (id SessionID) String() string  { return hex.EncodeToString(id[:]) }