diff --git a/client/duneapi/client.go b/client/duneapi/client.go index da1e611..5d88a89 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -69,7 +69,6 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line } // SendBlock sends a block to DuneAPI -// TODO: support batching multiple blocks in a single request func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error { buffer := c.bufPool.Get().(*bytes.Buffer) defer c.bufPool.Put(buffer) @@ -87,12 +86,14 @@ func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (Bl if c.cfg.DisableCompression { request.Payload = payload.Payload } else { + // not thread safe, multiple calls to the compressor here buffer.Reset() c.compressor.Reset(buffer) _, err := c.compressor.Write(payload.Payload) if err != nil { return request, err } + c.compressor.Close() request.ContentEncoding = "application/zstd" request.Payload = buffer.Bytes() } @@ -146,7 +147,9 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques return err } defer resp.Body.Close() - responseStatus = resp.Status + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status) + } err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { return err