Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable fwmark (SO_MARK) for outgoing sockets #202

Merged
merged 10 commits into from
Dec 16, 2024
6 changes: 6 additions & 0 deletions cmd/outline-ss-server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@ import (
type ServiceConfig struct {
Listeners []ListenerConfig
Keys []KeyConfig
Dialer DialerConfig
}

type ListenerType string

const listenerTypeTCP ListenerType = "tcp"

const listenerTypeUDP ListenerType = "udp"

type ListenerConfig struct {
Type ListenerType
Address string
}

type DialerConfig struct {
Fwmark uint
}

type KeyConfig struct {
ID string
Cipher string
Expand Down
23 changes: 13 additions & 10 deletions cmd/outline-ss-server/config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ services:
- type: udp
address: "[::]:9000"
keys:
- id: user-0
cipher: chacha20-ietf-poly1305
secret: Secret0
- id: user-1
cipher: chacha20-ietf-poly1305
secret: Secret1

- id: user-0
cipher: chacha20-ietf-poly1305
secret: Secret0
- id: user-1
cipher: chacha20-ietf-poly1305
secret: Secret1
dialer:
# fwmark can be used in conjunction with other Linux networking features like cgroups, network namespaces, and TC (Traffic Control) for sophisticated network management.
# Value of 0 disables fwmark (SO_MARK)
fwmark: 0
- listeners:
- type: tcp
address: "[::]:9001"
- type: udp
address: "[::]:9001"
keys:
- id: user-2
cipher: chacha20-ietf-poly1305
secret: Secret2
- id: user-2
cipher: chacha20-ietf-poly1305
secret: Secret2
46 changes: 32 additions & 14 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/lmittmann/tint"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/term"

"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service"
)

var logLevel = new(slog.LevelVar) // Info by default
var logHandler slog.Handler
var (
logLevel = new(slog.LevelVar) // Info by default
logHandler slog.Handler
)

// Set by goreleaser default ldflags. See https://goreleaser.com/customization/build/
var version = "dev"
Expand Down Expand Up @@ -120,30 +124,32 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) {
return ciphers, nil
}

func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler {
func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList, dialer transport.StreamDialer) service.StreamHandler {
authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics)
// TODO: Register initial data metrics at zero.
return service.NewStreamHandler(authFunc, tcpReadTimeout)
return service.NewStreamHandler(authFunc, tcpReadTimeout, dialer)
}

func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics)
func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList, dialer service.UDPDialer) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics, dialer)
}

func (s *SSServer) NewShadowsocksStreamHandlerFromConfig(config ServiceConfig) (service.StreamHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksStreamHandler(ciphers), nil
dialer := service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, config.Dialer.Fwmark)
return s.NewShadowsocksStreamHandler(ciphers, dialer), nil
}

func (s *SSServer) NewShadowsocksPacketHandlerFromConfig(config ServiceConfig) (service.PacketHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksPacketHandler(ciphers), nil
dialer := service.MakeTargetPacketListener(config.Dialer.Fwmark)
return s.NewShadowsocksPacketHandler(ciphers, dialer), nil
sbruens marked this conversation as resolved.
Show resolved Hide resolved
}

type listenerSet struct {
Expand Down Expand Up @@ -243,7 +249,8 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
ciphers := service.NewCipherList()
ciphers.Update(cipherList)

sh := s.NewShadowsocksStreamHandler(ciphers)
tcpDialer := service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, 0)
sbruens marked this conversation as resolved.
Show resolved Hide resolved
sh := s.NewShadowsocksStreamHandler(ciphers, tcpDialer)
ln, err := lnSet.ListenStream(addr)
if err != nil {
return err
Expand All @@ -259,7 +266,8 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
ph := s.NewShadowsocksPacketHandler(ciphers)
udpDialer := service.MakeTargetPacketListener(0)
sbruens marked this conversation as resolved.
Show resolved Hide resolved
ph := s.NewShadowsocksPacketHandler(ciphers, udpDialer)
go ph.Handle(pc)
}

Expand All @@ -275,7 +283,12 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String())
slog.Info("TCP service started.", "address", ln.Addr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
if sh == nil {
sh, err = s.NewShadowsocksStreamHandlerFromConfig(serviceConfig)
if err != nil {
Expand All @@ -291,7 +304,12 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
slog.Info("UDP service started.", "address", pc.LocalAddr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
if ph == nil {
ph, err = s.NewShadowsocksPacketHandlerFromConfig(serviceConfig)
if err != nil {
Expand Down
32 changes: 18 additions & 14 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
)

const maxUDPPacketSize = 64 * 1024

var udpDefaultDialer = service.MakeTargetPacketListener(0)

var tcpDefaultDialer = service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, 0)

func init() {
logging.SetLevel(logging.INFO, "")
}
Expand Down Expand Up @@ -132,8 +138,7 @@ func TestTCPEcho(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -183,8 +188,7 @@ func TestTCPEcho(t *testing.T) {
echoRunning.Wait()
}

type fakeShadowsocksMetrics struct {
}
type fakeShadowsocksMetrics struct{}

var _ service.ShadowsocksConnMetrics = (*fakeShadowsocksMetrics)(nil)

Expand Down Expand Up @@ -212,7 +216,7 @@ func TestRestrictedAddresses(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler := service.NewStreamHandler(authFunc, testTimeout, tcpDefaultDialer)
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -275,9 +279,11 @@ var _ service.UDPConnMetrics = (*fakeUDPConnMetrics)(nil)
func (m *fakeUDPConnMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.up = append(m.up, udpRecord{m.clientAddr, m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}

func (m *fakeUDPConnMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
m.down = append(m.down, udpRecord{m.clientAddr, m.accessKey, status, targetProxyBytes, proxyClientBytes})
}

func (m *fakeUDPConnMetrics) RemoveNatEntry() {
// Not tested because it requires waiting for a long timeout.
}
Expand Down Expand Up @@ -311,7 +317,7 @@ func TestUDPEcho(t *testing.T) {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{}
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -401,8 +407,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -468,8 +473,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -544,7 +548,7 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -588,7 +592,7 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down
25 changes: 19 additions & 6 deletions service/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ import (
"log/slog"
"net"
"net/netip"
"os"
"sync"
"syscall"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/shadowsocks/go-shadowsocks2/socks"

onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
"github.com/shadowsocks/go-shadowsocks2/socks"
)

// TCPConnMetrics is used to report metrics on TCP connections.
Expand Down Expand Up @@ -158,18 +160,27 @@ type streamHandler struct {
}

// NewStreamHandler creates a StreamHandler
func NewStreamHandler(authenticate StreamAuthenticateFunc, timeout time.Duration) StreamHandler {
func NewStreamHandler(authenticate StreamAuthenticateFunc, timeout time.Duration, dialer transport.StreamDialer) StreamHandler {
return &streamHandler{
readTimeout: timeout,
authenticate: authenticate,
dialer: defaultDialer,
dialer: dialer,
}
}

var defaultDialer = makeValidatingTCPStreamDialer(onet.RequirePublicIP)

func makeValidatingTCPStreamDialer(targetIPValidator onet.TargetIPValidator) transport.StreamDialer {
func MakeValidatingTCPStreamDialer(targetIPValidator onet.TargetIPValidator, fwmark uint) transport.StreamDialer {
sabify marked this conversation as resolved.
Show resolved Hide resolved
return &transport.TCPDialer{Dialer: net.Dialer{Control: func(network, address string, c syscall.RawConn) error {
if fwmark > 0 {
err := c.Control(func(fd uintptr) {
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, int(fwmark))
if err != nil {
slog.Error("Set fwmark failed.", "err", os.NewSyscallError("failed to set mark for TCP socket", err))
}
})
if err != nil {
slog.Error("Set TCPDialer Control func failed.", "err", err)
}
}
ip, _, _ := net.SplitHostPort(address)
return targetIPValidator(net.ParseIP(ip))
}}}
Expand Down Expand Up @@ -377,6 +388,8 @@ type NoOpTCPConnMetrics struct{}
var _ TCPConnMetrics = (*NoOpTCPConnMetrics)(nil)

func (m *NoOpTCPConnMetrics) AddAuthenticated(accessKey string) {}

func (m *NoOpTCPConnMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) {
}

func (m *NoOpTCPConnMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {}
Loading
Loading