Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable fwmark (SO_MARK) for outgoing sockets #202

Merged
merged 10 commits into from
Dec 16, 2024
6 changes: 6 additions & 0 deletions cmd/outline-ss-server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@ import (
type ServiceConfig struct {
Listeners []ListenerConfig
Keys []KeyConfig
Dialer DialerConfig
}

type ListenerType string

const listenerTypeTCP ListenerType = "tcp"

const listenerTypeUDP ListenerType = "udp"

type ListenerConfig struct {
Type ListenerType
Address string
}

type DialerConfig struct {
Fwmark uint
}

type KeyConfig struct {
ID string
Cipher string
Expand Down
23 changes: 13 additions & 10 deletions cmd/outline-ss-server/config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@ services:
- type: udp
address: "[::]:9000"
keys:
- id: user-0
cipher: chacha20-ietf-poly1305
secret: Secret0
- id: user-1
cipher: chacha20-ietf-poly1305
secret: Secret1

- id: user-0
cipher: chacha20-ietf-poly1305
secret: Secret0
- id: user-1
cipher: chacha20-ietf-poly1305
secret: Secret1
dialer:
# fwmark can be used in conjunction with other Linux networking features like cgroups, network namespaces, and TC (Traffic Control) for sophisticated network management.
# Value of 0 disables fwmark (SO_MARK) (Linux Only)
fwmark: 0
- listeners:
- type: tcp
address: "[::]:9001"
- type: udp
address: "[::]:9001"
keys:
- id: user-2
cipher: chacha20-ietf-poly1305
secret: Secret2
- id: user-2
cipher: chacha20-ietf-poly1305
secret: Secret2
46 changes: 32 additions & 14 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/lmittmann/tint"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/term"

"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service"
)

var logLevel = new(slog.LevelVar) // Info by default
var logHandler slog.Handler
var (
logLevel = new(slog.LevelVar) // Info by default
logHandler slog.Handler
)

// Set by goreleaser default ldflags. See https://goreleaser.com/customization/build/
var version = "dev"
Expand Down Expand Up @@ -120,30 +124,32 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) {
return ciphers, nil
}

func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler {
func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList, dialer transport.StreamDialer) service.StreamHandler {
authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics)
// TODO: Register initial data metrics at zero.
return service.NewStreamHandler(authFunc, tcpReadTimeout)
return service.NewStreamHandler(authFunc, tcpReadTimeout, dialer)
}

func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics)
func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList, dialer service.UDPDialer) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics, dialer)
}

func (s *SSServer) NewShadowsocksStreamHandlerFromConfig(config ServiceConfig) (service.StreamHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksStreamHandler(ciphers), nil
dialer := service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, config.Dialer.Fwmark)
return s.NewShadowsocksStreamHandler(ciphers, dialer), nil
}

func (s *SSServer) NewShadowsocksPacketHandlerFromConfig(config ServiceConfig) (service.PacketHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksPacketHandler(ciphers), nil
dialer := service.MakeTargetPacketListener(config.Dialer.Fwmark)
return s.NewShadowsocksPacketHandler(ciphers, dialer), nil
sbruens marked this conversation as resolved.
Show resolved Hide resolved
}

type listenerSet struct {
Expand Down Expand Up @@ -243,7 +249,8 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
ciphers := service.NewCipherList()
ciphers.Update(cipherList)

sh := s.NewShadowsocksStreamHandler(ciphers)
tcpDialer := service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, 0)
sbruens marked this conversation as resolved.
Show resolved Hide resolved
sh := s.NewShadowsocksStreamHandler(ciphers, tcpDialer)
ln, err := lnSet.ListenStream(addr)
if err != nil {
return err
Expand All @@ -259,7 +266,8 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
ph := s.NewShadowsocksPacketHandler(ciphers)
udpDialer := service.MakeTargetPacketListener(0)
sbruens marked this conversation as resolved.
Show resolved Hide resolved
ph := s.NewShadowsocksPacketHandler(ciphers, udpDialer)
go ph.Handle(pc)
}

Expand All @@ -275,7 +283,12 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String())
slog.Info("TCP service started.", "address", ln.Addr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
if sh == nil {
sh, err = s.NewShadowsocksStreamHandlerFromConfig(serviceConfig)
if err != nil {
Expand All @@ -291,7 +304,12 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
slog.Info("UDP service started.", "address", pc.LocalAddr().String(), "fwmark", func() any {
if serviceConfig.Dialer.Fwmark == 0 {
return "disabled"
}
return serviceConfig.Dialer.Fwmark
}())
if ph == nil {
ph, err = s.NewShadowsocksPacketHandlerFromConfig(serviceConfig)
if err != nil {
Expand Down
32 changes: 18 additions & 14 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,21 @@ import (

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
)

const maxUDPPacketSize = 64 * 1024

var udpDefaultDialer = service.MakeTargetPacketListener(0)

var tcpDefaultDialer = service.MakeValidatingTCPStreamDialer(onet.RequirePublicIP, 0)

func init() {
logging.SetLevel(logging.INFO, "")
}
Expand Down Expand Up @@ -132,8 +138,7 @@ func TestTCPEcho(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -183,8 +188,7 @@ func TestTCPEcho(t *testing.T) {
echoRunning.Wait()
}

type fakeShadowsocksMetrics struct {
}
type fakeShadowsocksMetrics struct{}

var _ service.ShadowsocksConnMetrics = (*fakeShadowsocksMetrics)(nil)

Expand Down Expand Up @@ -212,7 +216,7 @@ func TestRestrictedAddresses(t *testing.T) {
const testTimeout = 200 * time.Millisecond
testMetrics := &statusMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler := service.NewStreamHandler(authFunc, testTimeout, tcpDefaultDialer)
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -275,9 +279,11 @@ var _ service.UDPConnMetrics = (*fakeUDPConnMetrics)(nil)
func (m *fakeUDPConnMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
m.up = append(m.up, udpRecord{m.clientAddr, m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}

func (m *fakeUDPConnMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
m.down = append(m.down, udpRecord{m.clientAddr, m.accessKey, status, targetProxyBytes, proxyClientBytes})
}

func (m *fakeUDPConnMetrics) RemoveNatEntry() {
// Not tested because it requires waiting for a long timeout.
}
Expand Down Expand Up @@ -311,7 +317,7 @@ func TestUDPEcho(t *testing.T) {
t.Fatal(err)
}
testMetrics := &fakeUDPMetrics{}
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, testMetrics, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -401,8 +407,7 @@ func BenchmarkTCPThroughput(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -468,8 +473,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) {
const testTimeout = 200 * time.Millisecond
testMetrics := &service.NoOpTCPConnMetrics{}
authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{})
handler := service.NewStreamHandler(authFunc, testTimeout)
handler.SetTargetDialer(&transport.TCPDialer{})
handler := service.NewStreamHandler(authFunc, testTimeout, &transport.TCPDialer{})
done := make(chan struct{})
go func() {
service.StreamServe(
Expand Down Expand Up @@ -544,7 +548,7 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down Expand Up @@ -588,7 +592,7 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &service.NoOpUDPMetrics{}, &fakeShadowsocksMetrics{}, udpDefaultDialer)
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
Expand Down
30 changes: 30 additions & 0 deletions service/socketopts_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2018 Jigsaw Operations LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux

package service

import (
"os"
"syscall"
)

func SetFwmark(fd uintptr, fwmark uint) error {
sbruens marked this conversation as resolved.
Show resolved Hide resolved
err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, int(fwmark))
if err != nil {
os.NewSyscallError("failed to set fwmark for socket", err)
}
return nil
}
19 changes: 6 additions & 13 deletions service/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"net"
"net/netip"
"sync"
"syscall"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/shadowsocks/go-shadowsocks2/socks"

onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/Jigsaw-Code/outline-ss-server/service/metrics"
"github.com/shadowsocks/go-shadowsocks2/socks"
)

// TCPConnMetrics is used to report metrics on TCP connections.
Expand Down Expand Up @@ -158,23 +158,14 @@ type streamHandler struct {
}

// NewStreamHandler creates a StreamHandler
func NewStreamHandler(authenticate StreamAuthenticateFunc, timeout time.Duration) StreamHandler {
func NewStreamHandler(authenticate StreamAuthenticateFunc, timeout time.Duration, dialer transport.StreamDialer) StreamHandler {
return &streamHandler{
readTimeout: timeout,
authenticate: authenticate,
dialer: defaultDialer,
dialer: dialer,
}
}

var defaultDialer = makeValidatingTCPStreamDialer(onet.RequirePublicIP)

func makeValidatingTCPStreamDialer(targetIPValidator onet.TargetIPValidator) transport.StreamDialer {
return &transport.TCPDialer{Dialer: net.Dialer{Control: func(network, address string, c syscall.RawConn) error {
ip, _, _ := net.SplitHostPort(address)
return targetIPValidator(net.ParseIP(ip))
}}}
}

// StreamHandler is a handler that handles stream connections.
type StreamHandler interface {
Handle(ctx context.Context, conn transport.StreamConn, connMetrics TCPConnMetrics)
Expand Down Expand Up @@ -377,6 +368,8 @@ type NoOpTCPConnMetrics struct{}
var _ TCPConnMetrics = (*NoOpTCPConnMetrics)(nil)

func (m *NoOpTCPConnMetrics) AddAuthenticated(accessKey string) {}

func (m *NoOpTCPConnMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) {
}

func (m *NoOpTCPConnMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {}
Loading