Skip to content

Commit

Permalink
Move web server config into its own top-level config structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Dec 10, 2024
1 parent 6eb828e commit 831371a
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 106 deletions.
92 changes: 47 additions & 45 deletions cmd/outline-ss-server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,59 +22,72 @@ import (
)

type ServiceConfig struct {
Listeners []ListenerConfig
Keys []KeyConfig
Listeners []ListenerConfig `yaml:"listeners"`
Keys []KeyConfig `yaml:"keys"`
}

type ListenerType string

const (
listenerTypeTCP ListenerType = "tcp"
listenerTypeUDP ListenerType = "udp"
listenerTypeWebSocket ListenerType = "websocket"
listenerTypeTCP ListenerType = "tcp"
listenerTypeUDP ListenerType = "udp"
listenerTypeWebsocketStream ListenerType = "websocket-stream"
listenerTypeWebsocketPacket ListenerType = "websocket-packet"
)

type ConnectionType string

const (
connectionTypeStream ConnectionType = "stream"
connectionTypePacket ConnectionType = "packet"
)

type ConfigOption struct {
Path string `yaml:"path"`
ConnectionType ConnectionType `yaml:"connection_type"`
type WebServerConfig struct {
ID string `yaml:"id"`
Listeners []string `yaml:"listen"`
}

type ListenerConfig struct {
Type ListenerType
Address string

// WebSocket config options
Options []ConfigOption `yaml:"options,omitempty"`
Type ListenerType `yaml:"type"`
Address string `yaml:"address,omitempty"`
WebServer string `yaml:"web_server,omitempty"`
Path string `yaml:"path,omitempty"`
}

type KeyConfig struct {
ID string
Cipher string
Secret string
ID string `yaml:"id"`
Cipher string `yaml:"cipher"`
Secret string `yaml:"secret"`
}

type LegacyKeyServiceConfig struct {
KeyConfig `yaml:",inline"`
Port int
Port int `yaml:"port"`
}

type Config struct {
Services []ServiceConfig
Web struct {
Servers []WebServerConfig `yaml:"servers"`
} `yaml:"web"`
Services []ServiceConfig `yaml:"services"`

// Deprecated: `keys` exists for backward compatibility. Prefer to configure
// using the newer `services` format.
Keys []LegacyKeyServiceConfig
Keys []LegacyKeyServiceConfig `yaml:"keys"`
}

// Validate checks that the config is valid.
func (c *Config) Validate() error {
existingWebServers := make(map[string]bool)
for _, srv := range c.Web.Servers {
if srv.ID == "" {
return fmt.Errorf("web server must have an ID")
}
if _, exists := existingWebServers[srv.ID]; exists {
return fmt.Errorf("web server with ID `%s` already exists", srv.ID)
}
existingWebServers[srv.ID] = true

for _, addr := range srv.Listeners {
if err := validateAddress(addr); err != nil {
return fmt.Errorf("invalid listener for web server `%s`: %w", srv.ID, err)
}
}
}

existingListeners := make(map[string]bool)
for _, serviceConfig := range c.Services {
for _, lnConfig := range serviceConfig.Listeners {
Expand All @@ -88,28 +101,17 @@ func (c *Config) Validate() error {
if _, exists := existingListeners[key]; exists {
return fmt.Errorf("listener of type `%s` with address `%s` already exists.", lnConfig.Type, lnConfig.Address)
}

case listenerTypeWebSocket:
if err := validateAddress(lnConfig.Address); err != nil {
return err
case listenerTypeWebsocketStream, listenerTypeWebsocketPacket:
if lnConfig.WebServer == "" {
return fmt.Errorf("listener type `%s` requires an http server reference", lnConfig.Type)
}
if len(lnConfig.Options) == 0 {
return fmt.Errorf("listener type `%s` requires at least one option", lnConfig.Type)
if _, exists := existingWebServers[lnConfig.WebServer]; !exists {
return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer)
}
for _, option := range lnConfig.Options {
if option.Path == "" {
return fmt.Errorf("listener type `%s` requires a `path` for each option", lnConfig.Type)
}
if option.ConnectionType != connectionTypeStream && option.ConnectionType != connectionTypePacket {
return fmt.Errorf("unsupported connection type: %s", option.ConnectionType)
}
key = fmt.Sprintf("%s/%s/%s", lnConfig.Type, lnConfig.Address, option.Path)
if _, exists := existingListeners[key]; exists {
return fmt.Errorf("listener of type `%s` with address `%s` and path `%s` already exists.", lnConfig.Type, lnConfig.Address, option.Path)
}
existingListeners[key] = true
key = fmt.Sprintf("%s/%s", lnConfig.Type, lnConfig.WebServer)
if _, exists := existingListeners[key]; exists {
return fmt.Errorf("listener of type `%s` with http server `%s` already exists.", lnConfig.Type, lnConfig.WebServer)
}

default:
return fmt.Errorf("unsupported listener type: %s", lnConfig.Type)
}
Expand Down
21 changes: 12 additions & 9 deletions cmd/outline-ss-server/config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

web:
servers:
- id: my_web_server
listen:
- "[::]:8000"

services:
- listeners:
# TODO(sbruens): Allow a string-based listener config, as a convenient short-form
Expand All @@ -20,15 +26,12 @@ services:
address: "[::]:9000"
- type: udp
address: "[::]:9000"
- type: websocket
address: "[::]:8000"
options:
# TCP over WebSocket
- path: "/tcp"
connection_type: "stream"
# UDP over WebSocket
- path: "/udp"
connection_type: "packet"
- type: websocket-stream
web_server: my_web_server
path: "/tcp"
- type: websocket-packet
web_server: my_web_server
path: "/udp"
keys:
- id: user-0
cipher: chacha20-ietf-poly1305
Expand Down
125 changes: 73 additions & 52 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"container/list"
"context"
"errors"
"flag"
"fmt"
"log/slog"
Expand Down Expand Up @@ -60,13 +61,13 @@ func init() {
)
}

type WebSocketStreamListener struct {
type HTTPStreamListener struct {
service.StreamListener
}

var _ net.Listener = (*WebSocketStreamListener)(nil)
var _ net.Listener = (*HTTPStreamListener)(nil)

func (t *WebSocketStreamListener) Accept() (net.Conn, error) {
func (t *HTTPStreamListener) Accept() (net.Conn, error) {
return t.StreamListener.AcceptStream()
}

Expand Down Expand Up @@ -194,21 +195,49 @@ func (ls *listenerSet) Len() int {
return len(ls.listenerCloseFuncs)
}

type connWithDone struct {
net.Conn
doneCh chan struct{}
}

func (s *OutlineServer) runConfig(config Config) (func() error, error) {
startErrCh := make(chan error)
stopErrCh := make(chan error)
stopCh := make(chan struct{})

go func() {
lnSet := &listenerSet{
manager: s.lnManager,
manager: s.lnManager,
listenerCloseFuncs: make(map[string]func() error),
}
defer func() {
stopErrCh <- lnSet.Close()
}()

startErrCh <- func() error {
// Start configured web servers.
webServers := make(map[string]*http.ServeMux)
for _, srvConfig := range config.Web.Servers {
mux := http.NewServeMux()
for _, addr := range srvConfig.Listeners {
server := &http.Server{Addr: addr, Handler: mux}
ln, err := lnSet.ListenStream(addr)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", addr, err)
}
go func() {
defer server.Shutdown(context.Background())
err := server.Serve(&HTTPStreamListener{ln})
if err != nil && err != http.ErrServerClosed && !isErrClosing(err) {
slog.Error("Failed to run web server.", "err", err, "ID", srvConfig.ID)
}
}()
slog.Info("Web server started.", "ID", srvConfig.ID, "address", addr)
}
webServers[srvConfig.ID] = mux
}

// Start legacy services.
totalCipherCount := len(config.Keys)
portCiphers := make(map[int]*list.List) // Values are *List of *CipherEntry.
for _, keyConfig := range config.Keys {
Expand Down Expand Up @@ -254,7 +283,8 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
}


// Start services with listeners.
for _, serviceConfig := range config.Services {
ciphers, err := newCipherListFromConfig(serviceConfig)
if err != nil {
Expand Down Expand Up @@ -286,57 +316,48 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
case listenerTypeWebSocket:
ln, err := lnSet.ListenStream(lnConfig.Address)
if err != nil {
return err
case listenerTypeWebsocketStream:
if _, exists := webServers[lnConfig.WebServer]; !exists {
return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer)
}
mux := http.NewServeMux()
for _, option := range lnConfig.Options {
switch option.ConnectionType {
case connectionTypeStream:
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr)
if err != nil {
slog.Error("failed to upgrade", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}}
ssService.HandleStream(ctx, conn)
}
websocket.Handler(handler).ServeHTTP(w, r)
})
mux.Handle(option.Path, http.StripPrefix(option.Path, handler))
case connectionTypePacket:
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr)
if err != nil {
slog.Error("failed to upgrade", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &wrappedConn{Conn: wsConn, raddr: raddr}
ssService.HandleAssociation(conn)
}
websocket.Handler(handler).ServeHTTP(w, r)
})
mux.Handle(option.Path, http.StripPrefix(option.Path, handler))
mux := webServers[lnConfig.WebServer]
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
ctx, contextCancel := context.WithCancel(context.Background())
defer contextCancel()
raddr, err := net.ResolveTCPAddr("tcp", r.RemoteAddr)
if err != nil {
slog.Error("failed to upgrade", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &streamConn{&wrappedConn{Conn: wsConn, raddr: raddr}}
ssService.HandleStream(ctx, conn)
}
slog.Info("WebSocket service started.", "address", ln.Addr().String(), "path", option.Path)
websocket.Handler(handler).ServeHTTP(w, r)
})
mux.Handle(lnConfig.Path, http.StripPrefix(lnConfig.Path, handler))
case listenerTypeWebsocketPacket:
if _, exists := webServers[lnConfig.WebServer]; !exists {
return fmt.Errorf("listener type `%s` references unknown web server `%s`", lnConfig.Type, lnConfig.WebServer)
}
server := http.Server{Handler: mux}
go func() {
defer server.Shutdown(context.Background())
err := server.Serve(&WebSocketStreamListener{ln})
if err != nil && err != http.ErrServerClosed && !isErrClosing(err) {
slog.Error("Failed to run HTTP server.", "err", err)
mux := webServers[lnConfig.WebServer]
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
handler := func(wsConn *websocket.Conn) {
raddr, err := net.ResolveUDPAddr("udp", r.RemoteAddr)
if err != nil {
slog.Error("failed to upgrade", "err", err)
w.WriteHeader(http.StatusBadGateway)
return
}
conn := &wrappedConn{Conn: wsConn, raddr: raddr}
ssService.HandleAssociation(conn)
}
}()
websocket.Handler(handler).ServeHTTP(w, r)
})
mux.Handle(lnConfig.Path, http.StripPrefix(lnConfig.Path, handler))
default:
return errors.New("unsupported listener configuration")
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down

0 comments on commit 831371a

Please sign in to comment.