Skip to content

Commit

Permalink
Merge pull request #9 from nlnwa/log-service
Browse files Browse the repository at this point in the history
Stop collecting crawllog and pagelog
  • Loading branch information
johnerikhalse authored Apr 21, 2021
2 parents 0fb7197 + a458a4c commit 9d26ba9
Show file tree
Hide file tree
Showing 20 changed files with 877 additions and 842 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@
*.out

.idea

veidemann-metrics
70 changes: 0 additions & 70 deletions config.go

This file was deleted.

28 changes: 18 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
module github.com/nlnwa/veidemann-metrics

go 1.14
go 1.15

require (
github.com/golang/protobuf v1.4.3
github.com/nlnwa/veidemann-api-go v1.0.0-beta18
github.com/golang/protobuf v1.5.0
github.com/magefile/mage v1.11.0 // indirect
github.com/nlnwa/veidemann-api/go v0.0.0-20210413093311-7ff38e848604
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/prometheus/client_golang v1.8.0
github.com/prometheus/common v0.15.0
github.com/sirupsen/logrus v1.7.0 // indirect
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
google.golang.org/grpc v1.33.1
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/common v0.17.0
github.com/prometheus/procfs v0.6.0 // indirect
github.com/rs/zerolog v1.21.0
github.com/sirupsen/logrus v1.8.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
golang.org/x/crypto v0.0.0-20210218145215-b8e89b74b9df // indirect
golang.org/x/net v0.0.0-20210119194325-5f4716e94777 // indirect
golang.org/x/sys v0.0.0-20210219172841-57ea560cfca1 // indirect
golang.org/x/text v0.3.5 // indirect
google.golang.org/genproto v0.0.0-20210219173056-d891e3cb3b5b // indirect
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.26.0
gopkg.in/rethinkdb/rethinkdb-go.v6 v6.2.1
)
190 changes: 177 additions & 13 deletions go.sum

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions internal/frontier/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2018 National Library of Norway
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package aggregator contains an aggregator service client
package frontier

import (
"context"
"fmt"
"github.com/golang/protobuf/ptypes/empty"
"github.com/nlnwa/veidemann-api/go/frontier/v1"
"google.golang.org/grpc"
)

type grpcClient struct {
address string
*grpc.ClientConn
}

func (ac *grpcClient) Connect(ctx context.Context) error {
conn, err := grpc.DialContext(ctx, ac.address,
grpc.WithInsecure(),
grpc.WithBlock())
if err != nil {
return fmt.Errorf("failed to dial %s: %w", ac.address, err)
}
ac.ClientConn = conn
return nil
}

func (ac *grpcClient) Close() {
if ac.ClientConn != nil {
_ = ac.ClientConn.Close()
}
return
}

type Client struct {
*grpcClient
frontier.FrontierClient
}

func New(host string, port int) *Client {
return &Client{
grpcClient: &grpcClient{
address: fmt.Sprintf("%s:%d", host, port),
},
}
}

func (f *Client) Connect(ctx context.Context) error {
err := f.grpcClient.Connect(ctx)
if err != nil {
return err
}
f.FrontierClient = frontier.NewFrontierClient(f.ClientConn)
return nil
}

func (f *Client) QueueCountTotal(ctx context.Context) (int64, error) {
res, err := f.FrontierClient.QueueCountTotal(ctx, &empty.Empty{})
if err != nil {
return 0, err
}
return res.GetCount(), nil
}
42 changes: 42 additions & 0 deletions internal/logger/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package logger

import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
stdlog "log"
"os"
"strings"
"time"
)

func InitLog(level string, format string, logCaller bool) {
zerolog.TimeFieldFormat = zerolog.TimeFormatUnix

switch strings.ToLower(level) {
case "panic":
log.Logger = log.Level(zerolog.PanicLevel)
case "fatal":
log.Logger = log.Level(zerolog.FatalLevel)
case "error":
log.Logger = log.Level(zerolog.ErrorLevel)
case "warn":
log.Logger = log.Level(zerolog.WarnLevel)
case "info":
log.Logger = log.Level(zerolog.InfoLevel)
case "debug":
log.Logger = log.Level(zerolog.DebugLevel)
case "trace":
log.Logger = log.Level(zerolog.TraceLevel)
}

if format == "logfmt" {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339})
}

if logCaller {
log.Logger = log.With().Caller().Logger()
}

stdlog.SetFlags(0)
stdlog.SetOutput(log.Logger)
}
55 changes: 55 additions & 0 deletions internal/metrics/collectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2018 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/version"
)

const (
Namespace = "veidemann"
)

var (
JobStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "job",
Name: "status_total",
Help: "Status for running jobs",
}, []string{"job_name", "status"})

JobSize = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "job",
Name: "size_total",
Help: "Sizes for running jobs",
}, []string{"job_name", "type"})
)

func registerCollectors(collectUriQueueLength func() float64) {
prometheus.MustRegister(version.NewCollector("veidemann_exporter"))

prometheus.MustRegister(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: "uri",
Name: "queue_count",
Help: "Number of uris in queue.",
}, func() float64 { return collectUriQueueLength() }))
}
99 changes: 99 additions & 0 deletions internal/metrics/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2018 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metrics

import (
"context"
frontierV1 "github.com/nlnwa/veidemann-api/go/frontier/v1"
"github.com/nlnwa/veidemann-metrics/internal/frontier"
"github.com/nlnwa/veidemann-metrics/internal/rethinkdb"
"log"
"time"
)

type Exporter struct {
rethinkdb *rethinkdb.Query
frontier *frontier.Client
}

// New creates a new Exporter
func New(rethinkdb *rethinkdb.Query, frontier *frontier.Client) *Exporter {
return &Exporter{
rethinkdb,
frontier,
}
}

func (e *Exporter) Run(interval time.Duration) {
registerCollectors(e.collectUriQueueLength)
go e.collectJobStatusJob()
go func() {
for range time.Tick(interval) {
e.collectJobStatusJob()
}
}()
}

func (e *Exporter) collectJobStatusJob() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := e.rethinkdb.WalkLatestJobExecutionForCrawlJobs(ctx, collectJobStatus)
if err != nil {
log.Fatal(err)
}
}

func (e *Exporter) collectUriQueueLength() float64 {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
count, err := e.frontier.QueueCountTotal(ctx)
if err != nil {
log.Fatal(err)
}
return float64(count)
}

func collectJobStatus(jobState *frontierV1.JobExecutionStatus) {
name := jobState.GetJobId()
stateOrDefault := getOrDefault(jobState.GetExecutionsState())
JobStatus.WithLabelValues(name, "ABORTED_MANUAL").Set(stateOrDefault("ABORTED_MANUAL", 0))
JobStatus.WithLabelValues(name, "ABORTED_SIZE").Set(stateOrDefault("ABORTED_SIZE", 0))
JobStatus.WithLabelValues(name, "ABORTED_TIMEOUT").Set(stateOrDefault("ABORTED_TIMEOUT", 0))
JobStatus.WithLabelValues(name, "CREATED").Set(stateOrDefault("CREATED", 0))
JobStatus.WithLabelValues(name, "FAILED").Set(stateOrDefault("FAILED", 0))
JobStatus.WithLabelValues(name, "FETCHING").Set(stateOrDefault("FETCHING", 0))
JobStatus.WithLabelValues(name, "FINISHED").Set(stateOrDefault("FINISHED", 0))
JobStatus.WithLabelValues(name, "SLEEPING").Set(stateOrDefault("SLEEPING", 0))

JobSize.WithLabelValues(name, "documentsCrawled").Set(float64(jobState.GetDocumentsCrawled()))
JobSize.WithLabelValues(name, "documentsDenied").Set(float64(jobState.GetDocumentsDenied()))
JobSize.WithLabelValues(name, "documentsFailed").Set(float64(jobState.GetDocumentsFailed()))
JobSize.WithLabelValues(name, "documentsOutOfScope").Set(float64(jobState.GetDocumentsOutOfScope()))
JobSize.WithLabelValues(name, "documentsRetried").Set(float64(jobState.GetDocumentsRetried()))
JobSize.WithLabelValues(name, "urisCrawled").Set(float64(jobState.GetUrisCrawled()))
JobSize.WithLabelValues(name, "bytesCrawled").Set(float64(jobState.GetBytesCrawled()))
}

func getOrDefault(m map[string]int32) func(k string, v int32) float64 {
return func(k string, v int32) float64 {
if value, ok := m[k]; !ok {
return float64(v)
} else {
return float64(value)
}
}
}
Loading

0 comments on commit 9d26ba9

Please sign in to comment.