Skip to content

Commit

Permalink
feat(SPV-848) notifications (#246)
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-4chain authored Jul 12, 2024
1 parent 5305b41 commit 44c5921
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 2 deletions.
5 changes: 5 additions & 0 deletions examples/Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ tasks:
cmds:
- echo "running generate_totp..."
- go run ./generate_totp/generate_totp.go
webhooks:
desc: "running webhooks..."
cmds:
- echo "running webhooks..."
- go run ./webhooks/webhooks.go
2 changes: 1 addition & 1 deletion examples/go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions examples/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 77 additions & 0 deletions examples/webhooks/webhooks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Package main - send_op_return example
*/
package main

import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

walletclient "github.com/bitcoin-sv/spv-wallet-go-client"
"github.com/bitcoin-sv/spv-wallet-go-client/examples"
"github.com/bitcoin-sv/spv-wallet-go-client/notifications"
"github.com/bitcoin-sv/spv-wallet/models"
)

func main() {
defer examples.HandlePanic()

examples.CheckIfAdminKeyExists()

client := walletclient.NewWithAdminKey("http://localhost:3003/v1", examples.ExampleAdminKey)
wh := notifications.NewWebhook(
client,
"http://localhost:5005/notification",
notifications.WithToken("Authorization", "this-is-the-token"),
notifications.WithProcessors(3),
)
err := wh.Subscribe(context.Background())
if err != nil {
panic(err)
}

http.Handle("/notification", wh.HTTPHandler())

if err = notifications.RegisterHandler(wh, func(gpe *models.StringEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-string: %s\n", gpe.Value)
}); err != nil {
panic(err)
}

if err = notifications.RegisterHandler(wh, func(gpe *models.TransactionEvent) {
time.Sleep(50 * time.Millisecond) // simulate processing time
fmt.Printf("Processing event-transaction: XPubID: %s, TxID: %s, Status: %s\n", gpe.XPubID, gpe.TransactionID, gpe.Status)
}); err != nil {
panic(err)
}

server := http.Server{
Addr: ":5005",
Handler: nil,
ReadHeaderTimeout: time.Second * 10,
}
go func() {
_ = server.ListenAndServe()
}()

// wait for signal to shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan

fmt.Printf("Unsubscribing...\n")
if err = wh.Unsubscribe(context.Background()); err != nil {
panic(err)
}

fmt.Printf("Shutting down...\n")
if err = server.Shutdown(context.Background()); err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion go.mod

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,3 +1132,31 @@ func (wc *WalletClient) SendToRecipients(ctx context.Context, recipients []*Reci

return wc.RecordTransaction(ctx, hex, draft.ID, metadata)
}

// AdminSubscribeWebhook subscribes to a webhook to receive notifications from spv-wallet
func (wc *WalletClient) AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error {
requestModel := models.SubscribeRequestBody{
URL: webhookURL,
TokenHeader: tokenHeader,
TokenValue: tokenValue,
}
rawJSON, err := json.Marshal(requestModel)
if err != nil {
return WrapError(err)
}
err = wc.doHTTPRequest(ctx, http.MethodPost, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
return WrapError(err)
}

// AdminUnsubscribeWebhook unsubscribes from a webhook
func (wc *WalletClient) AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error {
requestModel := models.UnsubscribeRequestBody{
URL: webhookURL,
}
rawJSON, err := json.Marshal(requestModel)
if err != nil {
return WrapError(err)
}
err = wc.doHTTPRequest(ctx, http.MethodDelete, "/admin/webhooks/subscriptions", rawJSON, wc.adminXPriv, true, nil)
return err
}
25 changes: 25 additions & 0 deletions notifications/eventsMap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package notifications

import "sync"

type eventsMap struct {
registered *sync.Map
}

func newEventsMap() *eventsMap {
return &eventsMap{
registered: &sync.Map{},
}
}

func (em *eventsMap) store(name string, handler *eventHandler) {
em.registered.Store(name, handler)
}

func (em *eventsMap) load(name string) (*eventHandler, bool) {
h, ok := em.registered.Load(name)
if !ok {
return nil, false
}
return h.(*eventHandler), true
}
9 changes: 9 additions & 0 deletions notifications/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package notifications

import "context"

// WebhookSubscriber - interface for subscribing and unsubscribing to webhooks
type WebhookSubscriber interface {
AdminSubscribeWebhook(ctx context.Context, webhookURL, tokenHeader, tokenValue string) error
AdminUnsubscribeWebhook(ctx context.Context, webhookURL string) error
}
58 changes: 58 additions & 0 deletions notifications/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package notifications

import (
"context"
"runtime"
)

// WebhookOptions - options for the webhook
type WebhookOptions struct {
TokenHeader string
TokenValue string
BufferSize int
RootContext context.Context
Processors int
}

// NewWebhookOptions - creates a new webhook options
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
TokenHeader: "",
TokenValue: "",
BufferSize: 100,
Processors: runtime.NumCPU(),
RootContext: context.Background(),
}
}

// WebhookOpts - functional options for the webhook
type WebhookOpts = func(*WebhookOptions)

// WithToken - sets the token header and value
func WithToken(tokenHeader, tokenValue string) WebhookOpts {
return func(w *WebhookOptions) {
w.TokenHeader = tokenHeader
w.TokenValue = tokenValue
}
}

// WithBufferSize - sets the buffer size
func WithBufferSize(size int) WebhookOpts {
return func(w *WebhookOptions) {
w.BufferSize = size
}
}

// WithRootContext - sets the root context
func WithRootContext(ctx context.Context) WebhookOpts {
return func(w *WebhookOptions) {
w.RootContext = ctx
}
}

// WithProcessors - sets the number of concurrent loops which will process the events
func WithProcessors(count int) WebhookOpts {
return func(w *WebhookOptions) {
w.Processors = count
}
}
27 changes: 27 additions & 0 deletions notifications/registerer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package notifications

import (
"reflect"

"github.com/bitcoin-sv/spv-wallet/models"
)

type eventHandler struct {
Caller reflect.Value
ModelType reflect.Type
}

// RegisterHandler - registers a handler for a specific event type
func RegisterHandler[EventType models.Events](nd *Webhook, handlerFunction func(event *EventType)) error {
handlerValue := reflect.ValueOf(handlerFunction)

modelType := handlerValue.Type().In(0).Elem()
name := modelType.Name()

nd.handlers.store(name, &eventHandler{
Caller: handlerValue,
ModelType: modelType,
})

return nil
}
100 changes: 100 additions & 0 deletions notifications/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package notifications

import (
"context"
"encoding/json"
"net/http"
"reflect"
"time"

"github.com/bitcoin-sv/spv-wallet/models"
)

// Webhook - the webhook event receiver
type Webhook struct {
URL string
options *WebhookOptions
buffer chan *models.RawEvent
subscriber WebhookSubscriber
handlers *eventsMap
}

// NewWebhook - creates a new webhook
func NewWebhook(subscriber WebhookSubscriber, url string, opts ...WebhookOpts) *Webhook {
options := NewWebhookOptions()
for _, opt := range opts {
opt(options)
}

wh := &Webhook{
URL: url,
options: options,
buffer: make(chan *models.RawEvent, options.BufferSize),
subscriber: subscriber,
handlers: newEventsMap(),
}
for i := 0; i < options.Processors; i++ {
go wh.process()
}
return wh
}

// Subscribe - sends a subscription request to the spv-wallet
func (w *Webhook) Subscribe(ctx context.Context) error {
return w.subscriber.AdminSubscribeWebhook(ctx, w.URL, w.options.TokenHeader, w.options.TokenValue)
}

// Unsubscribe - sends an unsubscription request to the spv-wallet
func (w *Webhook) Unsubscribe(ctx context.Context) error {
return w.subscriber.AdminUnsubscribeWebhook(ctx, w.URL)
}

// HTTPHandler - returns an http handler for the webhook; it should be registered with the http server
func (w *Webhook) HTTPHandler() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
if w.options.TokenHeader != "" && r.Header.Get(w.options.TokenHeader) != w.options.TokenValue {
http.Error(rw, "Unauthorized", http.StatusUnauthorized)
return
}
var events []*models.RawEvent
if err := json.NewDecoder(r.Body).Decode(&events); err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
return
}

for _, event := range events {
select {
case w.buffer <- event:
// event sent
case <-r.Context().Done():
// request context canceled
return
case <-w.options.RootContext.Done():
// root context canceled - the whole event processing has been stopped
return
case <-time.After(1 * time.Second):
// timeout, most probably the channel is full
}
}
rw.WriteHeader(http.StatusOK)
})
}

func (w *Webhook) process() {
for {
select {
case event := <-w.buffer:
handler, ok := w.handlers.load(event.Type)
if !ok {
continue
}
model := reflect.New(handler.ModelType).Interface()
if err := json.Unmarshal(event.Content, model); err != nil {
continue
}
handler.Caller.Call([]reflect.Value{reflect.ValueOf(model)})
case <-w.options.RootContext.Done():
return
}
}
}

0 comments on commit 44c5921

Please sign in to comment.