Skip to content

Commit

Permalink
Merge pull request #41 from laszlocph/prom-metrics-for-each-run
Browse files Browse the repository at this point in the history
More prometheus metrics, refactoring
  • Loading branch information
laszlocph authored Jun 28, 2019
2 parents ba70314 + 1d564e9 commit 2832581
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 106 deletions.
21 changes: 5 additions & 16 deletions cmd/drone-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,23 +563,17 @@ func server(c *cli.Context) error {
auther := &authorizer{
password: c.String("agent-secret"),
}
s := grpc.NewServer(
grpcServer := grpc.NewServer(
grpc.StreamInterceptor(auther.streamInterceptor),
grpc.UnaryInterceptor(auther.unaryIntercaptor),
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: c.Duration("keepalive-min-time"),
}),
)
ss := new(droneserver.DroneServer)
ss.Queue = droneserver.Config.Services.Queue
ss.Logger = droneserver.Config.Services.Logs
ss.Pubsub = droneserver.Config.Services.Pubsub
ss.Remote = remote_
ss.Store = store_
ss.Host = droneserver.Config.Server.Host
proto.RegisterDroneServer(s, ss)

err = s.Serve(lis)
droneServer := droneserver.NewDroneServer(remote_, droneserver.Config.Services.Queue, droneserver.Config.Services.Logs, droneserver.Config.Services.Pubsub, store_, droneserver.Config.Server.Host)
proto.RegisterDroneServer(grpcServer, droneServer)

err = grpcServer.Serve(lis)
if err != nil {
logrus.Error(err)
return err
Expand Down Expand Up @@ -651,11 +645,6 @@ func server(c *cli.Context) error {
return g.Wait()
}

// HACK please excuse the message during this period of heavy refactoring.
// We are currently transitioning from storing services (ie database, queue)
// in the gin.Context to storing them in a struct. We are also moving away
// from gin to gorilla. We will temporarily use global during our refactoring
// which will be removing in the final implementation.
func setupEvilGlobals(c *cli.Context, v store.Store, r remote.Remote) {

// storage
Expand Down
2 changes: 1 addition & 1 deletion cmd/drone-server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func setupMetrics(g *errgroup.Group, store_ store.Store) {
})
builds := promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "drone",
Name: "build_count",
Name: "build_total_count",
Help: "Total number of builds.",
})
users := promauto.NewGauge(prometheus.GaugeOpts{
Expand Down
140 changes: 51 additions & 89 deletions server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/laszlocph/drone-oss-08/cncd/pipeline/pipeline/rpc/proto"
"github.com/laszlocph/drone-oss-08/cncd/pubsub"
"github.com/laszlocph/drone-oss-08/cncd/queue"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/laszlocph/drone-oss-08/model"
"github.com/laszlocph/drone-oss-08/remote"
Expand All @@ -41,11 +43,6 @@ import (
"github.com/drone/expr"
)

// This file is a complete disaster because I'm trying to wedge in some
// experimental code. Please pardon our appearance during renovations.

// Config is an evil global configuration that will be used as we transition /
// refactor the codebase to move away from storing these values in the Context.
var Config = struct {
Services struct {
Pubsub pubsub.Publisher
Expand Down Expand Up @@ -91,12 +88,14 @@ var Config = struct {
}{}

type RPC struct {
remote remote.Remote
queue queue.Queue
pubsub pubsub.Publisher
logger logging.Log
store store.Store
host string
remote remote.Remote
queue queue.Queue
pubsub pubsub.Publisher
logger logging.Log
store store.Store
host string
buildTime *prometheus.GaugeVec
buildCount *prometheus.CounterVec
}

// Next implements the rpc.Next function
Expand Down Expand Up @@ -416,6 +415,14 @@ func (s *RPC) Done(c context.Context, id string, state rpc.State) error {

s.notify(c, repo, build, procs)

if build.Status == model.StatusSuccess || build.Status == model.StatusFailure {
s.buildCount.WithLabelValues(repo.FullName, build.Branch, build.Status, "total").Inc()
s.buildTime.WithLabelValues(repo.FullName, build.Branch, build.Status, "total").Set(float64(build.Finished - build.Started))
}
if isMultiPipeline(procs) {
s.buildTime.WithLabelValues(repo.FullName, build.Branch, proc.State, proc.Name).Set(float64(proc.Stopped - proc.Started))
}

return nil
}

Expand Down Expand Up @@ -558,30 +565,41 @@ func createFilterFunc(filter rpc.Filter) (queue.Filter, error) {

// DroneServer is a grpc server implementation.
type DroneServer struct {
Remote remote.Remote
Queue queue.Queue
Pubsub pubsub.Publisher
Logger logging.Log
Store store.Store
Host string
peer RPC
}

func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto.NextReply, error) {
func NewDroneServer(remote remote.Remote, queue queue.Queue, logger logging.Log, pubsub pubsub.Publisher, store store.Store, host string) *DroneServer {
buildTime := promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "drone",
Name: "build_time",
Help: "Build time.",
}, []string{"repo", "branch", "status", "pipeline"})
buildCount := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "drone",
Name: "build_count",
Help: "Build count.",
}, []string{"repo", "branch", "status", "pipeline"})
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
remote: remote,
store: store,
queue: queue,
pubsub: pubsub,
logger: logger,
host: host,
buildTime: buildTime,
buildCount: buildCount,
}
return &DroneServer{peer: peer}
}

func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto.NextReply, error) {
filter := rpc.Filter{
Labels: req.GetFilter().GetLabels(),
Expr: req.GetFilter().GetExpr(),
}

res := new(proto.NextReply)
pipeline, err := peer.Next(c, filter)
pipeline, err := s.peer.Next(c, filter)
if err != nil {
return res, err
}
Expand All @@ -598,14 +616,6 @@ func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto
}

func (s *DroneServer) Init(c oldcontext.Context, req *proto.InitRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Expand All @@ -615,19 +625,11 @@ func (s *DroneServer) Init(c oldcontext.Context, req *proto.InitRequest) (*proto
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Init(c, req.GetId(), state)
err := s.peer.Init(c, req.GetId(), state)
return res, err
}

func (s *DroneServer) Update(c oldcontext.Context, req *proto.UpdateRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Expand All @@ -637,19 +639,11 @@ func (s *DroneServer) Update(c oldcontext.Context, req *proto.UpdateRequest) (*p
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Update(c, req.GetId(), state)
err := s.peer.Update(c, req.GetId(), state)
return res, err
}

func (s *DroneServer) Upload(c oldcontext.Context, req *proto.UploadRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
file := &rpc.File{
Data: req.GetFile().GetData(),
Mime: req.GetFile().GetMime(),
Expand All @@ -661,19 +655,11 @@ func (s *DroneServer) Upload(c oldcontext.Context, req *proto.UploadRequest) (*p
}

res := new(proto.Empty)
err := peer.Upload(c, req.GetId(), file)
err := s.peer.Upload(c, req.GetId(), file)
return res, err
}

func (s *DroneServer) Done(c oldcontext.Context, req *proto.DoneRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
state := rpc.State{
Error: req.GetState().GetError(),
ExitCode: int(req.GetState().GetExitCode()),
Expand All @@ -683,54 +669,30 @@ func (s *DroneServer) Done(c oldcontext.Context, req *proto.DoneRequest) (*proto
Exited: req.GetState().GetExited(),
}
res := new(proto.Empty)
err := peer.Done(c, req.GetId(), state)
err := s.peer.Done(c, req.GetId(), state)
return res, err
}

func (s *DroneServer) Wait(c oldcontext.Context, req *proto.WaitRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
res := new(proto.Empty)
err := peer.Wait(c, req.GetId())
err := s.peer.Wait(c, req.GetId())
return res, err
}

func (s *DroneServer) Extend(c oldcontext.Context, req *proto.ExtendRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
res := new(proto.Empty)
err := peer.Extend(c, req.GetId())
err := s.peer.Extend(c, req.GetId())
return res, err
}

func (s *DroneServer) Log(c oldcontext.Context, req *proto.LogRequest) (*proto.Empty, error) {
peer := RPC{
remote: s.Remote,
store: s.Store,
queue: s.Queue,
pubsub: s.Pubsub,
logger: s.Logger,
host: s.Host,
}
line := &rpc.Line{
Out: req.GetLine().GetOut(),
Pos: int(req.GetLine().GetPos()),
Time: req.GetLine().GetTime(),
Proc: req.GetLine().GetProc(),
}
res := new(proto.Empty)
err := peer.Log(c, req.GetId(), line)
err := s.peer.Log(c, req.GetId(), line)
return res, err
}

0 comments on commit 2832581

Please sign in to comment.