From 2aa7812cb1fb76100eb3ffe5f256983b14b629b8 Mon Sep 17 00:00:00 2001 From: naison <895703375@qq.com> Date: Fri, 15 Nov 2024 20:56:10 +0800 Subject: [PATCH] feat: use gvisor parse network packet in pod (#369) --- cmd/kubevpn/cmds/clone.go | 6 +- cmd/kubevpn/cmds/connect.go | 6 +- cmd/kubevpn/cmds/controlplane.go | 2 +- cmd/kubevpn/cmds/dev.go | 6 +- cmd/kubevpn/cmds/proxy.go | 6 +- cmd/kubevpn/cmds/serve.go | 2 +- cmd/kubevpn/cmds/syncthing.go | 2 +- cmd/kubevpn/cmds/webhook.go | 2 +- pkg/config/config.go | 3 +- pkg/core/gvisoricmpforwarder.go | 27 +++++ pkg/core/gvisorstack.go | 17 +-- pkg/core/gvisortcpforwarder.go | 31 ++--- pkg/core/gvisortcphandler.go | 96 +++++---------- pkg/core/gvisortunendpoint.go | 199 ++++++++++++++++--------------- pkg/core/gvisorudpforwarder.go | 34 ++---- pkg/core/gvisorudphandler.go | 33 +---- pkg/core/tcphandler.go | 8 +- pkg/core/tunhandler.go | 179 ++++++++++----------------- pkg/core/tunhandlerclient.go | 39 +++++- pkg/dev/options.go | 4 +- pkg/handler/clone.go | 2 +- pkg/handler/connect.go | 9 +- pkg/util/net.go | 39 ++++++ pkg/util/util.go | 4 + 24 files changed, 352 insertions(+), 404 deletions(-) create mode 100644 pkg/core/gvisoricmpforwarder.go diff --git a/cmd/kubevpn/cmds/clone.go b/cmd/kubevpn/cmds/clone.go index 0b46ade95..73b3019e5 100644 --- a/cmd/kubevpn/cmds/clone.go +++ b/cmd/kubevpn/cmds/clone.go @@ -74,10 +74,6 @@ func CmdClone(f cmdutil.Factory) *cobra.Command { kubevpn clone service/productpage --ssh-addr --ssh-username --gssapi-password `)), PreRunE: func(cmd *cobra.Command, args []string) (err error) { - // not support temporally - if options.Engine == config.EngineGvisor { - return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) - } util.InitLoggerForClient(false) // startup daemon process and sudo process return daemon.StartupDaemon(cmd.Context()) @@ -164,7 +160,7 @@ func CmdClone(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false") cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container") cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image) - cmd.Flags().StringVar((*string)(&options.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw)) + cmd.Flags().StringVar((*string)(&options.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem)) cmd.Flags().StringVar(&options.TargetImage, "target-image", "", "Clone container use this image to startup container, if not special, use origin image") cmd.Flags().StringVar(&options.TargetContainer, "target-container", "", "Clone container use special image to startup this container, if not special, use origin image") diff --git a/cmd/kubevpn/cmds/connect.go b/cmd/kubevpn/cmds/connect.go index f443a1907..663b51c7c 100644 --- a/cmd/kubevpn/cmds/connect.go +++ b/cmd/kubevpn/cmds/connect.go @@ -68,10 +68,6 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { if err != nil { return err } - // not support temporally - if connect.Engine == config.EngineGvisor { - return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) - } return nil }, RunE: func(cmd *cobra.Command, args []string) error { @@ -166,7 +162,7 @@ func CmdConnect(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVar(&config.Debug, "debug", false, "enable debug mode or not, true or false") cmd.Flags().StringVar(&config.Image, "image", config.Image, "use this image to startup container") cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image) - cmd.Flags().StringVar((*string)(&connect.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw)) + cmd.Flags().StringVar((*string)(&connect.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem)) cmd.Flags().BoolVar(&foreground, "foreground", false, "Hang up") cmd.Flags().BoolVar(&lite, "lite", false, "connect to multiple cluster in lite mode. mode \"lite\": design for only connecting to multiple cluster network. mode \"full\": not only connect to cluster network, it also supports proxy workloads inbound traffic to local PC.") diff --git a/cmd/kubevpn/cmds/controlplane.go b/cmd/kubevpn/cmds/controlplane.go index f7c9ad753..cc7adfde1 100644 --- a/cmd/kubevpn/cmds/controlplane.go +++ b/cmd/kubevpn/cmds/controlplane.go @@ -32,7 +32,7 @@ func CmdControlPlane(_ cmdutil.Factory) *cobra.Command { `)), RunE: func(cmd *cobra.Command, args []string) error { util.InitLoggerForServer(config.Debug) - go util.StartupPProf(0) + go util.StartupPProfForServer(0) go func(ctx context.Context) { conf, err := miekgdns.ClientConfigFromFile(resolvconf.Path()) if err != nil { diff --git a/cmd/kubevpn/cmds/dev.go b/cmd/kubevpn/cmds/dev.go index 16cb61c6a..a06de1392 100644 --- a/cmd/kubevpn/cmds/dev.go +++ b/cmd/kubevpn/cmds/dev.go @@ -88,10 +88,6 @@ func CmdDev(f cmdutil.Factory) *cobra.Command { return err } util.InitLoggerForClient(config.Debug) - // not support temporally - if options.Engine == config.EngineGvisor { - return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) - } if p := options.RunOptions.Platform; p != "" { if _, err = platforms.Parse(p); err != nil { @@ -144,7 +140,7 @@ func CmdDev(f cmdutil.Factory) *cobra.Command { cmdutil.CheckErr(cmd.RegisterFlagCompletionFunc("container", completion.ContainerCompletionFunc(f))) cmd.Flags().StringVar((*string)(&options.ConnectMode), "connect-mode", string(dev.ConnectModeHost), "Connect to kubernetes network in container or in host, eg: ["+string(dev.ConnectModeContainer)+"|"+string(dev.ConnectModeHost)+"]") cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image) - cmd.Flags().StringVar((*string)(&options.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw)) + cmd.Flags().StringVar((*string)(&options.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem)) // diy docker options cmd.Flags().StringVar(&options.DevImage, "dev-image", "", "Use to startup docker container, Default is pod image") diff --git a/cmd/kubevpn/cmds/proxy.go b/cmd/kubevpn/cmds/proxy.go index 71931edd1..53d7195c5 100644 --- a/cmd/kubevpn/cmds/proxy.go +++ b/cmd/kubevpn/cmds/proxy.go @@ -91,10 +91,6 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { if err = daemon.StartupDaemon(cmd.Context()); err != nil { return err } - // not support temporally - if connect.Engine == config.EngineGvisor { - return fmt.Errorf(`not support type engine: %s, support ("%s"|"%s")`, config.EngineGvisor, config.EngineMix, config.EngineRaw) - } return err }, RunE: func(cmd *cobra.Command, args []string) error { @@ -186,7 +182,7 @@ func CmdProxy(f cmdutil.Factory) *cobra.Command { cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false") cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container") cmd.Flags().BoolVar(&transferImage, "transfer-image", false, "transfer image to remote registry, it will transfer image "+config.OriginImage+" to flags `--image` special image, default: "+config.Image) - cmd.Flags().StringVar((*string)(&connect.Engine), "engine", string(config.EngineRaw), fmt.Sprintf(`transport engine ("%s"|"%s") %s: use gvisor and raw both (both performance and stable), %s: use raw mode (best stable)`, config.EngineMix, config.EngineRaw, config.EngineMix, config.EngineRaw)) + cmd.Flags().StringVar((*string)(&connect.Engine), "netstack", string(config.EngineSystem), fmt.Sprintf(`network stack ("%s"|"%s") %s: use gvisor (both performance and stable), %s: use raw mode (best stable)`, config.EngineGvisor, config.EngineSystem, config.EngineGvisor, config.EngineSystem)) cmd.Flags().BoolVar(&foreground, "foreground", false, "foreground hang up") handler.AddExtraRoute(cmd.Flags(), extraRoute) diff --git a/cmd/kubevpn/cmds/serve.go b/cmd/kubevpn/cmds/serve.go index df7bb6a41..2939532c2 100644 --- a/cmd/kubevpn/cmds/serve.go +++ b/cmd/kubevpn/cmds/serve.go @@ -34,7 +34,7 @@ func CmdServe(_ cmdutil.Factory) *cobra.Command { PreRun: func(*cobra.Command, []string) { util.InitLoggerForServer(config.Debug) runtime.GOMAXPROCS(0) - go util.StartupPProf(0) + go util.StartupPProfForServer(6060) }, RunE: func(cmd *cobra.Command, args []string) error { rand.Seed(time.Now().UnixNano()) diff --git a/cmd/kubevpn/cmds/syncthing.go b/cmd/kubevpn/cmds/syncthing.go index b1c0ecfe8..8ed6cd6a3 100644 --- a/cmd/kubevpn/cmds/syncthing.go +++ b/cmd/kubevpn/cmds/syncthing.go @@ -18,7 +18,7 @@ func CmdSyncthing(_ cmdutil.Factory) *cobra.Command { Short: i18n.T("Syncthing"), Long: templates.LongDesc(i18n.T(`Syncthing`)), RunE: func(cmd *cobra.Command, args []string) (err error) { - go util.StartupPProf(0) + go util.StartupPProfForServer(0) return syncthing.StartServer(cmd.Context(), detach, dir) }, Hidden: true, diff --git a/cmd/kubevpn/cmds/webhook.go b/cmd/kubevpn/cmds/webhook.go index bc847dc65..713e40d8d 100644 --- a/cmd/kubevpn/cmds/webhook.go +++ b/cmd/kubevpn/cmds/webhook.go @@ -23,7 +23,7 @@ func CmdWebhook(f cmdutil.Factory) *cobra.Command { Args: cobra.MaximumNArgs(0), PreRun: func(cmd *cobra.Command, args []string) { util.InitLoggerForServer(true) - go util.StartupPProf(0) + go util.StartupPProfForServer(0) }, RunE: func(cmd *cobra.Command, args []string) error { return webhook.Main(f) diff --git a/pkg/config/config.go b/pkg/config/config.go index 1851b8a18..62e994b2a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -169,8 +169,7 @@ type Engine string const ( EngineGvisor Engine = "gvisor" - EngineMix Engine = "mix" - EngineRaw Engine = "raw" + EngineSystem Engine = "system" ) const Slogan = "Now you can access resources in the kubernetes cluster !" diff --git a/pkg/core/gvisoricmpforwarder.go b/pkg/core/gvisoricmpforwarder.go new file mode 100644 index 000000000..87257f6cd --- /dev/null +++ b/pkg/core/gvisoricmpforwarder.go @@ -0,0 +1,27 @@ +package core + +import ( + "context" + + log "github.com/sirupsen/logrus" + "gvisor.dev/gvisor/pkg/tcpip/stack" + + "github.com/wencaiwulue/kubevpn/v2/pkg/util" +) + +func ICMPForwarder(s *stack.Stack, ctx context.Context) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { + return func(id stack.TransportEndpointID, buffer *stack.PacketBuffer) bool { + log.Debugf("[TUN-ICMP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", + id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), + ) + ctx1, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + ok, err := util.PingOnce(ctx1, id.RemoteAddress.String(), id.LocalAddress.String()) + if err != nil { + log.Debugf("[TUN-ICMP] Failed to ping dst %s from src %s", + id.LocalAddress.String(), id.RemoteAddress.String(), + ) + } + return ok + } +} diff --git a/pkg/core/gvisorstack.go b/pkg/core/gvisorstack.go index a73467634..e2d25eddf 100755 --- a/pkg/core/gvisorstack.go +++ b/pkg/core/gvisorstack.go @@ -16,6 +16,7 @@ import ( ) func NewStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack { + nicID := tcpip.NICID(1) s := stack.New(stack.Options{ NetworkProtocols: []stack.NetworkProtocolFactory{ ipv4.NewProtocol, @@ -33,26 +34,28 @@ func NewStack(ctx context.Context, tun stack.LinkEndpoint) *stack.Stack { RawFactory: raw.EndpointFactory{}, }) // set handler for TCP UDP ICMP - s.SetTransportProtocolHandler(tcp.ProtocolNumber, TCPForwarder(s)) - s.SetTransportProtocolHandler(udp.ProtocolNumber, UDPForwarder(s)) + s.SetTransportProtocolHandler(tcp.ProtocolNumber, TCPForwarder(s, ctx)) + s.SetTransportProtocolHandler(udp.ProtocolNumber, UDPForwarder(s, ctx)) + s.SetTransportProtocolHandler(header.ICMPv4ProtocolNumber, ICMPForwarder(s, ctx)) + s.SetTransportProtocolHandler(header.ICMPv6ProtocolNumber, ICMPForwarder(s, ctx)) s.SetRouteTable([]tcpip.Route{ { Destination: header.IPv4EmptySubnet, - NIC: 1, + NIC: nicID, }, { Destination: header.IPv6EmptySubnet, - NIC: 1, + NIC: nicID, }, }) - s.CreateNICWithOptions(1, packetsocket.New(tun), stack.NICOptions{ + s.CreateNICWithOptions(nicID, packetsocket.New(tun), stack.NICOptions{ Disabled: false, Context: ctx, }) - s.SetPromiscuousMode(1, true) - s.SetSpoofing(1, true) + s.SetPromiscuousMode(nicID, true) + s.SetSpoofing(nicID, true) // Enable SACK Recovery. { diff --git a/pkg/core/gvisortcpforwarder.go b/pkg/core/gvisortcpforwarder.go index f5a1c1983..805d2bbd7 100644 --- a/pkg/core/gvisortcpforwarder.go +++ b/pkg/core/gvisortcpforwarder.go @@ -5,8 +5,10 @@ import ( "context" "encoding/binary" "errors" + "fmt" "io" "net" + "time" log "github.com/sirupsen/logrus" "gvisor.dev/gvisor/pkg/tcpip" @@ -18,10 +20,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -var GvisorTCPForwardAddr string - -func TCPForwarder(s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { - GvisorTCPForwardAddr := GvisorTCPForwardAddr +func TCPForwarder(s *stack.Stack, ctx context.Context) func(stack.TransportEndpointID, *stack.PacketBuffer) bool { return tcp.NewForwarder(s, 0, 100000, func(request *tcp.ForwarderRequest) { defer request.Complete(false) id := request.ID() @@ -29,24 +28,14 @@ func TCPForwarder(s *stack.Stack) func(stack.TransportEndpointID, *stack.PacketB id.LocalPort, id.LocalAddress.String(), id.RemotePort, id.RemoteAddress.String(), ) - node, err := ParseNode(GvisorTCPForwardAddr) - if err != nil { - log.Errorf("[TUN-TCP] Failed to parse gvisor tcp forward addr %s: %v", GvisorTCPForwardAddr, err) - return - } - node.Client = &Client{ - Connector: GvisorTCPTunnelConnector(), - Transporter: TCPTransporter(), - } - forwardChain := NewChain(5, node) - - remote, err := forwardChain.dial(context.Background()) + // 2, dial proxy + host := id.LocalAddress.String() + port := fmt.Sprintf("%d", id.LocalPort) + var remote net.Conn + var d = net.Dialer{Timeout: time.Second * 5} + remote, err := d.DialContext(ctx, "tcp", net.JoinHostPort(host, port)) if err != nil { - log.Debugf("[TUN-TCP] Failed to dial remote conn: %v", err) - return - } - if err = WriteProxyInfo(remote, id); err != nil { - log.Debugf("[TUN-TCP] Failed to write proxy info: %v", err) + log.Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err) return } diff --git a/pkg/core/gvisortcphandler.go b/pkg/core/gvisortcphandler.go index 1e8c2b36e..60c94aadb 100644 --- a/pkg/core/gvisortcphandler.go +++ b/pkg/core/gvisortcphandler.go @@ -2,94 +2,62 @@ package core import ( "context" - "errors" - "fmt" - "io" "net" - "time" + "sync" log "github.com/sirupsen/logrus" + "gvisor.dev/gvisor/pkg/tcpip" + "gvisor.dev/gvisor/pkg/tcpip/link/channel" + "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" "github.com/wencaiwulue/kubevpn/v2/pkg/config" + "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -type gvisorTCPTunnelConnector struct { +type gvisorTCPHandler struct { + // map[srcIP]net.Conn + routeMapTCP *sync.Map + packetChan chan *datagramPacket } -func GvisorTCPTunnelConnector() Connector { - return &gvisorTCPTunnelConnector{} -} - -func (c *gvisorTCPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) { - switch con := conn.(type) { - case *net.TCPConn: - err := con.SetNoDelay(true) - if err != nil { - return nil, err - } - err = con.SetKeepAlive(true) - if err != nil { - return nil, err - } - err = con.SetKeepAlivePeriod(15 * time.Second) - if err != nil { - return nil, err - } - } - return conn, nil -} - -type gvisorTCPHandler struct{} - func GvisorTCPHandler() Handler { - return &gvisorTCPHandler{} + return &gvisorTCPHandler{ + routeMapTCP: RouteMapTCP, + packetChan: TCPPacketChan, + } } func (h *gvisorTCPHandler) Handle(ctx context.Context, tcpConn net.Conn) { defer tcpConn.Close() - log.Debugf("[TUN-TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) - // 1, get proxy info - endpointID, err := ParseProxyInfo(tcpConn) - if err != nil { - log.Errorf("[TUN-TCP] Failed to parse proxy info: %v", err) - return - } - log.Debugf("[TUN-TCP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", - endpointID.LocalPort, endpointID.LocalAddress.String(), endpointID.RemotePort, endpointID.RemoteAddress.String(), - ) - // 2, dial proxy - host := endpointID.LocalAddress.String() - port := fmt.Sprintf("%d", endpointID.LocalPort) - var remote net.Conn - remote, err = net.DialTimeout("tcp", net.JoinHostPort(host, port), time.Second*5) - if err != nil { - log.Errorf("[TUN-TCP] Failed to connect addr %s: %v", net.JoinHostPort(host, port), err) - return - } + cancel, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + log.Debugf("[TCP] %s -> %s", tcpConn.RemoteAddr(), tcpConn.LocalAddr()) + h.handle(cancel, tcpConn) +} +func (h *gvisorTCPHandler) handle(ctx context.Context, tcpConn net.Conn) { + endpoint := channel.New(tcp.DefaultReceiveBufferSize, uint32(config.DefaultMTU), tcpip.GetRandMacAddr()) errChan := make(chan error, 2) go func() { - i := config.LPool.Get().([]byte)[:] - defer config.LPool.Put(i[:]) - written, err2 := io.CopyBuffer(remote, tcpConn, i) - log.Debugf("[TUN-TCP] Write length %d data to remote", written) - errChan <- err2 + h.readFromTCPConnWriteToEndpoint(ctx, tcpConn, endpoint) + util.SafeClose(errChan) }() go func() { - i := config.LPool.Get().([]byte)[:] - defer config.LPool.Put(i[:]) - written, err2 := io.CopyBuffer(tcpConn, remote, i) - log.Debugf("[TUN-TCP] Read length %d data from remote", written) - errChan <- err2 + h.readFromEndpointWriteToTCPConn(ctx, tcpConn, endpoint) + util.SafeClose(errChan) }() - err = <-errChan - if err != nil && !errors.Is(err, io.EOF) { - log.Debugf("[TUN-TCP] Disconnect: %s >-<: %s: %v", tcpConn.LocalAddr(), remote.RemoteAddr(), err) + stack := NewStack(ctx, endpoint) + defer stack.Destroy() + select { + case <-errChan: + return + case <-ctx.Done(): + return } } func GvisorTCPListener(addr string) (net.Listener, error) { - log.Debugf("Gvisor tcp listen addr %s", addr) + log.Debugf("Gvisor TCP listening addr: %s", addr) laddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err diff --git a/pkg/core/gvisortunendpoint.go b/pkg/core/gvisortunendpoint.go index b68e2d9be..bc309b79d 100755 --- a/pkg/core/gvisortunendpoint.go +++ b/pkg/core/gvisortunendpoint.go @@ -3,6 +3,7 @@ package core import ( "context" "errors" + "io" "net" "os" @@ -15,123 +16,127 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/link/channel" "gvisor.dev/gvisor/pkg/tcpip/stack" - "gvisor.dev/gvisor/pkg/tcpip/transport/tcp" "github.com/wencaiwulue/kubevpn/v2/pkg/config" "github.com/wencaiwulue/kubevpn/v2/pkg/util" ) -func NewTunEndpoint(ctx context.Context, tun net.Conn, mtu uint32, engine config.Engine, in chan<- *DataElem, out chan *DataElem) stack.LinkEndpoint { - addr, _ := tcpip.ParseMACAddress("02:03:03:04:05:06") - endpoint := channel.New(tcp.DefaultReceiveBufferSize, mtu, addr) - - go func() { - for { - select { - case <-ctx.Done(): - return - default: - } - read := endpoint.ReadContext(ctx) - if read != nil { - bb := read.ToView().AsSlice() - i := config.LPool.Get().([]byte)[:] - n := copy(i, bb) - bb = nil - util.SafeWrite(out, NewDataElem(i[:], n, nil, nil)) - } +func (h *gvisorTCPHandler) readFromEndpointWriteToTCPConn(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) { + tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn) + for { + select { + case <-ctx.Done(): + return + default: } - }() - // tun --> dispatcher - go func() { - // full(all use gvisor), mix(cluster network use gvisor), raw(not use gvisor) - for { - bytes := config.LPool.Get().([]byte)[:] - read, err := tun.Read(bytes[:]) + + pktBuffer := endpoint.ReadContext(ctx) + if pktBuffer != nil { + buf := pktBuffer.ToView().AsSlice() + _, err := tcpConn.Write(buf) if err != nil { - if errors.Is(err, os.ErrClosed) { + if errors.Is(err, os.ErrClosed) || errors.Is(err, io.EOF) { return } // if context is done if ctx.Err() != nil { - log.Errorf("[TUN] Failed to read from tun: %v, context is done", err) + log.Errorf("[TUN] Failed to write to tun: %v, context is done: %v", err, ctx.Err()) return } - log.Errorf("[TUN] Failed to read from tun: %v", err) - continue + log.Errorf("[TUN] Failed to write data to tun device: %v", err) } - if read == 0 { - log.Warnf("[TUN] Read from tun length is %d", read) - continue + } + } +} + +// tun --> dispatcher +func (h *gvisorTCPHandler) readFromTCPConnWriteToEndpoint(ctx context.Context, conn net.Conn, endpoint *channel.Endpoint) { + tcpConn, _ := newGvisorFakeUDPTunnelConnOverTCP(ctx, conn) + for { + bytes := config.LPool.Get().([]byte)[:] + read, err := tcpConn.Read(bytes[:]) + if err != nil { + if errors.Is(err, os.ErrClosed) || errors.Is(err, io.EOF) { + return } - // Try to determine network protocol number, default zero. - var protocol tcpip.NetworkProtocolNumber - var ipProtocol int - var src, dst net.IP - // TUN interface with IFF_NO_PI enabled, thus - // we need to determine protocol from version field - version := bytes[0] >> 4 - if version == 4 { - protocol = header.IPv4ProtocolNumber - ipHeader, err := ipv4.ParseHeader(bytes[:read]) - if err != nil { - log.Errorf("Failed to parse IPv4 header: %v", err) - continue - } - ipProtocol = ipHeader.Protocol - src = ipHeader.Src - dst = ipHeader.Dst - } else if version == 6 { - protocol = header.IPv6ProtocolNumber - ipHeader, err := ipv6.ParseHeader(bytes[:read]) - if err != nil { - log.Errorf("Failed to parse IPv6 header: %s", err.Error()) - continue - } - ipProtocol = ipHeader.NextHeader - src = ipHeader.Src - dst = ipHeader.Dst - } else { - log.Debugf("[TUN-GVISOR] Unknown packet version %d", version) - continue + // if context is done + if ctx.Err() != nil { + log.Errorf("[TUN] Failed to read from tun: %v, context is done", err) + return } - // only tcp and udp needs to distinguish transport engine - // gvisor: all network use gvisor - // mix: cluster network use gvisor, diy network use raw - // raw: all network use raw - if (ipProtocol == int(layers.IPProtocolUDP) || ipProtocol == int(layers.IPProtocolUDPLite) || ipProtocol == int(layers.IPProtocolTCP)) && - (engine == config.EngineGvisor || (engine == config.EngineMix && (!config.CIDR.Contains(dst) && !config.CIDR6.Contains(dst)))) { - pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ - ReserveHeaderBytes: 0, - Payload: buffer.MakeWithData(bytes[:read]), - }) - //defer pkt.DecRef() + log.Errorf("[TUN] Failed to read from tcp conn: %v", err) + config.LPool.Put(bytes[:]) + continue + } + if read == 0 { + log.Warnf("[TUN] Read from tcp conn length is %d", read) + config.LPool.Put(bytes[:]) + continue + } + // Try to determine network protocol number, default zero. + var protocol tcpip.NetworkProtocolNumber + var ipProtocol int + var src, dst net.IP + // TUN interface with IFF_NO_PI enabled, thus + // we need to determine protocol from version field + if util.IsIPv4(bytes) { + protocol = header.IPv4ProtocolNumber + ipHeader, err := ipv4.ParseHeader(bytes[:read]) + if err != nil { + log.Errorf("Failed to parse IPv4 header: %v", err) config.LPool.Put(bytes[:]) - endpoint.InjectInbound(protocol, pkt) - log.Tracef("[TUN-%s] IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) - } else { - log.Tracef("[TUN-RAW] IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) - util.SafeWrite(in, NewDataElem(bytes[:], read, src, dst)) + continue } - } - }() - go func() { - for elem := range out { - _, err := tun.Write(elem.Data()[:elem.Length()]) - config.LPool.Put(elem.Data()[:]) + ipProtocol = ipHeader.Protocol + src = ipHeader.Src + dst = ipHeader.Dst + } else if util.IsIPv6(bytes) { + protocol = header.IPv6ProtocolNumber + ipHeader, err := ipv6.ParseHeader(bytes[:read]) if err != nil { - if errors.Is(err, os.ErrClosed) { - return - } - // if context is done - if ctx.Err() != nil { - log.Errorf("[TUN] Failed to write to tun: %v, context is done: %v", err, ctx.Err()) - return - } - log.Errorf("[TUN] Failed to write data to tun device: %v", err) + log.Errorf("Failed to parse IPv6 header: %s", err.Error()) + config.LPool.Put(bytes[:]) continue } + ipProtocol = ipHeader.NextHeader + src = ipHeader.Src + dst = ipHeader.Dst + } else { + log.Debugf("[TUN-GVISOR] Unknown packet") + config.LPool.Put(bytes[:]) + continue + } + + h.addRoute(src, conn) + // inner ip like 223.254.0.100/102/103 connect each other + if config.CIDR.Contains(dst) || config.CIDR6.Contains(dst) { + log.Tracef("[TUN-RAW] Forward to TUN device, SRC: %s, DST: %s, Length: %d", src.String(), dst.String(), read) + util.SafeWrite(h.packetChan, &datagramPacket{ + DataLength: uint16(read), + Data: bytes[:], + }) + continue + } + + pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{ + ReserveHeaderBytes: 0, + Payload: buffer.MakeWithData(bytes[:read]), + }) + //defer pkt.DecRef() + config.LPool.Put(bytes[:]) + endpoint.InjectInbound(protocol, pkt) + log.Tracef("[TUN-%s] Write to Gvisor IP-Protocol: %s, SRC: %s, DST: %s, Length: %d", layers.IPProtocol(ipProtocol).String(), layers.IPProtocol(ipProtocol).String(), src.String(), dst, read) + } +} + +func (h *gvisorTCPHandler) addRoute(src net.IP, tcpConn net.Conn) { + value, loaded := h.routeMapTCP.LoadOrStore(src.String(), tcpConn) + if loaded { + if tcpConn != value.(net.Conn) { + h.routeMapTCP.Store(src.String(), tcpConn) + log.Debugf("[TCP] Replace route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) } - }() - return endpoint + } else { + log.Debugf("[TCP] Add new route map TCP: %s -> %s-%s", src, tcpConn.LocalAddr(), tcpConn.RemoteAddr()) + } } diff --git a/pkg/core/gvisorudpforwarder.go b/pkg/core/gvisorudpforwarder.go index 81d6727c2..597525016 100644 --- a/pkg/core/gvisorudpforwarder.go +++ b/pkg/core/gvisorudpforwarder.go @@ -4,6 +4,7 @@ import ( "context" "errors" "io" + "net" log "github.com/sirupsen/logrus" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" @@ -14,10 +15,7 @@ import ( "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -var GvisorUDPForwardAddr string - -func UDPForwarder(s *stack.Stack) func(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool { - GvisorUDPForwardAddr := GvisorUDPForwardAddr +func UDPForwarder(s *stack.Stack, ctx context.Context) func(id stack.TransportEndpointID, pkt *stack.PacketBuffer) bool { return udp.NewForwarder(s, func(request *udp.ForwarderRequest) { endpointID := request.ID() log.Debugf("[TUN-UDP] LocalPort: %d, LocalAddress: %s, RemotePort: %d, RemoteAddress %s", @@ -30,30 +28,14 @@ func UDPForwarder(s *stack.Stack) func(id stack.TransportEndpointID, pkt *stack. return } - node, err := ParseNode(GvisorUDPForwardAddr) - if err != nil { - log.Debugf("[TUN-UDP] Failed to parse gviosr udp forward addr %s: %v", GvisorUDPForwardAddr, err) - return - } - node.Client = &Client{ - Connector: GvisorUDPOverTCPTunnelConnector(endpointID), - Transporter: TCPTransporter(), - } - forwardChain := NewChain(5, node) - - ctx := context.Background() - c, err := forwardChain.getConn(ctx) - if err != nil { - log.Debugf("[TUN-UDP] Failed to get conn: %v", err) - return - } - if err = WriteProxyInfo(c, endpointID); err != nil { - log.Debugf("[TUN-UDP] Failed to write proxy info: %v", err) - return + // 2, dial proxy + addr := &net.UDPAddr{ + IP: endpointID.LocalAddress.AsSlice(), + Port: int(endpointID.LocalPort), } - remote, err := node.Client.ConnectContext(ctx, c) + remote, err := net.DialUDP("udp", nil, addr) if err != nil { - log.Debugf("[TUN-UDP] Failed to connect: %v", err) + log.Errorf("[TUN-UDP] Failed to connect addr %s: %v", addr.String(), err) return } conn := gonet.NewUDPConn(w, endpoint) diff --git a/pkg/core/gvisorudphandler.go b/pkg/core/gvisorudphandler.go index a24251977..9bc81b93c 100644 --- a/pkg/core/gvisorudphandler.go +++ b/pkg/core/gvisorudphandler.go @@ -7,40 +7,9 @@ import ( "time" log "github.com/sirupsen/logrus" - "gvisor.dev/gvisor/pkg/tcpip/stack" - "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) -type gvisorUDPOverTCPTunnelConnector struct { - Id stack.TransportEndpointID -} - -func GvisorUDPOverTCPTunnelConnector(endpointID stack.TransportEndpointID) Connector { - return &gvisorUDPOverTCPTunnelConnector{ - Id: endpointID, - } -} - -func (c *gvisorUDPOverTCPTunnelConnector) ConnectContext(ctx context.Context, conn net.Conn) (net.Conn, error) { - switch con := conn.(type) { - case *net.TCPConn: - err := con.SetNoDelay(true) - if err != nil { - return nil, err - } - err = con.SetKeepAlive(true) - if err != nil { - return nil, err - } - err = con.SetKeepAlivePeriod(15 * time.Second) - if err != nil { - return nil, err - } - } - return newGvisorFakeUDPTunnelConnOverTCP(ctx, conn) -} - type gvisorUDPHandler struct{} func GvisorUDPHandler() Handler { @@ -116,7 +85,7 @@ func (c *gvisorFakeUDPTunnelConn) Close() error { } func GvisorUDPListener(addr string) (net.Listener, error) { - log.Debugf("Gvisor UDP over TCP listen addr %s", addr) + log.Debugf("Gvisor UDP over TCP listening addr: %s", addr) laddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err diff --git a/pkg/core/tcphandler.go b/pkg/core/tcphandler.go index a59ec56a5..1d36e8c18 100644 --- a/pkg/core/tcphandler.go +++ b/pkg/core/tcphandler.go @@ -86,12 +86,8 @@ func (h *fakeUdpHandler) Handle(ctx context.Context, tcpConn net.Conn) { } var src net.IP - bb := dgram.Data[:dgram.DataLength] - if util.IsIPv4(bb) { - src = net.IPv4(bb[12], bb[13], bb[14], bb[15]) - } else if util.IsIPv6(bb) { - src = bb[8:24] - } else { + src, _, err = util.ParseIP(dgram.Data[:dgram.DataLength]) + if err != nil { log.Errorf("[TCP] Unknown packet") continue } diff --git a/pkg/core/tunhandler.go b/pkg/core/tunhandler.go index 4cd7cdea2..a84434778 100644 --- a/pkg/core/tunhandler.go +++ b/pkg/core/tunhandler.go @@ -105,9 +105,8 @@ func (h *tunHandler) printRoute(ctx context.Context) { type Device struct { tun net.Conn - tunInboundRaw chan *DataElem - tunInbound chan *DataElem - tunOutbound chan *DataElem + tunInbound chan *DataElem + tunOutbound chan *DataElem // your main logic tunInboundHandler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) @@ -120,18 +119,28 @@ func (d *Device) readFromTun() { b := config.LPool.Get().([]byte)[:] n, err := d.tun.Read(b[:]) if err != nil { - select { - case d.chExit <- err: - default: - } + log.Errorf("[TUN] Failed to read from tun: %v", err) + util.SafeWrite(d.chExit, err) return } - if n != 0 { - util.SafeWrite(d.tunInboundRaw, &DataElem{ - data: b[:], - length: n, - }) + if n == 0 { + log.Errorf("[TUN] Read packet length 0") + continue + } + + src, dst, err := util.ParseIP(b[:n]) + if err != nil { + log.Errorf("[TUN] Unknown packet") + continue } + + log.Debugf("[TUN] SRC: %s --> DST: %s, length: %d", src, dst, n) + util.SafeWrite(d.tunInbound, &DataElem{ + data: b[:], + length: n, + src: src, + dst: dst, + }) } } @@ -140,39 +149,9 @@ func (d *Device) writeToTun() { _, err := d.tun.Write(e.data[:e.length]) config.LPool.Put(e.data[:]) if err != nil { - select { - case d.chExit <- err: - default: - } - return - } - } -} - -func (d *Device) parseIPHeader(ctx context.Context) { - for e := range d.tunInboundRaw { - select { - case <-ctx.Done(): + util.SafeWrite(d.chExit, err) return - default: - } - - if util.IsIPv4(e.data[:e.length]) { - // ipv4.ParseHeader - b := e.data[:e.length] - e.src = net.IPv4(b[12], b[13], b[14], b[15]) - e.dst = net.IPv4(b[16], b[17], b[18], b[19]) - } else if util.IsIPv6(e.data[:e.length]) { - // ipv6.ParseHeader - e.src = e.data[:e.length][8:24] - e.dst = e.data[:e.length][24:40] - } else { - log.Errorf("[TUN] Unknown packet") - continue } - - log.Debugf("[TUN] %s --> %s, length: %d", e.src, e.dst, e.length) - util.SafeWrite(d.tunInbound, e) } } @@ -180,7 +159,6 @@ func (d *Device) Close() { d.tun.Close() util.SafeClose(d.tunInbound) util.SafeClose(d.tunOutbound) - util.SafeClose(d.tunInboundRaw) util.SafeClose(TCPPacketChan) } @@ -285,7 +263,6 @@ func genICMPPacketIPv6(src net.IP, dst net.IP) ([]byte, error) { func (d *Device) Start(ctx context.Context) { go d.readFromTun() - go d.parseIPHeader(ctx) go d.tunInboundHandler(d.tunInbound, d.tunOutbound) go d.writeToTun() go heartbeats(ctx, d.tun) @@ -307,11 +284,10 @@ func (h *tunHandler) HandleServer(ctx context.Context, tun net.Conn) { go h.printRoute(ctx) device := &Device{ - tun: tun, - tunInboundRaw: make(chan *DataElem, MaxSize), - tunInbound: make(chan *DataElem, MaxSize), - tunOutbound: make(chan *DataElem, MaxSize), - chExit: h.chExit, + tun: tun, + tunInbound: make(chan *DataElem, MaxSize), + tunOutbound: make(chan *DataElem, MaxSize), + chExit: h.chExit, } device.SetTunInboundHandler(func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem) { for ctx.Err() == nil { @@ -366,8 +342,7 @@ type udpElem struct { type Peer struct { conn net.PacketConn - connInbound chan *udpElem - parsedConnInfo chan *udpElem + connInbound chan *udpElem tunInbound <-chan *DataElem tunOutbound chan<- *DataElem @@ -390,80 +365,55 @@ func (p *Peer) sendErr(err error) { func (p *Peer) readFromConn() { for { b := config.LPool.Get().([]byte)[:] - n, srcAddr, err := p.conn.ReadFrom(b[:]) + n, from, err := p.conn.ReadFrom(b[:]) if err != nil { p.sendErr(err) return } + + src, dst, err := util.ParseIP(b[:n]) + if err != nil { + log.Errorf("[TUN] Unknown packet: %v", err) + continue + } + if _, loaded := p.routeMapUDP.LoadOrStore(src, from); loaded { + log.Debugf("[TUN] Find route: %s -> %s", src, from) + } else { + log.Debugf("[TUN] Add new route: %s -> %s", src, from) + } + p.connInbound <- &udpElem{ - from: srcAddr, + from: from, data: b[:], length: n, + src: src, + dst: dst, } } } func (p *Peer) readFromTCPConn() { for packet := range TCPPacketChan { + src, dst, err := util.ParseIP(packet.Data) + if err != nil { + log.Errorf("[TUN] Unknown packet") + continue + } u := &udpElem{ data: packet.Data[:], length: int(packet.DataLength), - } - b := packet.Data - if util.IsIPv4(packet.Data) { - // ipv4.ParseHeader - u.src = net.IPv4(b[12], b[13], b[14], b[15]) - u.dst = net.IPv4(b[16], b[17], b[18], b[19]) - } else if util.IsIPv6(packet.Data) { - // ipv6.ParseHeader - u.src = b[8:24] - u.dst = b[24:40] - } else { - log.Errorf("[TUN] Unknown packet") - continue + src: src, + dst: dst, } log.Debugf("[TCP] udp-tun %s >>> %s length: %d", u.src, u.dst, u.length) - p.parsedConnInfo <- u - } -} - -func (p *Peer) parseHeader() { - var firstIPv4, firstIPv6 = true, true - for e := range p.connInbound { - b := e.data[:e.length] - if util.IsIPv4(e.data[:e.length]) { - // ipv4.ParseHeader - e.src = net.IPv4(b[12], b[13], b[14], b[15]) - e.dst = net.IPv4(b[16], b[17], b[18], b[19]) - } else if util.IsIPv6(e.data[:e.length]) { - // ipv6.ParseHeader - e.src = b[:e.length][8:24] - e.dst = b[:e.length][24:40] - } else { - log.Errorf("[TUN] Unknown packet") - continue - } - - if firstIPv4 || firstIPv6 { - if util.IsIPv4(e.data[:e.length]) { - firstIPv4 = false - } else { - firstIPv6 = false - } - if _, loaded := p.routeMapUDP.LoadOrStore(e.src, e.from); loaded { - log.Debugf("[TUN] Find route: %s -> %s", e.src, e.from) - } else { - log.Debugf("[TUN] Add new route: %s -> %s", e.src, e.from) - } - } - p.parsedConnInfo <- e + p.connInbound <- u } } func (p *Peer) routePeer() { - for e := range p.parsedConnInfo { + for e := range p.connInbound { if routeToAddr := p.routeMapUDP.RouteTo(e.dst); routeToAddr != nil { - log.Debugf("[TUN] Find route: %s -> %s", e.dst, routeToAddr) + log.Debugf("[TCP] Find route: %s -> %s", e.dst, routeToAddr) _, err := p.conn.WriteTo(e.data[:e.length], routeToAddr) config.LPool.Put(e.data[:]) if err != nil { @@ -471,6 +421,7 @@ func (p *Peer) routePeer() { return } } else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok { + log.Debugf("[TCP] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr()) dgram := newDatagramPacket(e.data[:e.length]) if err := dgram.Write(conn.(net.Conn)); err != nil { log.Errorf("[TCP] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) @@ -479,6 +430,7 @@ func (p *Peer) routePeer() { } config.LPool.Put(e.data[:]) } else { + log.Debugf("[TCP] Not found route to dst: %s, write to TUN device", e.dst.String()) p.tunOutbound <- &DataElem{ data: e.data, length: e.length, @@ -501,17 +453,18 @@ func (p *Peer) routeTUN() { return } } else if conn, ok := p.routeMapTCP.Load(e.dst.String()); ok { + log.Debugf("[TUN] Find TCP route to dst: %s -> %s", e.dst.String(), conn.(net.Conn).RemoteAddr()) dgram := newDatagramPacket(e.data[:e.length]) err := dgram.Write(conn.(net.Conn)) config.LPool.Put(e.data[:]) if err != nil { - log.Errorf("[TCP] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) + log.Errorf("[TUN] udp-tun %s <- %s : %s", conn.(net.Conn).RemoteAddr(), dgram.Addr(), err) p.sendErr(err) return } } else { + log.Errorf("[TUN] No route for %s -> %s, drop it", e.src, e.dst) config.LPool.Put(e.data[:]) - log.Errorf("[TUN] No route for %s -> %s", e.src, e.dst) } } } @@ -519,7 +472,6 @@ func (p *Peer) routeTUN() { func (p *Peer) Start() { go p.readFromConn() go p.readFromTCPConn() - go p.parseHeader() go p.routePeer() go p.routeTUN() } @@ -530,14 +482,13 @@ func (p *Peer) Close() { func transportTun(ctx context.Context, tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem, packetConn net.PacketConn, routeMapUDP *RouteMap, routeMapTCP *sync.Map) error { p := &Peer{ - conn: packetConn, - connInbound: make(chan *udpElem, MaxSize), - parsedConnInfo: make(chan *udpElem, MaxSize), - tunInbound: tunInbound, - tunOutbound: tunOutbound, - routeMapUDP: routeMapUDP, - routeMapTCP: routeMapTCP, - errChan: make(chan error, 2), + conn: packetConn, + connInbound: make(chan *udpElem, MaxSize), + tunInbound: tunInbound, + tunOutbound: tunOutbound, + routeMapUDP: routeMapUDP, + routeMapTCP: routeMapTCP, + errChan: make(chan error, 2), } defer p.Close() diff --git a/pkg/core/tunhandlerclient.go b/pkg/core/tunhandlerclient.go index 3fa3d412b..2e98b3007 100644 --- a/pkg/core/tunhandlerclient.go +++ b/pkg/core/tunhandlerclient.go @@ -22,10 +22,6 @@ func (h *tunHandler) HandleClient(ctx context.Context, tun net.Conn) { } in := make(chan *DataElem, MaxSize) out := make(chan *DataElem, MaxSize) - engine := h.node.Get(config.ConfigKubeVPNTransportEngine) - endpoint := NewTunEndpoint(ctx, tun, uint32(config.DefaultMTU), config.Engine(engine), in, out) - stack := NewStack(ctx, endpoint) - defer stack.Destroy() defer util.SafeClose(in) defer util.SafeClose(out) @@ -131,6 +127,8 @@ type ClientDevice struct { func (d *ClientDevice) Start(ctx context.Context) { go d.tunInboundHandler(d.tunInbound, d.tunOutbound) go heartbeats(ctx, d.tun) + go d.readFromTun() + go d.writeToTun() select { case err := <-d.chExit: @@ -144,3 +142,36 @@ func (d *ClientDevice) Start(ctx context.Context) { func (d *ClientDevice) SetTunInboundHandler(handler func(tunInbound <-chan *DataElem, tunOutbound chan<- *DataElem)) { d.tunInboundHandler = handler } + +func (d *ClientDevice) readFromTun() { + for { + b := config.LPool.Get().([]byte)[:] + n, err := d.tun.Read(b[:]) + if err != nil { + util.SafeWrite(d.chExit, err) + return + } + if n != 0 { + // Try to determine network protocol number, default zero. + var src, dst net.IP + src, dst, err = util.ParseIP(b[:n]) + if err != nil { + log.Debugf("[TUN-GVISOR] Unknown packet: %v", err) + continue + } + log.Tracef("[TUN-RAW] SRC: %s, DST: %s, Length: %d", src.String(), dst, n) + util.SafeWrite(d.tunInbound, NewDataElem(b[:], n, src, dst)) + } + } +} + +func (d *ClientDevice) writeToTun() { + for e := range d.tunOutbound { + _, err := d.tun.Write(e.data[:e.length]) + config.LPool.Put(e.data[:]) + if err != nil { + util.SafeWrite(d.chExit, err) + return + } + } +} diff --git a/pkg/dev/options.go b/pkg/dev/options.go index 00826af57..0da90c4ef 100644 --- a/pkg/dev/options.go +++ b/pkg/dev/options.go @@ -368,9 +368,9 @@ func (option *Options) CreateConnectContainer(portBindings nat.PortMap) (*RunCon var entrypoint []string if option.NoProxy { - entrypoint = []string{"kubevpn", "connect", "--foreground", "-n", option.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image, "--engine", string(option.Engine)} + entrypoint = []string{"kubevpn", "connect", "--foreground", "-n", option.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image, "--netstack", string(option.Engine)} } else { - entrypoint = []string{"kubevpn", "proxy", option.Workload, "--foreground", "-n", option.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image, "--engine", string(option.Engine)} + entrypoint = []string{"kubevpn", "proxy", option.Workload, "--foreground", "-n", option.Namespace, "--kubeconfig", "/root/.kube/config", "--image", config.Image, "--netstack", string(option.Engine)} for k, v := range option.Headers { entrypoint = append(entrypoint, "--headers", fmt.Sprintf("%s=%s", k, v)) } diff --git a/pkg/handler/clone.go b/pkg/handler/clone.go index 0cd2fada1..6d26980c6 100644 --- a/pkg/handler/clone.go +++ b/pkg/handler/clone.go @@ -293,7 +293,7 @@ func (d *CloneOptions) DoClone(ctx context.Context, kubeconfigJsonBytes []byte) "--kubeconfig", "/tmp/.kube/" + config.KUBECONFIG, "--namespace", d.Namespace, "--image", config.Image, - "--engine", string(d.Engine), + "--netstack", string(d.Engine), "--foreground", }, args...), Env: []v1.EnvVar{}, diff --git a/pkg/handler/connect.go b/pkg/handler/connect.go index e213a37d9..0c3bd7428 100644 --- a/pkg/handler/connect.go +++ b/pkg/handler/connect.go @@ -225,9 +225,10 @@ func (c *ConnectOptions) DoConnect(ctx context.Context, isLite bool) (err error) driver.InstallWireGuardTunDriver() } forward := fmt.Sprintf("tcp://127.0.0.1:%d", rawTCPForwardPort) - core.GvisorTCPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort) - core.GvisorUDPForwardAddr = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorUDPForwardPort) - if err = c.startLocalTunServe(c.ctx, forward, isLite); err != nil { + if c.Engine == config.EngineGvisor { + forward = fmt.Sprintf("tcp://127.0.0.1:%d", gvisorTCPForwardPort) + } + if err = c.startLocalTunServer(c.ctx, forward, isLite); err != nil { log.Errorf("Start local tun service failed: %v", err) return } @@ -330,7 +331,7 @@ func (c *ConnectOptions) portForward(ctx context.Context, portPair []string) err } } -func (c *ConnectOptions) startLocalTunServe(ctx context.Context, forwardAddress string, lite bool) (err error) { +func (c *ConnectOptions) startLocalTunServer(ctx context.Context, forwardAddress string, lite bool) (err error) { log.Debugf("IPv4: %s, IPv6: %s", c.localTunIPv4.IP.String(), c.localTunIPv6.IP.String()) var cidrList []*net.IPNet diff --git a/pkg/util/net.go b/pkg/util/net.go index f623052d2..5039cd3db 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -2,6 +2,7 @@ package util import ( "context" + "errors" "fmt" "net" "strings" @@ -10,6 +11,8 @@ import ( "github.com/cilium/ipam/service/allocator" "github.com/cilium/ipam/service/ipallocator" "github.com/prometheus-community/pro-bing" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" "github.com/wencaiwulue/kubevpn/v2/pkg/config" ) @@ -87,6 +90,24 @@ func GetLocalTunIP(tunName string) (net.IP, net.IP, error) { return srcIPv4, srcIPv6, nil } +func PingOnce(ctx context.Context, srcIP, dstIP string) (bool, error) { + pinger, err := probing.NewPinger(dstIP) + if err != nil { + return false, err + } + pinger.Source = srcIP + pinger.SetLogger(nil) + pinger.SetPrivileged(true) + pinger.Count = 1 + pinger.Timeout = time.Millisecond * 1000 + err = pinger.RunWithContext(ctx) // Blocks until finished. + if err != nil { + return false, err + } + stat := pinger.Statistics() + return stat.PacketsRecv == stat.PacketsSent, err +} + func Ping(ctx context.Context, srcIP, dstIP string) (bool, error) { pinger, err := probing.NewPinger(dstIP) if err != nil { @@ -113,6 +134,24 @@ func IsIPv6(packet []byte) bool { return 6 == (packet[0] >> 4) } +func ParseIP(packet []byte) (src net.IP, dst net.IP, err error) { + if IsIPv4(packet) { + header, err := ipv4.ParseHeader(packet) + if err != nil { + return nil, nil, err + } + return header.Src, header.Dst, nil + } + if IsIPv6(packet) { + header, err := ipv6.ParseHeader(packet) + if err != nil { + return nil, nil, err + } + return header.Src, header.Dst, nil + } + return nil, nil, errors.New("packet is invalid") +} + func GetIPBaseNic() (*net.IPNet, error) { addrs, _ := net.InterfaceAddrs() var sum int diff --git a/pkg/util/util.go b/pkg/util/util.go index 257d18eab..9e872eecc 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -298,6 +298,10 @@ func StartupPProf(port int) { _ = http.ListenAndServe(fmt.Sprintf("localhost:%d", port), nil) } +func StartupPProfForServer(port int) { + _ = http.ListenAndServe(fmt.Sprintf(":%d", port), nil) +} + func Merge[K comparable, V any](fromMap, ToMap map[K]V) map[K]V { if fromMap == nil { return ToMap