Skip to content

Commit

Permalink
SQS-412 | Active Orders Query: SSE
Browse files Browse the repository at this point in the history
Implements SSE for Active Orders Query endpoint.
  • Loading branch information
deividaspetraitis committed Sep 20, 2024
1 parent 65036e2 commit c3cd2c4
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 5 deletions.
2 changes: 1 addition & 1 deletion app/sidecar_query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewSideCarQueryServer(appCodec codec.Codec, config domain.Config, logger lo

// HTTP handlers
poolsHttpDelivery.NewPoolsHandler(e, poolsUseCase)
passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase)
passthroughHttpDelivery.NewPassthroughHandler(e, passthroughUseCase, orderBookUseCase, logger)
systemhttpdelivery.NewSystemHandler(e, config, logger, chainInfoUseCase)
if err := tokenshttpdelivery.NewTokensHandler(e, *config.Pricing, tokensUseCase, pricingSimpleRouterUsecase, logger); err != nil {
return nil, err
Expand Down
97 changes: 97 additions & 0 deletions delivery/http/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package http

import (
"encoding/json"

Check failure on line 4 in delivery/http/event.go

View workflow job for this annotation

GitHub Actions / Run linter

File is not `gofmt`-ed with `-s` (gofmt)
"bytes"
"fmt"
"io"

"github.com/labstack/echo/v4"
)

// Event represents Server-Sent Event.
// SSE explanation: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
type Event struct {
// ID is used to set the EventSource object's last event ID value.
ID []byte
// Data field is for the message. When the EventSource receives multiple consecutive lines
// that begin with data:, it concatenates them, inserting a newline character between each one.
// Trailing newlines are removed.
Data []byte
// Event is a string identifying the type of event described. If this is specified, an event
// will be dispatched on the browser to the listener for the specified event name; the website
// source code should use addEventListener() to listen for named events. The onmessage handler
// is called if no event name is specified for a message.
Event []byte
// Retry is the reconnection time. If the connection to the server is lost, the browser will
// wait for the specified time before attempting to reconnect. This must be an integer, specifying
// the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.
Retry []byte
// Comment line can be used to prevent connections from timing out; a server can send a comment
// periodically to keep the connection alive.
Comment []byte
}

// MarshalTo marshals Event to given Writer
func (ev *Event) MarshalTo(w io.Writer) error {
// Marshalling part is taken from: https://github.com/r3labs/sse/blob/c6d5381ee3ca63828b321c16baa008fd6c0b4564/http.go#L16
if len(ev.Data) == 0 && len(ev.Comment) == 0 {
return nil
}

if len(ev.Data) > 0 {
if _, err := fmt.Fprintf(w, "id: %s\n", ev.ID); err != nil {
return err
}

sd := bytes.Split(ev.Data, []byte("\n"))
for i := range sd {
if _, err := fmt.Fprintf(w, "data: %s\n", sd[i]); err != nil {
return err
}
}

if len(ev.Event) > 0 {
if _, err := fmt.Fprintf(w, "event: %s\n", ev.Event); err != nil {
return err
}
}

if len(ev.Retry) > 0 {
if _, err := fmt.Fprintf(w, "retry: %s\n", ev.Retry); err != nil {
return err
}
}
}

if len(ev.Comment) > 0 {
if _, err := fmt.Fprintf(w, ": %s\n", ev.Comment); err != nil {
return err
}
}

if _, err := fmt.Fprint(w, "\n"); err != nil {
return err
}

return nil
}

func WriteEvent(w *echo.Response, data any) error {
b, err := json.Marshal(data)
if err != nil {
return err
}

event := Event{
Data: b,
}

if err := event.MarshalTo(w); err != nil {
return err
}

w.Flush()

return nil
}
14 changes: 11 additions & 3 deletions domain/mocks/orderbook_usecase_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ var _ mvc.OrderBookUsecase = &OrderbookUsecaseMock{}

// OrderbookUsecaseMock is a mock implementation of the RouterUsecase interface
type OrderbookUsecaseMock struct {
ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error
GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool)
GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)
ProcessPoolFunc func(ctx context.Context, pool sqsdomain.PoolI) error
GetAllTicksFunc func(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool)
GetActiveOrdersFunc func(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)
GetActiveOrdersStreamFunc func(ctx context.Context, address string) <-chan orderbookdomain.ActiveOrder
}

func (m *OrderbookUsecaseMock) ProcessPool(ctx context.Context, pool sqsdomain.PoolI) error {
Expand All @@ -37,3 +38,10 @@ func (m *OrderbookUsecaseMock) GetActiveOrders(ctx context.Context, address stri
}
panic("unimplemented")
}

func (m *OrderbookUsecaseMock) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.ActiveOrder {
if m.GetActiveOrdersStreamFunc != nil {
return m.GetActiveOrdersStreamFunc(ctx, address)
}
panic("unimplemented")
}
3 changes: 3 additions & 0 deletions domain/mvc/orderbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ type OrderBookUsecase interface {

// GetOrder returns all active orderbook orders for a given address.
GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error)

// GetActiveOrdersStream returns a channel for feching active orderbook orders for a given address.
GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.ActiveOrder

Check failure on line 21 in domain/mvc/orderbook.go

View workflow job for this annotation

GitHub Actions / Run linter

File is not `gofmt`-ed with `-s` (gofmt)
}
7 changes: 7 additions & 0 deletions domain/orderbook/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,10 @@ type LimitOrder struct {
BaseAsset Asset `json:"base_asset"`
PlacedTx *string `json:"placed_tx,omitempty"`
}


Check failure on line 97 in domain/orderbook/order.go

View workflow job for this annotation

GitHub Actions / Run linter

File is not `gofmt`-ed with `-s` (gofmt)
type ActiveOrder struct {
LimitOrders []LimitOrder // The channel on which the orders are delivered.
IsBestEffort bool
Error error
}
57 changes: 57 additions & 0 deletions orderbook/usecase/orderbook_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,63 @@ func (o *OrderbookUseCaseImpl) ProcessPool(ctx context.Context, pool sqsdomain.P
return nil
}

// GetActiveOrdersStream implements mvc.OrderBookUsecase.
func (o *OrderbookUseCaseImpl) GetActiveOrdersStream(ctx context.Context, address string) <-chan orderbookdomain.ActiveOrder {

Check failure on line 141 in orderbook/usecase/orderbook_usecase.go

View workflow job for this annotation

GitHub Actions / Run linter

File is not `gofmt`-ed with `-s` (gofmt)
// Fetch orders duration
d := 5 * time.Second

// Result channel
c := make(chan orderbookdomain.ActiveOrder, 50) // buffered channel to avoid blocking

// Function to fetch orders
fetchOrders := func(ctx context.Context) {
orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs()
if err != nil {
c <- orderbookdomain.ActiveOrder{
Error: types.FailedGetAllCanonicalOrderbookPoolIDsError{Err: err},
}
}

for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)
if len(limitOrders) == 0 && err == nil {
return // skip empty orders
}

select {
case c <- orderbookdomain.ActiveOrder{
IsBestEffort: isBestEffort,
LimitOrders: limitOrders,
Error: err,
}:
case <-ctx.Done():
return
}
}(orderbook)
}
}

// Fetch orders immediately on start
go fetchOrders(ctx)

// Pull orders periodically based on duration
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ticker.C:
fetchOrders(ctx)
case <-ctx.Done():
return
}
}
}()

return c
}

// GetActiveOrders implements mvc.OrderBookUsecase.
func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) {
orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs()
Expand Down
65 changes: 64 additions & 1 deletion passthrough/delivery/http/passthrough_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ import (
"github.com/osmosis-labs/sqs/domain"
"github.com/osmosis-labs/sqs/domain/mvc"
_ "github.com/osmosis-labs/sqs/domain/passthrough"
"github.com/osmosis-labs/sqs/log"
"github.com/osmosis-labs/sqs/orderbook/types"

"github.com/labstack/echo/v4"
"go.opentelemetry.io/otel/trace"

"go.uber.org/zap"
)

// PassthroughHandler is the http handler for passthrough use case
type PassthroughHandler struct {
PUsecase mvc.PassthroughUsecase
OUsecase mvc.OrderBookUsecase
Logger log.Logger
}

const resourcePrefix = "/passthrough"
Expand All @@ -26,14 +30,21 @@ func formatPassthroughResource(resource string) string {
}

// NewPassthroughHandler will initialize the pools/ resources endpoint
func NewPassthroughHandler(e *echo.Echo, ptu mvc.PassthroughUsecase, ou mvc.OrderBookUsecase) {
func NewPassthroughHandler(e *echo.Echo, ptu mvc.PassthroughUsecase, ou mvc.OrderBookUsecase, logger log.Logger) {
handler := &PassthroughHandler{
PUsecase: ptu,
OUsecase: ou,
Logger: logger,
}

e.GET(formatPassthroughResource("/portfolio-assets/:address"), handler.GetPortfolioAssetsByAddress)
e.GET(formatPassthroughResource("/active-orders"), handler.GetActiveOrders)
e.GET(formatPassthroughResource("/active-orders"), func(c echo.Context) error {
if c.QueryParam("sse") != "" {
return handler.GetActiveOrdersStream(c) // Server-Sent Events (SSE)
}
return handler.GetActiveOrders(c)
})
}

// @Summary Returns portfolio assets associated with the given address by category.
Expand Down Expand Up @@ -61,6 +72,58 @@ func (a *PassthroughHandler) GetPortfolioAssetsByAddress(c echo.Context) error {
return c.JSON(http.StatusOK, portfolioAssetsResult)
}

func (a *PassthroughHandler) GetActiveOrdersStream(c echo.Context) (err error) {
ctx := c.Request().Context()

span := trace.SpanFromContext(ctx)
defer func() {
if err != nil {
span.RecordError(err)
// nolint:errcheck // ignore error
c.JSON(domain.GetStatusCode(err), domain.ResponseError{Message: err.Error()})
}

// Note: we do not end the span here as it is ended in the middleware.
}()

var req types.GetActiveOrdersRequest
if err := deliveryhttp.UnmarshalRequest(c, &req); err != nil {
return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()})
}

// Validate the request
if err := req.Validate(); err != nil {
return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()})
}

w := c.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

ch := a.OUsecase.GetActiveOrdersStream(ctx, req.UserOsmoAddress)

for {
select {
case <-c.Request().Context().Done():
return c.NoContent(http.StatusOK)
case orders, ok := <-ch:
if !ok {
return c.NoContent(http.StatusOK)
}

if orders.Error != nil {
a.Logger.Error("GET "+c.Request().URL.String(), zap.Error(orders.Error))
}

deliveryhttp.WriteEvent(

Check failure on line 119 in passthrough/delivery/http/passthrough_handler.go

View workflow job for this annotation

GitHub Actions / Run linter

Error return value of `deliveryhttp.WriteEvent` is not checked (errcheck)
w,
types.NewGetAllOrderResponse(orders.LimitOrders, orders.IsBestEffort),
)
}
}
}

// @Summary Returns all active orderbook orders associated with the given address.
// @Description The returned data represents all active orders for all orderbooks available for the specified address.
//
Expand Down
Loading

0 comments on commit c3cd2c4

Please sign in to comment.