Skip to content

Commit

Permalink
1. 优化日志功能初始化
Browse files Browse the repository at this point in the history
2. 支持只返回单个collect下的metrics
  • Loading branch information
sunlinyao committed Jan 9, 2022
1 parent f409fb9 commit ffe4435
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 17 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.0
0.2.1
15 changes: 15 additions & 0 deletions collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,18 @@ func (c *CollectContext) Collect(proMetrics chan<- prometheus.Metric) {
}

type Collects []CollectConfig

func (c Collects) Get(name string) *CollectConfig {
for idx := range c {
if c[idx].Name == name {
return &c[idx]
}
}
return nil
}

func (c *Collects) SetLogger(logger log.Logger) {
for idx := range *c {
(*c)[idx].SetLogger(logger)
}
}
10 changes: 7 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,27 @@ func NewConfig() *SafeConfig {
}
}

func (sc *SafeConfig) ReloadConfigFromReader(reader io.Reader, _ log.Logger) (err error) {
func (sc *SafeConfig) ReloadConfigFromReader(reader io.Reader, logger log.Logger) (err error) {
var c = &Config{}
decoder := yaml.NewDecoder(reader)
decoder.KnownFields(true)

if err = decoder.Decode(c); err != nil {
return fmt.Errorf("error parsing config file: %s", err)
}

c.Collects.SetLogger(logger)
sc.Lock()
sc.C = c
sc.Unlock()
return nil
}

func (sc *SafeConfig) GetConfig() *Config {
sc.Lock()
defer sc.Unlock()
return sc.C
}
func (sc *SafeConfig) ReloadConfig(confFile string, logger log.Logger) (err error) {

defer func() {
if err != nil {
configReloadSuccess.Set(0)
Expand Down
56 changes: 43 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func run() int {
for {
select {
case <-hup:
level.Info(logger).Log("msg", "Reload config from signal")
if err := sc.ReloadConfig(*configFile, logger); err != nil {
level.Error(logger).Log("msg", "Error reloading config", "err", err)
continue
Expand All @@ -123,33 +124,39 @@ func run() int {
}()

serve := http.NewServeMux()
// Match Prometheus behaviour and redirect over externalURL for root path only
// if routePrefix is different than "/"

serve.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, path.Join(*routePrefix, "/")) && strings.HasSuffix(r.URL.Path, "/metrics") {
rPath := strings.TrimPrefix(r.URL.Path, path.Join(*routePrefix, "/"))
if rPath == "metrics" {
collectMetrics(logger, w, r)
return
} else {
collectMetricsByName(logger, strings.TrimSuffix(rPath, "/metrics"), w, r)
return
}
}
http.NotFound(w, r)
})

serve.HandleFunc(path.Join(*routePrefix, "/-/reload"),
func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
fmt.Fprintf(w, "This endpoint requires a POST request.\n")
_, _ = fmt.Fprintf(w, "This endpoint requires a POST request.\n")
return
}

level.Info(logger).Log("msg", "Reload config from http api")
rc := make(chan error)
reloadCh <- rc
if err := <-rc; err != nil {
http.Error(w, fmt.Sprintf("failed to reload config: %s", err), http.StatusInternalServerError)
}
})

serve.HandleFunc(path.Join(*routePrefix, "/metrics"), func(writer http.ResponseWriter, request *http.Request) {
collectMetrics(logger, writer, request)
})

serve.HandleFunc(path.Join(*routePrefix, "/-/healthy"), func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("Healthy"))
_, _ = w.Write([]byte("Healthy"))
})
if *enablePprof {
pprofPrefix := path.Join(*routePrefix, *pprofUrl) + "/"
Expand Down Expand Up @@ -228,13 +235,36 @@ func computeExternalURL(u, listenAddr string) (*url.URL, error) {
return eu, nil
}

func collectMetricsByName(logger log.Logger, name string, w http.ResponseWriter, r *http.Request) {
level.Debug(logger).Log("msg", "collect metrics by collect_name", "name", name)
conf := sc.GetConfig()
reg := prometheus.NewRegistry()

if collect := conf.Collects.Get(name); collect != nil {
reg.MustRegister(&collector.CollectContext{
CollectConfig: collect,
Context: r.Context(),
})
} else {
http.NotFound(w, r)
return
}

handler := promhttp.HandlerFor(
prometheus.Gatherers{reg},
promhttp.HandlerOpts{
ErrorLog: stdlog.New(log.NewStdlibAdapter(level.Error(logger)), "", 0),
ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: 10,
Registry: reg,
},
)
handler.ServeHTTP(w, r)
}
func collectMetrics(logger log.Logger, w http.ResponseWriter, r *http.Request) {
sc.Lock()
conf := sc.C
sc.Unlock()
conf := sc.GetConfig()
reg := prometheus.NewRegistry()
for idx := range conf.Collects {
conf.Collects[idx].SetLogger(logger)
reg.MustRegister(&collector.CollectContext{
CollectConfig: &conf.Collects[idx],
Context: r.Context(),
Expand Down
34 changes: 34 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package main

import (
"bytes"
"fmt"
"github.com/MicroOps-cn/data_exporter/collector"
"github.com/MicroOps-cn/data_exporter/testings"
"github.com/go-kit/log"
Expand Down Expand Up @@ -131,3 +132,36 @@ func TestCollectMetrics(t *testing.T) {
assert.Contains(t, body, `weather_temperature_hour{name="吉林",zone="china"} 16`)
assert.Contains(t, body, `server_memory{name="server1"} 6.8719476736e+10`)
}

func TestCollectMetricsByName(t *testing.T) {
tt := testings.NewTesting(t)
logger := log.NewLogfmtLogger(os.Stdout)
reader := bytes.NewReader([]byte(yamlConfigContent))
tt.AssertNoError(sc.ReloadConfigFromReader(reader, logger))
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
f, err := os.Open("examples/weather.xml")
tt.AssertNoError(err)
defer f.Close()
tt.AssertNoError(io.Copy(w, f))
}))
time.Sleep(time.Second)
defer ts.Close()
sc.C.Collects[1].Datasource[0].Url = ts.URL
sc.C.Collects[0].Datasource[0].Url = "examples/my_data.json"

req, err := http.NewRequest("GET", "", nil)
if err != nil {
t.Fatal(err)
}
rr := httptest.NewRecorder()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
collectMetricsByName(logger, "test-http", w, r)
})
handler.ServeHTTP(rr, req)
tt.AssertEqual(rr.Code, 200)
body := rr.Body.String()
fmt.Println(body)
assert.NotContains(t, body, `weather_temperature_week{name="黑龙江",zone="china"} 18`)
assert.NotContains(t, body, `weather_temperature_hour{name="吉林",zone="china"} 16`)
assert.Contains(t, body, `server_memory{name="server1"} 6.8719476736e+10`)
}

0 comments on commit ffe4435

Please sign in to comment.