diff --git a/modules/api/test/api_test.go b/modules/api/test/api_test.go index 13bc75a29..4cb7f9bc1 100755 --- a/modules/api/test/api_test.go +++ b/modules/api/test/api_test.go @@ -20,6 +20,7 @@ import ( "os" "strings" "testing" + "time" "github.com/masato25/resty" log "github.com/sirupsen/logrus" @@ -555,31 +556,60 @@ func TestGraph(t *testing.T) { rc := resty.New() rc.SetHeader("Apitoken", api_token) + var rr *[]map[string]interface{} = &[]map[string]interface{}{} Convey("Get endpoint list: GET /graph/endpoint", t, func() { - r := []map[string]interface{}{} resp, _ := rc.R().SetQueryParam("q", ".+"). - SetResult(&r). + SetResult(rr). Get(fmt.Sprintf("%s/graph/endpoint", api_v1)) So(resp.StatusCode(), ShouldEqual, 200) - So(len(r), ShouldBeGreaterThanOrEqualTo, 0) + So(len(*rr), ShouldBeGreaterThanOrEqualTo, 0) - if len(r) == 0 { - return - } + }) - eid := r[0]["id"] - r = []map[string]interface{}{} - Convey("Get counter list: GET /graph/endpoint_counter", func() { - resp, _ := rc.R(). - SetQueryParam("eid", fmt.Sprintf("%v", eid)). - SetQueryParam("metricQuery", ".+"). - SetQueryParam("limit", "1"). - SetResult(&r). - Get(fmt.Sprintf("%s/graph/endpoint_counter", api_v1)) - So(resp.StatusCode(), ShouldEqual, 200) - So(r, ShouldNotBeEmpty) - }) + if len(*rr) == 0 { + return + } + + eid := (*rr)[0]["id"] + endpoint := (*rr)[0]["endpoint"] + Convey("Get counter list: GET /graph/endpoint_counter", t, func() { + resp, _ := rc.R(). + SetQueryParam("eid", fmt.Sprintf("%v", eid)). + SetQueryParam("metricQuery", ".+"). + SetQueryParam("limit", "1"). + SetResult(rr). + Get(fmt.Sprintf("%s/graph/endpoint_counter", api_v1)) + So(resp.StatusCode(), ShouldEqual, 200) + So(*rr, ShouldNotBeEmpty) + }) + + if len(*rr) == 0 { + return + } + + counter := (*rr)[0]["counter"] + step := (*rr)[0]["step"] + + now := time.Now() + start_ts := now.Add(time.Duration(-1) * time.Hour).Unix() + end_ts := now.Unix() + + Convey("Query counter history: POST /graph/history", t, func() { + resp, _ := rc.R(). + SetBody(map[string]interface{}{ + "step": step, + "consol_fun": "AVERAGE", + "start_time": start_ts, + "end_time": end_ts, + "hostnames": []string{endpoint.(string)}, + "counters": []string{counter.(string)}, + }). + SetResult(rr). + Post(fmt.Sprintf("%s/graph/history", api_v1)) + log.Info(resp) + So(resp.StatusCode(), ShouldEqual, 200) + So(*rr, ShouldNotBeEmpty) }) } diff --git a/modules/graph/api/graph.go b/modules/graph/api/graph.go index 55b628f31..a91e931a8 100644 --- a/modules/graph/api/graph.go +++ b/modules/graph/api/graph.go @@ -48,7 +48,7 @@ func (this *Graph) GetRrd(key string, rrdfile *g.File) (err error) { items := store.GraphItems.PopAll(key) if len(items) > 0 { - rrdtool.FlushFile(rrdfile.Filename, md5, items) + rrdtool.CommitFile(rrdfile.Filename, md5, items) } rrdfile.Body, err = rrdtool.ReadFile(rrdfile.Filename, md5) @@ -178,7 +178,11 @@ func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryRe // read data from rrd file // 从RRD中获取数据不包含起始时间点 // 例: start_ts=1484651400,step=60,则第一个数据时间为1484651460) - datas, _ = rrdtool.Fetch(filename, md5, param.ConsolFun, start_ts-int64(step), end_ts, step) + var err error + datas, err = rrdtool.Fetch(filename, md5, param.ConsolFun, start_ts-int64(step), end_ts, step) + if err != nil { + log.Debugf("rrdtool fetch %s error: %v", filename, err) + } datas_size = len(datas) } diff --git a/modules/graph/api/rpc.go b/modules/graph/api/rpc.go index fbb3aef4f..3cf1b289f 100644 --- a/modules/graph/api/rpc.go +++ b/modules/graph/api/rpc.go @@ -16,7 +16,7 @@ package api import ( "container/list" - "log" + log "github.com/sirupsen/logrus" "net" "net/rpc" "sync" @@ -52,7 +52,7 @@ func init() { func Start() { if !g.Config().Rpc.Enabled { - log.Println("rpc.Start warning, not enabled") + log.Info("rpc.Start warning, not enabled") return } addr := g.Config().Rpc.Listen @@ -65,7 +65,7 @@ func Start() { if err != nil { log.Fatalf("rpc.Start error, listen %s failed, %s", addr, err) } else { - log.Println("rpc.Start ok, listening on", addr) + log.Info("rpc.Start ok, listening on", addr) } rpc.Register(new(Graph)) @@ -99,7 +99,7 @@ func Start() { select { case <-Close_chan: - log.Println("rpc, recv sigout and exiting...") + log.Info("rpc, recv sigout and exiting...") listener.Close() Close_done_chan <- 1 diff --git a/modules/graph/cron/clean.go b/modules/graph/cron/clean.go index 515dc7538..dbcd0ec9d 100644 --- a/modules/graph/cron/clean.go +++ b/modules/graph/cron/clean.go @@ -29,7 +29,7 @@ package cron import ( - "log" + log "github.com/sirupsen/logrus" "strings" "time" @@ -85,7 +85,7 @@ func DeleteInvalidItems() int { pfc.Gauge("GraphItemsCacheCnt", int64(currentCnt)) pfc.Gauge("GraphItemsCacheInvalidCnt", int64(deleteCnt)) - log.Printf("GraphItemsCache: Count=>%d, DeleteInvalid=>%d", currentCnt, deleteCnt) + log.Infof("GraphItemsCache: Count=>%d, DeleteInvalid=>%d", currentCnt, deleteCnt) return deleteCnt } @@ -107,7 +107,7 @@ func DeleteInvalidHistory() int { pfc.Gauge("HistoryCacheCnt", int64(currentCnt)) pfc.Gauge("HistoryCacheInvalidCnt", int64(deleteCnt)) - log.Printf("HistoryCache: Count=>%d, DeleteInvalid=>%d", currentCnt, deleteCnt) + log.Infof("HistoryCache: Count=>%d, DeleteInvalid=>%d", currentCnt, deleteCnt) return deleteCnt } diff --git a/modules/graph/g/cfg.go b/modules/graph/g/cfg.go index e8b1021b3..85efa0153 100644 --- a/modules/graph/g/cfg.go +++ b/modules/graph/g/cfg.go @@ -16,7 +16,7 @@ package g import ( "encoding/json" - "log" + log "github.com/sirupsen/logrus" "strconv" "sync/atomic" "unsafe" @@ -77,24 +77,24 @@ func Config() *GlobalConfig { func ParseConfig(cfg string) { if cfg == "" { - log.Fatalln("config file not specified: use -c $filename") + log.Fatal("config file not specified: use -c $filename") } if !file.IsExist(cfg) { - log.Fatalln("config file specified not found:", cfg) + log.Fatal("config file specified not found:", cfg) } ConfigFile = cfg configContent, err := file.ToTrimString(cfg) if err != nil { - log.Fatalln("read config file", cfg, "error:", err.Error()) + log.Fatal("read config file", cfg, "error:", err.Error()) } var c GlobalConfig err = json.Unmarshal([]byte(configContent), &c) if err != nil { - log.Fatalln("parse config file", cfg, "error:", err.Error()) + log.Fatal("parse config file", cfg, "error:", err.Error()) } if c.Migrate.Enabled && len(c.Migrate.Cluster) == 0 { @@ -112,5 +112,5 @@ func ParseConfig(cfg string) { // set config atomic.StorePointer(&ptr, unsafe.Pointer(&c)) - log.Println("g.ParseConfig ok, file", cfg) + log.Info("g.ParseConfig ok, file", cfg) } diff --git a/modules/graph/g/db.go b/modules/graph/g/db.go index 8e42bd0f5..139cd0bc0 100644 --- a/modules/graph/g/db.go +++ b/modules/graph/g/db.go @@ -17,7 +17,7 @@ package g import ( "database/sql" _ "github.com/go-sql-driver/mysql" - "log" + log "github.com/sirupsen/logrus" "sync" ) @@ -33,11 +33,11 @@ func InitDB() { var err error DB, err = makeDbConn() if DB == nil || err != nil { - log.Fatalln("g.InitDB, get db conn fail", err) + log.Fatal("g.InitDB, get db conn fail", err) } dbConnMap = make(map[string]*sql.DB) - log.Println("g.InitDB ok") + log.Info("g.InitDB ok") } func GetDbConn(connName string) (c *sql.DB, e error) { diff --git a/modules/graph/g/g.go b/modules/graph/g/g.go index 4a5aa1ae6..f4f5a3df2 100644 --- a/modules/graph/g/g.go +++ b/modules/graph/g/g.go @@ -15,7 +15,6 @@ package g import ( - "log" "runtime" ) @@ -55,7 +54,6 @@ const ( DEFAULT_STEP = 60 //s MIN_STEP = 30 //s CLEAN_CACHE = 86400 //s the step that clean GraphItems/historyCache Cache - CACHE_DELAY = 1800 //s CACHE_TIME = 1800000 //ms FLUSH_DISK_STEP = 1000 //ms FLUSH_MIN_COUNT = 6 // flush counter to disk when its number of monitoring data greater than FLUSH_MIN_COUNT @@ -71,5 +69,4 @@ const ( func init() { runtime.GOMAXPROCS(runtime.NumCPU()) - log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) } diff --git a/modules/graph/http/common.go b/modules/graph/http/common.go index 132fd3e09..78d6195e7 100644 --- a/modules/graph/http/common.go +++ b/modules/graph/http/common.go @@ -16,10 +16,13 @@ package http import ( "github.com/open-falcon/falcon-plus/modules/graph/g" + "github.com/open-falcon/falcon-plus/modules/graph/rrdtool" "github.com/open-falcon/falcon-plus/modules/graph/store" "github.com/toolkits/file" + "fmt" "github.com/gin-gonic/gin" + log "github.com/sirupsen/logrus" "strconv" "time" ) @@ -73,4 +76,18 @@ func configCommonRoutes() { } JSONR(c, 200, rt) }) + + router.GET("/api/v2/counter/migrate", func(c *gin.Context) { + counter := rrdtool.GetCounterV2() + log.Debug("migrating counter v2:", fmt.Sprintf("%+v", counter)) + c.JSON(200, counter) + }) + + //compatible with open-falcon v0.1 + router.GET("/counter/migrate", func(c *gin.Context) { + cnt := rrdtool.GetCounter() + log.Debug("migrating counter:", cnt) + c.JSON(200, gin.H{"msg": "ok", "counter": cnt}) + }) + } diff --git a/modules/graph/http/http.go b/modules/graph/http/http.go index c987836f8..1e1cf8384 100644 --- a/modules/graph/http/http.go +++ b/modules/graph/http/http.go @@ -16,7 +16,6 @@ package http import ( "encoding/json" - "fmt" log "github.com/sirupsen/logrus" "net" "net/http" @@ -25,7 +24,6 @@ import ( "github.com/gin-gonic/gin" "github.com/open-falcon/falcon-plus/modules/graph/g" - "github.com/open-falcon/falcon-plus/modules/graph/rrdtool" ) type Dto struct { @@ -102,19 +100,6 @@ func Start() { configProcRoutes() configIndexRoutes() - router.GET("/api/v2/counter/migrate", func(c *gin.Context) { - counter := rrdtool.GetCounterV2() - log.Debug("migrating counter v2:", fmt.Sprintf("%+v", counter)) - c.JSON(200, counter) - }) - - //compatible with open-falcon v0.1 - router.GET("/counter/migrate", func(c *gin.Context) { - cnt := rrdtool.GetCounter() - log.Debug("migrating counter:", cnt) - c.JSON(200, gin.H{"msg": "ok", "counter": cnt}) - }) - addr := g.Config().Http.Listen if addr == "" { return diff --git a/modules/graph/index/cache.go b/modules/graph/index/cache.go index ac1da25f0..d32dbfbdd 100644 --- a/modules/graph/index/cache.go +++ b/modules/graph/index/cache.go @@ -17,7 +17,7 @@ package index import ( "database/sql" "fmt" - "log" + log "github.com/sirupsen/logrus" "strconv" "strings" "sync" @@ -97,7 +97,7 @@ func GetEndpointFromCache(endpoint string) (int64, bool) { var id int64 = -1 err := g.DB.QueryRow("SELECT id FROM endpoint WHERE endpoint = ?", endpoint).Scan(&id) if err != nil && err != sql.ErrNoRows { - log.Println("query endpoint id fail,", err) + log.Error("query endpoint id fail,", err) return -1, false } @@ -132,7 +132,7 @@ func GetCounterFromCache(endpointId int64, counter string) (dsType string, step err = g.DB.QueryRow("SELECT type, step FROM endpoint_counter WHERE endpoint_id = ? and counter = ?", endpointId, counter).Scan(&dsType, &step) if err != nil && err != sql.ErrNoRows { - log.Println("query type and step fail", err) + log.Error("query type and step fail", err) return } diff --git a/modules/graph/main.go b/modules/graph/main.go index 4467aa76a..5a9ebadd7 100644 --- a/modules/graph/main.go +++ b/modules/graph/main.go @@ -17,7 +17,7 @@ package main import ( "flag" "fmt" - "log" + log "github.com/sirupsen/logrus" "os" "os/signal" "syscall" @@ -32,33 +32,34 @@ import ( func start_signal(pid int, cfg *g.GlobalConfig) { sigs := make(chan os.Signal, 1) - log.Println(pid, "register signal notify") + log.Info(pid, " register signal notify") signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { s := <-sigs - log.Println("recv", s) + log.Info("recv", s) switch s { case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: - log.Println("graceful shut down") + log.Info("graceful shut down") if cfg.Http.Enabled { http.Close_chan <- 1 <-http.Close_done_chan } - log.Println("http stop ok") + log.Info("http stop ok") if cfg.Rpc.Enabled { api.Close_chan <- 1 <-api.Close_done_chan } - log.Println("rpc stop ok") + log.Info("rpc stop ok") - rrdtool.Out_done_chan <- 1 - rrdtool.FlushAll(true) - log.Println("rrdtool stop ok") + rrdtool.Main_done_chan <- 1 + //flush cache to local file or transmit cache to remote graph + rrdtool.CommitBeforeQuit() + log.Info("rrdtool stop ok") - log.Println(pid, "exit") + log.Info("pid:", pid, " exit") os.Exit(0) } } diff --git a/modules/graph/rrdtool/migrate.go b/modules/graph/rrdtool/migrate.go index 56441f748..f97c84b70 100644 --- a/modules/graph/rrdtool/migrate.go +++ b/modules/graph/rrdtool/migrate.go @@ -17,10 +17,10 @@ package rrdtool import ( "errors" "fmt" - "log" + log "github.com/sirupsen/logrus" "net" "net/rpc" - "os" + "strings" "sync/atomic" "time" @@ -144,7 +144,7 @@ func migrate_start(cfg *g.GlobalConfig) { for i = 0; i < cfg.Migrate.Concurrency; i++ { if clients[node][i], err = dial(addr, time.Second); err != nil { - log.Fatalf("node:%s addr:%s err:%s\n", node, addr, err) + log.Fatalf("node:%s addr:%s err:%s", node, addr, err) } go net_task_worker(i, Net_task_ch[node], &clients[node][i], addr) } @@ -174,32 +174,25 @@ func net_task_worker(idx int, ch chan *Net_task_t, client **rpc.Client, addr str atomic.AddUint64(&stat_cnt[QUERY_S_SUCCESS], 1) } } else if task.Method == NET_TASK_M_PULL { - if atomic.LoadInt32(&flushrrd_timeout) != 0 { - // hope this more faster than fetch_rrd - if err = send_data(client, task.Key, addr); err != nil { - pfc.Meter("migrate.sendbusy.err", 1) - atomic.AddUint64(&stat_cnt[SEND_S_ERR], 1) + if err = fetch_rrd(client, task.Key, addr); err != nil { + if strings.HasSuffix(err.Error(), "no such file or directory") { + // which expect that err msg like open xxx.rrd: no such file or directory + // TODO:check error in cross different platforms + pfc.Meter("migrate.scprrd.null", 1) + atomic.AddUint64(&stat_cnt[FETCH_S_ISNOTEXIST], 1) + store.GraphItems.SetFlag(task.Key, 0) + //when the remote rrd file not exist, flush cache to local rrdfile + CommitByKey(task.Key) } else { - pfc.Meter("migrate.sendbusy.ok", 1) - atomic.AddUint64(&stat_cnt[SEND_S_SUCCESS], 1) + pfc.Meter("migrate.scprrd.err", 1) + //warning:other errors, cache backlogged + atomic.AddUint64(&stat_cnt[FETCH_S_ERR], 1) } } else { - if err = fetch_rrd(client, task.Key, addr); err != nil { - if os.IsNotExist(err) { - pfc.Meter("migrate.scprrd.null", 1) - //文件不存在时,直接将缓存数据刷入本地 - atomic.AddUint64(&stat_cnt[FETCH_S_ISNOTEXIST], 1) - store.GraphItems.SetFlag(task.Key, 0) - CommitByKey(task.Key) - } else { - pfc.Meter("migrate.scprrd.err", 1) - //warning:其他异常情况,缓存数据会堆积 - atomic.AddUint64(&stat_cnt[FETCH_S_ERR], 1) - } - } else { - pfc.Meter("migrate.scprrd.ok", 1) - atomic.AddUint64(&stat_cnt[FETCH_S_SUCCESS], 1) - } + pfc.Meter("migrate.scprrd.ok", 1) + atomic.AddUint64(&stat_cnt[FETCH_S_SUCCESS], 1) + //flush cache to local rrd file after scp rrd success + CommitByKey(task.Key) } } else { err = errors.New("error net task method") @@ -287,9 +280,11 @@ func send_data(client **rpc.Client, key string, addr string) error { if err == rpc.ErrShutdown { reconnection(client, addr) } + log.Errorf("transmit rrd %s %s to remote succ, retry[%d]", key, items[0].UUID(), i) } - // err - store.GraphItems.PushAll(key, items) + //send_data only will be called when graph quit, + //so then there is a error of transmit, drop the cache instead of restore. + //store.GraphItems.PushAll(key, items) //flag |= g.GRAPH_F_ERR out: flag &= ^g.GRAPH_F_SENDING @@ -341,7 +336,7 @@ func fetch_rrd(client **rpc.Client, key string, addr string) error { goto out } } else { - log.Println(err) + log.Errorf("scp rrd %s from remote err[%s], retry[%d]", key, err, i) } if err == rpc.ErrShutdown { reconnection(client, addr) diff --git a/modules/graph/rrdtool/rrdtool.go b/modules/graph/rrdtool/rrdtool.go index 721bd7f1a..536b02994 100644 --- a/modules/graph/rrdtool/rrdtool.go +++ b/modules/graph/rrdtool/rrdtool.go @@ -16,7 +16,7 @@ package rrdtool import ( "errors" - "log" + log "github.com/sirupsen/logrus" "math" "sync/atomic" "time" @@ -58,15 +58,17 @@ func Start() { var err error // check data dir if err = file.EnsureDirRW(cfg.RRD.Storage); err != nil { - log.Fatalln("rrdtool.Start error, bad data dir "+cfg.RRD.Storage+",", err) + log.Fatal("rrdtool.Start error, bad data dir "+cfg.RRD.Storage+",", err) } migrate_start(cfg) + log.Info("rrdtool migrateWorker started") - // sync disk go syncDisk() + log.Info("rrdtool syncDiskWorker started") + go ioWorker() - log.Println("rrdtool.Start ok") + log.Info("rrdtool ioWorker started") } // RRA.Point.Size @@ -169,7 +171,7 @@ func ReadFile(filename, md5 string) ([]byte, error) { return task.args.(*readfile_t).data, err } -func FlushFile(filename, md5 string, items []*cmodel.GraphItem) error { +func CommitFile(filename, md5 string, items []*cmodel.GraphItem) error { done := make(chan error, 1) io_task_chans[getIndex(md5)] <- &io_task_t{ method: IO_TASK_M_FLUSH, @@ -232,30 +234,18 @@ func fetch(filename string, cf string, start, end int64, step int) ([]*cmodel.RR return ret, nil } -func FlushAll(force bool) { - n := store.GraphItems.Size / 10 - for i := 0; i < store.GraphItems.Size; i++ { - FlushRRD(i, force) - if i%n == 0 { - log.Printf("flush hash idx:%03d size:%03d disk:%08d net:%08d\n", - i, store.GraphItems.Size, disk_counter, net_counter) - } - } - log.Printf("flush hash done (disk:%08d net:%08d)\n", disk_counter, net_counter) -} - -func CommitByKey(key string) { +func CommitByKey(key string) error { md5, dsType, step, err := g.SplitRrdCacheKey(key) if err != nil { - return + return err } filename := g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step) items := store.GraphItems.PopAll(key) if len(items) == 0 { - return + return nil } - FlushFile(filename, md5, items) + return CommitFile(filename, md5, items) } func PullByKey(key string) { @@ -274,45 +264,123 @@ func PullByKey(key string) { Key: key, Done: done, } - // net_task slow, shouldn't block syncDisk() or FlushAll() + // net_task slow, shouldn't block syncDisk() or CommitBeforeQuit() // warning: recev sigout when migrating, maybe lost memory data go func() { err := <-done if err != nil { - log.Printf("get %s from remote err[%s]\n", key, err) + log.Errorf("get %s %s from remote err[%s]", key, item.UUID(), err) return } atomic.AddUint64(&net_counter, 1) - //todo: flushfile after getfile? not yet }() } -func FlushRRD(idx int, force bool) { +func SendByKey(key string) { + done := make(chan error) + + item := store.GraphItems.First(key) + if item == nil { + return + } + node, err := Consistent.Get(item.PrimaryKey()) + if err != nil { + return + } + Net_task_ch[node] <- &Net_task_t{ + Method: NET_TASK_M_SEND, + Key: key, + Done: done, + } + + go func() { + err := <-done + if err != nil { + log.Errorf("transmit %s %s to remote err[%s]", key, item.UUID(), err) + } else { + log.Debugf("transmit %s %s to remote succ", key, item.UUID()) + } + }() +} + +func CommitBeforeQuit() { + n := store.GraphItems.Size / 10 + for i := 0; i < store.GraphItems.Size; i++ { + commitByIdxBeforeQuit(i) + if i%n == 0 { + log.Infof("flush rrd before quit, hash idx:%03d size:%03d disk:%08d net:%08d", + i, store.GraphItems.Size, disk_counter, net_counter) + } + } + log.Infof("flush done (disk:%08d net:%08d)", disk_counter, net_counter) +} + +func commitByIdxBeforeQuit(idx int) { begin := time.Now() - atomic.StoreInt32(&flushrrd_timeout, 0) keys := store.GraphItems.KeysByIndex(idx) if len(keys) == 0 { return } + is_migrate := g.Config().Migrate.Enabled for _, key := range keys { flag, _ := store.GraphItems.GetFlag(key) - //write err data to local filename - if force == false && g.Config().Migrate.Enabled && flag&g.GRAPH_F_MISS != 0 { - if time.Since(begin) > time.Millisecond*g.FLUSH_DISK_STEP { - atomic.StoreInt32(&flushrrd_timeout, 1) + if is_migrate && flag&g.GRAPH_F_MISS != 0 { + filename, _ := getFilenameByKey(key) + if !g.IsRrdFileExist(filename) { + //transmit cache data to remote graph + SendByKey(key) + } else { + CommitByKey(key) } - PullByKey(key) - } else if force || shouldFlush(key) { + } else { CommitByKey(key) } + + //check if there is backlog + if time.Since(begin) > time.Millisecond*g.FLUSH_DISK_STEP { + log.Warnf("commit rrd too slow, check the backlog of idx %d", idx) + } } } -func shouldFlush(key string) bool { +func commitByIdx(idx int) { + begin := time.Now() + keys := store.GraphItems.KeysByIndex(idx) + if len(keys) == 0 { + return + } + + is_migrate := g.Config().Migrate.Enabled + for _, key := range keys { + flag, _ := store.GraphItems.GetFlag(key) + if is_migrate { + if flag&g.GRAPH_F_MISS == 0 && shouldFlush(key) { + CommitByKey(key) + } + if flag&g.GRAPH_F_MISS != 0 { + filename, _ := getFilenameByKey(key) + if !g.IsRrdFileExist(filename) { + PullByKey(key) + } else { + CommitByKey(key) + } + } + } else if shouldFlush(key) { + CommitByKey(key) + } + + //check if there is backlog + if time.Since(begin) > time.Millisecond*g.FLUSH_DISK_STEP { + log.Warnf("commit rrd too slow, check the backlog of idx %d", idx) + } + } +} + +func shouldFlush(key string) bool { if store.GraphItems.ItemCnt(key) >= g.FLUSH_MIN_COUNT { return true } @@ -325,3 +393,11 @@ func shouldFlush(key string) bool { return false } + +func getFilenameByKey(key string) (string, error) { + md5, dsType, step, err := g.SplitRrdCacheKey(key) + if err != nil { + return "", err + } + return g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step), nil +} diff --git a/modules/graph/rrdtool/sync_disk.go b/modules/graph/rrdtool/sync_disk.go index fc294e2ee..322a7ae67 100644 --- a/modules/graph/rrdtool/sync_disk.go +++ b/modules/graph/rrdtool/sync_disk.go @@ -17,12 +17,12 @@ package rrdtool import ( "io" "io/ioutil" - "log" "os" "time" "github.com/open-falcon/falcon-plus/modules/graph/g" "github.com/open-falcon/falcon-plus/modules/graph/store" + log "github.com/sirupsen/logrus" "github.com/toolkits/file" ) @@ -41,33 +41,38 @@ type io_task_t struct { } var ( - Out_done_chan chan int - io_task_chans []chan *io_task_t + Main_done_chan chan int + io_task_chans []chan *io_task_t ) func InitChannel() { - Out_done_chan = make(chan int, 1) + Main_done_chan = make(chan int, 1) ioWorkerNum := g.Config().IOWorkerNum io_task_chans = make([]chan *io_task_t, ioWorkerNum) for i := 0; i < ioWorkerNum; i++ { + //the io task queue length is 16 io_task_chans[i] = make(chan *io_task_t, 16) } } func syncDisk() { - time.Sleep(time.Second * g.CACHE_DELAY) ticker := time.NewTicker(time.Millisecond * g.FLUSH_DISK_STEP) defer ticker.Stop() var idx int = 0 + n := store.GraphItems.Size / 10 for { select { case <-ticker.C: idx = idx % store.GraphItems.Size - FlushRRD(idx, false) + commitByIdx(idx) + if idx%n == 0 { + log.Debugf("flush rrd hash idx:%03d size:%03d disk:%08d net:%08d", + idx, store.GraphItems.Size, disk_counter, net_counter) + } idx += 1 - case <-Out_done_chan: - log.Println("cron recv sigout and exit...") + case <-Main_done_chan: + log.Info("syncDisk cron recv sigout and exit...") return } } diff --git a/modules/graph/store/store.go b/modules/graph/store/store.go index 6ecf6d66c..12325ce31 100644 --- a/modules/graph/store/store.go +++ b/modules/graph/store/store.go @@ -17,8 +17,8 @@ package store import ( "container/list" "errors" + log "github.com/sirupsen/logrus" "hash/crc32" - "log" "sync" cmodel "github.com/open-falcon/falcon-plus/common/model" @@ -37,7 +37,7 @@ type GraphItemMap struct { func (this *GraphItemMap) Get(key string) (*SafeLinkedList, bool) { this.RLock() defer this.RUnlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) val, ok := this.A[idx][key] return val, ok } @@ -46,7 +46,7 @@ func (this *GraphItemMap) Get(key string) (*SafeLinkedList, bool) { func (this *GraphItemMap) Remove(key string) bool { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) _, exists := this.A[idx][key] if !exists { return false @@ -67,7 +67,7 @@ func (this *GraphItemMap) Getitems(idx int) map[string]*SafeLinkedList { func (this *GraphItemMap) Set(key string, val *SafeLinkedList) { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) this.A[idx][key] = val } @@ -84,7 +84,7 @@ func (this *GraphItemMap) Len() int { func (this *GraphItemMap) First(key string) *cmodel.GraphItem { this.RLock() defer this.RUnlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return nil @@ -101,7 +101,7 @@ func (this *GraphItemMap) First(key string) *cmodel.GraphItem { func (this *GraphItemMap) PushAll(key string, items []*cmodel.GraphItem) error { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return errors.New("not exist") @@ -113,7 +113,7 @@ func (this *GraphItemMap) PushAll(key string, items []*cmodel.GraphItem) error { func (this *GraphItemMap) GetFlag(key string) (uint32, error) { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return 0, errors.New("not exist") @@ -121,10 +121,14 @@ func (this *GraphItemMap) GetFlag(key string) (uint32, error) { return sl.Flag, nil } +func (this *GraphItemMap) idx(key string) uint32 { + return hashKey(key) % uint32(this.Size) +} + func (this *GraphItemMap) SetFlag(key string, flag uint32) error { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return errors.New("not exist") @@ -136,7 +140,7 @@ func (this *GraphItemMap) SetFlag(key string, flag uint32) error { func (this *GraphItemMap) PopAll(key string) []*cmodel.GraphItem { this.Lock() defer this.Unlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return []*cmodel.GraphItem{} @@ -147,7 +151,7 @@ func (this *GraphItemMap) PopAll(key string) []*cmodel.GraphItem { func (this *GraphItemMap) FetchAll(key string) ([]*cmodel.GraphItem, uint32) { this.RLock() defer this.RUnlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) sl, ok := this.A[idx][key] if !ok { return []*cmodel.GraphItem{}, 0 @@ -175,7 +179,6 @@ func (this *GraphItemMap) PushFront(key string, if linkedList, exists := this.Get(key); exists { linkedList.PushFront(item) } else { - //log.Println("new key:", key) safeList := &SafeLinkedList{L: list.New()} safeList.L.PushFront(item) @@ -207,7 +210,7 @@ func (this *GraphItemMap) KeysByIndex(idx int) []string { func (this *GraphItemMap) Back(key string) *cmodel.GraphItem { this.RLock() defer this.RUnlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) L, ok := this.A[idx][key] if !ok { return nil @@ -225,7 +228,7 @@ func (this *GraphItemMap) Back(key string) *cmodel.GraphItem { func (this *GraphItemMap) ItemCnt(key string) int { this.RLock() defer this.RUnlock() - idx := hashKey(key) % uint32(this.Size) + idx := this.idx(key) L, ok := this.A[idx][key] if !ok { return 0 diff --git a/vendor/github.com/open-falcon/rrdlite/rrd_c.go b/vendor/github.com/open-falcon/rrdlite/rrd_c.go index bea526528..d79641fb0 100644 --- a/vendor/github.com/open-falcon/rrdlite/rrd_c.go +++ b/vendor/github.com/open-falcon/rrdlite/rrd_c.go @@ -14,14 +14,11 @@ import ( "reflect" "strconv" "strings" - "sync" "time" "unsafe" ) -var mutex sync.Mutex - -func makeArgs(args []string) []*C.char { +func makeCArgs(args []string) []*C.char { ret := make([]*C.char, len(args)) for i, s := range args { ret[i] = C.CString(s) @@ -39,7 +36,7 @@ func freeArgs(cArgs []*C.char) { } } -func makeError(e *C.char) error { +func makeGoError(e *C.char) error { var null *C.char if e == null { return nil @@ -50,7 +47,7 @@ func makeError(e *C.char) error { func (c *Creator) create() error { filename := C.CString(c.filename) defer freeCString(filename) - args := makeArgs(c.args) + args := makeCArgs(c.args) defer freeArgs(args) e := C.rrdCreate( @@ -60,12 +57,12 @@ func (c *Creator) create() error { C.int(len(args)), &args[0], ) - return makeError(e) + return makeGoError(e) } func (u *Updater) update(_args []string) error { - args := makeArgs(_args) + args := makeCArgs(_args) defer freeArgs(args) e := C.rrdUpdate( @@ -74,7 +71,7 @@ func (u *Updater) update(_args []string) error { C.int(len(args)), &args[0], ) - return makeError(e) + return makeGoError(e) } var ( @@ -245,7 +242,7 @@ func Info(filename string) (map[string]interface{}, error) { fn := C.CString(filename) defer freeCString(fn) var i *C.rrd_info_t - err := makeError(C.rrdInfo(&i, fn)) + err := makeGoError(C.rrdInfo(&i, fn)) if err != nil { return nil, err } @@ -254,6 +251,7 @@ func Info(filename string) (map[string]interface{}, error) { // Fetch retrieves data from RRD file. func Fetch(filename, cf string, start, end time.Time, step time.Duration) (FetchResult, error) { + fn := C.CString(filename) defer freeCString(fn) cCf := C.CString(cf) @@ -262,12 +260,13 @@ func Fetch(filename, cf string, start, end time.Time, step time.Duration) (Fetch cEnd := C.time_t(end.Unix()) cStep := C.ulong(step.Seconds()) var ( - ret C.int + cRet C.int cDsCnt C.ulong cDsNames **C.char cData *C.double ) - err := makeError(C.rrdFetch(&ret, fn, cCf, &cStart, &cEnd, &cStep, &cDsCnt, &cDsNames, &cData)) + + err := makeGoError(C.rrdFetch(&cRet, fn, cCf, &cStart, &cEnd, &cStep, &cDsCnt, &cDsNames, &cData)) if err != nil { return FetchResult{filename, cf, start, end, step, nil, 0, nil}, err } diff --git a/vendor/github.com/open-falcon/rrdlite/rrd_error.h b/vendor/github.com/open-falcon/rrdlite/rrd_error.h index f55078cf0..aecb29f1b 100644 --- a/vendor/github.com/open-falcon/rrdlite/rrd_error.h +++ b/vendor/github.com/open-falcon/rrdlite/rrd_error.h @@ -123,4 +123,4 @@ #define RRD_ERR_UNKNOWN_DATA1 0x0279 /* if add new system event flag, please upadte the RRD_ERR_END */ #define RRD_ERR_END 0x0279 -#define RRD_ERR_NUM (RRD_ERR_END - RRD_ERR_START + 1) \ No newline at end of file +#define RRD_ERR_NUM (RRD_ERR_END - RRD_ERR_START + 1) diff --git a/vendor/github.com/open-falcon/rrdlite/rrdfunc.c b/vendor/github.com/open-falcon/rrdlite/rrdfunc.c index 62a21cbc5..13d460cd8 100644 --- a/vendor/github.com/open-falcon/rrdlite/rrdfunc.c +++ b/vendor/github.com/open-falcon/rrdlite/rrdfunc.c @@ -1,10 +1,15 @@ #include +#include + #include "rrd.h" const char *rrdCreate(const char *filename, unsigned long step, time_t start, int argc, const char **argv) { int ret; ret = rrd_create_r(filename, step, start, argc, argv); + if (ret != 0){ + printf("rrd_create %s, errno: %d\n", filename, ret); + } return rrd_strerror(ret); } @@ -12,6 +17,9 @@ const char *rrdUpdate(const char *filename, const char *template, int argc, const char **argv) { int ret; ret = rrd_update_r(filename, template, argc, argv); + if (ret != 0){ + printf("rrd_update %s, errno: %d\n", filename, ret); + } return rrd_strerror(ret); } @@ -19,13 +27,21 @@ const char *rrdUpdate(const char *filename, const char *template, const char *rrdInfo(rrd_info_t **info, char *filename) { int ret = 0; *info = rrd_info_r(filename, &ret); + if (ret != 0){ + printf("rrd_info %s, errno: %d\n", filename, ret); + } return rrd_strerror(ret); } const char *rrdFetch(int *ret, char *filename, const char *cf, time_t *start, time_t *end, unsigned long *step, unsigned long *ds_cnt, char ***ds_namv, double **data) { + //setbuf(stdout, NULL); + //setbuf(stderr, NULL); *ret = rrd_fetch_r(filename, cf, start, end, step, ds_cnt, ds_namv, data); + if (*ret != 0){ + printf("rrdfetch %s, errno: %d\n", filename, *ret); + } return rrd_strerror(*ret); } diff --git a/vendor/vendor.json b/vendor/vendor.json index 1122e8355..a0309884b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -307,10 +307,10 @@ "revisionTime": "2015-12-21T08:53:10Z" }, { - "checksumSHA1": "I0kIkGOtYBit3XflqEqbEt+kuhQ=", + "checksumSHA1": "8tcXzsjEx8Qo/4wv0+T0/CZJqqk=", "path": "github.com/open-falcon/rrdlite", - "revision": "7d8646c85cc56cad1f65d97abbe24fdfc1b88917", - "revisionTime": "2017-04-12T12:20:36Z" + "revision": "bf5829f786ad3765b09ae54f416106d5f1565c07", + "revisionTime": "2020-02-14T14:08:04Z" }, { "checksumSHA1": "F1IYMLBLAZaTOWnmXsgaxTGvrWI=",