From 55e64f9684df86841684348518104c4187602894 Mon Sep 17 00:00:00 2001 From: lohuza Date: Tue, 27 Dec 2022 04:50:43 +0400 Subject: [PATCH 1/6] added nats --- go.mod | 14 +++- go.sum | 30 ++++++-- nats/common.go | 25 +++++++ nats/handler.go | 183 ++++++++++++++++++++++++++++++++++++++++++++++ nats/publisher.go | 143 ++++++++++++++++++++++++++++++++++++ 5 files changed, 386 insertions(+), 9 deletions(-) create mode 100644 nats/common.go create mode 100644 nats/handler.go create mode 100644 nats/publisher.go diff --git a/go.mod b/go.mod index d151a0b..33a7ba1 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/gomarkdown/markdown v0.0.0-20220830015526-01a3c37d6f50 github.com/hashicorp/vault/api v1.7.2 github.com/iancoleman/strcase v0.2.0 + github.com/nats-io/nats.go v1.22.1 github.com/openware/pkg/ika v0.1.1 github.com/openware/pkg/mngapi v0.1.1 github.com/stretchr/testify v1.8.0 @@ -66,10 +67,12 @@ require ( github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.15.11 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/mattn/go-colorable v0.1.6 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/mattn/go-sqlite3 v1.14.12 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-testing-interface v1.0.0 // indirect @@ -77,6 +80,10 @@ require ( github.com/mitchellh/reflectwalk v1.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect + github.com/nats-io/nats-server/v2 v2.9.10 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/run v1.0.0 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.19.0 // indirect @@ -88,11 +95,12 @@ require ( github.com/ugorji/go/codec v1.2.7 // indirect github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect go.uber.org/atomic v1.9.0 // indirect - golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd // indirect + go.uber.org/automaxprocs v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect golang.org/x/text v0.3.7 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0 // indirect google.golang.org/grpc v1.41.0 // indirect google.golang.org/protobuf v1.28.0 // indirect diff --git a/go.sum b/go.sum index 62f1efd..5ed09f8 100644 --- a/go.sum +++ b/go.sum @@ -242,6 +242,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -277,6 +279,8 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ= github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= @@ -299,6 +303,16 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.10 h1:LMC46Oi9E6BUx/xBsaCVZgofliAqKQzRPU6eKWkN8jE= +github.com/nats-io/nats-server/v2 v2.9.10/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -390,6 +404,8 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -405,11 +421,12 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd h1:XcWmESyNjXJMLahc3mqVQJcgSTDxFxhETVlfk9uGc38= -golang.org/x/crypto v0.0.0-20220315160706-3147a52a75dd/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -450,6 +467,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -474,8 +492,8 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -486,8 +504,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/nats/common.go b/nats/common.go new file mode 100644 index 0000000..d376aa7 --- /dev/null +++ b/nats/common.go @@ -0,0 +1,25 @@ +package nats + +import ( + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" +) + +func InitNats(connectionString string) (*nats.Conn, error) { + nc, err := nats.Connect(connectionString) + + return nc, err +} + +func InitEmbededNats() (*nats.Conn, error) { + opts := &server.Options{} + ns, err := server.NewServer(opts) + if err != nil { + panic("failed to initialize nats mock server") + } + + ns.Start() + nc, err := nats.Connect(ns.ClientURL()) + + return nc, err +} diff --git a/nats/handler.go b/nats/handler.go new file mode 100644 index 0000000..2cff5b2 --- /dev/null +++ b/nats/handler.go @@ -0,0 +1,183 @@ +package nats + +import ( + "os" + "sync" + + "github.com/nats-io/nats.go" +) + +type EventHandler interface { + SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error + SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error + Subscribe(subject string, cb nats.MsgHandler) error +} + +type eventHandlerBase struct { + subs []*nats.Subscription + mutex sync.Mutex + termination <-chan os.Signal +} + +type handlerConfig struct { + autoUnsubscribeOnShutdown bool +} + +type natsEventHandler struct { + eventHandlerBase + nc *nats.Conn + config *handlerConfig +} + +type jsEventHandler struct { + eventHandlerBase + js nats.JetStreamContext + config *handlerConfig +} + +// For Checking compatibility +var ( + _ EventHandler = (*natsEventHandler)(nil) + _ EventHandler = (*jsEventHandler)(nil) +) + +func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { + return eventHandlerBase{ + termination: termination, + subs: make([]*nats.Subscription, 0), + mutex: sync.Mutex{}, + } +} + +func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *natsEventHandler { + if config == nil { + config = NewHandlerDefaultConfig() + } + + handler := natsEventHandler{ + nc: nc, + eventHandlerBase: newHandlerBase(termination), + config: config, + } + + go handler.handleShutdown(config.autoUnsubscribeOnShutdown) + + return &handler +} + +func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*jsEventHandler, error) { + if config == nil { + config = NewHandlerDefaultConfig() + } + + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + handler := &jsEventHandler{ + js: js, + eventHandlerBase: newHandlerBase(termination), + config: config, + } + + go handler.handleShutdown(config.autoUnsubscribeOnShutdown) + + return handler, nil +} + +func NewHandlerDefaultConfig() *handlerConfig { + return &handlerConfig{ + autoUnsubscribeOnShutdown: true, + } +} + +func (h *eventHandlerBase) handleShutdown(unsubOnShutdown bool) []error { + if unsubOnShutdown { + for _ = range h.termination { + var errors []error + for _, sub := range h.subs { + err := sub.Unsubscribe() + if err != nil { + errors = append(errors, err) + } + } + + return errors + } + } + + return nil +} + +func (h *eventHandlerBase) GetSubscriptions() []*nats.Subscription { + return h.subs +} + +func (h *eventHandlerBase) pushSub(sub *nats.Subscription) { + h.mutex.Lock() + defer h.mutex.Unlock() + + h.subs = append(h.subs, sub) +} + +func (j *jsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { + sub, err := j.js.ChanQueueSubscribe(subject, group, msgChannel, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +func (j *jsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { + sub, err := j.js.QueueSubscribe(subject, group, cb, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +func (j *jsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { + sub, err := j.js.Subscribe(subject, cb, nats.AckExplicit()) + if err != nil { + return err + } + + j.pushSub(sub) + return nil +} + +func (n *natsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { + sub, err := n.nc.ChanQueueSubscribe(subject, group, msgChannel) + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} + +func (n *natsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { + sub, err := n.nc.QueueSubscribe(subject, group, cb) + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} + +func (n *natsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { + sub, err := n.nc.Subscribe(subject, cb) + + if err != nil { + return err + } + + n.pushSub(sub) + return nil +} diff --git a/nats/publisher.go b/nats/publisher.go new file mode 100644 index 0000000..ace0767 --- /dev/null +++ b/nats/publisher.go @@ -0,0 +1,143 @@ +package nats + +import ( + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +type eventPublisherBase interface { + Request(subj string, data []byte) (*nats.Msg, error) + RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) + Publish(topic string, payload []byte) error + publishMultiple(topics []string, payload []byte) []error +} + +type EventPublisher interface { + eventPublisherBase +} + +type JsEventPublisher interface { + eventPublisherBase + CreateNewEventStream(string, []string) error + DeleteEventStream(streamName string) error +} + +type publisherBase struct { + nc *nats.Conn +} + +var _ eventPublisherBase = (*publisherBase)(nil) + +type natsEventPublisher struct { + publisherBase +} + +var _ EventPublisher = (*natsEventPublisher)(nil) + +type jsEventPublisher struct { + publisherBase + js nats.JetStreamContext +} + +var _ JsEventPublisher = (*jsEventPublisher)(nil) + +func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher { + dispatcher := natsEventPublisher{ + publisherBase{nc: nc}, + } + + return &dispatcher +} + +func NewJsEventPublisher(nc *nats.Conn) (*jsEventPublisher, error) { + js, err := nc.JetStream() + if err != nil { + return nil, err + } + + dispatcher := jsEventPublisher{ + publisherBase: publisherBase{nc}, + js: js, + } + + return &dispatcher, nil +} + +func (p *publisherBase) Request(subject string, data []byte) (*nats.Msg, error) { + // TODO: maybe modify to something else + return p.RequestWithTimeout(subject, data, time.Second*3) +} + +func (p *publisherBase) RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) { + return p.nc.Request(subject, data, timeout) +} + +func (p *publisherBase) Publish(topic string, payload []byte) error { + return p.nc.Publish(topic, payload) +} + +func (p *publisherBase) publishMultiple(topics []string, payload []byte) []error { + wg := sync.WaitGroup{} + var errors []error + for _, topic := range topics { + wg.Add(1) + go func() { + defer wg.Done() + err := p.Publish(topic, payload) + if err != nil { + errors = append(errors, err) + } + }() + } + wg.Wait() + + return errors +} + +func (j *jsEventPublisher) CreateNewEventStream(streamName string, subjects []string) error { + stream, err := j.js.StreamInfo(streamName) + if err != nil && err != nats.ErrStreamNotFound { + return err + } + + if stream == nil { + _, err := j.js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: subjects, + }) + if err != nil { + return err + } + } + + return nil +} + +func (j *jsEventPublisher) DeleteEventStream(streamName string) error { + return j.js.DeleteStream(streamName) +} + +func (j *jsEventPublisher) Publish(topic string, payload []byte) error { + _, err := j.js.Publish(topic, payload) + return err +} + +func (j *jsEventPublisher) publishMultiple(topics []string, payload []byte) []error { + wg := sync.WaitGroup{} + var errors []error + for _, topic := range topics { + wg.Add(1) + go func() { + defer wg.Done() + _, err := j.js.Publish(topic, payload) + if err != nil { + errors = append(errors, err) + } + }() + } + wg.Wait() + + return errors +} From adf3b5442d5a70b0b3e1638123637d41ad610691 Mon Sep 17 00:00:00 2001 From: lohuza Date: Wed, 28 Dec 2022 04:14:43 +0400 Subject: [PATCH 2/6] nats testing and separate mod --- .idea/.gitignore | 8 +++++ nats/go.mod | 22 ++++++++++++++ nats/go.sum | 47 ++++++++++++++++++++++++++++++ nats/handler.go | 28 +++++++++--------- nats/nats_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 165 insertions(+), 14 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 nats/go.mod create mode 100644 nats/go.sum create mode 100644 nats/nats_test.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/nats/go.mod b/nats/go.mod new file mode 100644 index 0000000..806de52 --- /dev/null +++ b/nats/go.mod @@ -0,0 +1,22 @@ +module github.com/openware/pkg/nats + +go 1.18 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/klauspost/compress v1.15.11 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.3.0 // indirect + github.com/nats-io/nats-server/v2 v2.9.10 // indirect + github.com/nats-io/nats.go v1.22.1 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/testify v1.8.1 // indirect + go.uber.org/automaxprocs v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect + golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect + golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/nats/go.sum b/nats/go.sum new file mode 100644 index 0000000..dcb1557 --- /dev/null +++ b/nats/go.sum @@ -0,0 +1,47 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.10 h1:LMC46Oi9E6BUx/xBsaCVZgofliAqKQzRPU6eKWkN8jE= +github.com/nats-io/nats-server/v2 v2.9.10/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g= +github.com/nats-io/nats.go v1.22.1 h1:XzfqDspY0RNufzdrB8c4hFR+R3dahkxlpWe5+IWJzbE= +github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/nats/handler.go b/nats/handler.go index 2cff5b2..0389b31 100644 --- a/nats/handler.go +++ b/nats/handler.go @@ -23,13 +23,13 @@ type handlerConfig struct { autoUnsubscribeOnShutdown bool } -type natsEventHandler struct { +type NatsEventHandler struct { eventHandlerBase nc *nats.Conn config *handlerConfig } -type jsEventHandler struct { +type JsEventHandler struct { eventHandlerBase js nats.JetStreamContext config *handlerConfig @@ -37,8 +37,8 @@ type jsEventHandler struct { // For Checking compatibility var ( - _ EventHandler = (*natsEventHandler)(nil) - _ EventHandler = (*jsEventHandler)(nil) + _ EventHandler = (*NatsEventHandler)(nil) + _ EventHandler = (*JsEventHandler)(nil) ) func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { @@ -49,12 +49,12 @@ func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { } } -func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *natsEventHandler { +func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *NatsEventHandler { if config == nil { config = NewHandlerDefaultConfig() } - handler := natsEventHandler{ + handler := NatsEventHandler{ nc: nc, eventHandlerBase: newHandlerBase(termination), config: config, @@ -65,7 +65,7 @@ func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handler return &handler } -func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*jsEventHandler, error) { +func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*JsEventHandler, error) { if config == nil { config = NewHandlerDefaultConfig() } @@ -75,7 +75,7 @@ func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerCo return nil, err } - handler := &jsEventHandler{ + handler := &JsEventHandler{ js: js, eventHandlerBase: newHandlerBase(termination), config: config, @@ -121,7 +121,7 @@ func (h *eventHandlerBase) pushSub(sub *nats.Subscription) { h.subs = append(h.subs, sub) } -func (j *jsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { +func (j *JsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { sub, err := j.js.ChanQueueSubscribe(subject, group, msgChannel, nats.AckExplicit()) if err != nil { return err @@ -131,7 +131,7 @@ func (j *jsEventHandler) SubscribeToQueueUsingChannel(subject string, group stri return nil } -func (j *jsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { +func (j *JsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { sub, err := j.js.QueueSubscribe(subject, group, cb, nats.AckExplicit()) if err != nil { return err @@ -141,7 +141,7 @@ func (j *jsEventHandler) SubscribeToQueue(subject string, group string, cb nats. return nil } -func (j *jsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { +func (j *JsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { sub, err := j.js.Subscribe(subject, cb, nats.AckExplicit()) if err != nil { return err @@ -151,7 +151,7 @@ func (j *jsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { return nil } -func (n *natsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { +func (n *NatsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { sub, err := n.nc.ChanQueueSubscribe(subject, group, msgChannel) if err != nil { return err @@ -161,7 +161,7 @@ func (n *natsEventHandler) SubscribeToQueueUsingChannel(subject string, group st return nil } -func (n *natsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { +func (n *NatsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { sub, err := n.nc.QueueSubscribe(subject, group, cb) if err != nil { return err @@ -171,7 +171,7 @@ func (n *natsEventHandler) SubscribeToQueue(subject string, group string, cb nat return nil } -func (n *natsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { +func (n *NatsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { sub, err := n.nc.Subscribe(subject, cb) if err != nil { diff --git a/nats/nats_test.go b/nats/nats_test.go new file mode 100644 index 0000000..36e4432 --- /dev/null +++ b/nats/nats_test.go @@ -0,0 +1,74 @@ +package nats + +import ( + "sync" + "testing" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +func initNatsHandlers() (*natsEventPublisher, *NatsEventHandler) { + nc, _ := InitEmbededNats() + publisher := NewNatsEventPublisher(nc) + handler := NewNatsHandler(nc, nil, nil) + + return publisher, handler +} + +func TestNatsPubSub(t *testing.T) { + pub, handler := initNatsHandlers() + handler.Subscribe("test.*", func(msg *nats.Msg) { + res := string(msg.Data) + assert.Equal(t, "test", res) + }) + + handler.Subscribe("test.*", func(msg *nats.Msg) { + res := string(msg.Data) + assert.Equal(t, "test", res) + }) + + pub.Publish("test.all", []byte("test")) +} + +func TestNatsQueue(t *testing.T) { + pub, handler := initNatsHandlers() + wg := sync.WaitGroup{} + subCount := 0 + handler.SubscribeToQueue("test.*", "grp", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + handler.SubscribeToQueue("test.*", "grp", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + handler.SubscribeToQueue("test.*", "grp2", func(msg *nats.Msg) { + subCount++ + wg.Done() + }) + + pub.Publish("test.all", []byte("test")) + wg.Add(2) + wg.Wait() + assert.Equal(t, 2, subCount) +} + +func TestNatsChannelQueue(t *testing.T) { + pub, handler := initNatsHandlers() + msgChan := make(chan *nats.Msg) + wg := sync.WaitGroup{} + go func() { + for msg := range msgChan { + assert.Equal(t, "test", string(msg.Data)) + wg.Done() + } + }() + + handler.SubscribeToQueueUsingChannel("test.*", "grp", msgChan) + pub.Publish("test.topic", []byte("test")) + wg.Add(1) + wg.Wait() +} From df00b9b78564eb4f3d84e180112419796ff77cc5 Mon Sep 17 00:00:00 2001 From: lohuza Date: Thu, 29 Dec 2022 12:59:22 +0400 Subject: [PATCH 3/6] make publish multiple public --- nats/publisher.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nats/publisher.go b/nats/publisher.go index ace0767..06647ea 100644 --- a/nats/publisher.go +++ b/nats/publisher.go @@ -11,7 +11,7 @@ type eventPublisherBase interface { Request(subj string, data []byte) (*nats.Msg, error) RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) Publish(topic string, payload []byte) error - publishMultiple(topics []string, payload []byte) []error + PublishMultiple(topics []string, payload []byte) []error } type EventPublisher interface { @@ -78,7 +78,7 @@ func (p *publisherBase) Publish(topic string, payload []byte) error { return p.nc.Publish(topic, payload) } -func (p *publisherBase) publishMultiple(topics []string, payload []byte) []error { +func (p *publisherBase) PublishMultiple(topics []string, payload []byte) []error { wg := sync.WaitGroup{} var errors []error for _, topic := range topics { @@ -124,7 +124,7 @@ func (j *jsEventPublisher) Publish(topic string, payload []byte) error { return err } -func (j *jsEventPublisher) publishMultiple(topics []string, payload []byte) []error { +func (j *jsEventPublisher) PublishMultiple(topics []string, payload []byte) []error { wg := sync.WaitGroup{} var errors []error for _, topic := range topics { From cd20424c2f8c6c6279f629f5438041a9cac5e77d Mon Sep 17 00:00:00 2001 From: lohuza Date: Mon, 9 Jan 2023 04:46:39 +0400 Subject: [PATCH 4/6] include godoc --- nats/README.md | 4 ++++ nats/common.go | 3 +++ nats/handler.go | 14 ++++++++++++++ nats/publisher.go | 13 +++++++++++++ 4 files changed, 34 insertions(+) create mode 100644 nats/README.md diff --git a/nats/README.md b/nats/README.md new file mode 100644 index 0000000..890c5c5 --- /dev/null +++ b/nats/README.md @@ -0,0 +1,4 @@ +# Openware Nats + +openware common nats package. + diff --git a/nats/common.go b/nats/common.go index d376aa7..6f78c35 100644 --- a/nats/common.go +++ b/nats/common.go @@ -1,3 +1,4 @@ +// Package nats is used for handling nats (or nats jetstream) pub/sub package nats import ( @@ -5,12 +6,14 @@ import ( "github.com/nats-io/nats.go" ) +// InitNats initialize nats using connectionSting func InitNats(connectionString string) (*nats.Conn, error) { nc, err := nats.Connect(connectionString) return nc, err } +// InitEmbededNats initialize nats in memory func InitEmbededNats() (*nats.Conn, error) { opts := &server.Options{} ns, err := server.NewServer(opts) diff --git a/nats/handler.go b/nats/handler.go index 0389b31..67bca08 100644 --- a/nats/handler.go +++ b/nats/handler.go @@ -7,6 +7,7 @@ import ( "github.com/nats-io/nats.go" ) +// EventHandler handles subscribing and queue subscribing on nats and jetstream type EventHandler interface { SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error @@ -19,16 +20,19 @@ type eventHandlerBase struct { termination <-chan os.Signal } +// handlerConfig config for nats and jetstream type handlerConfig struct { autoUnsubscribeOnShutdown bool } +// NatsEventHandler nats event handler structure which implements EventHandler interface type NatsEventHandler struct { eventHandlerBase nc *nats.Conn config *handlerConfig } +// JsEventHandler jetstream event handler structure which implements EventHandler interface type JsEventHandler struct { eventHandlerBase js nats.JetStreamContext @@ -49,6 +53,7 @@ func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { } } +// NewNatsHandler initializes new nats handler. func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) *NatsEventHandler { if config == nil { config = NewHandlerDefaultConfig() @@ -65,6 +70,7 @@ func NewNatsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handler return &handler } +// NewJsHandler initializes new jetstream handler. func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerConfig) (*JsEventHandler, error) { if config == nil { config = NewHandlerDefaultConfig() @@ -86,6 +92,7 @@ func NewJsHandler(nc *nats.Conn, termination <-chan os.Signal, config *handlerCo return handler, nil } +// NewHandlerDefaultConfig initialize default config for event handlers func NewHandlerDefaultConfig() *handlerConfig { return &handlerConfig{ autoUnsubscribeOnShutdown: true, @@ -110,6 +117,7 @@ func (h *eventHandlerBase) handleShutdown(unsubOnShutdown bool) []error { return nil } +// GetSubscriptions get list of subscriptions func (h *eventHandlerBase) GetSubscriptions() []*nats.Subscription { return h.subs } @@ -121,6 +129,7 @@ func (h *eventHandlerBase) pushSub(sub *nats.Subscription) { h.subs = append(h.subs, sub) } +// SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel func (j *JsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { sub, err := j.js.ChanQueueSubscribe(subject, group, msgChannel, nats.AckExplicit()) if err != nil { @@ -131,6 +140,7 @@ func (j *JsEventHandler) SubscribeToQueueUsingChannel(subject string, group stri return nil } +// SubscribeToQueue subscribe to queue using a callback. func (j *JsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { sub, err := j.js.QueueSubscribe(subject, group, cb, nats.AckExplicit()) if err != nil { @@ -141,6 +151,7 @@ func (j *JsEventHandler) SubscribeToQueue(subject string, group string, cb nats. return nil } +// Subscribe subscribe using a callback. func (j *JsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { sub, err := j.js.Subscribe(subject, cb, nats.AckExplicit()) if err != nil { @@ -151,6 +162,7 @@ func (j *JsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { return nil } +// SubscribeToQueueUsingChannel subscribe to queue with channel. you'll receive all the new events into the channel func (n *NatsEventHandler) SubscribeToQueueUsingChannel(subject string, group string, msgChannel chan *nats.Msg) error { sub, err := n.nc.ChanQueueSubscribe(subject, group, msgChannel) if err != nil { @@ -161,6 +173,7 @@ func (n *NatsEventHandler) SubscribeToQueueUsingChannel(subject string, group st return nil } +// SubscribeToQueue subscribe to queue using a callback. func (n *NatsEventHandler) SubscribeToQueue(subject string, group string, cb nats.MsgHandler) error { sub, err := n.nc.QueueSubscribe(subject, group, cb) if err != nil { @@ -171,6 +184,7 @@ func (n *NatsEventHandler) SubscribeToQueue(subject string, group string, cb nat return nil } +// Subscribe subscribe using a callback. func (n *NatsEventHandler) Subscribe(subject string, cb nats.MsgHandler) error { sub, err := n.nc.Subscribe(subject, cb) diff --git a/nats/publisher.go b/nats/publisher.go index 06647ea..5eaac47 100644 --- a/nats/publisher.go +++ b/nats/publisher.go @@ -14,16 +14,19 @@ type eventPublisherBase interface { PublishMultiple(topics []string, payload []byte) []error } +// EventPublisher nats event publisher interface type EventPublisher interface { eventPublisherBase } +// JsEventPublisher jetstream event publisher and event stream manager type JsEventPublisher interface { eventPublisherBase CreateNewEventStream(string, []string) error DeleteEventStream(streamName string) error } +// publisherBase Base publisher stuct. it has base implementation for publishing events type publisherBase struct { nc *nats.Conn } @@ -43,6 +46,7 @@ type jsEventPublisher struct { var _ JsEventPublisher = (*jsEventPublisher)(nil) +// NewNatsEventPublisher initialize new nats event publisher func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher { dispatcher := natsEventPublisher{ publisherBase{nc: nc}, @@ -51,6 +55,7 @@ func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher { return &dispatcher } +// NewJsEventPublisher initialize new jetstream event publisher func NewJsEventPublisher(nc *nats.Conn) (*jsEventPublisher, error) { js, err := nc.JetStream() if err != nil { @@ -65,19 +70,23 @@ func NewJsEventPublisher(nc *nats.Conn) (*jsEventPublisher, error) { return &dispatcher, nil } +// Request make a request request to specific subject. (default timeout is set on 3 seconds) func (p *publisherBase) Request(subject string, data []byte) (*nats.Msg, error) { // TODO: maybe modify to something else return p.RequestWithTimeout(subject, data, time.Second*3) } +// RequestWithTimeout make a request to specific subject and specify timeout func (p *publisherBase) RequestWithTimeout(subject string, data []byte, timeout time.Duration) (*nats.Msg, error) { return p.nc.Request(subject, data, timeout) } +// Publish publish an event for specific topic func (p *publisherBase) Publish(topic string, payload []byte) error { return p.nc.Publish(topic, payload) } +// PublishMultiple publish an event for multiple payload. because each publish may result with error we return error array func (p *publisherBase) PublishMultiple(topics []string, payload []byte) []error { wg := sync.WaitGroup{} var errors []error @@ -96,6 +105,7 @@ func (p *publisherBase) PublishMultiple(topics []string, payload []byte) []error return errors } +// CreateNewEventStream create stream for specific subject for jetstream. if stream already exists nothing happens. func (j *jsEventPublisher) CreateNewEventStream(streamName string, subjects []string) error { stream, err := j.js.StreamInfo(streamName) if err != nil && err != nats.ErrStreamNotFound { @@ -115,15 +125,18 @@ func (j *jsEventPublisher) CreateNewEventStream(streamName string, subjects []st return nil } +// DeleteEventStream delete a stream in jetstream func (j *jsEventPublisher) DeleteEventStream(streamName string) error { return j.js.DeleteStream(streamName) } +// Publish publish a new event func (j *jsEventPublisher) Publish(topic string, payload []byte) error { _, err := j.js.Publish(topic, payload) return err } +// PublishMultiple publish an event for multiple payload. because each publish may result with error we return error array func (j *jsEventPublisher) PublishMultiple(topics []string, payload []byte) []error { wg := sync.WaitGroup{} var errors []error From 5a9590c5fb3a37b5166f00d4f4c767cfc62de81c Mon Sep 17 00:00:00 2001 From: lohuza Date: Mon, 9 Jan 2023 05:55:10 +0400 Subject: [PATCH 5/6] readme --- nats/README.md | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/nats/README.md b/nats/README.md index 890c5c5..2552c64 100644 --- a/nats/README.md +++ b/nats/README.md @@ -2,3 +2,45 @@ openware common nats package. +first we need to initialize nats connection + +``` +connectionString = "localhost:4222" +nc, err := nats.InitNats(connectionString) +``` +we need nc for initializing handlers and subscribers + + +### Publisher + +we have publisher for publishing event using either nats or Jetstream + +to intialize nats and publish an event: +``` +pub, err := nats.NewNatsEventPublisher(nc) +// handle error +pub.Publish("foo.bar", []byte("baz")) +``` + +to initialize jetstream and publish an event: +``` +js, err := nats.NewJsEventPublisher(nc) +// handle error +js.Publish("foo.bar", []byte("baz")) +``` +keep in mind that before publishing an event you'll also need to have a stream of the topic. to create a stream: +``` +err := js.CreateNewEventStream("foo", []string{"foo.baz", "foo.bar"}) +// handle error +``` + +### Subscriber +With subscribers we can subscribe to different topics. If we subscribe using queue, subscribers with the same group name will receive an event once. + +to intiialize nats and subscribe to a topic. +we can pass ```<-chan os.Signal``` for subscribing to shutdown event. so at the end it can cleanup +``` +handler := nats.NewNatsHandler(nc, terminationChannel, nats.NewHandlerDefaultConfig()) +msgChan := make(chan *nats.Msg) +handler.SubscribeToQueueUsingChannel("foo.baz", "bar", msgChannel) +``` \ No newline at end of file From d6c22a852be85337fd0a586f673ee0aa2f674aa2 Mon Sep 17 00:00:00 2001 From: lohuza Date: Mon, 9 Jan 2023 13:32:47 +0400 Subject: [PATCH 6/6] moved interface compatibility checks into tests. --- nats/handler.go | 6 ------ nats/nats_test.go | 10 ++++++++++ nats/publisher.go | 6 ------ 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/nats/handler.go b/nats/handler.go index 67bca08..e207d97 100644 --- a/nats/handler.go +++ b/nats/handler.go @@ -39,12 +39,6 @@ type JsEventHandler struct { config *handlerConfig } -// For Checking compatibility -var ( - _ EventHandler = (*NatsEventHandler)(nil) - _ EventHandler = (*JsEventHandler)(nil) -) - func newHandlerBase(termination <-chan os.Signal) eventHandlerBase { return eventHandlerBase{ termination: termination, diff --git a/nats/nats_test.go b/nats/nats_test.go index 36e4432..84b3191 100644 --- a/nats/nats_test.go +++ b/nats/nats_test.go @@ -8,6 +8,16 @@ import ( "github.com/stretchr/testify/assert" ) +// For Checking compatibility +var ( + _ EventHandler = (*NatsEventHandler)(nil) + _ EventHandler = (*JsEventHandler)(nil) + + _ eventPublisherBase = (*publisherBase)(nil) + _ EventPublisher = (*natsEventPublisher)(nil) + _ JsEventPublisher = (*jsEventPublisher)(nil) +) + func initNatsHandlers() (*natsEventPublisher, *NatsEventHandler) { nc, _ := InitEmbededNats() publisher := NewNatsEventPublisher(nc) diff --git a/nats/publisher.go b/nats/publisher.go index 5eaac47..a811c7c 100644 --- a/nats/publisher.go +++ b/nats/publisher.go @@ -31,21 +31,15 @@ type publisherBase struct { nc *nats.Conn } -var _ eventPublisherBase = (*publisherBase)(nil) - type natsEventPublisher struct { publisherBase } -var _ EventPublisher = (*natsEventPublisher)(nil) - type jsEventPublisher struct { publisherBase js nats.JetStreamContext } -var _ JsEventPublisher = (*jsEventPublisher)(nil) - // NewNatsEventPublisher initialize new nats event publisher func NewNatsEventPublisher(nc *nats.Conn) *natsEventPublisher { dispatcher := natsEventPublisher{