diff --git a/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go b/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go index 458a117..5d46021 100644 --- a/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go +++ b/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go @@ -1,4 +1,4 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.8.0. DO NOT EDIT! package store import ( @@ -7,13 +7,13 @@ import ( binary "encoding/binary" errors "errors" fmt "fmt" - wrpc "github.com/bytecodealliance/wrpc/go" io "io" slog "log/slog" math "math" sync "sync" atomic "sync/atomic" utf8 "unicode/utf8" + wrpc "wrpc.io/go" ) // The set of errors which may be raised by functions in this package @@ -406,7 +406,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return nil } - stop0, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "get", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop0, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "get", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -444,8 +450,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r interface { @@ -484,17 +495,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 1: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 1, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.get` handler") r0, err := h.Get(ctx, p0, p1) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc:keyvalue/store@0.2.0-draft.get` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := func(v *wrpc.Result[[]uint8, Error], w interface { io.ByteWriter io.Writer @@ -615,7 +636,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -623,39 +645,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.get` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "get", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.get`: %w", err) } stops = append(stops, stop0) - stop1, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "set", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop1, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "set", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -693,8 +713,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r interface { @@ -733,8 +758,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 1: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 1, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 2) p2, err := func(r interface { @@ -770,17 +800,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return nil, errors.New("byte length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 2: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 2, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.set` handler") r0, err := h.Set(ctx, p0, p1, p2) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc:keyvalue/store@0.2.0-draft.set` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := func(v *wrpc.Result[struct{}, Error], w interface { io.ByteWriter io.Writer @@ -814,7 +854,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -822,39 +863,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.set` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "set", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.set`: %w", err) } stops = append(stops, stop1) - stop2, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "delete", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop2, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "delete", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -892,8 +931,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r interface { @@ -932,17 +976,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 1: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 1, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.delete` handler") r0, err := h.Delete(ctx, p0, p1) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc:keyvalue/store@0.2.0-draft.delete` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := func(v *wrpc.Result[struct{}, Error], w interface { io.ByteWriter io.Writer @@ -976,7 +1030,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -984,39 +1039,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.delete` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "delete", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.delete`: %w", err) } stops = append(stops, stop2) - stop3, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "exists", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop3, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "exists", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -1054,8 +1107,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r interface { @@ -1094,17 +1152,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 1: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 1, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.exists` handler") r0, err := h.Exists(ctx, p0, p1) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc:keyvalue/store@0.2.0-draft.exists` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := func(v *wrpc.Result[bool, Error], w interface { io.ByteWriter io.Writer @@ -1153,7 +1221,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -1161,39 +1230,37 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.exists` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "exists", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.exists`: %w", err) } stops = append(stops, stop3) - stop4, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "list-keys", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReadCloser) error { + + stop4, err := s.Serve("wrpc:keyvalue/store@0.2.0-draft", "list-keys", func(ctx context.Context, w wrpc.IndexWriteCloser, r wrpc.IndexReadCloser) { + defer func() { + if err := w.Close(); err != nil { + slog.DebugContext(ctx, "failed to close writer", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + } + }() slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r interface { io.ByteReader @@ -1231,8 +1298,13 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } return "", errors.New("string length overflows a 32-bit integer") }(r) + if err != nil { - return fmt.Errorf("failed to read parameter 0: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 0, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + } + return } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.IndexReader, path ...uint32) (*uint64, error) { @@ -1277,17 +1349,27 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("invalid option status byte %d", status) } }(r, []uint32{1}...) + if err != nil { - return fmt.Errorf("failed to read parameter 1: %w", err) + slog.WarnContext(ctx, "failed to read parameter", "i", 1, "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + if err := r.Close(); err != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + } + return } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.list-keys` handler") r0, err := h.ListKeys(ctx, p0, p1) + if cErr := r.Close(); cErr != nil { + slog.ErrorContext(ctx, "failed to close reader", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + } if err != nil { - return fmt.Errorf("failed to handle `wrpc:keyvalue/store@0.2.0-draft.list-keys` invocation: %w", err) + slog.WarnContext(ctx, "failed to handle invocation", "instance", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + return } var buf bytes.Buffer writes := make(map[uint32]func(wrpc.IndexWriter) error, 1) + write0, err := func(v *wrpc.Result[KeyResponse, Error], w interface { io.ByteWriter io.Writer @@ -1329,7 +1411,8 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { } }(r0, &buf) if err != nil { - return fmt.Errorf("failed to write result value 0: %w", err) + slog.WarnContext(ctx, "failed to write result value", "i", 0, "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + return } if write0 != nil { writes[0] = write0 @@ -1337,33 +1420,25 @@ func ServeInterface(s wrpc.Server, h Handler) (stop func() error, err error) { slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.list-keys` result") _, err = w.Write(buf.Bytes()) if err != nil { - return fmt.Errorf("failed to write result: %w", err) + slog.WarnContext(ctx, "failed to write result", "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + return } if len(writes) > 0 { - var wg sync.WaitGroup - var wgErr atomic.Value for index, write := range writes { - wg.Add(1) w, err := w.Index(index) if err != nil { - return fmt.Errorf("failed to index writer: %w", err) + slog.ErrorContext(ctx, "failed to index writer", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) + return } + index := index write := write go func() { - defer wg.Done() if err := write(w); err != nil { - wgErr.Store(err) + slog.WarnContext(ctx, "failed to write nested result value", "index", index, "wrpc:keyvalue/store@0.2.0-draft", "name", "list-keys", "err", err) } }() } - wg.Wait() - err := wgErr.Load() - if err == nil { - return nil - } - return err.(error) } - return nil }) if err != nil { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.list-keys`: %w", err) diff --git a/examples/keyvalue-inmemory/bindings/server.wrpc.go b/examples/keyvalue-inmemory/bindings/server.wrpc.go index 22643e6..605d264 100644 --- a/examples/keyvalue-inmemory/bindings/server.wrpc.go +++ b/examples/keyvalue-inmemory/bindings/server.wrpc.go @@ -1,10 +1,10 @@ -// Generated by `wit-bindgen-wrpc-go` 0.5.0. DO NOT EDIT! +// Generated by `wit-bindgen-wrpc-go` 0.8.0. DO NOT EDIT! // server package contains wRPC bindings for `server` world package server import ( - wrpc "github.com/bytecodealliance/wrpc/go" exports__wrpc__keyvalue__store "github.com/wasmCloud/provider-sdk-go/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store" + wrpc "wrpc.io/go" ) func Serve(s wrpc.Server, h0 exports__wrpc__keyvalue__store.Handler) (stop func() error, err error) { diff --git a/examples/keyvalue-inmemory/go.mod b/examples/keyvalue-inmemory/go.mod index d9b4ba0..c97cb50 100644 --- a/examples/keyvalue-inmemory/go.mod +++ b/examples/keyvalue-inmemory/go.mod @@ -40,6 +40,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240730163845-b1a4ccb954bf // indirect google.golang.org/grpc v1.65.0 // indirect google.golang.org/protobuf v1.34.2 // indirect + wrpc.io/go v0.0.3 // indirect ) replace go.wasmcloud.dev/provider => ../.. diff --git a/examples/keyvalue-inmemory/go.sum b/examples/keyvalue-inmemory/go.sum index 9f2ad95..e798876 100644 --- a/examples/keyvalue-inmemory/go.sum +++ b/examples/keyvalue-inmemory/go.sum @@ -81,3 +81,5 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +wrpc.io/go v0.0.3 h1:clf0KiO8WTjYwkyorrZQ4sSf7k/7xP5gdL4934A4L7s= +wrpc.io/go v0.0.3/go.mod h1:3EZdmAh0pp6uNJ8RG4aciP3LLIDWlT2fJP6qd9Z/O6U= diff --git a/examples/keyvalue-inmemory/keyvalue.go b/examples/keyvalue-inmemory/keyvalue.go index 16235fa..56402f6 100644 --- a/examples/keyvalue-inmemory/keyvalue.go +++ b/examples/keyvalue-inmemory/keyvalue.go @@ -5,10 +5,10 @@ import ( "context" "sync" - wrpc "github.com/bytecodealliance/wrpc/go" "github.com/wasmCloud/provider-sdk-go/examples/keyvalue-inmemory/bindings/exports/wrpc/keyvalue/store" "go.opentelemetry.io/otel/trace" "go.wasmcloud.dev/provider" + wrpc "wrpc.io/go" ) var ( diff --git a/go.mod b/go.mod index d8abc85..0d83120 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.22.2 toolchain go1.22.3 require ( - github.com/bytecodealliance/wrpc/go v0.0.0-20240821200644-5f4408308a27 github.com/nats-io/nats.go v1.37.0 github.com/nats-io/nkeys v0.4.7 go.opentelemetry.io/otel v1.28.0 @@ -19,6 +18,7 @@ require ( go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/sdk/log v0.4.0 go.opentelemetry.io/otel/sdk/metric v1.28.0 + wrpc.io/go v0.0.3 ) require ( diff --git a/go.sum b/go.sum index 0785faa..f07611f 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,3 @@ -github.com/bytecodealliance/wrpc/go v0.0.0-20240821140339-1a19079b99a3 h1:boG8swY0eGdU/COFpztzvp7zNz1B7tWb9R3ZkUiqOI0= -github.com/bytecodealliance/wrpc/go v0.0.0-20240821140339-1a19079b99a3/go.mod h1:Zny2i1MyPOpQvP50fhFa+p55yTe+3n5C7J4kUptFYHg= -github.com/bytecodealliance/wrpc/go v0.0.0-20240821145323-723a8c2b1b7f h1:gUw8o1OIQsSjYXuBV0yN1XPXqPyp/Jhkwu67lraKBSI= -github.com/bytecodealliance/wrpc/go v0.0.0-20240821145323-723a8c2b1b7f/go.mod h1:Zny2i1MyPOpQvP50fhFa+p55yTe+3n5C7J4kUptFYHg= -github.com/bytecodealliance/wrpc/go v0.0.0-20240821200644-5f4408308a27 h1:ejfi/wUXgl5KJpn3Hx3801WuLwScBqjckfz/j8O8vio= -github.com/bytecodealliance/wrpc/go v0.0.0-20240821200644-5f4408308a27/go.mod h1:Zny2i1MyPOpQvP50fhFa+p55yTe+3n5C7J4kUptFYHg= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -81,3 +75,5 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +wrpc.io/go v0.0.3 h1:clf0KiO8WTjYwkyorrZQ4sSf7k/7xP5gdL4934A4L7s= +wrpc.io/go v0.0.3/go.mod h1:3EZdmAh0pp6uNJ8RG4aciP3LLIDWlT2fJP6qd9Z/O6U= diff --git a/provider.go b/provider.go index f5f5f80..95b677e 100644 --- a/provider.go +++ b/provider.go @@ -14,11 +14,11 @@ import ( "syscall" "time" - wrpcnats "github.com/bytecodealliance/wrpc/go/nats" nats "github.com/nats-io/nats.go" "github.com/nats-io/nkeys" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/log/global" + wrpcnats "wrpc.io/go/nats" ) type WasmcloudProvider struct { @@ -199,7 +199,7 @@ func New(options ...ProviderHandler) (*WasmcloudProvider, error) { } prefix := fmt.Sprintf("%s.%s", hostData.LatticeRPCPrefix, hostData.ProviderKey) - wrpc := wrpcnats.NewClientWithQueueGroup(nc, prefix, prefix) + wrpc := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix), wrpcnats.WithGroup(prefix)) signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, syscall.SIGINT) @@ -276,7 +276,7 @@ func (wp *WasmcloudProvider) NatsConnection() *nats.Conn { } func (wp *WasmcloudProvider) OutgoingRpcClient(target string) *wrpcnats.Client { - return wrpcnats.NewClient(wp.natsConnection, fmt.Sprintf("%s.%s", wp.hostData.LatticeRPCPrefix, target)) + return wrpcnats.NewClient(wp.natsConnection, wrpcnats.WithPrefix(fmt.Sprintf("%s.%s", wp.hostData.LatticeRPCPrefix, target))) } func (wp *WasmcloudProvider) Start() error {