Make golang channel be able to be subscribed with multiple subscribers.
It uses generic so Go1.18+ needed.
Go channel applys only one subscriber at once. chdist
implements pub-sub model of go channel, so it can be used like a small message queue system. Also, chdist
provides sync and async method both. This system can be used just like handling a normal channel.
Distributor
is a publisher. The constructor of Distributor
receives generic type and a channel (whose type is given generic type) to generate an instance, and it can be subscribed as sync or async.
channel := make(chan int)
distributor := chdist.NewDistributor[int](channel)
You can publish using the channel and the distributor both.
channel <- 5
// or
distributor.In() <- 5
Multiple subscribing is possible.
// sync
distributor.Subscribe(func(value int) {
fmt.Println(value)
})
// async
for value := range distributor.AsyncSubscribe.Out() {
fmt.Println(value)
}
And it can be closed by using Close()
.
distributor.Close()
When a distributor is subscribed, it returns a Subscription
instance. This can be closed, health-checked, or something else with its methods.
subscription := distributor.Subscribe(func(value int) { /* logics */ })
if subscription.IsAlive() {
subscription.Close()
}
AsyncSubscription
can be used like a common channel.
asyncSubscription := distributor.AsyncSubscribe()
fmt.Println(<-asyncSubscription.Out())
JsonDistributor
can handle JSON data. This receives a type of the JSON data will be after unmarshaled.
It applys string
and []byte
type of JSON data, each of them can be generated by NewJsonStringDistributor
and NewJsonBytesDistributor
.
It uses encoding/json
package to unmarshal which can occur an error, so JsonDistributor
uses a struct Item
representing both value and error.
type Item[T any] struct {
Value T
Error error
}
JsonStringDistributor
's constructor receives string
type Distributor
and JsonBytesDistributor
's receives []byte
type Distributor
.
type Sample struct {
Num int `json:"num"`
}
channel := make(chan string)
jsonStringDistributor := chdist.NewJsonStringDistributor[Sample](
chdist.NewDistributor[string](channel),
)
go func() {
for {
channel <- `{"num":123}`
time.Sleep(time.Second * 5)
}
}()
jsonStringDistributor.Subscribe(func(item chdist.Item[Sample]) {
fmt.Println(item.Value) // type of `Sample`
fmt.Println(item.Error) // if error occured while unmarshaling, it goes to
})
go get github.com/p9595jh/chdist
import "github.com/p9595jh/chdist"