diff --git a/api/openapi/readers.yml b/api/openapi/readers.yml index 47cea33ebd..2391060e27 100644 --- a/api/openapi/readers.yml +++ b/api/openapi/readers.yml @@ -26,7 +26,7 @@ servers: - url: https://localhost:9009 - url: http://localhost:9011 - url: https://localhost:9011 - + tags: - name: readers description: Everything about your Readers @@ -57,14 +57,16 @@ paths: - $ref: "#/components/parameters/DataValue" - $ref: "#/components/parameters/From" - $ref: "#/components/parameters/To" + - $ref: "#/components/parameters/Aggregation" + - $ref: "#/components/parameters/Interval" responses: - '200': + "200": $ref: "#/components/responses/MessagesPageRes" - '400': + "400": description: Failed due to malformed query parameters. - '401': + "401": description: Missing or invalid access token provided. - '500': + "500": $ref: "#/components/responses/ServiceError" /health: get: @@ -72,9 +74,9 @@ paths: tags: - health responses: - '200': + "200": $ref: "#/components/responses/HealthRes" - '500': + "500": $ref: "#/components/responses/ServiceError" components: @@ -226,6 +228,7 @@ components: in: query schema: type: number + example: 1709218556069 required: false To: name: to @@ -233,6 +236,34 @@ components: in: query schema: type: number + example: 1709218757503 + required: false + Aggregation: + name: aggregation + description: Aggregation function. + in: query + schema: + type: string + enum: + - MAX + - AVG + - MIN + - SUM + - COUNT + - max + - min + - sum + - avg + - count + example: MAX + required: false + Interval: + name: interval + description: Aggregation interval. + in: query + schema: + type: string + example: 10s required: false responses: diff --git a/cli/message.go b/cli/message.go index 79f86571fb..538b51bc80 100644 --- a/cli/message.go +++ b/cli/message.go @@ -38,9 +38,11 @@ var cmdMessages = []cobra.Command{ logUsage(cmd.Use) return } - pageMetadata := mgxsdk.PageMetadata{ - Offset: Offset, - Limit: Limit, + pageMetadata := mgxsdk.MessagePageMetadata{ + PageMetadata: mgxsdk.PageMetadata{ + Offset: Offset, + Limit: Limit, + }, } m, err := sdk.ReadMessages(pageMetadata, args[0], args[1]) diff --git a/internal/apiutil/errors.go b/internal/apiutil/errors.go index bca1c710ed..11b880106d 100644 --- a/internal/apiutil/errors.go +++ b/internal/apiutil/errors.go @@ -164,4 +164,16 @@ var ( // ErrRollbackTx indicates failed to rollback transaction. ErrRollbackTx = errors.New("failed to rollback transaction") + + // ErrInvalidAggregation indicates invalid aggregation value. + ErrInvalidAggregation = errors.New("invalid aggregation value") + + // ErrInvalidInterval indicates invalid interval value. + ErrInvalidInterval = errors.New("invalid interval value") + + // ErrMissingFrom indicates missing from value. + ErrMissingFrom = errors.New("missing from time value") + + // ErrMissingTo indicates missing to value. + ErrMissingTo = errors.New("missing to time value") ) diff --git a/pkg/sdk/go/message.go b/pkg/sdk/go/message.go index 591436ef51..e5a044885e 100644 --- a/pkg/sdk/go/message.go +++ b/pkg/sdk/go/message.go @@ -7,6 +7,8 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" + "strconv" "strings" "github.com/absmach/magistrala/internal/apiutil" @@ -23,14 +25,14 @@ func (sdk mgSDK) SendMessage(chanName, msg, key string) errors.SDKError { subtopicPart = fmt.Sprintf("/%s", strings.ReplaceAll(chanNameParts[1], ".", "/")) } - url := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart) + reqURL := fmt.Sprintf("%s/channels/%s/messages%s", sdk.httpAdapterURL, chanID, subtopicPart) - _, _, err := sdk.processRequest(http.MethodPost, url, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted) + _, _, err := sdk.processRequest(http.MethodPost, reqURL, ThingPrefix+key, []byte(msg), nil, http.StatusAccepted) return err } -func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (MessagesPage, errors.SDKError) { +func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, token string) (MessagesPage, errors.SDKError) { chanNameParts := strings.SplitN(chanName, ".", channelParts) chanID := chanNameParts[0] subtopicPart := "" @@ -39,7 +41,7 @@ func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (Messages } readMessagesEndpoint := fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart) - url, err := sdk.withQueryParams(sdk.readerURL, readMessagesEndpoint, pm) + msgURL, err := sdk.withMessageQueryParams(sdk.readerURL, readMessagesEndpoint, pm) if err != nil { return MessagesPage{}, errors.NewSDKError(err) } @@ -47,7 +49,7 @@ func (sdk mgSDK) ReadMessages(pm PageMetadata, chanName, token string) (Messages header := make(map[string]string) header["Content-Type"] = string(sdk.msgContentType) - _, body, sdkerr := sdk.processRequest(http.MethodGet, url, token, nil, header, http.StatusOK) + _, body, sdkerr := sdk.processRequest(http.MethodGet, msgURL, token, nil, header, http.StatusOK) if sdkerr != nil { return MessagesPage{}, sdkerr } @@ -69,3 +71,34 @@ func (sdk *mgSDK) SetContentType(ct ContentType) errors.SDKError { return nil } + +func (sdk mgSDK) withMessageQueryParams(baseURL, endpoint string, mpm MessagePageMetadata) (string, error) { + b, err := json.Marshal(mpm) + if err != nil { + return "", err + } + q := map[string]interface{}{} + if err := json.Unmarshal(b, &q); err != nil { + return "", err + } + ret := url.Values{} + for k, v := range q { + switch t := v.(type) { + case string: + ret.Add(k, t) + case float64: + ret.Add(k, strconv.FormatFloat(t, 'f', -1, 64)) + case uint64: + ret.Add(k, strconv.FormatUint(t, 10)) + case int64: + ret.Add(k, strconv.FormatInt(t, 10)) + case json.Number: + ret.Add(k, t.String()) + case bool: + ret.Add(k, strconv.FormatBool(t)) + } + } + qs := ret.Encode() + + return fmt.Sprintf("%s/%s?%s", baseURL, endpoint, qs), nil +} diff --git a/pkg/sdk/go/sdk.go b/pkg/sdk/go/sdk.go index 1bb8ff3033..8a4d980c73 100644 --- a/pkg/sdk/go/sdk.go +++ b/pkg/sdk/go/sdk.go @@ -69,6 +69,22 @@ var ( ErrInvalidJWT = errors.New("invalid JWT") ) +type MessagePageMetadata struct { + PageMetadata + Subtopic string `json:"subtopic,omitempty"` + Publisher string `json:"publisher,omitempty"` + Comparator string `json:"comparator,omitempty"` + BoolValue *bool `json:"vb,omitempty"` + StringValue string `json:"vs,omitempty"` + DataValue string `json:"vd,omitempty"` + From float64 `json:"from,omitempty"` + To float64 `json:"to,omitempty"` + Aggregation string `json:"aggregation,omitempty"` + Interval string `json:"interval,omitempty"` + Value float64 `json:"value,omitempty"` + Protocol string `json:"protocol,omitempty"` +} + type PageMetadata struct { Total uint64 `json:"total"` Offset uint64 `json:"offset"` @@ -828,13 +844,13 @@ type SDK interface { // ReadMessages read messages of specified channel. // // example: - // pm := sdk.PageMetadata{ + // pm := sdk.MessagePageMetadata{ // Offset: 0, // Limit: 10, // } // msgs, _ := sdk.ReadMessages(pm,"channelID", "token") // fmt.Println(msgs) - ReadMessages(pm PageMetadata, chanID, token string) (MessagesPage, errors.SDKError) + ReadMessages(pm MessagePageMetadata, chanID, token string) (MessagesPage, errors.SDKError) // SetContentType sets message content type. // diff --git a/pkg/sdk/mocks/sdk.go b/pkg/sdk/mocks/sdk.go index eadcb80409..56123f54f9 100644 --- a/pkg/sdk/mocks/sdk.go +++ b/pkg/sdk/mocks/sdk.go @@ -1805,7 +1805,7 @@ func (_m *SDK) Parents(id string, pm sdk.PageMetadata, token string) (sdk.Groups } // ReadMessages provides a mock function with given fields: pm, chanID, token -func (_m *SDK) ReadMessages(pm sdk.PageMetadata, chanID string, token string) (sdk.MessagesPage, errors.SDKError) { +func (_m *SDK) ReadMessages(pm sdk.MessagePageMetadata, chanID string, token string) (sdk.MessagesPage, errors.SDKError) { ret := _m.Called(pm, chanID, token) if len(ret) == 0 { @@ -1814,16 +1814,16 @@ func (_m *SDK) ReadMessages(pm sdk.PageMetadata, chanID string, token string) (s var r0 sdk.MessagesPage var r1 errors.SDKError - if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string) (sdk.MessagesPage, errors.SDKError)); ok { + if rf, ok := ret.Get(0).(func(sdk.MessagePageMetadata, string, string) (sdk.MessagesPage, errors.SDKError)); ok { return rf(pm, chanID, token) } - if rf, ok := ret.Get(0).(func(sdk.PageMetadata, string, string) sdk.MessagesPage); ok { + if rf, ok := ret.Get(0).(func(sdk.MessagePageMetadata, string, string) sdk.MessagesPage); ok { r0 = rf(pm, chanID, token) } else { r0 = ret.Get(0).(sdk.MessagesPage) } - if rf, ok := ret.Get(1).(func(sdk.PageMetadata, string, string) errors.SDKError); ok { + if rf, ok := ret.Get(1).(func(sdk.MessagePageMetadata, string, string) errors.SDKError); ok { r1 = rf(pm, chanID, token) } else { if ret.Get(1) != nil { diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go index 6c77b46b2a..cbbee4d5a3 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/endpoint_test.go @@ -414,7 +414,6 @@ func TestReadAll(t *testing.T) { key: thingToken, status: http.StatusBadRequest, }, - { desc: "read page with non-float to as thing", url: fmt.Sprintf("%s/channels/%s/messages?to=ABCD", ts.URL, chanID), @@ -431,6 +430,68 @@ func TestReadAll(t *testing.T) { Messages: messages[5:15], }, }, + { + desc: "read page with aggregation as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX", ts.URL, chanID), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with interval as thing", + url: fmt.Sprintf("%s/channels/%s/messages?interval=10h", ts.URL, chanID), + key: thingToken, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, + }, + { + desc: "read page with aggregation and interval as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h", ts.URL, chanID), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval, to and from as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: thingToken, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages[5:20])), + Messages: messages[5:15], + }, + }, + { + desc: "read page with invalid aggregation and valid interval, to and from as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=invalid&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with invalid interval and valid aggregation, to and from as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10hrs&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with missing from as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=%f", ts.URL, chanID, messages[4].Time), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with invalid from as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=ABCD&from=%f", ts.URL, chanID, messages[4].Time), + key: thingToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with invalid to as thing", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=ABCD", ts.URL, chanID, messages[4].Time), + key: thingToken, + status: http.StatusBadRequest, + }, { desc: "read page with valid offset and limit as user", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), @@ -711,6 +772,68 @@ func TestReadAll(t *testing.T) { Messages: messages[5:15], }, }, + { + desc: "read page with aggregation as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX", ts.URL, chanID), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with interval as user", + url: fmt.Sprintf("%s/channels/%s/messages?interval=10h", ts.URL, chanID), + key: userToken, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages)), + Messages: messages[0:10], + }, + }, + { + desc: "read page with aggregation and interval as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h", ts.URL, chanID), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval, to and from as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: userToken, + status: http.StatusOK, + res: pageRes{ + Total: uint64(len(messages[5:20])), + Messages: messages[5:15], + }, + }, + { + desc: "read page with invalid aggregation and valid interval, to and from as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=invalid&interval=10h&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with invalid interval and valid aggregation, to and from as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10hrs&from=%f&to=%f", ts.URL, chanID, messages[19].Time, messages[4].Time), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with missing from as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=%f", ts.URL, chanID, messages[4].Time), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with invalid from as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&to=ABCD&from=%f", ts.URL, chanID, messages[4].Time), + key: userToken, + status: http.StatusBadRequest, + }, + { + desc: "read page with aggregation, interval and to with invalid to as user", + url: fmt.Sprintf("%s/channels/%s/messages?aggregation=MAX&interval=10h&from=%f&to=ABCD", ts.URL, chanID, messages[4].Time), + key: userToken, + status: http.StatusBadRequest, + }, } for _, tc := range cases { diff --git a/readers/api/requests.go b/readers/api/requests.go index b157d106e1..b1386a71fa 100644 --- a/readers/api/requests.go +++ b/readers/api/requests.go @@ -4,12 +4,18 @@ package api import ( + "slices" + "strings" + "time" + "github.com/absmach/magistrala/internal/apiutil" "github.com/absmach/magistrala/readers" ) const maxLimitSize = 1000 +var validAggregations = []string{"MAX", "MIN", "AVG", "SUM", "COUNT"} + type listMessagesReq struct { chanID string token string @@ -39,5 +45,23 @@ func (req listMessagesReq) validate() error { return apiutil.ErrInvalidComparator } + if req.pageMeta.Aggregation != "" { + if req.pageMeta.From == 0 { + return apiutil.ErrMissingFrom + } + + if req.pageMeta.To == 0 { + return apiutil.ErrMissingTo + } + + if !slices.Contains(validAggregations, strings.ToUpper(req.pageMeta.Aggregation)) { + return apiutil.ErrInvalidAggregation + } + + if _, err := time.ParseDuration(req.pageMeta.Interval); err != nil { + return apiutil.ErrInvalidInterval + } + } + return nil } diff --git a/readers/api/transport.go b/readers/api/transport.go index 5cab942a80..2c55d8a98b 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -36,6 +36,9 @@ const ( comparatorKey = "comparator" fromKey = "from" toKey = "to" + aggregationKey = "aggregation" + intervalKey = "interval" + defInterval = "1s" defLimit = 10 defOffset = 0 defFormat = "messages" @@ -141,6 +144,19 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) { return nil, errors.Wrap(apiutil.ErrValidation, err) } + aggregation, err := apiutil.ReadStringQuery(r, aggregationKey, "") + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + + var interval string + if aggregation != "" { + interval, err = apiutil.ReadStringQuery(r, intervalKey, defInterval) + if err != nil { + return nil, errors.Wrap(apiutil.ErrValidation, err) + } + } + req := listMessagesReq{ chanID: chi.URLParam(r, "chanID"), token: apiutil.ExtractBearerToken(r), @@ -160,6 +176,8 @@ func decodeList(_ context.Context, r *http.Request) (interface{}, error) { BoolValue: vb, From: from, To: to, + Aggregation: aggregation, + Interval: interval, }, } return req, nil @@ -196,7 +214,11 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) { errors.Contains(err, apiutil.ErrMissingID), errors.Contains(err, apiutil.ErrLimitSize), errors.Contains(err, apiutil.ErrOffsetSize), - errors.Contains(err, apiutil.ErrInvalidComparator): + errors.Contains(err, apiutil.ErrInvalidComparator), + errors.Contains(err, apiutil.ErrInvalidAggregation), + errors.Contains(err, apiutil.ErrInvalidInterval), + errors.Contains(err, apiutil.ErrMissingFrom), + errors.Contains(err, apiutil.ErrMissingTo): w.WriteHeader(http.StatusBadRequest) case errors.Contains(err, svcerr.ErrAuthentication), errors.Contains(err, svcerr.ErrAuthorization), diff --git a/readers/messages.go b/readers/messages.go index acda83d776..eb14ebeb0a 100644 --- a/readers/messages.go +++ b/readers/messages.go @@ -55,6 +55,8 @@ type PageMetadata struct { From float64 `json:"from,omitempty"` To float64 `json:"to,omitempty"` Format string `json:"format,omitempty"` + Aggregation string `json:"aggregation,omitempty"` + Interval string `json:"interval,omitempty"` } // ParseValueComparator convert comparison operator keys into mathematic anotation. diff --git a/readers/timescale/messages.go b/readers/timescale/messages.go index 26857a56ae..e16304ab5e 100644 --- a/readers/timescale/messages.go +++ b/readers/timescale/messages.go @@ -37,7 +37,14 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( format = rpm.Format } - q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC LIMIT :limit OFFSET :offset;`, format, fmtCondition(chanID, rpm), order) + q := fmt.Sprintf(`SELECT * FROM %s WHERE %s ORDER BY %s DESC LIMIT :limit OFFSET :offset;`, format, fmtCondition(rpm), order) + totalQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(rpm)) + + // If aggregation is provided, add time_bucket and aggregation to the query + if rpm.Aggregation != "" { + q = fmt.Sprintf(`SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1 ORDER BY time DESC LIMIT :limit OFFSET :offset;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm)) + totalQuery = fmt.Sprintf(`SELECT COUNT(*) FROM (SELECT EXTRACT(epoch FROM time_bucket('%s', to_timestamp(time/1000))) AS time, %s(value) AS value FROM %s WHERE %s GROUP BY 1) AS subquery;`, rpm.Interval, rpm.Aggregation, format, fmtCondition(rpm)) + } params := map[string]interface{}{ "channel": chanID, @@ -94,8 +101,7 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( } } - q = fmt.Sprintf(`SELECT COUNT(*) FROM %s WHERE %s;`, format, fmtCondition(chanID, rpm)) - rows, err = tr.db.NamedQuery(q, params) + rows, err = tr.db.NamedQuery(totalQuery, params) if err != nil { return readers.MessagesPage{}, errors.Wrap(readers.ErrReadMessages, err) } @@ -112,7 +118,7 @@ func (tr timescaleRepository) ReadAll(chanID string, rpm readers.PageMetadata) ( return page, nil } -func fmtCondition(chanID string, rpm readers.PageMetadata) string { +func fmtCondition(rpm readers.PageMetadata) string { condition := `channel = :channel` var query map[string]interface{}