Skip to content

Commit

Permalink
fix: use poll in pollOneOff
Browse files Browse the repository at this point in the history
Use poll on nonblocking TCP Conn in pollOneOff.

Signed-off-by: Gaukas Wang <[email protected]>
  • Loading branch information
gaukas committed Jan 16, 2024
1 parent 9414800 commit f3783b2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
37 changes: 37 additions & 0 deletions imports/wasi_snapshot_preview1/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package wasi_snapshot_preview1

import (
"context"
"errors"
"time"

"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/fsapi"
socketapi "github.com/tetratelabs/wazero/internal/sock"
internalsys "github.com/tetratelabs/wazero/internal/sys"
"github.com/tetratelabs/wazero/internal/sysfs"
"github.com/tetratelabs/wazero/internal/wasip1"
"github.com/tetratelabs/wazero/internal/wasm"
)
Expand Down Expand Up @@ -49,6 +52,12 @@ type event struct {
errno wasip1.Errno
}

type nonblockingSocketSub struct {
fd int32
tc socketapi.TCPConn
evt *event
}

func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
in := uint32(params[0])
out := uint32(params[1])
Expand Down Expand Up @@ -88,6 +97,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
fsc := mod.(*wasm.ModuleInstance).Sys.FS()
// Slice of events that are processed out of the loop (blocking stdin subscribers).
var blockingStdinSubs []*event
// Slice of events that are processed out of the loop (nonblocking socket subscribers).
var nonblockingSocketSubs []*nonblockingSocketSub
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the subscriptions that have been already written back to outBuf.
Expand Down Expand Up @@ -135,6 +146,13 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if fd != internalsys.FdStdin && file.File.IsNonblock() {
if tcpConn, ok := file.File.(socketapi.TCPConn); ok {
nonblockingSocketSubs = append(nonblockingSocketSubs, &nonblockingSocketSub{
fd: fd,
tc: tcpConn,
evt: evt,
})
}
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
Expand All @@ -161,6 +179,25 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
}
}

if len(nonblockingSocketSubs) > 0 {
var conns []socketapi.TCPConn
var events []int16

for _, sub := range nonblockingSocketSubs {
conns = append(conns, sub.tc)
events = append(events, sysfs.POLLIN) // TODO: support other events
}

ready, err := sysfs.PollTCPConns(conns, events)
if !errors.Is(err, sys.Errno(0)) {
return err.(sys.Errno)
}

if ready <= 0 {
return sys.ENOTSUP
}
}

sysCtx := mod.(*wasm.ModuleInstance).Sys
if nevents == nsubscriptions {
// We already wrote back all the results. We already wrote this number
Expand Down
4 changes: 4 additions & 0 deletions internal/sysfs/poll_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
var ts syscall.Timespec
if timeoutMillis >= 0 {
ts = syscall.NsecToTimespec(int64(time.Duration(timeoutMillis) * time.Millisecond))
} else {
// just max out the timeout, simply giving a negative timeout will not work
// as it fails with EINVAL
ts = syscall.NsecToTimespec(1<<63 - 1)
}
return ppoll(fds, &ts)
}
Expand Down
40 changes: 40 additions & 0 deletions internal/sysfs/w_poll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 The WATER Authors. All rights reserved.
// Use of this source code is governed by Apache 2 license
// that can be found in the LICENSE file.

package sysfs

import (
"errors"

"github.com/tetratelabs/wazero/experimental/sys"
socketapi "github.com/tetratelabs/wazero/internal/sock"
)

const (
POLLIN = _POLLIN // export the value
)

func PollTCPConns(conns []socketapi.TCPConn, events []int16) (int, error) {
var pollFds []pollFd
for i, conn := range conns {
syscallConnControl(conn.(*tcpConnFile).tc, func(fd uintptr) (int, sys.Errno) {
if events[i]&_POLLIN == 0 {
return 0, sys.EINVAL
}
pollFds = append(pollFds, newPollFd(fd, _POLLIN, 0))
return 0, 0
})
}

for {
ready, err := _poll(pollFds, -1)
if ready == 0 {
if errors.Is(err, sys.EINTR) || errors.Is(err, sys.Errno(0)) {
continue
}
}

return ready, err
}
}

0 comments on commit f3783b2

Please sign in to comment.