From 5665ea7e7c0ff5ba1606b96af00f0519aefa9607 Mon Sep 17 00:00:00 2001 From: lgtm <1gtm@users.noreply.github.com> Date: Thu, 7 Oct 2021 20:42:22 -0700 Subject: [PATCH] Use nats.go v1.13.0 (#1391) /cherry-pick Signed-off-by: 1gtm <1gtm@appscode.com> --- go.mod | 2 +- go.sum | 8 +- vendor/github.com/nats-io/nats.go/README.md | 2 +- vendor/github.com/nats-io/nats.go/go_test.mod | 2 +- vendor/github.com/nats-io/nats.go/go_test.sum | 19 +- vendor/github.com/nats-io/nats.go/js.go | 314 ++++-- vendor/github.com/nats-io/nats.go/jsm.go | 58 +- vendor/github.com/nats-io/nats.go/kv.go | 644 ++++++++++++ vendor/github.com/nats-io/nats.go/nats.go | 87 +- vendor/github.com/nats-io/nats.go/object.go | 928 ++++++++++++++++++ vendor/modules.txt | 4 +- 11 files changed, 1936 insertions(+), 132 deletions(-) create mode 100644 vendor/github.com/nats-io/nats.go/kv.go create mode 100644 vendor/github.com/nats-io/nats.go/object.go diff --git a/go.mod b/go.mod index 29c561942..c9840fe7d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 - go.bytebuilders.dev/audit v0.0.9 + go.bytebuilders.dev/audit v0.0.10 go.bytebuilders.dev/license-verifier v0.9.3 go.bytebuilders.dev/license-verifier/kubernetes v0.9.3 gomodules.xyz/blobfs v0.1.7 diff --git a/go.sum b/go.sum index 0bf3a5a48..fe9e8e97a 100644 --- a/go.sum +++ b/go.sum @@ -564,8 +564,8 @@ github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.12.3 h1:te0GLbRsjtejEkZKKiuk46tbfIn6FfCSv3WWSo1+51E= -github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE= +github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= @@ -771,8 +771,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= -go.bytebuilders.dev/audit v0.0.9 h1:mmwwcWinjqlmIeDeieEAOqU9eaJKkYmWNASFDwxuerA= -go.bytebuilders.dev/audit v0.0.9/go.mod h1:IFioMRcGEAf/fzHmUKE98/Asconn9ec51rzPqCYnIJc= +go.bytebuilders.dev/audit v0.0.10 h1:49weCqLGJbTnBLOKB3qdB/gPu9pRris6/e/fpbe/shU= +go.bytebuilders.dev/audit v0.0.10/go.mod h1:jhNyXHoeVHijC4tM7EYpcB9RqkwnI27IoRN3k0ivick= go.bytebuilders.dev/license-verifier v0.9.3 h1:foHyjil3Y2OesjUInQZeRl5kntWWCPkqqQOz1wSAmLo= go.bytebuilders.dev/license-verifier v0.9.3/go.mod h1:GpIW0o8O0wpiBVt7IIz4z7bcPuG8nza8/bCDkaupDn8= go.bytebuilders.dev/license-verifier/kubernetes v0.9.3 h1:rrgo72xtiEEyE0exFSQ8HMpQpxt+hpJW2a9QWOkk4n4= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 041d44ad4..adaf600af 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -29,7 +29,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.12.3 +go get github.com/nats-io/nats.go/@v1.13.0 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 4d301c8a5..a868bac29 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 + github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index 05586f46f..5f7cda601 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -15,33 +15,24 @@ github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEE github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz/0= github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.0.3 h1:i/O6cmIsjpcQyWDYNcq2JyZ3/VTF8SJ4JWluI5OhpvI= -github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SEcNiGlY= -github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6 h1:TYI6K487xhbbpKjz4gIIVBWL6l2gFI3JHu/N0XySwRY= -github.com/nats-io/nats-server/v2 v2.5.1-0.20210921161523-29037a4f5cd6/go.mod h1:xZLDZ6cRUu9FCh7+mKXGEy16O66CdWVxttxNIiUuNCk= -github.com/nats-io/nats.go v1.12.2-0.20210916222008-92921544b891/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= +github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da h1:0snsE4pD2VKIsFiRMRkHFY+SJZVbT7/eZJ1lOt5XuLA= +github.com/nats-io/nats-server/v2 v2.6.2-0.20211007142333-41a9d082f8da/go.mod h1:ubcDOPViqaQcNvJVzoX9FIDxAxyJDTItw07lqFCzC80= +github.com/nats-io/nats.go v1.12.3/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e h1:gsTQYXdTw2Gq7RBsWvlQ91b+aEQ6bXFUngBGuR8sPpI= golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 h1:NusfzzA6yGQ+ua51ck7E3omNUX/JuqbFSaRGqU8CcLI= golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 4d4adbdac..361c15a6c 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -30,78 +30,6 @@ import ( "github.com/nats-io/nuid" ) -// Request API subjects for JetStream. -const ( - // defaultAPIPrefix is the default prefix for the JetStream API. - defaultAPIPrefix = "$JS.API." - - // jsDomainT is used to create JetStream API prefix by specifying only Domain - jsDomainT = "$JS.%s.API." - - // apiAccountInfo is for obtaining general information about JetStream. - apiAccountInfo = "INFO" - - // apiConsumerCreateT is used to create consumers. - apiConsumerCreateT = "CONSUMER.CREATE.%s" - - // apiDurableCreateT is used to create durable consumers. - apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" - - // apiConsumerInfoT is used to create consumers. - apiConsumerInfoT = "CONSUMER.INFO.%s.%s" - - // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. - apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" - - // apiDeleteConsumerT is used to delete consumers. - apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" - - // apiConsumerListT is used to return all detailed consumer information - apiConsumerListT = "CONSUMER.LIST.%s" - - // apiConsumerNamesT is used to return a list with all consumer names for the stream. - apiConsumerNamesT = "CONSUMER.NAMES.%s" - - // apiStreams can lookup a stream by subject. - apiStreams = "STREAM.NAMES" - - // apiStreamCreateT is the endpoint to create new streams. - apiStreamCreateT = "STREAM.CREATE.%s" - - // apiStreamInfoT is the endpoint to get information on a stream. - apiStreamInfoT = "STREAM.INFO.%s" - - // apiStreamUpdate is the endpoint to update existing streams. - apiStreamUpdateT = "STREAM.UPDATE.%s" - - // apiStreamDeleteT is the endpoint to delete streams. - apiStreamDeleteT = "STREAM.DELETE.%s" - - // apiPurgeStreamT is the endpoint to purge streams. - apiStreamPurgeT = "STREAM.PURGE.%s" - - // apiStreamListT is the endpoint that will return all detailed stream information - apiStreamList = "STREAM.LIST" - - // apiMsgGetT is the endpoint to get a message. - apiMsgGetT = "STREAM.MSG.GET.%s" - - // apiMsgDeleteT is the endpoint to remove a message. - apiMsgDeleteT = "STREAM.MSG.DELETE.%s" - - // orderedHeartbeatsInterval is how fast we want HBs from the server during idle. - orderedHeartbeatsInterval = 5 * time.Second - - // Scale for threshold of missed HBs or lack of activity. - hbcThresh = 2 -) - -// Types of control messages, so far heartbeat and flow control -const ( - jsCtrlHB = 1 - jsCtrlFC = 2 -) - // JetStream allows persistent messaging through JetStream. type JetStream interface { // Publish publishes a message to JetStream. @@ -177,8 +105,90 @@ type JetStream interface { type JetStreamContext interface { JetStream JetStreamManager + KeyValueManager + ObjectStoreManager } +// Request API subjects for JetStream. +const ( + // defaultAPIPrefix is the default prefix for the JetStream API. + defaultAPIPrefix = "$JS.API." + + // jsDomainT is used to create JetStream API prefix by specifying only Domain + jsDomainT = "$JS.%s.API." + + // apiAccountInfo is for obtaining general information about JetStream. + apiAccountInfo = "INFO" + + // apiConsumerCreateT is used to create consumers. + apiConsumerCreateT = "CONSUMER.CREATE.%s" + + // apiDurableCreateT is used to create durable consumers. + apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s" + + // apiConsumerInfoT is used to create consumers. + apiConsumerInfoT = "CONSUMER.INFO.%s.%s" + + // apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. + apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" + + // apiDeleteConsumerT is used to delete consumers. + apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + + // apiConsumerListT is used to return all detailed consumer information + apiConsumerListT = "CONSUMER.LIST.%s" + + // apiConsumerNamesT is used to return a list with all consumer names for the stream. + apiConsumerNamesT = "CONSUMER.NAMES.%s" + + // apiStreams can lookup a stream by subject. + apiStreams = "STREAM.NAMES" + + // apiStreamCreateT is the endpoint to create new streams. + apiStreamCreateT = "STREAM.CREATE.%s" + + // apiStreamInfoT is the endpoint to get information on a stream. + apiStreamInfoT = "STREAM.INFO.%s" + + // apiStreamUpdate is the endpoint to update existing streams. + apiStreamUpdateT = "STREAM.UPDATE.%s" + + // apiStreamDeleteT is the endpoint to delete streams. + apiStreamDeleteT = "STREAM.DELETE.%s" + + // apiPurgeStreamT is the endpoint to purge streams. + apiStreamPurgeT = "STREAM.PURGE.%s" + + // apiStreamListT is the endpoint that will return all detailed stream information + apiStreamList = "STREAM.LIST" + + // apiMsgGetT is the endpoint to get a message. + apiMsgGetT = "STREAM.MSG.GET.%s" + + // apiMsgDeleteT is the endpoint to remove a message. + apiMsgDeleteT = "STREAM.MSG.DELETE.%s" + + // orderedHeartbeatsInterval is how fast we want HBs from the server during idle. + orderedHeartbeatsInterval = 5 * time.Second + + // Scale for threshold of missed HBs or lack of activity. + hbcThresh = 2 + + // For ChanSubscription, we can't update sub.delivered as we do for other + // type of subscriptions, since the channel is user provided. + // With flow control in play, we will check for flow control on incoming + // messages (as opposed to when they are delivered), but also from a go + // routine. Without this, the subscription would possibly stall until + // a new message or heartbeat/fc are received. + chanSubFCCheckInterval = 250 * time.Millisecond +) + +// Types of control messages, so far heartbeat and flow control +const ( + jsCtrlHB = 1 + jsCtrlFC = 2 +) + // js is an internal struct from a JetStreamContext. type js struct { nc *Conn @@ -311,6 +321,16 @@ const ( ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence" ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" + MsgRollup = "Nats-Rollup" +) + +// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested. +const MsgSize = "Nats-Msg-Size" + +// Rollups, can be subject only or all messages. +const ( + MsgRollupSubject = "sub" + MsgRollupAll = "all" ) // PublishMsg publishes a Msg to a stream from JetStream. @@ -658,10 +678,14 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if m.Reply != _EMPTY_ { return nil, errors.New("nats: reply subject should be empty") } + reply := m.Reply m.Reply = js.newAsyncReply() + defer func() { m.Reply = reply }() + if m.Reply == _EMPTY_ { return nil, errors.New("nats: error creating async reply handler") } + id := m.Reply[aReplyPreLen:] paf := &pubAckFuture{msg: m, st: time.Now()} numPending, maxPending := js.registerPAF(id, paf) @@ -674,7 +698,6 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { return nil, errors.New("nats: stalled with too many outstanding async published messages") } } - if err := js.nc.PublishMsg(m); err != nil { js.clearPAF(id) return nil, err @@ -832,6 +855,7 @@ type ConsumerConfig struct { MaxAckPending int `json:"max_ack_pending,omitempty"` FlowControl bool `json:"flow_control,omitempty"` Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + HeadersOnly bool `json:"headers_only,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -897,6 +921,8 @@ type jsSub struct { cmeta string fcr string fcd uint64 + fciseq uint64 + csfct *time.Timer } // Deletes the JS Consumer. @@ -1158,7 +1184,10 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // If this is a queue subscription and no consumer nor durable name was specified, // then we will use the queue name as a durable name. - if queue != _EMPTY_ && o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { + if o.consumer == _EMPTY_ && o.cfg.Durable == _EMPTY_ { + if err := checkDurName(queue); err != nil { + return nil, err + } o.cfg.Durable = queue } } @@ -1409,15 +1438,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. - if ch != nil { + if isSync { ch = make(chan *Msg, cap(ch)) + } else if ch != nil { + // User provided (ChanSubscription), simply try to drain it. + for done := false; !done; { + select { + case <-ch: + default: + done = true + } + } } jsi.deliver = deliver + jsi.hbi = info.Config.Heartbeat // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } + hasFC = info.Config.FlowControl + hasHeartbeats = info.Config.Heartbeat > 0 } } else { if cinfo.Error.Code == 404 { @@ -1442,10 +1483,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if hasHeartbeats { sub.scheduleHeartbeatCheck() } + // For ChanSubscriptions, if we know that there is flow control, we will + // start a go routine that evaluates the number of delivered messages + // and process flow control. + if sub.Type() == ChanSubscription && hasFC { + sub.chanSubcheckForFlowControlResponse() + } return sub, nil } +// This long-lived routine is used per ChanSubscription to check +// on the number of delivered messages and check for flow control response. +func (sub *Subscription) chanSubcheckForFlowControlResponse() { + sub.mu.Lock() + // We don't use defer since if we need to send an RC reply, we need + // to do it outside the sub's lock. So doing explicit unlock... + if sub.closed { + sub.mu.Unlock() + return + } + var fcReply string + var nc *Conn + + jsi := sub.jsi + if jsi.csfct == nil { + jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse) + } else { + fcReply = sub.checkForFlowControlResponse() + nc = sub.conn + // Do the reset here under the lock, it's ok... + jsi.csfct.Reset(chanSubFCCheckInterval) + } + sub.mu.Unlock() + // This call will return an error (which we don't care here) + // if nc is nil or fcReply is empty. + nc.Publish(fcReply, nil) +} + // ErrConsumerSequenceMismatch represents an error from a consumer // that received a Heartbeat including sequence different to the // one expected from the view of the client. @@ -1488,8 +1563,11 @@ func isJSControlMessage(msg *Msg) (bool, int) { // Keeps track of the incoming message's reply subject so that the consumer's // state (deliver sequence, etc..) can be checked against heartbeats. +// We will also bump the incoming data message sequence that is used in FC cases. // Runs under the subscription lock func (sub *Subscription) trackSequences(reply string) { + // For flow control, keep track of incoming message sequence. + sub.jsi.fciseq++ sub.jsi.cmeta = reply } @@ -1626,13 +1704,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { }() } +// For jetstream subscriptions, returns the number of delivered messages. +// For ChanSubscription, this value is computed based on the known number +// of messages added to the channel minus the current size of that channel. +// Lock held on entry +func (sub *Subscription) getJSDelivered() uint64 { + if sub.typ == ChanSubscription { + return sub.jsi.fciseq - uint64(len(sub.mch)) + } + return sub.delivered +} + // checkForFlowControlResponse will check to see if we should send a flow control response // based on the subscription current delivered index and the target. // Runs under subscription lock func (sub *Subscription) checkForFlowControlResponse() string { // Caller has verified that there is a sub.jsi and fc jsi := sub.jsi - if jsi.fcd == sub.delivered { + jsi.active = true + if sub.getJSDelivered() >= jsi.fcd { fcr := jsi.fcr jsi.fcr, jsi.fcd = _EMPTY_, 0 return fcr @@ -1642,9 +1732,8 @@ func (sub *Subscription) checkForFlowControlResponse() string { // Record an inbound flow control message. // Runs under subscription lock -func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi := sub.jsi - jsi.fcr, jsi.fcd = reply, dfuture +func (sub *Subscription) scheduleFlowControlResponse(reply string) { + sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq } // Checks for activity from our consumer. @@ -1820,7 +1909,18 @@ func Description(description string) SubOpt { }) } +// Check that the durable name is valid, that is, that it does not contain +// any ".", and if it does return ErrInvalidDurableName, otherwise nil. +func checkDurName(dur string) error { + if strings.Contains(dur, ".") { + return ErrInvalidDurableName + } + return nil +} + // Durable defines the consumer name for JetStream durable subscribers. +// This function will return ErrInvalidDurableName in the name contains +// any dot ".". func Durable(consumer string) SubOpt { return subOptFn(func(opts *subOpts) error { if opts.cfg.Durable != _EMPTY_ { @@ -1829,8 +1929,8 @@ func Durable(consumer string) SubOpt { if opts.consumer != _EMPTY_ && opts.consumer != consumer { return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer) } - if strings.Contains(consumer, ".") { - return ErrInvalidDurableName + if err := checkDurName(consumer); err != nil { + return err } opts.cfg.Durable = consumer @@ -2035,6 +2135,14 @@ func DeliverSubject(subject string) SubOpt { }) } +// HeadersOnly() will instruct the consumer to only deleiver headers and no payloads. +func HeadersOnly() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.HeadersOnly = true + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -2148,6 +2256,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { js := sub.jsi.js pmc := len(sub.mch) > 0 + // All fetch requests have an expiration, in case of no explicit expiration + // then the default timeout of the JetStream context is used. ttl := o.ttl if ttl == 0 { ttl = js.opts.wait @@ -2161,9 +2271,20 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { err error cancel context.CancelFunc ) - if o.ctx == nil { + if ctx == nil { ctx, cancel = context.WithTimeout(context.Background(), ttl) defer cancel() + } else if _, hasDeadline := ctx.Deadline(); !hasDeadline { + // Prevent from passing the background context which will just block + // and cannot be canceled either. + if octx, ok := ctx.(ContextOpt); ok && octx.Context == context.Background() { + return nil, ErrNoDeadlineContext + } + + // If the context did not have a deadline, then create a new child context + // that will use the default timeout from the JS context. + ctx, cancel = context.WithTimeout(ctx, ttl) + defer cancel() } // Check if context not done already before making the request. @@ -2180,6 +2301,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { return nil, err } + // Use the deadline of the context to base the expire times. + deadline, _ := ctx.Deadline() + ttl = time.Until(deadline) checkCtxErr := func(err error) error { if o.ctx == nil && err == context.DeadlineExceeded { return ErrTimeout @@ -2188,9 +2312,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { } var ( - msgs = make([]*Msg, 0, batch) - msg *Msg - start = time.Now() + msgs = make([]*Msg, 0, batch) + msg *Msg ) for pmc && len(msgs) < batch { // Check next msg with booleans that say that this is an internal call @@ -2218,11 +2341,18 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { var nr nextRequest sendReq := func() error { - ttl -= time.Since(start) - if ttl < 0 { - // At this point consider that we have timed-out - return context.DeadlineExceeded + // The current deadline for the context will be used + // to set the expires TTL for a fetch request. + deadline, _ = ctx.Deadline() + ttl = time.Until(deadline) + + // Check if context has already been canceled or expired. + select { + case <-ctx.Done(): + return ctx.Err() + default: } + // Make our request expiration a bit shorter than the current timeout. expires := ttl if ttl >= 20*time.Millisecond { @@ -2343,6 +2473,12 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error { usesCtx := o.ctx != nil usesWait := o.ttl > 0 + + // Only allow either AckWait or Context option to set the timeout. + if usesWait && usesCtx { + return ErrContextAndTimeout + } + sync = sync || usesCtx || usesWait ctx := o.ctx wait := defaultRequestWait diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index f40086b29..95f38b593 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -93,6 +93,10 @@ type StreamConfig struct { Placement *Placement `json:"placement,omitempty"` Mirror *StreamSource `json:"mirror,omitempty"` Sources []*StreamSource `json:"sources,omitempty"` + Sealed bool `json:"sealed,omitempty"` + DenyDelete bool `json:"deny_delete,omitempty"` + DenyPurge bool `json:"deny_purge,omitempty"` + AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` } // Placement is used to guide placement of streams in clustered JetStream. @@ -239,8 +243,8 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*C var ccSubj string if cfg != nil && cfg.Durable != _EMPTY_ { - if strings.Contains(cfg.Durable, ".") { - return nil, ErrInvalidDurableName + if err := checkDurName(cfg.Durable); err != nil { + return nil, err } ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable) } else { @@ -726,7 +730,8 @@ func (js *js) DeleteStream(name string, opts ...JSOpt) error { } type apiMsgGetRequest struct { - Seq uint64 `json:"seq"` + Seq uint64 `json:"seq,omitempty"` + LastFor string `json:"last_by_subj,omitempty"` } // RawStreamMsg is a raw message stored in JetStream. @@ -751,11 +756,20 @@ type storedMsg struct { type apiMsgGetResponse struct { apiResponse Message *storedMsg `json:"message,omitempty"` - Success bool `json:"success,omitempty"` +} + +// GetLastMsg retrieves the last raw stream message stored in JetStream by subject. +func (js *js) GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) { + return js.getMsg(name, &apiMsgGetRequest{LastFor: subject}, opts...) } // GetMsg retrieves a raw stream message stored in JetStream by sequence number. func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) { + return js.getMsg(name, &apiMsgGetRequest{Seq: seq}, opts...) +} + +// Low level getMsg +func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawStreamMsg, error) { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return nil, err @@ -768,7 +782,7 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err return nil, ErrStreamNameRequired } - req, err := json.Marshal(&apiMsgGetRequest{Seq: seq}) + req, err := json.Marshal(mreq) if err != nil { return nil, err } @@ -784,13 +798,16 @@ func (js *js) GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, err return nil, err } if resp.Error != nil { - return nil, errors.New(resp.Error.Description) + if resp.Error.Code == 404 && strings.Contains(resp.Error.Description, "message") { + return nil, ErrMsgNotFound + } + return nil, fmt.Errorf("nats: %s", resp.Error.Description) } msg := resp.Message var hdr Header - if msg.Header != nil { + if len(msg.Header) > 0 { hdr, err = decodeHeadersMsg(msg.Header) if err != nil { return nil, err @@ -850,6 +867,16 @@ func (js *js) DeleteMsg(name string, seq uint64, opts ...JSOpt) error { return nil } +// purgeRequest is optional request information to the purge API. +type streamPurgeRequest struct { + // Purge up to but not including sequence. + Sequence uint64 `json:"seq,omitempty"` + // Subject to match against messages for the purge command. + Subject string `json:"filter,omitempty"` + // Number of messages to keep. + Keep uint64 `json:"keep,omitempty"` +} + type streamPurgeResponse struct { apiResponse Success bool `json:"success,omitempty"` @@ -857,7 +884,11 @@ type streamPurgeResponse struct { } // PurgeStream purges messages on a Stream. -func (js *js) PurgeStream(name string, opts ...JSOpt) error { +func (js *js) PurgeStream(stream string, opts ...JSOpt) error { + return js.purgeStream(stream, nil) +} + +func (js *js) purgeStream(stream string, req *streamPurgeRequest, opts ...JSOpt) error { o, cancel, err := getJSContextOpts(js.opts, opts...) if err != nil { return err @@ -866,8 +897,15 @@ func (js *js) PurgeStream(name string, opts ...JSOpt) error { defer cancel() } - psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name)) - r, err := js.nc.RequestWithContext(o.ctx, psSubj, nil) + var b []byte + if req != nil { + if b, err = json.Marshal(req); err != nil { + return err + } + } + + psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, stream)) + r, err := js.nc.RequestWithContext(o.ctx, psSubj, b) if err != nil { return err } diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go new file mode 100644 index 000000000..cf5468bce --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -0,0 +1,644 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "context" + "errors" + "fmt" + "regexp" + "strconv" + "strings" + "time" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type KeyValueManager interface { + // KeyValue will lookup and bind to an existing KeyValue store. + KeyValue(bucket string) (KeyValue, error) + // CreateKeyValue will create a KeyValue store with the following configuration. + CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) + // DeleteKeyValue will delete this KeyValue store (JetStream stream). + DeleteKeyValue(bucket string) error +} + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type KeyValue interface { + // Get returns the latest value for the key. + Get(key string) (entry KeyValueEntry, err error) + // Put will place the new value for the key into the store. + Put(key string, value []byte) (revision uint64, err error) + // PutString will place the string for the key into the store. + PutString(key string, value string) (revision uint64, err error) + // Create will add the key/value pair iff it does not exist. + Create(key string, value []byte) (revision uint64, err error) + // Update will update the value iff the latest revision matches. + Update(key string, value []byte, last uint64) (revision uint64, err error) + // Delete will place a delete marker and leave all revisions. + Delete(key string) error + // Purge will place a delete marker and remove all previous revisions. + Purge(key string) error + // Watch for any updates to keys that match the keys argument which could include wildcards. + // Watch will send a nil entry when it has received all initial values. + Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) + // WatchAll will invoke the callback for all updates. + WatchAll(opts ...WatchOpt) (KeyWatcher, error) + // Keys will return all keys. + Keys(opts ...WatchOpt) ([]string, error) + // History will return all historical values for the key. + History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) + // Bucket returns the current bucket name. + Bucket() string + // PurgeDeletes will remove all current delete markers. + PurgeDeletes(opts ...WatchOpt) error +} + +// KeyWatcher is what is returned when doing a watch. +type KeyWatcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan KeyValueEntry + // Stop() will stop this watcher. + Stop() error +} + +type WatchOpt interface { + configureWatcher(opts *watchOpts) error +} + +// For nats.Context() support. +func (ctx ContextOpt) configureWatcher(opts *watchOpts) error { + opts.ctx = ctx + return nil +} + +type watchOpts struct { + ctx context.Context + // Do not send delete markers to the update channel. + ignoreDeletes bool + // Include all history per subject, not just last one. + includeHistory bool +} + +type watchOptFn func(opts *watchOpts) error + +func (opt watchOptFn) configureWatcher(opts *watchOpts) error { + return opt(opts) +} + +// IncludeHistory instructs the key watcher to include historical values as well. +func IncludeHistory() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.includeHistory = true + return nil + }) +} + +// IgnoreDeletes will have the key watcher not pass any deleted keys. +func IgnoreDeletes() WatchOpt { + return watchOptFn(func(opts *watchOpts) error { + opts.ignoreDeletes = true + return nil + }) +} + +// KeyValueConfig is for configuring a KeyValue store. +type KeyValueConfig struct { + Bucket string + Description string + MaxValueSize int32 + History uint8 + TTL time.Duration + MaxBytes int64 + Storage StorageType + Replicas int +} + +// Used to watch all keys. +const ( + KeyValueMaxHistory = 64 + AllKeys = ">" + kvop = "KV-Operation" + kvdel = "DEL" + kvpurge = "PURGE" +) + +type KeyValueOp uint8 + +const ( + KeyValuePut KeyValueOp = iota + KeyValueDelete + KeyValuePurge +) + +func (op KeyValueOp) String() string { + switch op { + case KeyValuePut: + return "KeyValuePutOp" + case KeyValueDelete: + return "KeyValueDeleteOp" + case KeyValuePurge: + return "KeyValuePurgeOp" + default: + return "Unknown Operation" + } +} + +// KeyValueEntry is a retrieved entry for Get or List or Watch. +type KeyValueEntry interface { + // Bucket is the bucket the data was loaded from. + Bucket() string + // Key is the key that was retrieved. + Key() string + // Value is the retrieved value. + Value() []byte + // Revision is a unique sequence for this value. + Revision() uint64 + // Created is the time the data was put in the bucket. + Created() time.Time + // Delta is distance from the latest value. + Delta() uint64 + // Operation returns Put or Delete or Purge. + Operation() KeyValueOp +} + +// Errors +var ( + ErrKeyValueConfigRequired = errors.New("nats: config required") + ErrInvalidBucketName = errors.New("nats: invalid bucket name") + ErrInvalidKey = errors.New("nats: invalid key") + ErrBucketNotFound = errors.New("nats: bucket not found") + ErrBadBucket = errors.New("nats: bucket not valid key-value store") + ErrKeyNotFound = errors.New("nats: key not found") + ErrKeyDeleted = errors.New("nats: key was deleted") + ErrHistoryToLarge = errors.New("nats: history limited to a max of 64") + ErrNoKeysFound = errors.New("nats: no keys found") +) + +const ( + kvBucketNameTmpl = "KV_%s" + kvSubjectsTmpl = "$KV.%s.>" + kvSubjectsPreTmpl = "$KV.%s." + kvNoPending = "0" +) + +// Regex for valid keys and buckets. +var ( + validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`) + validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`) +) + +// KeyValue will lookup and bind to an existing KeyValue store. +func (js *js) KeyValue(bucket string) (KeyValue, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + if !validBucketRe.MatchString(bucket) { + return nil, ErrInvalidBucketName + } + stream := fmt.Sprintf(kvBucketNameTmpl, bucket) + si, err := js.StreamInfo(stream) + if err != nil { + if err == ErrStreamNotFound { + err = ErrBucketNotFound + } + return nil, err + } + // Do some quick sanity checks that this is a correctly formed stream for KV. + // Max msgs per subject should be > 0. + if si.Config.MaxMsgsPerSubject < 1 { + return nil, ErrBadBucket + } + + kv := &kvs{ + name: bucket, + stream: stream, + pre: fmt.Sprintf(kvSubjectsPreTmpl, bucket), + js: js, + } + return kv, nil +} + +// CreateKeyValue will create a KeyValue store with the following configuration. +func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + if cfg == nil { + return nil, ErrKeyValueConfigRequired + } + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidBucketName + } + if _, err := js.AccountInfo(); err != nil { + return nil, err + } + + // Default to 1 for history. Max is 64 for now. + history := int64(1) + if cfg.History > 0 { + if cfg.History > KeyValueMaxHistory { + return nil, ErrHistoryToLarge + } + history = int64(cfg.History) + } + + replicas := cfg.Replicas + if replicas == 0 { + replicas = 1 + } + + scfg := &StreamConfig{ + Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), + Description: cfg.Description, + Subjects: []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}, + MaxMsgsPerSubject: history, + MaxBytes: cfg.MaxBytes, + MaxAge: cfg.TTL, + MaxMsgSize: cfg.MaxValueSize, + Storage: cfg.Storage, + Replicas: replicas, + AllowRollup: true, + DenyDelete: true, + } + + if _, err := js.AddStream(scfg); err != nil { + return nil, err + } + + kv := &kvs{ + name: cfg.Bucket, + stream: scfg.Name, + pre: fmt.Sprintf(kvSubjectsPreTmpl, cfg.Bucket), + js: js, + } + return kv, nil +} + +// DeleteKeyValue will delete this KeyValue store (JetStream stream). +func (js *js) DeleteKeyValue(bucket string) error { + if !validBucketRe.MatchString(bucket) { + return ErrInvalidBucketName + } + stream := fmt.Sprintf(kvBucketNameTmpl, bucket) + return js.DeleteStream(stream) +} + +type kvs struct { + name string + stream string + pre string + js *js +} + +// Underlying entry. +type kve struct { + bucket string + key string + value []byte + revision uint64 + delta uint64 + created time.Time + op KeyValueOp +} + +func (e *kve) Bucket() string { return e.bucket } +func (e *kve) Key() string { return e.key } +func (e *kve) Value() []byte { return e.value } +func (e *kve) Revision() uint64 { return e.revision } +func (e *kve) Created() time.Time { return e.created } +func (e *kve) Delta() uint64 { return e.delta } +func (e *kve) Operation() KeyValueOp { return e.op } + +func keyValid(key string) bool { + if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' { + return false + } + return validKeyRe.MatchString(key) +} + +// Get returns the latest value for the key. +func (kv *kvs) Get(key string) (KeyValueEntry, error) { + if !keyValid(key) { + return nil, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + m, err := kv.js.GetLastMsg(kv.stream, b.String()) + if err != nil { + if err == ErrMsgNotFound { + err = ErrKeyNotFound + } + return nil, err + } + + entry := &kve{ + bucket: kv.name, + key: key, + value: m.Data, + revision: m.Sequence, + created: m.Time, + } + + // Double check here that this is not a DEL Operation marker. + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + entry.op = KeyValueDelete + return entry, ErrKeyDeleted + case kvpurge: + entry.op = KeyValuePurge + return entry, ErrKeyDeleted + } + } + + return entry, nil +} + +// Put will place the new value for the key into the store. +func (kv *kvs) Put(key string, value []byte) (revision uint64, err error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + pa, err := kv.js.Publish(b.String(), value) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// PutString will place the string for the key into the store. +func (kv *kvs) PutString(key string, value string) (revision uint64, err error) { + return kv.Put(key, []byte(value)) +} + +// Create will add the key/value pair iff it does not exist. +func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { + v, err := kv.Update(key, value, 0) + if err == nil { + return v, nil + } + // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that + // so we need to double check. + if e, err := kv.Get(key); err == ErrKeyDeleted { + return kv.Update(key, value, e.Revision()) + } + return 0, err +} + +// Update will update the value iff the latest revision matches. +func (kv *kvs) Update(key string, value []byte, revision uint64) (uint64, error) { + if !keyValid(key) { + return 0, ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + m := Msg{Subject: b.String(), Header: Header{}, Data: value} + m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(revision, 10)) + + pa, err := kv.js.PublishMsg(&m) + if err != nil { + return 0, err + } + return pa.Sequence, err +} + +// Delete will place a delete marker and leave all revisions. +func (kv *kvs) Delete(key string) error { + return kv.delete(key, false) +} + +// Purge will remove the key and all revisions. +func (kv *kvs) Purge(key string) error { + return kv.delete(key, true) +} + +func (kv *kvs) delete(key string, purge bool) error { + if !keyValid(key) { + return ErrInvalidKey + } + + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(key) + + // DEL op marker. For watch functionality. + m := NewMsg(b.String()) + + if purge { + m.Header.Set(kvop, kvpurge) + m.Header.Set(MsgRollup, MsgRollupSubject) + } else { + m.Header.Set(kvop, kvdel) + } + _, err := kv.js.PublishMsg(m) + return err +} + +// PurgeDeletes will remove all current delete markers. +// This is a maintenance option if there is a larger buildup of delete markers. +func (kv *kvs) PurgeDeletes(opts ...WatchOpt) error { + watcher, err := kv.WatchAll(opts...) + if err != nil { + return err + } + defer watcher.Stop() + + for entry := range watcher.Updates() { + if entry == nil { + break + } + if op := entry.Operation(); op == KeyValueDelete || op == KeyValuePurge { + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(entry.Key()) + err := kv.js.purgeStream(kv.stream, &streamPurgeRequest{Subject: b.String()}) + if err != nil { + return err + } + } + } + return nil +} + +// Keys() will return all keys. +func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { + opts = append(opts, IgnoreDeletes()) + watcher, err := kv.WatchAll(opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var keys []string + for entry := range watcher.Updates() { + if entry == nil { + break + } + keys = append(keys, entry.Key()) + } + if len(keys) == 0 { + return nil, ErrNoKeysFound + } + return keys, nil +} + +// History will return all values for the key. +func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { + opts = append(opts, IncludeHistory()) + watcher, err := kv.Watch(key, opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var entries []KeyValueEntry + for entry := range watcher.Updates() { + if entry == nil { + break + } + entries = append(entries, entry) + } + if len(entries) == 0 { + return nil, ErrKeyNotFound + } + return entries, nil +} + +// Implementation for Watch +type watcher struct { + updates chan KeyValueEntry + sub *Subscription +} + +// Updates returns the interior channel. +func (w *watcher) Updates() <-chan KeyValueEntry { + if w == nil { + return nil + } + return w.updates +} + +// Stop will unsubscribe from the watcher. +func (w *watcher) Stop() error { + if w == nil { + return nil + } + return w.sub.Unsubscribe() +} + +// WatchAll watches all keys. +func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) { + return kv.Watch(AllKeys, opts...) +} + +// Watch will fire the callback when a key that matches the keys pattern is updated. +// keys needs to be a valid NATS subject. +func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + + var initDoneMarker bool + + // Could be a pattern so don't check for validity as we normally do. + var b strings.Builder + b.WriteString(kv.pre) + b.WriteString(keys) + keys = b.String() + + w := &watcher{updates: make(chan KeyValueEntry, 32)} + + update := func(m *Msg) { + tokens, err := getMetadataFields(m.Reply) + if err != nil { + return + } + if len(m.Subject) <= len(kv.pre) { + return + } + subj := m.Subject[len(kv.pre):] + + var op KeyValueOp + if len(m.Header) > 0 { + switch m.Header.Get(kvop) { + case kvdel: + op = KeyValueDelete + case kvpurge: + op = KeyValuePurge + } + } + delta := uint64(parseNum(tokens[ackNumPendingTokenPos])) + entry := &kve{ + bucket: kv.name, + key: subj, + value: m.Data, + revision: uint64(parseNum(tokens[ackStreamSeqTokenPos])), + created: time.Unix(0, parseNum(tokens[ackTimestampSeqTokenPos])), + delta: delta, + op: op, + } + if !o.ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge) { + w.updates <- entry + } + // Check if done initial values. + if !initDoneMarker && delta == 0 { + initDoneMarker = true + w.updates <- nil + } + } + + // Check if we have anything pending. + _, err := kv.js.GetLastMsg(kv.stream, keys) + if err == ErrMsgNotFound { + initDoneMarker = true + w.updates <- nil + } + + // Used ordered consumer to deliver results. + subOpts := []SubOpt{OrderedConsumer()} + if !o.includeHistory { + subOpts = append(subOpts, DeliverLastPerSubject()) + } + sub, err := kv.js.Subscribe(keys, update, subOpts...) + if err != nil { + return nil, err + } + w.sub = sub + return w, nil +} + +// Bucket returns the current bucket name (JetStream stream). +func (kv *kvs) Bucket() string { + return kv.name +} diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index ed8506eb2..129c120cb 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -32,6 +32,7 @@ import ( "net/url" "os" "path/filepath" + "regexp" "runtime" "strconv" "strings" @@ -46,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.12.3" + Version = "1.13.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -155,6 +156,7 @@ var ( ErrPullSubscribeToPushConsumer = errors.New("nats: cannot pull subscribe to push based consumer") ErrPullSubscribeRequired = errors.New("nats: must use pull subscribe to bind to pull based consumer") ErrConsumerNotActive = errors.New("nats: consumer not active") + ErrMsgNotFound = errors.New("nats: message not found") ) func init() { @@ -677,6 +679,7 @@ type serverInfo struct { ID string `json:"server_id"` Name string `json:"server_name"` Proto int `json:"proto"` + Version string `json:"version"` Host string `json:"host"` Port int `json:"port"` Headers bool `json:"headers"` @@ -1834,6 +1837,52 @@ func (nc *Conn) ConnectedServerName() string { return nc.info.Name } +var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) + +func versionComponents(version string) (major, minor, patch int, err error) { + m := semVerRe.FindStringSubmatch(version) + if m == nil { + return 0, 0, 0, errors.New("invalid semver") + } + major, err = strconv.Atoi(m[1]) + if err != nil { + return -1, -1, -1, err + } + minor, err = strconv.Atoi(m[2]) + if err != nil { + return -1, -1, -1, err + } + patch, err = strconv.Atoi(m[3]) + if err != nil { + return -1, -1, -1, err + } + return major, minor, patch, err +} + +// Check for mininum server requirement. +func (nc *Conn) serverMinVersion(major, minor, patch int) bool { + smajor, sminor, spatch, _ := versionComponents(nc.ConnectedServerVersion()) + if smajor < major || (smajor == major && sminor < minor) || (smajor == major && sminor == minor && spatch < patch) { + return false + } + return true +} + +// ConnectedServerVersion reports the connected server's version as a string +func (nc *Conn) ConnectedServerVersion() string { + if nc == nil { + return _EMPTY_ + } + + nc.mu.RLock() + defer nc.mu.RUnlock() + + if nc.status != CONNECTED { + return _EMPTY_ + } + return nc.info.Version +} + // ConnectedClusterName reports the connected server's cluster name if any func (nc *Conn) ConnectedClusterName() string { if nc == nil { @@ -2600,7 +2649,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) { delivered = s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } } s.mu.Unlock() @@ -2768,6 +2816,7 @@ func (nc *Conn) processMsg(data []byte) { // Skip processing if this is a control message. if !ctrlMsg { + var chanSubCheckFC bool // Subscription internal stats (applicable only for non ChanSubscription's) if sub.typ != ChanSubscription { sub.pMsgs++ @@ -2784,6 +2833,8 @@ func (nc *Conn) processMsg(data []byte) { (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) { goto slowConsumer } + } else if jsi != nil { + chanSubCheckFC = true } // We have two modes of delivery. One is the channel, used by channel @@ -2811,15 +2862,26 @@ func (nc *Conn) processMsg(data []byte) { // Store the ACK metadata from the message to // compare later on with the received heartbeat. sub.trackSequences(m.Reply) + if chanSubCheckFC { + // For ChanSubscription, since we can't call this when a message + // is "delivered" (since user is pull from their own channel), + // we have a go routine that does this check, however, we do it + // also here to make it much more responsive. The go routine is + // really to avoid stalling when there is no new messages coming. + fcReply = sub.checkForFlowControlResponse() + } } } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. - // If we have no pending, go ahead and send in place. - if sub.pMsgs <= 0 { + // We will schedule the send of the FC reply once we have delivered the + // DATA message that was received before this flow control message, which + // has sequence `jsi.fciseq`. However, it is possible that this message + // has already been delivered, in that case, we need to send the FC reply now. + if sub.getJSDelivered() >= jsi.fciseq { fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(m.Reply) } } @@ -2968,7 +3030,7 @@ func (nc *Conn) processInfo(info string) error { if info == _EMPTY_ { return nil } - ncInfo := serverInfo{} + var ncInfo serverInfo if err := json.Unmarshal([]byte(info), &ncInfo); err != nil { return err } @@ -3851,9 +3913,15 @@ func (nc *Conn) removeSub(s *Subscription) { s.mch = nil // If JS subscription then stop HB timer. - if jsi := s.jsi; jsi != nil && jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil + if jsi := s.jsi; jsi != nil { + if jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + if jsi.csfct != nil { + jsi.csfct.Stop() + jsi.csfct = nil + } } // Mark as invalid @@ -4192,7 +4260,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { delivered := s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } if s.typ == SyncSubscription { diff --git a/vendor/github.com/nats-io/nats.go/object.go b/vendor/github.com/nats-io/nats.go/object.go new file mode 100644 index 000000000..13dd7b280 --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/object.go @@ -0,0 +1,928 @@ +// Copyright 2021 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type ObjectStoreManager interface { + // ObjectStore will lookup and bind to an existing object store instance. + ObjectStore(bucket string) (ObjectStore, error) + // CreateObjectStore will create an object store. + CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) + // DeleteObjectStore will delete the underlying stream for the named object. + DeleteObjectStore(bucket string) error +} + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. +type ObjectStore interface { + // Put will place the contents from the reader into a new object. + Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) + // Get will pull the named object from the object store. + Get(name string, opts ...ObjectOpt) (ObjectResult, error) + + // PutBytes is convenience function to put a byte slice into this object store. + PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) + // GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. + GetBytes(name string, opts ...ObjectOpt) ([]byte, error) + + // PutBytes is convenience function to put a string into this object store. + PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) + // GetString is a convenience function to pull an object from this object store and return it as a string. + GetString(name string, opts ...ObjectOpt) (string, error) + + // PutFile is convenience function to put a file into this object store. + PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) + // GetFile is a convenience function to pull an object from this object store and place it in a file. + GetFile(name, file string, opts ...ObjectOpt) error + + // GetInfo will retrieve the current information for the object. + GetInfo(name string) (*ObjectInfo, error) + // UpdateMeta will update the meta data for the object. + UpdateMeta(name string, meta *ObjectMeta) error + + // Delete will delete the named object. + Delete(name string) error + + // AddLink will add a link to another object into this object store. + AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) + + // AddBucketLink will add a link to another object store. + AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) + + // Seal will seal the object store, no further modifications will be allowed. + Seal() error + + // Watch for changes in the underlying store and receive meta information updates. + Watch(opts ...WatchOpt) (ObjectWatcher, error) + + // List will list all the objects in this store. + List(opts ...WatchOpt) ([]*ObjectInfo, error) +} + +type ObjectOpt interface { + configureObject(opts *objOpts) error +} + +type objOpts struct { + ctx context.Context +} + +// For nats.Context() support. +func (ctx ContextOpt) configureObject(opts *objOpts) error { + opts.ctx = ctx + return nil +} + +// ObjectWatcher is what is returned when doing a watch. +type ObjectWatcher interface { + // Updates returns a channel to read any updates to entries. + Updates() <-chan *ObjectInfo + // Stop() will stop this watcher. + Stop() error +} + +var ( + ErrObjectConfigRequired = errors.New("nats: object-store config required") + ErrBadObjectMeta = errors.New("nats: object-store meta information invalid") + ErrObjectNotFound = errors.New("nats: object not found") + ErrInvalidStoreName = errors.New("nats: invalid object-store name") + ErrInvalidObjectName = errors.New("nats: invalid object name") + ErrDigestMismatch = errors.New("nats: received a corrupt object, digests do not match") + ErrNoObjectsFound = errors.New("nats: no objects found") +) + +// ObjectStoreConfig is the config for the object store. +type ObjectStoreConfig struct { + Bucket string + Description string + TTL time.Duration + Storage StorageType + Replicas int +} + +// ObjectMetaOptions +type ObjectMetaOptions struct { + Link *ObjectLink `json:"link,omitempty"` + ChunkSize uint32 `json:"max_chunk_size,omitempty"` +} + +// ObjectMeta is high level information about an object. +type ObjectMeta struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Headers Header `json:"headers,omitempty"` + + // Optional options. + Opts *ObjectMetaOptions `json:"options,omitempty"` +} + +// ObjectInfo is meta plus instance information. +type ObjectInfo struct { + ObjectMeta + Bucket string `json:"bucket"` + NUID string `json:"nuid"` + Size uint64 `json:"size"` + ModTime time.Time `json:"mtime"` + Chunks uint32 `json:"chunks"` + Digest string `json:"digest,omitempty"` + Deleted bool `json:"deleted,omitempty"` +} + +// ObjectLink is used to embed links to other buckets and objects. +type ObjectLink struct { + // Bucket is the name of the other object store. + Bucket string `json:"bucket"` + // Name can be used to link to a single object. + // If empty means this is a link to the whole store, like a directory. + Name string `json:"name,omitempty"` +} + +// ObjectResult will return the underlying stream info and also be an io.ReadCloser. +type ObjectResult interface { + io.ReadCloser + Info() (*ObjectInfo, error) + Error() error +} + +const ( + objNameTmpl = "OBJ_%s" + objSubjectsPre = "$O." + objAllChunksPreTmpl = "$O.%s.C.>" + objAllMetaPreTmpl = "$O.%s.M.>" + objChunksPreTmpl = "$O.%s.C.%s" + objMetaPreTmpl = "$O.%s.M.%s" + objNoPending = "0" + objDefaultChunkSize = uint32(128 * 1024) // 128k + objDigestType = "sha-256=" + objDigestTmpl = objDigestType + "%s" +) + +type obs struct { + name string + stream string + js *js +} + +// CreateObjectStore will create an object store. +func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) { + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: object-store requires at least server version 2.6.2") + } + if cfg == nil { + return nil, ErrObjectConfigRequired + } + if !validBucketRe.MatchString(cfg.Bucket) { + return nil, ErrInvalidStoreName + } + + name := cfg.Bucket + chunks := fmt.Sprintf(objAllChunksPreTmpl, name) + meta := fmt.Sprintf(objAllMetaPreTmpl, name) + + scfg := &StreamConfig{ + Name: fmt.Sprintf(objNameTmpl, name), + Description: cfg.Description, + Subjects: []string{chunks, meta}, + MaxAge: cfg.TTL, + Storage: cfg.Storage, + Replicas: cfg.Replicas, + Discard: DiscardNew, + AllowRollup: true, + } + + // Create our stream. + _, err := js.AddStream(scfg) + if err != nil { + return nil, err + } + + return &obs{name: name, stream: scfg.Name, js: js}, nil +} + +// ObjectStore will lookup and bind to an existing object store instance. +func (js *js) ObjectStore(bucket string) (ObjectStore, error) { + if !validBucketRe.MatchString(bucket) { + return nil, ErrInvalidStoreName + } + if !js.nc.serverMinVersion(2, 6, 2) { + return nil, errors.New("nats: key-value requires at least server version 2.6.2") + } + + stream := fmt.Sprintf(objNameTmpl, bucket) + si, err := js.StreamInfo(stream) + if err != nil { + return nil, err + } + return &obs{name: bucket, stream: si.Config.Name, js: js}, nil +} + +// DeleteObjectStore will delete the underlying stream for the named object. +func (js *js) DeleteObjectStore(bucket string) error { + stream := fmt.Sprintf(objNameTmpl, bucket) + return js.DeleteStream(stream) +} + +func sanitizeName(name string) string { + stream := strings.ReplaceAll(name, ".", "_") + return strings.ReplaceAll(stream, " ", "_") +} + +// Put will place the contents from the reader into this object-store. +func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) { + if meta == nil { + return nil, ErrBadObjectMeta + } + + obj := sanitizeName(meta.Name) + if !keyValid(obj) { + return nil, ErrInvalidObjectName + } + + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + + // Grab existing meta info. + einfo, err := obs.GetInfo(meta.Name) + if err != nil && err != ErrObjectNotFound { + return nil, err + } + + // Create a random subject prefixed with the object stream name. + id := nuid.Next() + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, id) + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) + + // For async error handling + var perr error + var mu sync.Mutex + setErr := func(err error) { + mu.Lock() + defer mu.Unlock() + perr = err + } + getErr := func() error { + mu.Lock() + defer mu.Unlock() + return perr + } + + purgePartial := func() { obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) } + + // Create our own JS context to handle errors etc. + js, err := obs.js.nc.JetStream(PublishAsyncErrHandler(func(js JetStream, _ *Msg, err error) { setErr(err) })) + if err != nil { + return nil, err + } + + chunkSize := objDefaultChunkSize + if meta.Opts != nil && meta.Opts.ChunkSize > 0 { + chunkSize = meta.Opts.ChunkSize + } + + m, h := NewMsg(chunkSubj), sha256.New() + chunk, sent, total := make([]byte, chunkSize), 0, uint64(0) + info := &ObjectInfo{Bucket: obs.name, NUID: id, ObjectMeta: *meta} + + for r != nil { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + purgePartial() + return nil, err + } + } + + // Actual read. + // TODO(dlc) - Deadline? + n, err := r.Read(chunk) + + // EOF Processing. + if err == io.EOF { + // Finalize sha. + sha := h.Sum(nil) + // Place meta info. + info.Size, info.Chunks = uint64(total), uint32(sent) + info.Digest = fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString(sha[:])) + break + } else if err != nil { + purgePartial() + return nil, err + } + + // Chunk processing. + m.Data = chunk[:n] + h.Write(m.Data) + + // Send msg itself. + if _, err := js.PublishMsgAsync(m); err != nil { + purgePartial() + return nil, err + } + if err := getErr(); err != nil { + purgePartial() + return nil, err + } + // Update totals. + sent++ + total += uint64(n) + } + + // Publish the metadata. + mm := NewMsg(metaSubj) + mm.Header.Set(MsgRollup, MsgRollupSubject) + mm.Data, err = json.Marshal(info) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + // Send meta message. + _, err = js.PublishMsgAsync(mm) + if err != nil { + if r != nil { + purgePartial() + } + return nil, err + } + + // Wait for all to be processed. + select { + case <-js.PublishAsyncComplete(): + if err := getErr(); err != nil { + purgePartial() + return nil, err + } + case <-time.After(obs.js.opts.wait): + return nil, ErrTimeout + } + info.ModTime = time.Now().UTC() + + // Delete any original one. + if einfo != nil && !einfo.Deleted { + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID) + obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) + } + + return info, nil +} + +// ObjectResult impl. +type objResult struct { + sync.Mutex + info *ObjectInfo + r io.ReadCloser + err error + ctx context.Context +} + +func (info *ObjectInfo) isLink() bool { + return info.ObjectMeta.Opts != nil && info.ObjectMeta.Opts.Link != nil +} + +// GetObject will pull the object from the underlying stream. +func (obs *obs) Get(name string, opts ...ObjectOpt) (ObjectResult, error) { + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return nil, err + } + if info.NUID == _EMPTY_ { + return nil, ErrBadObjectMeta + } + + // Check for object links.If single objects we do a pass through. + if info.isLink() { + if info.ObjectMeta.Opts.Link.Name == _EMPTY_ { + return nil, errors.New("nats: link is a bucket") + } + lobs, err := obs.js.ObjectStore(info.ObjectMeta.Opts.Link.Bucket) + if err != nil { + return nil, err + } + return lobs.Get(info.ObjectMeta.Opts.Link.Name) + } + + var o objOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureObject(&o); err != nil { + return nil, err + } + } + } + ctx := o.ctx + + result := &objResult{info: info, ctx: ctx} + if info.Size == 0 { + return result, nil + } + + pr, pw := net.Pipe() + result.r = pr + + gotErr := func(m *Msg, err error) { + pw.Close() + m.Sub.Unsubscribe() + result.setErr(err) + } + + // For calculating sum256 + h := sha256.New() + + processChunk := func(m *Msg) { + if ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + err = ctx.Err() + } else { + err = ErrTimeout + } + default: + } + if err != nil { + gotErr(m, err) + return + } + } + + tokens, err := getMetadataFields(m.Reply) + if err != nil { + gotErr(m, err) + return + } + + // Write to our pipe. + for b := m.Data; len(b) > 0; { + n, err := pw.Write(b) + if err != nil { + gotErr(m, err) + return + } + b = b[n:] + } + // Update sha256 + h.Write(m.Data) + + // Check if we are done. + if tokens[ackNumPendingTokenPos] == objNoPending { + pw.Close() + m.Sub.Unsubscribe() + + // Make sure the digest matches. + sha := h.Sum(nil) + rsha, err := base64.URLEncoding.DecodeString(info.Digest) + if err != nil { + gotErr(m, err) + return + } + if !bytes.Equal(sha[:], rsha) { + gotErr(m, ErrDigestMismatch) + return + } + } + } + + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) + if err != nil { + return nil, err + } + + return result, nil +} + +// Delete will delete the object. +func (obs *obs) Delete(name string) error { + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return err + } + if info.NUID == _EMPTY_ { + return ErrBadObjectMeta + } + + // Place a rollup delete marker. + info.Deleted = true + info.Size, info.Chunks, info.Digest = 0, 0, _EMPTY_ + + metaSubj := fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(name)) + mm := NewMsg(metaSubj) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + mm.Header.Set(MsgRollup, MsgRollupSubject) + _, err = obs.js.PublishMsg(mm) + if err != nil { + return err + } + + // Purge chunks for the object. + chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) + return obs.js.purgeStream(obs.stream, &streamPurgeRequest{Subject: chunkSubj}) +} + +// AddLink will add a link to another object into this object store. +func (obs *obs) AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) { + if obj == nil { + return nil, errors.New("nats: object required") + } + if obj.Deleted { + return nil, errors.New("nats: object is deleted") + } + name = sanitizeName(name) + if !keyValid(name) { + return nil, ErrInvalidObjectName + } + + // Same object store. + if obj.Bucket == obs.name { + info := *obj + info.Name = name + if err := obs.UpdateMeta(obj.Name, &info.ObjectMeta); err != nil { + return nil, err + } + return obs.GetInfo(name) + } + + link := &ObjectLink{Bucket: obj.Bucket, Name: obj.Name} + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: link}, + } + return obs.Put(meta, nil) +} + +// AddBucketLink will add a link to another object store. +func (ob *obs) AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) { + if bucket == nil { + return nil, errors.New("nats: bucket required") + } + name = sanitizeName(name) + if !keyValid(name) { + return nil, ErrInvalidObjectName + } + + bos, ok := bucket.(*obs) + if !ok { + return nil, errors.New("nats: bucket malformed") + } + meta := &ObjectMeta{ + Name: name, + Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: bos.name}}, + } + return ob.Put(meta, nil) +} + +// PutBytes is convenience function to put a byte slice into this object store. +func (obs *obs) PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, bytes.NewReader(data), opts...) +} + +// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. +func (obs *obs) GetBytes(name string, opts ...ObjectOpt) ([]byte, error) { + result, err := obs.Get(name, opts...) + if err != nil { + return nil, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return nil, err + } + return b.Bytes(), nil +} + +// PutBytes is convenience function to put a string into this object store. +func (obs *obs) PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) { + return obs.Put(&ObjectMeta{Name: name}, strings.NewReader(data), opts...) +} + +// GetString is a convenience function to pull an object from this object store and return it as a string. +func (obs *obs) GetString(name string, opts ...ObjectOpt) (string, error) { + result, err := obs.Get(name, opts...) + if err != nil { + return _EMPTY_, err + } + defer result.Close() + + var b bytes.Buffer + if _, err := b.ReadFrom(result); err != nil { + return _EMPTY_, err + } + return b.String(), nil +} + +// PutFile is convenience function to put a file into an object store. +func (obs *obs) PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) { + f, err := os.Open(file) + if err != nil { + return nil, err + } + defer f.Close() + return obs.Put(&ObjectMeta{Name: file}, f, opts...) +} + +// GetFile is a convenience function to pull and object and place in a file. +func (obs *obs) GetFile(name, file string, opts ...ObjectOpt) error { + // Expect file to be new. + f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return err + } + defer f.Close() + + result, err := obs.Get(name, opts...) + if err != nil { + os.Remove(f.Name()) + return err + } + defer result.Close() + + // Stream copy to the file. + _, err = io.Copy(f, result) + return err +} + +// GetInfo will retrieve the current information for the object. +func (obs *obs) GetInfo(name string) (*ObjectInfo, error) { + // Lookup the stream to get the bound subject. + obj := sanitizeName(name) + if !keyValid(obj) { + return nil, ErrInvalidObjectName + } + + // Grab last meta value we have. + meta := fmt.Sprintf(objMetaPreTmpl, obs.name, obj) + stream := fmt.Sprintf(objNameTmpl, obs.name) + + m, err := obs.js.GetLastMsg(stream, meta) + if err != nil { + if err == ErrMsgNotFound { + err = ErrObjectNotFound + } + return nil, err + } + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return nil, ErrBadObjectMeta + } + info.ModTime = m.Time + return &info, nil +} + +// UpdateMeta will update the meta data for the object. +func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error { + if meta == nil { + return ErrBadObjectMeta + } + // Grab meta info. + info, err := obs.GetInfo(name) + if err != nil { + return err + } + // Copy new meta + info.ObjectMeta = *meta + mm := NewMsg(fmt.Sprintf(objMetaPreTmpl, obs.name, sanitizeName(meta.Name))) + mm.Data, err = json.Marshal(info) + if err != nil { + return err + } + _, err = obs.js.PublishMsg(mm) + return err +} + +// Seal will seal the object store, no further modifications will be allowed. +func (obs *obs) Seal() error { + stream := fmt.Sprintf(objNameTmpl, obs.name) + si, err := obs.js.StreamInfo(stream) + if err != nil { + return err + } + // Seal the stream from being able to take on more messages. + cfg := si.Config + cfg.Sealed = true + _, err = obs.js.UpdateStream(&cfg) + return err +} + +// Implementation for Watch +type objWatcher struct { + updates chan *ObjectInfo + sub *Subscription +} + +// Updates returns the interior channel. +func (w *objWatcher) Updates() <-chan *ObjectInfo { + if w == nil { + return nil + } + return w.updates +} + +// Stop will unsubscribe from the watcher. +func (w *objWatcher) Stop() error { + if w == nil { + return nil + } + return w.sub.Unsubscribe() +} + +// Watch for changes in the underlying store and receive meta information updates. +func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) { + var o watchOpts + for _, opt := range opts { + if opt != nil { + if err := opt.configureWatcher(&o); err != nil { + return nil, err + } + } + } + + var initDoneMarker bool + + w := &objWatcher{updates: make(chan *ObjectInfo, 32)} + + update := func(m *Msg) { + var info ObjectInfo + if err := json.Unmarshal(m.Data, &info); err != nil { + return // TODO(dlc) - Communicate this upwards? + } + meta, err := m.Metadata() + if err != nil { + return + } + + if !o.ignoreDeletes || !info.Deleted { + info.ModTime = meta.Timestamp + w.updates <- &info + } + + if !initDoneMarker && meta.NumPending == 0 { + initDoneMarker = true + w.updates <- nil + } + } + + allMeta := fmt.Sprintf(objAllMetaPreTmpl, obs.name) + _, err := obs.js.GetLastMsg(obs.stream, allMeta) + if err == ErrMsgNotFound { + initDoneMarker = true + w.updates <- nil + } + + // Used ordered consumer to deliver results. + subOpts := []SubOpt{OrderedConsumer()} + if !o.includeHistory { + subOpts = append(subOpts, DeliverLastPerSubject()) + } + sub, err := obs.js.Subscribe(allMeta, update, subOpts...) + if err != nil { + return nil, err + } + w.sub = sub + return w, nil +} + +// List will list all the objects in this store. +func (obs *obs) List(opts ...WatchOpt) ([]*ObjectInfo, error) { + opts = append(opts, IgnoreDeletes()) + watcher, err := obs.Watch(opts...) + if err != nil { + return nil, err + } + defer watcher.Stop() + + var objs []*ObjectInfo + for entry := range watcher.Updates() { + if entry == nil { + break + } + objs = append(objs, entry) + } + if len(objs) == 0 { + return nil, ErrNoObjectsFound + } + return objs, nil +} + +// Read impl. +func (o *objResult) Read(p []byte) (n int, err error) { + o.Lock() + defer o.Unlock() + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + o.err = ctx.Err() + } else { + o.err = ErrTimeout + } + default: + } + } + if o.err != nil { + return 0, err + } + if o.r == nil { + return 0, io.EOF + } + + r := o.r.(net.Conn) + r.SetReadDeadline(time.Now().Add(200 * time.Millisecond)) + n, err = r.Read(p) + if err, ok := err.(net.Error); ok && err.Timeout() { + if ctx := o.ctx; ctx != nil { + select { + case <-ctx.Done(): + if ctx.Err() == context.Canceled { + return 0, ctx.Err() + } else { + return 0, ErrTimeout + } + default: + err = nil + } + } + } + return n, err +} + +// Close impl. +func (o *objResult) Close() error { + o.Lock() + defer o.Unlock() + if o.r == nil { + return nil + } + return o.r.Close() +} + +func (o *objResult) setErr(err error) { + o.Lock() + defer o.Unlock() + o.err = err +} + +func (o *objResult) Info() (*ObjectInfo, error) { + o.Lock() + defer o.Unlock() + return o.info, o.err +} + +func (o *objResult) Error() error { + o.Lock() + defer o.Unlock() + return o.err +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 535837fc3..cc0cf062a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -236,7 +236,7 @@ github.com/modern-go/reflect2 github.com/monochromegane/go-gitignore # github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/munnerz/goautoneg -# github.com/nats-io/nats.go v1.12.3 +# github.com/nats-io/nats.go v1.13.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/util @@ -343,7 +343,7 @@ github.com/yudai/gojsondiff github.com/yudai/gojsondiff/formatter # github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 github.com/yudai/golcs -# go.bytebuilders.dev/audit v0.0.9 +# go.bytebuilders.dev/audit v0.0.10 ## explicit go.bytebuilders.dev/audit/api/v1 go.bytebuilders.dev/audit/lib