diff --git a/pkg/config/config.go b/pkg/config/config.go index 00668c8..ead4f2a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -16,6 +16,7 @@ const ( DefaultDelaySeconds = 300 DefaultRelodIntervalMinutes = 60 DefaultRateLimit = 15 + DefaultQueryMetricBatchSize = 50 EnvAccessKey = "TENCENTCLOUD_SECRET_ID" EnvSecretKey = "TENCENTCLOUD_SECRET_KEY" @@ -103,11 +104,12 @@ func (p *TencentProduct) IsReloadEnable() bool { } type TencentConfig struct { - Credential TencentCredential `yaml:"credential"` - Metrics []TencentMetric `yaml:"metrics"` - Products []TencentProduct `yaml:"products"` - RateLimit float64 `yaml:"rate_limit"` - Filename string `yaml:"filename"` + Credential TencentCredential `yaml:"credential"` + Metrics []TencentMetric `yaml:"metrics"` + Products []TencentProduct `yaml:"products"` + RateLimit float64 `yaml:"rate_limit"` + MetricQueryBatchSize int `yaml:"metric_query_batch_size"` + Filename string `yaml:"filename"` } func NewConfig() *TencentConfig { @@ -193,6 +195,10 @@ func (c *TencentConfig) fillDefault() { c.RateLimit = DefaultRateLimit } + if c.MetricQueryBatchSize <= 0 || c.MetricQueryBatchSize > 100 { + c.MetricQueryBatchSize = DefaultQueryMetricBatchSize + } + for index, metric := range c.Metrics { if metric.PeriodSeconds == 0 { c.Metrics[index].PeriodSeconds = DefaultPeriodSeconds diff --git a/pkg/constant/cache.go b/pkg/constant/cache.go index 3f2495e..c66e85c 100644 --- a/pkg/constant/cache.go +++ b/pkg/constant/cache.go @@ -1 +1,7 @@ package constant + +import "time" + +const ( + DefaultReloadInterval = 60 * time.Minute +) diff --git a/pkg/constant/metric.go b/pkg/constant/metric.go new file mode 100644 index 0000000..89d6b81 --- /dev/null +++ b/pkg/constant/metric.go @@ -0,0 +1,5 @@ +package constant + +const ( + DefaultQueryMetricBatchSize = 50 +) diff --git a/pkg/instance/cache.go b/pkg/instance/cache.go index 9672984..013281f 100644 --- a/pkg/instance/cache.go +++ b/pkg/instance/cache.go @@ -10,10 +10,6 @@ import ( "github.com/go-kit/kit/log/level" ) -const ( - DefaultReloadInterval = 60 * time.Minute -) - // 可用于产品的实例的缓存, TcInstanceRepository type TcInstanceCache struct { Raw TcInstanceRepository diff --git a/pkg/metric/repository.go b/pkg/metric/repository.go index 74c5363..150e337 100644 --- a/pkg/metric/repository.go +++ b/pkg/metric/repository.go @@ -7,10 +7,11 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "golang.org/x/time/rate" + monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/monitor/v20180724" "github.com/tencentyun/tencentcloud-exporter/pkg/client" "github.com/tencentyun/tencentcloud-exporter/pkg/config" - "golang.org/x/time/rate" ) var ( @@ -33,7 +34,10 @@ type TcmMetricRepositoryImpl struct { monitorClient *monitor.Client limiter *rate.Limiter // 限速 ctx context.Context - logger log.Logger + + queryMetricBatchSize int + + logger log.Logger } func (repo *TcmMetricRepositoryImpl) GetMeta(namespace string, name string) (meta *TcmMeta, err error) { @@ -136,83 +140,111 @@ func (repo *TcmMetricRepositoryImpl) GetSamples(s *TcmSeries, st int64, et int64 return } -func (repo *TcmMetricRepositoryImpl) ListSamples(m *TcmMetric, st int64, et int64) (samplesList []*TcmSamples, err error) { - for _, seriesList := range m.GetSeriesSplitByBatch(10) { - ctx, cancel := context.WithCancel(repo.ctx) - err = repo.limiter.Wait(ctx) +func (repo *TcmMetricRepositoryImpl) ListSamples(m *TcmMetric, st int64, et int64) ([]*TcmSamples, error) { + var samplesList []*TcmSamples + for _, seriesList := range m.GetSeriesSplitByBatch(repo.queryMetricBatchSize) { + sl, err := repo.listSampleByBatch(m, seriesList, st, et) if err != nil { - return + level.Error(repo.logger).Log("msg", err.Error()) + continue } + samplesList = append(samplesList, sl...) + } + return samplesList, nil +} - request := monitor.NewGetMonitorDataRequest() - request.Namespace = &m.Meta.Namespace - request.MetricName = &m.Meta.MetricName - - period := uint64(m.Conf.StatPeriodSeconds) - request.Period = &period - - for _, series := range seriesList { - ifilters := &monitor.Instance{ - Dimensions: []*monitor.Dimension{}, - } - for k, v := range series.QueryLabels { - tk := k - tv := v - ifilters.Dimensions = append(ifilters.Dimensions, &monitor.Dimension{Name: &tk, Value: &tv}) - } - request.Instances = append(request.Instances, ifilters) - } +func (repo *TcmMetricRepositoryImpl) listSampleByBatch( + m *TcmMetric, + seriesList []*TcmSeries, + st int64, + et int64, +) ([]*TcmSamples, error) { + var samplesList []*TcmSamples - stStr := time.Unix(st, 0).Format(timeStampFormat) - request.StartTime = &stStr - if et != 0 { - etStr := time.Unix(et, 0).Format(timeStampFormat) - request.StartTime = &etStr - } + ctx, cancel := context.WithCancel(repo.ctx) + defer cancel() - response, err := repo.monitorClient.GetMonitorData(request) - if err != nil { - return nil, err + err := repo.limiter.Wait(ctx) + if err != nil { + return nil, err + } + + request := repo.buildGetMonitorDataRequest(m, seriesList, st, et) + response, err := repo.monitorClient.GetMonitorData(request) + if err != nil { + return nil, err + } + + for _, points := range response.Response.DataPoints { + samples, ql, e := repo.buildSamples(m, points) + if e != nil { + level.Debug(repo.logger).Log( + "msg", e.Error(), + "metric", m.Meta.MetricName, + "dimension", fmt.Sprintf("%v", ql)) + continue } + samplesList = append(samplesList, samples) + } + return samplesList, nil +} - for _, points := range response.Response.DataPoints { - ql := map[string]string{} - for _, dimension := range points.Dimensions { - if *dimension.Value != "" { - ql[*dimension.Name] = *dimension.Value - } - } - sid, e := GetTcmSeriesId(m, ql) - if e != nil { - level.Warn(repo.logger).Log( - "msg", "Get series id fail", - "metric", m.Meta.MetricName, - "dimension", fmt.Sprintf("%v", ql)) - continue - } - s, ok := m.Series[sid] - if !ok { - level.Warn(repo.logger).Log( - "msg", "Response data point not match series", - "metric", m.Meta.MetricName, - "dimension", fmt.Sprintf("%v", ql)) - continue - } - samples, e := NewTcmSamples(s, points) - if e != nil { - level.Debug(repo.logger).Log( - "msg", "The instance has no metric data and may not have traffic", - "metric", m.Meta.MetricName, - "dimension", fmt.Sprintf("%v", ql)) - } else { - samplesList = append(samplesList, samples) - } +func (repo *TcmMetricRepositoryImpl) buildGetMonitorDataRequest( + m *TcmMetric, + seriesList []*TcmSeries, + st int64, et int64, +) *monitor.GetMonitorDataRequest { + request := monitor.NewGetMonitorDataRequest() + request.Namespace = &m.Meta.Namespace + request.MetricName = &m.Meta.MetricName + + period := uint64(m.Conf.StatPeriodSeconds) + request.Period = &period + for _, series := range seriesList { + ifilters := &monitor.Instance{ + Dimensions: []*monitor.Dimension{}, } + for k, v := range series.QueryLabels { + tk := k + tv := v + ifilters.Dimensions = append(ifilters.Dimensions, &monitor.Dimension{Name: &tk, Value: &tv}) + } + request.Instances = append(request.Instances, ifilters) + } + + stStr := time.Unix(st, 0).Format(timeStampFormat) + request.StartTime = &stStr + if et != 0 { + etStr := time.Unix(et, 0).Format(timeStampFormat) + request.StartTime = &etStr + } + return request +} - cancel() +func (repo *TcmMetricRepositoryImpl) buildSamples( + m *TcmMetric, + points *monitor.DataPoint, +) (*TcmSamples, map[string]string, error) { + ql := map[string]string{} + for _, dimension := range points.Dimensions { + if *dimension.Value != "" { + ql[*dimension.Name] = *dimension.Value + } } - return + sid, e := GetTcmSeriesId(m, ql) + if e != nil { + return nil, ql, fmt.Errorf("get series id fail") + } + s, ok := m.Series[sid] + if !ok { + return nil, ql, fmt.Errorf("response data point not match series") + } + samples, e := NewTcmSamples(s, points) + if e != nil { + return nil, ql, fmt.Errorf("this instance may not have metric data") + } + return samples, ql, nil } func NewTcmMetricRepository(conf *config.TencentConfig, logger log.Logger) (repo TcmMetricRepository, err error) { @@ -222,10 +254,11 @@ func NewTcmMetricRepository(conf *config.TencentConfig, logger log.Logger) (repo } repo = &TcmMetricRepositoryImpl{ - monitorClient: monitorClient, - limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1), - ctx: context.Background(), - logger: logger, + monitorClient: monitorClient, + limiter: rate.NewLimiter(rate.Limit(conf.RateLimit), 1), + ctx: context.Background(), + queryMetricBatchSize: conf.MetricQueryBatchSize, + logger: logger, } return