Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split out association handling in packet handler #221

Open
wants to merge 52 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
b2aa859
refactor: split UDP serving from handling of packets
sbruens Nov 18, 2024
c0c0e9f
Use a read channel.
sbruens Nov 20, 2024
8d95309
Rename `wrappedPacketConn` to just `packetConn`.
sbruens Nov 20, 2024
51d5c88
Update `shadowsocks_handler`.
sbruens Nov 21, 2024
422542b
Change the `HandlePacket` API to not require the `pkt`.
sbruens Nov 22, 2024
c10176f
Remove the NAT map from the `Handle()` function.
sbruens Nov 25, 2024
004c6c3
Rework the metrics now that the NAT is no longer done by the handler.
sbruens Nov 26, 2024
a91d55b
Move the metrics `AddClosed()` call to after the `timedCopy` returns.
sbruens Nov 26, 2024
1d7200b
Remove the explicit `targetConn.Close()` and let it expire on its own.
sbruens Nov 26, 2024
4b8eeae
Add the NAT metrics back in at the server level.
sbruens Nov 27, 2024
1e89e85
Use buffer pool on `packetHandler` instead of global.
sbruens Nov 27, 2024
b4e7dbb
Revert wrapping the `clientConn` with shadowsocks encryption/decryption.
sbruens Nov 27, 2024
63c2cff
Rename `packet` to `association`.
sbruens Nov 27, 2024
1eeb4d6
Fix tests.
sbruens Nov 28, 2024
08e8bba
Update the docstring for `PacketServe`.
sbruens Nov 28, 2024
34a01d7
Fix comment to refer to "associations".
sbruens Nov 28, 2024
033ba9d
Fix metrics test.
sbruens Dec 3, 2024
87a9d23
Use `slicepool`.
sbruens Dec 3, 2024
50feaa2
Let the assocation handler provide the buffer.
sbruens Dec 4, 2024
b7a4a3c
Remove the `packetConnWrapper` and move the logic into `natconn` inst…
sbruens Dec 10, 2024
f80294c
Fix close while reading of `natconn`.
sbruens Dec 10, 2024
36d4b27
Simplify the natmap a little.
sbruens Dec 10, 2024
f5d9ac3
Refactor `PacketServe` to use events (close and read).
sbruens Dec 13, 2024
e0547f2
Use correct logger.
sbruens Dec 13, 2024
3dc12d1
Keep `readCh` and `closeCh` unbuffered.
sbruens Dec 16, 2024
afb9cd1
Catch panics in the `ReadFrom` go routine.
sbruens Dec 16, 2024
5d2acc6
Close the `readCh` instead of sending the error on the `readCh`.
sbruens Dec 16, 2024
7cd0f1f
Wrap a logger with the association's client address so we can simplif…
sbruens Dec 16, 2024
078032c
Reference GitHub issue for supporting multiple IPs.
sbruens Dec 16, 2024
2b3f746
Simplify packet handling with a new `association` struct.
sbruens Dec 18, 2024
2f2268b
Add some comments to the `Association` interface.
sbruens Dec 18, 2024
bfe4b35
Consolidate debug logging.
sbruens Dec 18, 2024
f37f23a
Rename some vars.
sbruens Dec 18, 2024
08de4c3
Update doc.
sbruens Dec 18, 2024
418c0e4
Format.
sbruens Dec 19, 2024
98988b0
Do not unpack first packets twice.
sbruens Dec 19, 2024
b9c0286
Move handling into the association.
sbruens Dec 20, 2024
d14ea20
Merge branch 'master' into sbruens/udp-split-serving
sbruens Dec 20, 2024
eef630a
Add some comments to the timeout value.
sbruens Dec 20, 2024
1c462b1
Don't set the stream dialer in the old config flow.
sbruens Dec 20, 2024
0918050
Rename `AddAuthentication` and `AddClose`.
sbruens Jan 6, 2025
78af4be
Update comment to reflect it handles packets from both directions.
sbruens Jan 6, 2025
2cf398c
Separate the interfaces for `Handle()` and `HandlePacket()`.
sbruens Jan 6, 2025
1055cb1
Fix typo in `UDPAssocationMetrics`.
sbruens Jan 6, 2025
f8ab81a
Don't pass `conn` to `Handle()`.
sbruens Jan 6, 2025
e6ef2f3
Update comment.
sbruens Jan 6, 2025
2939f8a
Add comments.
sbruens Jan 6, 2025
23a03e9
Remove ConnAssociation in favor of a `HandleAssociation(Conn, PacketA…
sbruens Jan 8, 2025
aa130f0
Split `Service` interface into outline-ss-server and Caddy interfaces.
sbruens Jan 8, 2025
19fb5f7
Remove unused const.
sbruens Jan 8, 2025
c656b36
Don't require `conn` in `HandleAssociation()`.
sbruens Jan 8, 2025
31cec7c
Exit the loop if the connection is closed.
sbruens Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions caddy/shadowsocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
outline "github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/caddyserver/caddy/v2"
"github.com/mholt/caddy-l4/layer4"

outline "github.com/Jigsaw-Code/outline-ss-server/service"
)

const serverUDPBufferSize = 64 * 1024

const ssModuleName = "layer4.handlers.shadowsocks"

func init() {
Expand Down Expand Up @@ -115,8 +118,12 @@ func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) err
switch conn := cx.Conn.(type) {
case transport.StreamConn:
h.service.HandleStream(cx.Context, conn)
case net.PacketConn:
h.service.HandlePacket(conn)
case net.Conn:
assoc, err := h.service.NewAssociation(conn)
if err != nil {
return fmt.Errorf("Failed to handle association: %v", err)
}
assoc.Handle(conn)
default:
return fmt.Errorf("failed to handle unknown connection type: %t", conn)
}
Expand Down
9 changes: 4 additions & 5 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {

ssService, err := service.NewShadowsocksService(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, 0)),
service.WithLogger(slog.Default()),
)
ln, err := lnSet.ListenStream(addr)
Expand All @@ -242,7 +242,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, ssService.NewAssociation, s.serverMetrics)
}

for _, serviceConfig := range config.Services {
Expand All @@ -252,11 +252,10 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
ssService, err := service.NewShadowsocksService(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
service.WithStreamDialer(service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, serviceConfig.Dialer.Fwmark)),
service.WithPacketListener(service.MakeTargetUDPListener(serviceConfig.Dialer.Fwmark)),
service.WithPacketListener(service.MakeTargetUDPListener(s.natTimeout, serviceConfig.Dialer.Fwmark)),
fortuna marked this conversation as resolved.
Show resolved Hide resolved
service.WithLogger(slog.Default()),
)
if err != nil {
Expand Down Expand Up @@ -287,7 +286,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
return serviceConfig.Dialer.Fwmark
}())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, ssService.NewAssociation, s.serverMetrics)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
32 changes: 29 additions & 3 deletions cmd/outline-ss-server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"time"

"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -25,12 +26,15 @@ var now = time.Now

type serverMetrics struct {
// NOTE: New metrics need to be added to `newPrometheusServerMetrics()`, `Describe()` and `Collect()`.
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
buildInfo *prometheus.GaugeVec
accessKeys prometheus.Gauge
ports prometheus.Gauge
addedNatEntries prometheus.Counter
removedNatEntries prometheus.Counter
}

var _ prometheus.Collector = (*serverMetrics)(nil)
var _ service.NATMetrics = (*serverMetrics)(nil)

// newPrometheusServerMetrics constructs a Prometheus metrics collector for server
// related metrics.
Expand All @@ -48,19 +52,33 @@ func newPrometheusServerMetrics() *serverMetrics {
Name: "ports",
Help: "Count of open ports",
}),
addedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_added",
Help: "Entries added to the UDP NAT table",
}),
removedNatEntries: prometheus.NewCounter(prometheus.CounterOpts{
Subsystem: "udp",
Name: "nat_entries_removed",
Help: "Entries removed from the UDP NAT table",
}),
}
}

func (m *serverMetrics) Describe(ch chan<- *prometheus.Desc) {
m.buildInfo.Describe(ch)
m.accessKeys.Describe(ch)
m.ports.Describe(ch)
m.addedNatEntries.Describe(ch)
m.removedNatEntries.Describe(ch)
}

func (m *serverMetrics) Collect(ch chan<- prometheus.Metric) {
m.buildInfo.Collect(ch)
m.accessKeys.Collect(ch)
m.ports.Collect(ch)
m.addedNatEntries.Collect(ch)
m.removedNatEntries.Collect(ch)
}

func (m *serverMetrics) SetVersion(version string) {
Expand All @@ -71,3 +89,11 @@ func (m *serverMetrics) SetNumAccessKeys(numKeys int, ports int) {
m.accessKeys.Set(float64(numKeys))
m.ports.Set(float64(ports))
}

func (m *serverMetrics) AddNATEntry() {
m.addedNatEntries.Inc()
}

func (m *serverMetrics) RemoveNATEntry() {
m.removedNatEntries.Inc()
}
127 changes: 66 additions & 61 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration_test
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -104,6 +105,9 @@ func startUDPEchoServer(t testing.TB) (*net.UDPConn, *sync.WaitGroup) {
for {
n, clientAddr, err := conn.ReadFromUDP(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
t.Logf("Failed to read from UDP conn: %v", err)
return
}
Expand Down Expand Up @@ -257,47 +261,50 @@ func TestRestrictedAddresses(t *testing.T) {
assert.ElementsMatch(t, testMetrics.statuses, expectedStatus)
}

// Stub metrics implementation for testing NAT behaviors.
type natTestMetrics struct {
natEntriesAdded int
}

var _ service.NATMetrics = (*natTestMetrics)(nil)

func (m *natTestMetrics) AddNATEntry() {
m.natEntriesAdded++
}
func (m *natTestMetrics) RemoveNATEntry() {}

// Metrics about one UDP packet.
type udpRecord struct {
clientAddr net.Addr
accessKey, status string
in, out int64
}

type fakeUDPConnMetrics struct {
clientAddr net.Addr
accessKey string
up, down []udpRecord
type fakeUDPAssocationMetrics struct {
accessKey string
up, down []udpRecord
mu sync.Mutex
}

var _ service.UDPConnMetrics = (*fakeUDPConnMetrics)(nil)
var _ service.UDPAssocationMetrics = (*fakeUDPAssocationMetrics)(nil)

func (m *fakeUDPConnMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.up = append(m.up, udpRecord{m.clientAddr, m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}
func (m *fakeUDPConnMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
m.down = append(m.down, udpRecord{m.clientAddr, m.accessKey, status, targetProxyBytes, proxyClientBytes})
}
func (m *fakeUDPConnMetrics) RemoveNatEntry() {
// Not tested because it requires waiting for a long timeout.
func (m *fakeUDPAssocationMetrics) AddAuthenticated(key string) {
m.accessKey = key
}

// Fake metrics implementation for UDP
type fakeUDPMetrics struct {
connMetrics []fakeUDPConnMetrics
func (m *fakeUDPAssocationMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.up = append(m.up, udpRecord{m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}

var _ service.UDPMetrics = (*fakeUDPMetrics)(nil)

func (m *fakeUDPMetrics) AddUDPNatEntry(clientAddr net.Addr, accessKey string) service.UDPConnMetrics {
cm := fakeUDPConnMetrics{
clientAddr: clientAddr,
accessKey: accessKey,
}
m.connMetrics = append(m.connMetrics, cm)
return &m.connMetrics[len(m.connMetrics)-1]
func (m *fakeUDPAssocationMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
m.mu.Lock()
defer m.mu.Unlock()
m.down = append(m.down, udpRecord{m.accessKey, status, targetProxyBytes, proxyClientBytes})
}

func (m *fakeUDPAssocationMetrics) AddClosed() {}

func TestUDPEcho(t *testing.T) {
echoConn, echoRunning := startUDPEchoServer(t)

Expand All @@ -310,14 +317,14 @@ func TestUDPEcho(t *testing.T) {
if err != nil {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{}
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})

proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(proxyConn)
done <- struct{}{}
}()
natMetrics := &natTestMetrics{}
associationMetrics := &fakeUDPAssocationMetrics{}
go service.PacketServe(proxyConn, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, associationMetrics)
}, natMetrics)

cryptoKey, err := shadowsocks.NewEncryptionKey(shadowsocks.CHACHA20IETFPOLY1305, secrets[0])
require.NoError(t, err)
Expand Down Expand Up @@ -356,34 +363,28 @@ func TestUDPEcho(t *testing.T) {
echoConn.Close()
echoRunning.Wait()
proxyConn.Close()
<-done
// Verify that the expected metrics were reported.
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
keyID := snapshot[0].Value.(*service.CipherEntry).ID

if len(testMetrics.connMetrics) != 1 {
t.Errorf("Wrong NAT count: %d", len(testMetrics.connMetrics))
}
if len(testMetrics.connMetrics[0].up) != 1 {
t.Errorf("Wrong number of packets sent: %v", testMetrics.connMetrics[0].up)
} else {
record := testMetrics.connMetrics[0].up[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad upstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad upstream metrics")
require.Equal(t, "OK", record.status, "Bad upstream metrics")
require.Greater(t, record.in, record.out, "Bad upstream metrics")
require.Equal(t, int64(N), record.out, "Bad upstream metrics")
}
if len(testMetrics.connMetrics[0].down) != 1 {
t.Errorf("Wrong number of packets received: %v", testMetrics.connMetrics[0].down)
} else {
record := testMetrics.connMetrics[0].down[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad downstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad downstream metrics")
require.Equal(t, "OK", record.status, "Bad downstream metrics")
require.Greater(t, record.out, record.in, "Bad downstream metrics")
require.Equal(t, int64(N), record.in, "Bad downstream metrics")
}
require.Equal(t, natMetrics.natEntriesAdded, 1, "Wrong NAT count")

associationMetrics.mu.Lock()
defer associationMetrics.mu.Unlock()

require.Lenf(t, associationMetrics.up, 1, "Wrong number of packets sent")
record := associationMetrics.up[0]
require.Equal(t, keyID, record.accessKey, "Bad upstream metrics")
require.Equal(t, "OK", record.status, "Bad upstream metrics")
require.Greater(t, record.in, record.out, "Bad upstream metrics")
require.Equal(t, int64(N), record.out, "Bad upstream metrics")

require.Lenf(t, associationMetrics.down, 1, "Wrong number of packets received")
record = associationMetrics.down[0]
require.Equal(t, keyID, record.accessKey, "Bad downstream metrics")
require.Equal(t, "OK", record.status, "Bad downstream metrics")
require.Greater(t, record.out, record.in, "Bad downstream metrics")
require.Equal(t, int64(N), record.in, "Bad downstream metrics")
}

func BenchmarkTCPThroughput(b *testing.B) {
Expand Down Expand Up @@ -544,11 +545,13 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(server)
service.PacketServe(server, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, nil)
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down Expand Up @@ -588,11 +591,13 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(proxyConn)
service.PacketServe(proxyConn, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, nil)
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down
Loading
Loading