Skip to content

Commit

Permalink
feat: logging with level implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
ramazan committed Sep 22, 2023
1 parent 6199ae6 commit 9f792aa
Show file tree
Hide file tree
Showing 25 changed files with 193 additions and 128 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ import (
func listener(ctx *models.ListenerContext) {
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Printf(
logger.Log.Info(
"mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
)
case models.DcpDeletion:
logger.Log.Printf(
logger.Log.Info(
"deleted(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
case models.DcpExpiration:
logger.Log.Printf(
logger.Log.Info(
"expired(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
Expand Down Expand Up @@ -114,6 +114,7 @@ $ go get github.com/Trendyol/go-dcp
| `api.port` | int | no | 8080 | Set API port |
| `metric.path` | string | no | /metrics | Set metric endpoint path. |
| `metric.averageWindowSec` | float64 | no | 10.0 | Set metric window range. |
| `logging.level` | string | no | info | Set logging level. |

### Environment Variables

Expand Down
12 changes: 6 additions & 6 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ type api struct {
}

func (s *api) Listen() {
logger.Log.Printf("api starting on port %d", s.config.API.Port)
logger.Log.Info("api starting on port %d", s.config.API.Port)

err := s.app.Listen(fmt.Sprintf(":%d", s.config.API.Port))

if err != nil {
logger.ErrorLog.Printf("api cannot start on port %d, err: %v", s.config.API.Port, err)
logger.Log.Error("api cannot start on port %d, err: %v", s.config.API.Port, err)
} else {
logger.Log.Printf("api stopped")
logger.Log.Info("api stopped")
}
}

func (s *api) Shutdown() {
err := s.app.Shutdown()
if err != nil {
logger.ErrorLog.Printf("api cannot be shutdown, err: %v", err)
logger.Log.Error("api cannot be shutdown, err: %v", err)
panic(err)
}
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func NewAPI(config *dcp.Dcp,
if err == nil {
app.Use(newMetricMiddleware(app, config))
} else {
logger.ErrorLog.Printf("metric middleware cannot be initialized: %v", err)
logger.Log.Error("metric middleware cannot be initialized: %v", err)
}

if config.Debug {
Expand All @@ -128,7 +128,7 @@ func newMetricMiddleware(app *fiber.App, config *dcp.Dcp) func(ctx *fiber.Ctx) e
fiberPrometheus := fiberprometheus.New(config.Dcp.Group.Name)
fiberPrometheus.RegisterAt(app, config.Metric.Path)

logger.Log.Printf("metric middleware registered on path %s", config.Metric.Path)
logger.Log.Info("metric middleware registered on path %s", config.Metric.Path)

return fiberPrometheus.Middleware
}
27 changes: 23 additions & 4 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type Metadata struct {
ReadOnly bool `json:"readOnly"`
}

type Logging struct {
Level string `yaml:"level"`
}

type Dcp struct {
Username string `yaml:"username"`
BucketName string `yaml:"bucketName"`
Expand All @@ -114,6 +118,7 @@ type Dcp struct {
ConnectionBufferSize uint `yaml:"connectionBufferSize"`
SecureConnection bool `yaml:"secureConnection"`
Debug bool `yaml:"debug"`
Logging Logging `yaml:"logging"`
}

func (c *Dcp) IsCollectionModeEnabled() bool {
Expand All @@ -135,13 +140,13 @@ func (c *Dcp) GetFileMetadata() string {
fileName = c.Metadata.Config[FileMetadataFileNameConfig]
} else {
err := errors.New("file metadata file name is not set")
logger.ErrorLog.Printf("failed to get metadata file name: %v", err)
logger.Log.Error("failed to get metadata file name: %v", err)
panic(err)
}

if fileName == "" {
err := errors.New("file metadata file name is empty")
logger.ErrorLog.Printf("failed to get metadata file name: %v", err)
logger.Log.Error("failed to get metadata file name: %v", err)
panic(err)
}

Expand Down Expand Up @@ -184,7 +189,7 @@ func (c *Dcp) getMetadataConnectionBufferSize() uint {
if connectionBufferSize, ok := c.Metadata.Config[CouchbaseMetadataConnectionBufferSizeConfig]; ok {
parsedConnectionBufferSize, err := strconv.ParseUint(connectionBufferSize, 10, 32)
if err != nil {
logger.ErrorLog.Printf("failed to parse metadata connection buffer size: %v", err)
logger.Log.Error("failed to parse metadata connection buffer size: %v", err)
panic(err)
}

Expand All @@ -198,7 +203,7 @@ func (c *Dcp) getMetadataConnectionTimeout() time.Duration {
if connectionTimeout, ok := c.Metadata.Config[CouchbaseMetadataConnectionTimeoutConfig]; ok {
parsedConnectionTimeout, err := time.ParseDuration(connectionTimeout)
if err != nil {
logger.ErrorLog.Printf("failed to parse metadata connection timeout: %v", err)
logger.Log.Error("failed to parse metadata connection timeout: %v", err)
panic(err)
}

Expand All @@ -222,6 +227,7 @@ func (c *Dcp) ApplyDefaults() {
c.applyDefaultLeaderElection()
c.applyDefaultDcp()
c.applyDefaultMetadata()
c.applyLogging()
}

func (c *Dcp) applyDefaultRollbackMitigation() {
Expand Down Expand Up @@ -369,3 +375,16 @@ func (c *Dcp) applyDefaultMetadata() {
c.Metadata.Type = MetadataTypeCouchbase
}
}

func (c *Dcp) applyLogging() {
if logger.Log != nil {
return
}

loggingLevel := c.Logging.Level
if loggingLevel == "" {
c.Logging.Level = logger.INFO
}

logger.InitDefaultLogger(c.Logging.Level)
}
22 changes: 11 additions & 11 deletions couchbase/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *client) GetMetaAgent() *gocbcore.Agent {
func CreateTLSRootCaProvider(rootCAPath string) func() *x509.CertPool {
cert, err := os.ReadFile(os.ExpandEnv(rootCAPath))
if err != nil {
logger.ErrorLog.Printf("error while reading cert file: %v", err)
logger.Log.Error("error while reading cert file: %v", err)
panic(err)
}

Expand Down Expand Up @@ -251,11 +251,11 @@ func (s *client) Connect() error {
s.metaAgent = metaAgent
}

logger.Log.Printf("connected to %s, bucket: %s, meta bucket: %s", s.config.Hosts, s.config.BucketName, metadataBucketName)
logger.Log.Info("connected to %s, bucket: %s, meta bucket: %s", s.config.Hosts, s.config.BucketName, metadataBucketName)
return nil
}

logger.Log.Printf("connected to %s, bucket: %s", s.config.Hosts, s.config.BucketName)
logger.Log.Info("connected to %s, bucket: %s", s.config.Hosts, s.config.BucketName)

return nil
}
Expand All @@ -269,7 +269,7 @@ func (s *client) Close() {
_ = s.agent.Close()
}

logger.Log.Printf("connections closed %s", s.config.Hosts)
logger.Log.Info("connections closed %s", s.config.Hosts)
}

func (s *client) DcpConnect() error {
Expand Down Expand Up @@ -327,14 +327,14 @@ func (s *client) DcpConnect() error {
}

s.dcpAgent = client
logger.Log.Printf("connected to %s as dcp, bucket: %s", s.config.Hosts, s.config.BucketName)
logger.Log.Info("connected to %s as dcp, bucket: %s", s.config.Hosts, s.config.BucketName)

return nil
}

func (s *client) DcpClose() {
_ = s.dcpAgent.Close()
logger.Log.Printf("dcp connection closed %s", s.config.Hosts)
logger.Log.Info("dcp connection closed %s", s.config.Hosts)
}

func (s *client) GetVBucketSeqNos() (map[uint16]uint64, error) {
Expand Down Expand Up @@ -381,13 +381,13 @@ func (s *client) GetVBucketSeqNos() (map[uint16]uint64, error) {
func (s *client) GetNumVBuckets() int {
snapshot, err := s.GetConfigSnapshot()
if err != nil {
logger.ErrorLog.Printf("failed to get config snapshot: %v", err)
logger.Log.Error("failed to get config snapshot: %v", err)
panic(err)
}

vBuckets, err := snapshot.NumVbuckets()
if err != nil {
logger.ErrorLog.Printf("failed to get number of vbucket: %v", err)
logger.Log.Error("failed to get number of vbucket: %v", err)
panic(err)
}

Expand Down Expand Up @@ -428,7 +428,7 @@ func (s *client) openStreamWithRollback(vbID uint16,
observer Observer,
openStreamOptions gocbcore.OpenStreamOptions,
) error {
logger.Log.Printf(
logger.Log.Info(
"open stream with rollback, vbID: %d, failedSeqNo: %d, rollbackSeqNo: %d",
vbID, failedSeqNo, rollbackSeqNo,
)
Expand Down Expand Up @@ -521,7 +521,7 @@ func (s *client) OpenStream(

if err != nil {
if rollbackErr, ok := err.(gocbcore.DCPRollbackError); ok {
logger.Log.Printf("need to rollback for vbID: %d, vbUUID: %d", vbID, offset.VbUUID)
logger.Log.Info("need to rollback for vbID: %d, vbUUID: %d", vbID, offset.VbUUID)
return s.openStreamWithRollback(vbID, gocbcore.SeqNo(offset.SeqNo), rollbackErr.SeqNo, observer, openStreamOptions)
}
}
Expand Down Expand Up @@ -587,7 +587,7 @@ func (s *client) GetCollectionIDs(scopeName string, collectionNames []string) ma
for _, collectionName := range collectionNames {
collectionID, err := s.getCollectionID(scopeName, collectionName)
if err != nil {
logger.ErrorLog.Printf("cannot get collection ids: %v", err)
logger.Log.Error("cannot get collection ids: %v", err)
panic(err)
}

Expand Down
24 changes: 12 additions & 12 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (h *cbMembership) register() {

err := h.createIndex(ctx, now)
if err != nil {
logger.ErrorLog.Printf("error while create index: %v", err)
logger.Log.Error("error while create index: %v", err)
panic(err)
}

Expand All @@ -95,7 +95,7 @@ func (h *cbMembership) register() {
}

if err != nil {
logger.ErrorLog.Printf("error while register: %v", err)
logger.Log.Error("error while register: %v", err)
panic(err)
}
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func (h *cbMembership) heartbeat() {

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.id, payload, _expirySec)
if err != nil {
logger.ErrorLog.Printf("error while heartbeat: %v", err)
logger.Log.Error("error while heartbeat: %v", err)
return
}
}
Expand All @@ -150,15 +150,15 @@ func (h *cbMembership) monitor() {

data, err := Get(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll)
if err != nil {
logger.ErrorLog.Printf("error while monitor try to get index: %v", err)
logger.Log.Error("error while monitor try to get index: %v", err)
return
}

all := map[string]int64{}

err = jsoniter.Unmarshal(data, &all)
if err != nil {
logger.ErrorLog.Printf("error while monitor try to unmarshal index: %v", err)
logger.Log.Error("error while monitor try to unmarshal index: %v", err)
return
}

Expand All @@ -184,7 +184,7 @@ func (h *cbMembership) monitor() {
if errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
return
} else {
logger.ErrorLog.Printf("error while monitor try to get instance: %v", err)
logger.Log.Error("error while monitor try to get instance: %v", err)
panic(err)
}
}
Expand All @@ -194,14 +194,14 @@ func (h *cbMembership) monitor() {
err = jsoniter.Unmarshal(doc, instance)

if err != nil {
logger.ErrorLog.Printf("error while monitor try to unmarshal instance %v, err: %v", string(doc), err)
logger.Log.Error("error while monitor try to unmarshal instance %v, err: %v", string(doc), err)
panic(err)
}

if h.isAlive(instance.HeartbeatTime) {
instances[i] = instance
} else {
logger.Log.Printf("instance %v is not alive", instance.ID)
logger.Log.Info("instance %v is not alive", instance.ID)
}
}(i, id)
}
Expand Down Expand Up @@ -231,7 +231,7 @@ func (h *cbMembership) updateIndex(ctx context.Context) {

err := UpdateDocument(ctx, h.client.GetMetaAgent(), h.scopeName, h.collectionName, h.instanceAll, payload, 0)
if err != nil {
logger.ErrorLog.Printf("error while update instances: %v", err)
logger.Log.Error("error while update instances: %v", err)
return
}
}
Expand All @@ -248,7 +248,7 @@ func (h *cbMembership) rebalance(instances []Instance) {

if selfOrder == 0 {
err := errors.New("cant find self in cluster")
logger.ErrorLog.Printf("error while rebalance, self = %v, err: %v", string(h.id), err)
logger.Log.Error("error while rebalance, self = %v, err: %v", string(h.id), err)
panic(err)
} else {
h.bus.Emit(helpers.MembershipChangedBusEventName, &membership.Model{
Expand All @@ -274,7 +274,7 @@ func (h *cbMembership) startMonitor() {
h.monitorTicker = time.NewTicker(_monitorIntervalMs * time.Millisecond)

go func() {
logger.Log.Printf("couchbase membership will start after %v", h.config.Dcp.Group.Membership.RebalanceDelay)
logger.Log.Info("couchbase membership will start after %v", h.config.Dcp.Group.Membership.RebalanceDelay)
time.Sleep(h.config.Dcp.Group.Membership.RebalanceDelay)

for range h.monitorTicker.C {
Expand All @@ -300,7 +300,7 @@ func (h *cbMembership) membershipChangedListener(event interface{}) {
func NewCBMembership(config *config.Dcp, client Client, bus helpers.Bus) membership.Membership {
if !config.IsCouchbaseMetadata() {
err := errors.New("unsupported metadata type")
logger.ErrorLog.Printf("cannot initialize couchbase membership, err: %v", err)
logger.Log.Error("cannot initialize couchbase membership, err: %v", err)
panic(err)
}

Expand Down
4 changes: 2 additions & 2 deletions couchbase/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *cbMetadata) Load(
if err == nil || errors.As(err, &kvErr) && kvErr.StatusCode == memd.StatusKeyNotFound {
state.Store(vbID, doc)
} else {
logger.ErrorLog.Printf("cannot load checkpoint, vbID: %d, err: %v", vbID, err)
logger.Log.Error("cannot load checkpoint, vbID: %d, err: %v", vbID, err)
panic(err)
}

Expand Down Expand Up @@ -131,7 +131,7 @@ func (s *cbMetadata) Clear(vbIds []uint16) error {
func NewCBMetadata(client Client, config *config.Dcp) metadata.Metadata {
if !config.IsCouchbaseMetadata() {
err := errors.New("unsupported metadata type")
logger.ErrorLog.Printf("cannot initialize couchbase metadata: %v", err)
logger.Log.Error("cannot initialize couchbase metadata: %v", err)
panic(err)
}

Expand Down
2 changes: 1 addition & 1 deletion couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (so *observer) needCatchup(vbID uint16, seqNo uint64) bool {
so.catchup.Delete(vbID)
so.catchupNeededVbIDCount--

logger.Log.Printf("catchup completed for vbID: %d", vbID)
logger.Log.Info("catchup completed for vbID: %d", vbID)

return seqNo == catchupSeqNo
}
Expand Down
Loading

0 comments on commit 9f792aa

Please sign in to comment.