Skip to content

Commit

Permalink
Merge pull request #246 from TRON-US/BTFS-928-to-release
Browse files Browse the repository at this point in the history
[BTFS-928]for v0.2.3 release: switch to grpc client
  • Loading branch information
laipogo authored Nov 12, 2019
2 parents 691ac90 + 0811eec commit 5f801bc
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 67 deletions.
177 changes: 113 additions & 64 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package analytics

import (
"bytes"
"encoding/json"
"context"
"fmt"
"net/http"
"github.com/cenkalti/backoff"
"github.com/dustin/go-humanize"
"github.com/gogo/protobuf/proto"
"github.com/tron-us/go-btfs-common/protos/node"
pb "github.com/tron-us/go-btfs-common/protos/status"
"google.golang.org/grpc"
"runtime"
"time"

Expand Down Expand Up @@ -42,32 +46,21 @@ type dataCollection struct {
NumPeers uint64 `json:"peers_connected"` //Number of peers
}

type dataBag struct {
PublicKey []byte `json:"public_key"`
Signature []byte `json:"signature"`
Payload []byte `json:"payload"`
}

type healthData struct {
NodeId string `json:"node_id"`
BTFSVersion string `json:"btfs_version"`
FailurePoint string `json:"failure_point"`
}

//Server URL for data collection
var statusServerDomain string

const (
routeMetrics = "/metrics"
routeHealth = "/health"
)

// other constants
const (
kilobyte = 1024

//HeartBeat is how often we send data to server, at the moment set to 15 Minutes
heartBeat = 15 * time.Minute

maxRetryTimes = 3

dialTimeout = time.Minute

callTimeout = 5 * time.Second
)

//Go doesn't have a built in Max function? simple function to not have negatives values
Expand Down Expand Up @@ -158,51 +151,103 @@ func (dc *dataCollection) update() {
dc.NumPeers = uint64(len(st.Peers))
}

func (dc *dataCollection) getGrpcConn() (*grpc.ClientConn, context.CancelFunc, error) {
config, err := dc.node.Repo.Config()
if err != nil {
return nil, nil, fmt.Errorf("failed to load config: %s", err.Error())
}

ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
conn, err := grpc.DialContext(ctx, config.StatusServerDomain, grpc.WithInsecure(), grpc.WithDisableRetry())
if err != nil {
return nil, nil, fmt.Errorf("failed to connect to status server: %s", err.Error())
}
return conn, cancel, nil
}

func (dc *dataCollection) sendData() {
retry(func() error {
return dc.doSendData()
})
}

func (dc *dataCollection) doSendData() error {
dc.update()
dcMarshal, err := json.Marshal(dc)
payload, err := dc.getPayload()
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to marshal dataCollection object to a byte array: %s", err.Error()))
return
return err
}
if dc.node.PrivateKey == nil {
dc.reportHealthAlert("node's private key is null")
return
return err
}
signature, err := dc.node.PrivateKey.Sign(dcMarshal)

signature, err := dc.node.PrivateKey.Sign(payload)
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to sign raw data with node private key: %s", err.Error()))
return
return err
}

publicKey, err := ic.MarshalPublicKey(dc.node.PrivateKey.GetPublic())
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to marshal node public key: %s", err.Error()))
return
return err
}
dataBagInstance := new(dataBag)
dataBagInstance.PublicKey = publicKey
dataBagInstance.Signature = signature
dataBagInstance.Payload = dcMarshal
dataBagMarshaled, err := json.Marshal(dataBagInstance)

sm := new(pb.SignedMetrics)
sm.Payload = payload
sm.Signature = signature
sm.PublicKey = publicKey

conn, cancel, err := dc.getGrpcConn()
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to marshal databag: %s", err.Error()))
return
return err
}
defer cancel()
defer conn.Close()

// btfs node reports to status server by making HTTP request
req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", statusServerDomain, routeMetrics), bytes.NewReader(dataBagMarshaled))
ctx, cancel := context.WithTimeout(context.Background(), callTimeout)
defer cancel()
client := pb.NewStatusClient(conn)
_, err = client.UpdateMetrics(ctx, sm)
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to make new http request: %s", err.Error()))
return
return err
}
req.Header.Add("Content-Type", "application/json")
return nil
}

res, err := http.DefaultClient.Do(req)
func (dc *dataCollection) getPayload() ([]byte, error) {
nd := new(node.Node)
now := time.Now().UTC()
nd.TimeCreated = &now
nd.NodeId = dc.NodeID
nd.BtfsVersion = dc.BTFSVersion
nd.ArchType = dc.ArchType
nd.BlocksDown = dc.BlocksDown
nd.BlocksUp = dc.BlocksUp
nd.CpuInfo = dc.CPUInfo
nd.CpuUsed = dc.CPUUsed
nd.Download = dc.Download
nd.MemoryUsed = dc.MemUsed
nd.OsType = dc.OSType
nd.PeersConnected = dc.NumPeers
nd.StorageUsed = dc.StorageUsed
nd.UpTime = dc.UpTime
nd.Upload = dc.Upload
nd.TotalUpload = dc.TotalUp
nd.TotalDownload = dc.TotalDown
if config, err := dc.node.Repo.Config(); err == nil {
if storageMax, err := humanize.ParseBytes(config.Datastore.StorageMax); err == nil {
nd.StorageVolumeCap = storageMax
}
}
nd.Settings = &node.Node_Settings{}
bytes, err := proto.Marshal(nd)
if err != nil {
dc.reportHealthAlert(fmt.Sprintf("failed to perform http.DefaultClient.Do(): %s", err.Error()))
return
return nil, err
}
defer res.Body.Close()
return bytes, nil
}

func (dc *dataCollection) collectionAgent() {
Expand All @@ -225,30 +270,34 @@ func (dc *dataCollection) collectionAgent() {
}
}

func (dc *dataCollection) reportHealthAlert(failurePoint string) {
// log is the command logger
var log = logging.Logger("cmd/btfs")
func retry(f func() error) {
backoff.Retry(f, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), maxRetryTimes))
}

hd := new(healthData)
hd.NodeId = dc.NodeID
hd.BTFSVersion = dc.BTFSVersion
hd.FailurePoint = failurePoint
hdMarshaled, err := json.Marshal(hd)
if err != nil {
log.Warning(err.Error())
return
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s%s", statusServerDomain, routeHealth), bytes.NewReader(hdMarshaled))
if err != nil {
log.Warning(err.Error())
return
}
req.Header.Add("Content-Type", "application/json")
func (dc *dataCollection) reportHealthAlert(failurePoint string) {
retry(func() error {
return dc.doReportHealthAlert(failurePoint)
})
}

res, err := http.DefaultClient.Do(req)
func (dc *dataCollection) doReportHealthAlert(failurePoint string) error {
conn, cancel, err := dc.getGrpcConn()
if err != nil {
log.Warning(err.Error())
return
return err
}
defer res.Body.Close()
defer cancel()
defer conn.Close()

n := new(pb.NodeHealth)
n.BtfsVersion = dc.BTFSVersion
n.FailurePoint = failurePoint
n.NodeId = dc.NodeID
now := time.Now().UTC()
n.TimeCreated = &now

ctx, cancel := context.WithTimeout(context.Background(), callTimeout)
defer cancel()
client := pb.NewStatusClient(conn)
_, err = client.CollectHealth(ctx, n)
return err
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/TRON-US/go-btfs-config v0.1.6
github.com/blang/semver v3.5.1+incompatible
github.com/bren2010/proquint v0.0.0-20160323162903-38337c27106d
github.com/cenkalti/backoff v2.1.1+incompatible
github.com/cmars/basen v0.0.0-20150613233007-fe3947df716e // indirect
github.com/dsnet/compress v0.0.1 // indirect
github.com/dustin/go-humanize v1.0.0
Expand All @@ -18,7 +19,7 @@ require (
github.com/frankban/quicktest v1.4.2 // indirect
github.com/fsnotify/fsnotify v1.4.7
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/gogo/protobuf v1.2.1
github.com/gogo/protobuf v1.3.1
github.com/golangci/golangci-lint v1.17.1
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.1
Expand Down Expand Up @@ -113,22 +114,22 @@ require (
github.com/prometheus/procfs v0.0.0-20190519111021-9935e8e0588d // indirect
github.com/shirou/gopsutil v0.0.0-20180427012116-c95755e4bcd7
github.com/syndtr/goleveldb v1.0.0
github.com/tron-us/go-btfs-common v0.0.10
github.com/tyler-smith/go-bip32 v0.0.0-20170922074101-2c9cfd177564
github.com/tyler-smith/go-bip39 v1.0.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/dig v1.7.0 // indirect
go.uber.org/fx v1.9.0
go.uber.org/goleak v0.10.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
go4.org v0.0.0-20190313082347-94abd6928b1d // indirect
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 // indirect
google.golang.org/grpc v1.24.0
gopkg.in/cheggaaa/pb.v1 v1.0.28
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.2.2
Expand Down
Loading

0 comments on commit 5f801bc

Please sign in to comment.