From f3783b2da9328035b1a49f6dc5565555ebca9121 Mon Sep 17 00:00:00 2001 From: Gaukas Wang Date: Mon, 15 Jan 2024 21:49:40 -0700 Subject: [PATCH] fix: use poll in pollOneOff Use poll on nonblocking TCP Conn in pollOneOff. Signed-off-by: Gaukas Wang --- imports/wasi_snapshot_preview1/poll.go | 37 ++++++++++++++++++++++++ internal/sysfs/poll_linux.go | 4 +++ internal/sysfs/w_poll.go | 40 ++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 internal/sysfs/w_poll.go diff --git a/imports/wasi_snapshot_preview1/poll.go b/imports/wasi_snapshot_preview1/poll.go index 84cd8660bc..e59bf8b9e9 100644 --- a/imports/wasi_snapshot_preview1/poll.go +++ b/imports/wasi_snapshot_preview1/poll.go @@ -2,12 +2,15 @@ package wasi_snapshot_preview1 import ( "context" + "errors" "time" "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/experimental/sys" "github.com/tetratelabs/wazero/internal/fsapi" + socketapi "github.com/tetratelabs/wazero/internal/sock" internalsys "github.com/tetratelabs/wazero/internal/sys" + "github.com/tetratelabs/wazero/internal/sysfs" "github.com/tetratelabs/wazero/internal/wasip1" "github.com/tetratelabs/wazero/internal/wasm" ) @@ -49,6 +52,12 @@ type event struct { errno wasip1.Errno } +type nonblockingSocketSub struct { + fd int32 + tc socketapi.TCPConn + evt *event +} + func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno { in := uint32(params[0]) out := uint32(params[1]) @@ -88,6 +97,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno fsc := mod.(*wasm.ModuleInstance).Sys.FS() // Slice of events that are processed out of the loop (blocking stdin subscribers). var blockingStdinSubs []*event + // Slice of events that are processed out of the loop (nonblocking socket subscribers). + var nonblockingSocketSubs []*nonblockingSocketSub // The timeout is initialized at max Duration, the loop will find the minimum. var timeout time.Duration = 1<<63 - 1 // Count of all the subscriptions that have been already written back to outBuf. @@ -135,6 +146,13 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno writeEvent(outBuf[outOffset:], evt) nevents++ } else if fd != internalsys.FdStdin && file.File.IsNonblock() { + if tcpConn, ok := file.File.(socketapi.TCPConn); ok { + nonblockingSocketSubs = append(nonblockingSocketSubs, &nonblockingSocketSub{ + fd: fd, + tc: tcpConn, + evt: evt, + }) + } writeEvent(outBuf[outOffset:], evt) nevents++ } else { @@ -161,6 +179,25 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno } } + if len(nonblockingSocketSubs) > 0 { + var conns []socketapi.TCPConn + var events []int16 + + for _, sub := range nonblockingSocketSubs { + conns = append(conns, sub.tc) + events = append(events, sysfs.POLLIN) // TODO: support other events + } + + ready, err := sysfs.PollTCPConns(conns, events) + if !errors.Is(err, sys.Errno(0)) { + return err.(sys.Errno) + } + + if ready <= 0 { + return sys.ENOTSUP + } + } + sysCtx := mod.(*wasm.ModuleInstance).Sys if nevents == nsubscriptions { // We already wrote back all the results. We already wrote this number diff --git a/internal/sysfs/poll_linux.go b/internal/sysfs/poll_linux.go index dab7bb2cab..ac8e751394 100644 --- a/internal/sysfs/poll_linux.go +++ b/internal/sysfs/poll_linux.go @@ -31,6 +31,10 @@ func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) { var ts syscall.Timespec if timeoutMillis >= 0 { ts = syscall.NsecToTimespec(int64(time.Duration(timeoutMillis) * time.Millisecond)) + } else { + // just max out the timeout, simply giving a negative timeout will not work + // as it fails with EINVAL + ts = syscall.NsecToTimespec(1<<63 - 1) } return ppoll(fds, &ts) } diff --git a/internal/sysfs/w_poll.go b/internal/sysfs/w_poll.go new file mode 100644 index 0000000000..b9ab89b027 --- /dev/null +++ b/internal/sysfs/w_poll.go @@ -0,0 +1,40 @@ +// Copyright 2024 The WATER Authors. All rights reserved. +// Use of this source code is governed by Apache 2 license +// that can be found in the LICENSE file. + +package sysfs + +import ( + "errors" + + "github.com/tetratelabs/wazero/experimental/sys" + socketapi "github.com/tetratelabs/wazero/internal/sock" +) + +const ( + POLLIN = _POLLIN // export the value +) + +func PollTCPConns(conns []socketapi.TCPConn, events []int16) (int, error) { + var pollFds []pollFd + for i, conn := range conns { + syscallConnControl(conn.(*tcpConnFile).tc, func(fd uintptr) (int, sys.Errno) { + if events[i]&_POLLIN == 0 { + return 0, sys.EINVAL + } + pollFds = append(pollFds, newPollFd(fd, _POLLIN, 0)) + return 0, 0 + }) + } + + for { + ready, err := _poll(pollFds, -1) + if ready == 0 { + if errors.Is(err, sys.EINTR) || errors.Is(err, sys.Errno(0)) { + continue + } + } + + return ready, err + } +}