diff --git a/experimental/sys/file.go b/experimental/sys/file.go index 22671d0a01..657f5fa6bf 100644 --- a/experimental/sys/file.go +++ b/experimental/sys/file.go @@ -314,4 +314,16 @@ type File interface { // - This is like syscall.Close and `close` in POSIX. See // https://pubs.opengroup.org/onlinepubs/9699919799/functions/close.html Close() Errno + + // [WATER SECTION BEGIN] + + // Fd returns the underlying file descriptor owned by the host. If a file + // descriptor does not exist, it returns 0. + // + // This is a useful workaround since we want to collect the file descriptor + // from every file (including sockets and pipes) and pass them into the poll + // syscall in poll_oneoff. + Fd() uintptr + + // [WATER SECTION END] } diff --git a/experimental/sys/unimplemented.go b/experimental/sys/unimplemented.go index d853d9e8f4..6eac485ccf 100644 --- a/experimental/sys/unimplemented.go +++ b/experimental/sys/unimplemented.go @@ -158,3 +158,12 @@ func (UnimplementedFile) Utimens(int64, int64) Errno { // Close implements File.Close func (UnimplementedFile) Close() (errno Errno) { return } + +// [WATER SECTION BEGIN] + +// Fd implements File.Fd +func (UnimplementedFile) Fd() uintptr { + return 0 // not ENOSYS +} + +// [WATER SECTION END] diff --git a/imports/wasi_snapshot_preview1/poll_test.go b/imports/wasi_snapshot_preview1/poll_test.go index a5357e7966..fe60352e82 100644 --- a/imports/wasi_snapshot_preview1/poll_test.go +++ b/imports/wasi_snapshot_preview1/poll_test.go @@ -376,18 +376,18 @@ func Test_pollOneoff_Stdin(t *testing.T) { out: 128, // past in resultNevents: 512, // past out expectedMem: []byte{ - // Clock is acknowledged first. - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata - byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit - wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum + // First an illegal file with custom user data should be acknowledged. + 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata + byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit + wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, - // Then an illegal file with custom user data. - 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata - byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit - wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum + // Clock is acknowledged then. + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata + byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit + wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, diff --git a/imports/wasi_snapshot_preview1/w_poll_oneoff.go b/imports/wasi_snapshot_preview1/w_poll_oneoff.go new file mode 100644 index 0000000000..195be25149 --- /dev/null +++ b/imports/wasi_snapshot_preview1/w_poll_oneoff.go @@ -0,0 +1,250 @@ +// Copyright 2024 The WATER Authors. All rights reserved. +// Use of this source code is governed by Apache 2 license +// that can be found in the LICENSE file. + +package wasi_snapshot_preview1 + +import ( + "context" + "time" + + "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/internal/fsapi" + internalsys "github.com/tetratelabs/wazero/internal/sys" + internalsysfs "github.com/tetratelabs/wazero/internal/sysfs" + "github.com/tetratelabs/wazero/internal/wasip1" + "github.com/tetratelabs/wazero/internal/wasm" +) + +// use the init function to override the default pollOneoffFn +func init() { + // override the default pollOneoff + pollOneoff = newHostFunc( + wasip1.PollOneoffName, alternativePollOneoffFn, + []api.ValueType{i32, i32, i32, i32}, + "in", "out", "nsubscriptions", "result.nevents", + ) +} + +// alternativePollOneoffFn is a modified version of pollOneoffFn that +// tries to be more syscall-aligned. It should block and return only when +// there is at least one event triggered. +func alternativePollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno { + in := uint32(params[0]) + out := uint32(params[1]) + nsubscriptions := uint32(params[2]) + resultNevents := uint32(params[3]) + + if nsubscriptions == 0 { + return sys.EINVAL // early returning on empty subscriptions list + } + + mem := mod.Memory() + + // Ensure capacity prior to the read loop to reduce error handling. + inBuf, ok := mem.Read(in, nsubscriptions*48) + if !ok { + return sys.EFAULT + } + outBuf, ok := mem.Read(out, nsubscriptions*32) + // zero-out all buffer before writing + for i := range outBuf { + outBuf[i] = 0 + } + + if !ok { + return sys.EFAULT + } + + // start by writing 0 to resultNevents + if !mod.Memory().WriteUint32Le(resultNevents, 0) { + return sys.EFAULT + } + + // Extract FS context, used in the body of the for loop for FS access. + fsc := mod.(*wasm.ModuleInstance).Sys.FS() + // Slice of events that are processed out of the loop (blocking stdin subscribers). + var blockingStdinSubs []*event + // The timeout is initialized at max Duration, the loop will find the minimum. + var timeout time.Duration = 1<<63 - 1 + // Count of all the subscriptions that have been already written back to outBuf. + // nevents*32 returns at all times the offset where the next event should be written: + // this way we ensure that there are no gaps between records. + var nevents uint32 + + // Slice of all I/O events that will be written if triggered + var ioEvents []*event + + // Slice of hostPollSub that will be used for polling + var hostPollSubs []internalsysfs.PollFd + + // The clock event with the minimum timeout, if any. + var clkevent *event + + // Layout is subscription_u: Union + // https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u + for i := uint32(0); i < nsubscriptions; i++ { + inOffset := i * 48 + outOffset := nevents * 32 + + eventType := inBuf[inOffset+8] // +8 past userdata + // +8 past userdata +8 contents_offset + argBuf := inBuf[inOffset+8+8:] + userData := inBuf[inOffset : inOffset+8] + + evt := &event{ + eventType: eventType, + userData: userData, + errno: wasip1.ErrnoSuccess, + } + + switch eventType { + case wasip1.EventTypeClock: // handle later + newTimeout, err := processClockEvent(argBuf) + if err != 0 { + return err + } + // Min timeout. + if newTimeout < timeout { + timeout = newTimeout + // overwrite the clock event + clkevent = evt + } + case wasip1.EventTypeFdRead: + guestFd := int32(le.Uint32(argBuf)) + if guestFd < 0 { + return sys.EBADF + } + + if file, ok := fsc.LookupFile(guestFd); !ok { + evt.errno = wasip1.ErrnoBadf + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else if guestFd == internalsys.FdStdin { // stdin is always checked with Poll function later. + if file.File.IsNonblock() { // non-blocking stdin is always ready to read + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else { + // if the fd is Stdin, and it is in blocking mode, + // do not ack yet, append to a slice for delayed evaluation. + blockingStdinSubs = append(blockingStdinSubs, evt) + } + } else if hostFd := file.File.Fd(); hostFd == 0 { + evt.errno = wasip1.ErrnoNotsup + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else { + ioEvents = append(ioEvents, evt) + hostPollSubs = append(hostPollSubs, internalsysfs.PollFd{ + Fd: hostFd, + Events: fsapi.POLLIN, + }) + } + case wasip1.EventTypeFdWrite: + guestFd := int32(le.Uint32(argBuf)) + if guestFd < 0 { + return sys.EBADF + } + + if file, ok := fsc.LookupFile(guestFd); !ok { + evt.errno = wasip1.ErrnoBadf + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else if guestFd == internalsys.FdStdout || guestFd == internalsys.FdStderr { // stdout and stderr are always ready to write + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else if hostFd := file.File.Fd(); hostFd == 0 { + evt.errno = wasip1.ErrnoNotsup + writeEvent(outBuf[outOffset:], evt) + nevents++ + } else { + ioEvents = append(ioEvents, evt) + hostPollSubs = append(hostPollSubs, internalsysfs.PollFd{ + Fd: hostFd, + Events: fsapi.POLLOUT, + }) + } + default: + return sys.EINVAL + } + } + + // We have scanned all the subscriptions, and there are several cases: + // - Clock subscriptions-only: we block until the timeout expires. + // - At least one I/O subscription: we call poll on the I/O fds. Then we check the poll results + // and write back the corresponding events ONLY if the revent in pollFd is properly set. + // - If no clock subscription, we block with max timeout. + // - If there are clock subscriptions, we block with the minimum timeout. + + // If there are no I/O subscriptions, we can block until the timeout expires. + sysCtx := mod.(*wasm.ModuleInstance).Sys + if len(ioEvents) == 0 { + if timeout > 0 && clkevent != nil { // there is a clock subscription with a timeout + sysCtx.Nanosleep(int64(timeout)) + } + // Ack the clock event if there is one + if clkevent != nil { + writeEvent(outBuf[nevents*32:], clkevent) + nevents++ + } + } + + // If there are I/O subscriptions, we call poll on the I/O fds with the updated timeout. + if len(hostPollSubs) > 0 { + pollNevents, err := internalsysfs.Poll(hostPollSubs, int32(timeout.Milliseconds())) + if err != 0 { + return err + } + + if pollNevents > 0 { // if there are events triggered + // iterate over hostPollSubs and if the revent is set, write back + // the event + for i, pollFd := range hostPollSubs { + if pollFd.Revents&pollFd.Events != 0 { + // write back the event + writeEvent(outBuf[nevents*32:], ioEvents[i]) + nevents++ + } else if pollFd.Revents != 0 { + // write back the event + writeEvent(outBuf[nevents*32:], ioEvents[i]) + nevents++ + } + } + } else { // otherwise it means that the timeout expired + // Ack the clock event if there is one (it can also be a default max timeout) + if clkevent != nil { + writeEvent(outBuf[nevents*32:], clkevent) + nevents++ + } + } + } + + // If there are blocking stdin subscribers, check for data with given timeout. + if len(blockingStdinSubs) > 0 { + stdin, ok := fsc.LookupFile(internalsys.FdStdin) + if !ok { + return sys.EBADF + } + + // Wait for the timeout to expire, or for some data to become available on Stdin. + if stdinReady, errno := stdin.File.Poll(fsapi.POLLIN, int32(timeout.Milliseconds())); errno != 0 { + return errno + } else if stdinReady { + // stdin has data ready to for reading, write back all the events + for i := range blockingStdinSubs { + evt := blockingStdinSubs[i] + evt.errno = 0 + writeEvent(outBuf[nevents*32:], evt) + nevents++ + } + } + } + + // write nevents to resultNevents + if !mem.WriteUint32Le(resultNevents, nevents) { + return sys.EFAULT + } + + return 0 +} diff --git a/internal/fsapi/poll.go b/internal/fsapi/poll.go index 25f7c5711b..93281f9488 100644 --- a/internal/fsapi/poll.go +++ b/internal/fsapi/poll.go @@ -17,4 +17,12 @@ const ( // POLLOUT is a write event. POLLOUT + + // [WATER SECTION BEGIN] + + // POLLUNKNOWN is an unknown event. It is used to indicate that the + // event is not currently supported but there was an event set. + POLLUNKNOWN + + // [WATER SECTION END] ) diff --git a/internal/sys/lazy.go b/internal/sys/lazy.go index fe233d29ea..31583d86c1 100644 --- a/internal/sys/lazy.go +++ b/internal/sys/lazy.go @@ -149,3 +149,14 @@ func (d *lazyDir) SetNonblock(bool) experimentalsys.Errno { func (d *lazyDir) Poll(fsapi.Pflag, int32) (ready bool, errno experimentalsys.Errno) { return false, experimentalsys.ENOSYS } + +// [WATER SECTION BEGIN] + +// Fd implements the same method as documented on fsapi.File +// +// We do not know the file descriptor of a lazyDir, so we return 0. +func (d *lazyDir) Fd() uintptr { + return 0 +} + +// [WATER SECTION END] diff --git a/internal/sysfs/osfile.go b/internal/sysfs/osfile.go index 490f0fa681..bcbf614908 100644 --- a/internal/sysfs/osfile.go +++ b/internal/sysfs/osfile.go @@ -213,6 +213,15 @@ func (f *osFile) Poll(flag fsapi.Pflag, timeoutMillis int32) (ready bool, errno return poll(f.fd, flag, timeoutMillis) } +// [WATER SECTION BEGIN] + +// Fd implements the same method as documented on sys.File +func (f *osFile) Fd() uintptr { + return f.fd +} + +// [WATER SECTION END] + // Readdir implements File.Readdir. Notably, this uses "Readdir", not // "ReadDir", from os.File. func (f *osFile) Readdir(n int) (dirents []experimentalsys.Dirent, errno experimentalsys.Errno) { diff --git a/internal/sysfs/poll_darwin.go b/internal/sysfs/poll_darwin.go index 1f7f890937..749fa86370 100644 --- a/internal/sysfs/poll_darwin.go +++ b/internal/sysfs/poll_darwin.go @@ -23,6 +23,7 @@ func newPollFd(fd uintptr, events, revents int16) pollFd { // _POLLIN subscribes a notification when any readable data is available. const _POLLIN = 0x0001 +const _POLLOUT = 0x0004 // [WATER] added _POLLOUT to support subscription to FdWrite events // _poll implements poll on Darwin via the corresponding libc function. func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) { diff --git a/internal/sysfs/poll_linux.go b/internal/sysfs/poll_linux.go index 49bf4fd06d..61409ceb49 100644 --- a/internal/sysfs/poll_linux.go +++ b/internal/sysfs/poll_linux.go @@ -27,12 +27,17 @@ func newPollFd(fd uintptr, events, revents int16) pollFd { // _POLLIN subscribes a notification when any readable data is available. const _POLLIN = 0x0001 +const _POLLOUT = 0x0004 // [WATER] added _POLLOUT to support subscription to FdWrite events // _poll implements poll on Linux via ppoll. func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) { var ts syscall.Timespec if timeoutMillis >= 0 { ts = syscall.NsecToTimespec(int64(time.Duration(timeoutMillis) * time.Millisecond)) + } else { // [WATER] Patched this branch to support negative timeouts for max duration + // just max out the timeout, simply giving a negative timeout will not work + // as it fails with EINVAL + ts = syscall.NsecToTimespec(1<<63 - 1) } return ppoll(fds, &ts) } diff --git a/internal/sysfs/poll_windows.go b/internal/sysfs/poll_windows.go index 82c8b2bafd..c8b2ad264c 100644 --- a/internal/sysfs/poll_windows.go +++ b/internal/sysfs/poll_windows.go @@ -18,8 +18,12 @@ const ( _POLLRDNORM = 0x0100 // _POLLRDBAND subscribes to priority band (out-of-band) data for read. _POLLRDBAND = 0x0200 + // _POLLWRNORM subscribes to normal data for write. + _POLLWRNORM = 0x0010 // [WATER] added _POLLWRNORM to support subscription to FdWrite events // _POLLIN subscribes a notification when any readable data is available. _POLLIN = (_POLLRDNORM | _POLLRDBAND) + // _POLLOUT subscribes a notification when any writeable data can be written. + _POLLOUT = _POLLWRNORM // [WATER] added _POLLOUT to support subscription to FdWrite events ) // pollFd is the struct to query for file descriptor events using poll. @@ -66,6 +70,44 @@ func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) { return -1, errno } + // [WATER SECTION BEGIN] + + // automatically merge the partitions back to the original slice to update revents + defer mergePartitions(fds, regular, pipes, sockets) + + // phony poll regular files: always writable and readable + for _, fd := range regular { + if fd.events == _POLLIN || fd.events == _POLLOUT { + fd.revents = fd.events + } + } + + // First do a one-shot check for any ready-to-go pipes or sockets. + npipes, nsockets, errno := peekAll(pipes, sockets) + if errno != 0 { + return -1, errno + } + count := nregular + npipes + nsockets + if count > 0 { + return count, 0 + } + + // Now we learned: + // - no regular files in the list (otherwise already returned) + // - none of the pipes or sockets are ready + + // We can invoke wsaPoll with the given timeout instead of busy-looping if + // only sockets are present. + if len(pipes) == 0 { + return wsaPoll(sockets, int(timeoutMillis)) + } + + // Otherwise, we need to check both pipes and sockets, and cannot use wsaPoll. + // We use a ticker to trigger a check periodically, and a timer to expire after + // the given timeout. + + // [WATER SECTION END] + // Ticker that emits at every pollInterval. tick := time.NewTicker(pollInterval) tickCh := tick.C @@ -81,15 +123,6 @@ func _poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) { afterCh = after.C } - npipes, nsockets, errno := peekAll(pipes, sockets) - if errno != 0 { - return -1, errno - } - count := nregular + npipes + nsockets - if count > 0 { - return count, 0 - } - for { select { case <-afterCh: @@ -189,6 +222,30 @@ func partionByFtype(fds []pollFd) (regular, pipe, socket []pollFd, errno sys.Err return } +// mergePartitions merges the given partitions back to the original slice and +// updates the revents field of each pollFd. +func mergePartitions(dst []pollFd, partitions ...[]pollFd) { + for _, p := range partitions { + LOOP_EACH_FD_IN_PARTITION: + for _, pfd := range p { + for i, fd := range dst { + if fd.fd == pfd.fd && fd.events == pfd.events { + dst[i].revents = pfd.revents + + // special case: POLLIN combines POLLRDNORM and POLLRDBAND and + // when one of them is set, we need to set the other to maintain + // consistency with the Linux implementation. + if pfd.revents&_POLLIN != 0 { + dst[i].revents |= _POLLIN + } + + continue LOOP_EACH_FD_IN_PARTITION // go to next fd in partition, we assume dst is unique + } + } + } + } +} + // ftypeOf checks the type of fd and return the corresponding ftype. func ftypeOf(fd uintptr) (ftype, sys.Errno) { h := syscall.Handle(fd) diff --git a/internal/sysfs/w_poll.go b/internal/sysfs/w_poll.go new file mode 100644 index 0000000000..c433e91967 --- /dev/null +++ b/internal/sysfs/w_poll.go @@ -0,0 +1,48 @@ +// Copyright 2024 The WATER Authors. All rights reserved. +// Use of this source code is governed by Apache 2 license +// that can be found in the LICENSE file. + +package sysfs + +import ( + "errors" + + "github.com/tetratelabs/wazero/experimental/sys" + "github.com/tetratelabs/wazero/internal/fsapi" +) + +type PollFd struct { + Fd uintptr + Events fsapi.Pflag + Revents fsapi.Pflag // set only if events triggered +} + +func Poll(fds []PollFd, timeoutMillis int32) (int, sys.Errno) { + var pollFds []pollFd + for _, fd := range fds { + if fsapi.Pflag(fd.Events) == fsapi.POLLIN { + pollFds = append(pollFds, newPollFd(fd.Fd, _POLLIN, 0)) + } else if fsapi.Pflag(fd.Events) == fsapi.POLLOUT { + pollFds = append(pollFds, newPollFd(fd.Fd, _POLLOUT, 0)) + } else { + return 0, sys.ENOTSUP + } + } + + n, err := _poll(pollFds, timeoutMillis) + if !errors.Is(err, sys.Errno(0)) { + return n, err + } + + // check the pollFds for errors + if n > 0 { + for i, pfd := range pollFds { + if pfd.revents&pfd.events != 0 { + fds[i].Revents = fds[i].Events + } else if pfd.revents != 0 { + fds[i].Revents = fsapi.POLLUNKNOWN // TODO: Need more in-depth checking of the returned event. + } + } + } + return n, 0 +} diff --git a/internal/sysfs/w_sock_fd.go b/internal/sysfs/w_sock_fd.go new file mode 100644 index 0000000000..c442d4742e --- /dev/null +++ b/internal/sysfs/w_sock_fd.go @@ -0,0 +1,33 @@ +// Copyright 2024 The WATER Authors. All rights reserved. +// Use of this source code is governed by Apache 2 license +// that can be found in the LICENSE file. + +package sysfs + +import ( + experimentalsys "github.com/tetratelabs/wazero/experimental/sys" +) + +// Fd implements the same method as documented on fsapi.File +func (f *tcpListenerFile) Fd() uintptr { + var fd uintptr + + syscallConnControl(f.tl, func(_fd uintptr) (int, experimentalsys.Errno) { + fd = _fd + return 0, 0 + }) + + return fd +} + +// Fd implements the same method as documented on fsapi.File +func (f *tcpConnFile) Fd() uintptr { + var fd uintptr + + syscallConnControl(f.tc, func(_fd uintptr) (int, experimentalsys.Errno) { + fd = _fd + return 0, 0 + }) + + return fd +}