diff --git a/pkg/transfer/template/etl/formatter/handler.go b/pkg/transfer/template/etl/formatter/handler.go index f63cf3d71..242b19794 100644 --- a/pkg/transfer/template/etl/formatter/handler.go +++ b/pkg/transfer/template/etl/formatter/handler.go @@ -10,6 +10,11 @@ package formatter import ( + "bytes" + "compress/gzip" + "encoding/base64" + "io" + "strings" "time" "github.com/cstockton/go-conv" @@ -384,6 +389,10 @@ func tryDecodeExtraMeta(s string) ([]map[string]string, error) { Custom []map[string]string `json:"custom"` } + type V2Meta struct { + Content string `json:"content"` + } + parseV0Meta := func(b []byte) ([]map[string]string, error) { ret := make([]map[string]string, 0) err := json.Unmarshal(b, &ret) @@ -412,20 +421,47 @@ func tryDecodeExtraMeta(s string) ([]map[string]string, error) { return ret, nil } - type tryV1 struct { + parseV2Meta := func(b []byte) ([]map[string]string, error) { + var v2Meta V2Meta + if err := json.Unmarshal(b, &v2Meta); err != nil { + return nil, err + } + + b, err := base64.RawStdEncoding.DecodeString(strings.TrimRight(v2Meta.Content, "=")) + if err != nil { + return nil, err + } + + r, err := gzip.NewReader(bytes.NewBuffer(b)) + if err != nil { + return nil, err + } + defer r.Close() + + content, err := io.ReadAll(r) + if err != nil { + return nil, err + } + return parseV1Meta(content) + } + + type tryVer struct { Version string `json:"version"` } - var tryv1 tryV1 + var tryV tryVer var ret []map[string]string - // 尝试用最小代价解析 version 字段,判断其是否为 v1 格式 + // 尝试用最小代价解析 version 字段,判断数据版本号 // 不同版本格式 // v0: []map[string]string // v1: V1Meta - err := json.Unmarshal([]byte(s), &tryv1) - if err == nil && tryv1.Version == "v1" { + // v2: V2Meta -> content: V1Meta + err := json.Unmarshal([]byte(s), &tryV) + if err == nil && tryV.Version == "v1" { ret, err = parseV1Meta([]byte(s)) + } else if err == nil && tryV.Version == "v2" { + ret, err = parseV2Meta([]byte(s)) } else { ret, err = parseV0Meta([]byte(s)) } diff --git a/pkg/transfer/template/etl/formatter/handler_test.go b/pkg/transfer/template/etl/formatter/handler_test.go index 0d746870e..6c6d589eb 100644 --- a/pkg/transfer/template/etl/formatter/handler_test.go +++ b/pkg/transfer/template/etl/formatter/handler_test.go @@ -292,6 +292,42 @@ func (s *HandlerSuite) TestCutterByDbmMetaV1() { }) } +func (s *HandlerSuite) TestCutterByDbmMetaV2() { + hostInfo := models.CCHostInfo{ + IP: "127.0.0.1", + CloudID: 1, + CCTopoBaseModelInfo: &models.CCTopoBaseModelInfo{ + BizID: []int{2}, + Topo: []map[string]string{}, + }, + DbmMeta: `{"version": "v2", "content": "H4sIAB6N7mUC/8WXT2vDMAzFv0rxeQz5z9pu5112bm9jhCwxIyyOQ5LRltDvPgtGx047/g4hIAs9ydZ7tlbT5JTyYJ42q6nHsfzN535e4ryYu41aulZtVmQnWzU1/VdZnqrlMkZdOZ5iGqd8vhzj0Hbz4fD8MsxLPTRRvdv3m+MUy7K5aogSIadie11v4dqc6k7TMH0ePv777kvWGr77gaqm3P+CVKnWmH8cxjwt6uBF3E6zYKC3GLR4rmrBoEPgoLmqHVe158hlOejA8dpZ7qwfuLPmOtxyvLbg9fHIdbjjeA1Cc+QKnJAKeGlyHR44DXfcs9BzahbAqjlJ8VzVds9JCiekAp41t+EefCBxHS7chlvu5hLwRQoOPtz04cCZi1Mzy1XtQXKBbQaqGchrDlrKzPV2/QaY+kKyjxcAAA"}`, + } + s.StoreHost(&hostInfo).AnyTimes() + s.Store.EXPECT().Get(gomock.Any()).Return(nil, define.ErrItemNotFound).AnyTimes() + + s.runHandler(TransferRecordCutterByDbmMetaCreator(s.Store, true), func(record *define.ETLRecord) { + dims := record.Dimensions + s.NotNil(dims["app"]) + s.NotNil(dims["appid"]) + s.NotNil(dims["db_type"]) + s.NotNil(dims["cluster_type"]) + s.NotNil(dims["cluster_domain"]) + s.NotNil(dims["instance_port"]) + s.NotNil(dims["instance_role"]) + s.T().Logf("dbm-meta/v2 record: %+v", record) + }, []handlerCase{ + { + 48, nil, define.ETLRecord{ + Dimensions: map[string]interface{}{ + define.RecordBizIDFieldName: 3, + define.RecordIPFieldName: "127.0.0.1", + define.RecordCloudIDFieldName: "1", + }, + }, + }, + }) +} + // TestFillBizIDHandlerCreator : func (s *HandlerSuite) TestFillBizIDHandlerCreatorWithInstanceId() { instanceInfo := &models.CCInstanceInfo{ diff --git a/pkg/transfer/testsuite/mock_define.go b/pkg/transfer/testsuite/mock_define.go index 50f0df92c..a929766df 100644 --- a/pkg/transfer/testsuite/mock_define.go +++ b/pkg/transfer/testsuite/mock_define.go @@ -183,6 +183,14 @@ type MockPayload struct { recorder *MockPayloadMockRecorder } +func (m *MockPayload) SetFlag(flag define.PayloadFlag) {} + +func (m *MockPayload) AddFlag(flag define.PayloadFlag) {} + +func (m *MockPayload) Flag() define.PayloadFlag { + return 0 +} + // MockPayloadMockRecorder is the mock recorder for MockPayload. type MockPayloadMockRecorder struct { mock *MockPayload