Skip to content

Commit

Permalink
MG-2507 - Update auth in readers service (absmach#2514)
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru authored Nov 10, 2024
1 parent a5059a7 commit 3a02788
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 156 deletions.
11 changes: 10 additions & 1 deletion api/openapi/readers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tags:
url: https://docs.magistrala.abstractmachines.fr/

paths:
/channels/{chanId}/messages:
/{domainID}/channels/{chanId}/messages:
get:
operationId: getMessages
summary: Retrieves messages sent to single channel
Expand All @@ -47,6 +47,7 @@ paths:
tags:
- readers
parameters:
- $ref: "#/components/parameters/DomainID"
- $ref: "#/components/parameters/ChanId"
- $ref: "#/components/parameters/Limit"
- $ref: "#/components/parameters/Offset"
Expand Down Expand Up @@ -141,6 +142,14 @@ components:
description: Time of updating measurement.

parameters:
DomainID:
name: domainID
description: Unique domain identifier.
in: path
schema:
type: string
format: uuid
required: true
ChanId:
name: chanId
description: Unique channel identifier.
Expand Down
8 changes: 4 additions & 4 deletions cli/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ var cmdMessages = []cobra.Command{
},
},
{
Use: "read <channel_id.subtopic> <user_token>",
Use: "read <channel_id.subtopic> <domain_id> <user_token>",
Short: "Read messages",
Long: "Reads all channel messages\n" +
"Usage:\n" +
"\tmagistrala-cli messages read <channel_id.subtopic> <user_token> --offset <offset> --limit <limit> - lists all messages with provided offset and limit\n",
"\tmagistrala-cli messages read <channel_id.subtopic> <domain_id> <user_token> --offset <offset> --limit <limit> - lists all messages with provided offset and limit\n",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
if len(args) != 3 {
logUsageCmd(*cmd, cmd.Use)
return
}
Expand All @@ -45,7 +45,7 @@ var cmdMessages = []cobra.Command{
},
}

m, err := sdk.ReadMessages(pageMetadata, args[0], args[1])
m, err := sdk.ReadMessages(pageMetadata, args[0], args[1], args[2])
if err != nil {
logErrorCmd(*cmd, err)
return
Expand Down
5 changes: 4 additions & 1 deletion cli/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func TestReadMesageCmd(t *testing.T) {
desc: "read message successfully",
args: []string{
channel.ID,
domainID,
validToken,
},
page: mgsdk.MessagesPage{
Expand All @@ -124,6 +125,7 @@ func TestReadMesageCmd(t *testing.T) {
desc: "read message with invalid args",
args: []string{
channel.ID,
domainID,
validToken,
extraArg,
},
Expand All @@ -133,6 +135,7 @@ func TestReadMesageCmd(t *testing.T) {
desc: "read message with invalid token",
args: []string{
channel.ID,
domainID,
invalidToken,
},
sdkErr: errors.NewSDKErrorWithStatus(svcerr.ErrAuthorization, http.StatusUnauthorized),
Expand All @@ -143,7 +146,7 @@ func TestReadMesageCmd(t *testing.T) {

for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
sdkCall := sdkMock.On("ReadMessages", mock.Anything, tc.args[0], tc.args[1]).Return(tc.page, tc.sdkErr)
sdkCall := sdkMock.On("ReadMessages", mock.Anything, tc.args[0], tc.args[1], tc.args[2]).Return(tc.page, tc.sdkErr)
out := executeCommand(t, rootCmd, append([]string{readCmd}, tc.args...)...)

switch tc.logType {
Expand Down
18 changes: 14 additions & 4 deletions cmd/postgres-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
"github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
pgclient "github.com/absmach/magistrala/pkg/postgres"
Expand Down Expand Up @@ -84,14 +85,14 @@ func main() {
}
defer db.Close()

authzCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authzCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
clientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}

authz, authzHandler, err := authsvc.NewAuthorization(ctx, authzCfg)
authz, authzHandler, err := authsvc.NewAuthorization(ctx, clientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
Expand All @@ -100,6 +101,15 @@ func main() {
defer authzHandler.Close()
logger.Info("Authz successfully connected to auth gRPC server " + authzHandler.Secure())

authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, clientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("Authn successfully connected to auth gRPC server " + authnHandler.Secure())

thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
Expand All @@ -125,7 +135,7 @@ func main() {
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authz, thingsClient, svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authn, authz, thingsClient, svcName, cfg.InstanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
Expand Down
18 changes: 14 additions & 4 deletions cmd/timescale-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
chclient "github.com/absmach/callhome/pkg/client"
"github.com/absmach/magistrala"
mglog "github.com/absmach/magistrala/logger"
authsvcAuthn "github.com/absmach/magistrala/pkg/authn/authsvc"
"github.com/absmach/magistrala/pkg/authz/authsvc"
"github.com/absmach/magistrala/pkg/grpcclient"
pgclient "github.com/absmach/magistrala/pkg/postgres"
Expand Down Expand Up @@ -84,14 +85,14 @@ func main() {

repo := newService(db, logger)

authzCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&authzCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
clientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&clientCfg, env.Options{Prefix: envPrefixAuth}); err != nil {
logger.Error(fmt.Sprintf("failed to load auth gRPC client configuration : %s", err))
exitCode = 1
return
}

authz, authzHandler, err := authsvc.NewAuthorization(ctx, authzCfg)
authz, authzHandler, err := authsvc.NewAuthorization(ctx, clientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
Expand All @@ -100,6 +101,15 @@ func main() {
defer authzHandler.Close()
logger.Info("Authz successfully connected to auth gRPC server " + authzHandler.Secure())

authn, authnHandler, err := authsvcAuthn.NewAuthentication(ctx, clientCfg)
if err != nil {
logger.Error(err.Error())
exitCode = 1
return
}
defer authnHandler.Close()
logger.Info("Authn successfully connected to auth gRPC server " + authnHandler.Secure())

thingsClientCfg := grpcclient.Config{}
if err := env.ParseWithOptions(&thingsClientCfg, env.Options{Prefix: envPrefixThings}); err != nil {
logger.Error(fmt.Sprintf("failed to load %s auth configuration : %s", svcName, err))
Expand All @@ -123,7 +133,7 @@ func main() {
exitCode = 1
return
}
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authz, thingsClient, svcName, cfg.InstanceID), logger)
hs := httpserver.NewServer(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, authn, authz, thingsClient, svcName, cfg.InstanceID), logger)

if cfg.SendTelemetry {
chc := chclient.New(svcName, magistrala.Version, logger, cancel)
Expand Down
16 changes: 15 additions & 1 deletion pkg/sdk/go/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
bmocks "github.com/absmach/magistrala/bootstrap/mocks"
mglog "github.com/absmach/magistrala/logger"
authnmocks "github.com/absmach/magistrala/pkg/authn/mocks"
authzmocks "github.com/absmach/magistrala/pkg/authz/mocks"
"github.com/absmach/magistrala/pkg/errors"
sdk "github.com/absmach/magistrala/pkg/sdk/go"
readersapi "github.com/absmach/magistrala/readers/api"
readersmocks "github.com/absmach/magistrala/readers/mocks"
thmocks "github.com/absmach/magistrala/things/mocks"
"github.com/stretchr/testify/assert"
)

Expand All @@ -31,7 +35,7 @@ func TestHealth(t *testing.T) {
bootstrapTs := setupMinimalBootstrap()
defer bootstrapTs.Close()

readerTs, _, _ := setupReader()
readerTs := setupMinimalReader()
defer readerTs.Close()

httpAdapterTs, _, _ := setupMessages()
Expand Down Expand Up @@ -128,3 +132,13 @@ func setupMinimalBootstrap() *httptest.Server {

return httptest.NewServer(mux)
}

func setupMinimalReader() *httptest.Server {
repo := new(readersmocks.MessageRepository)
authz := new(authzmocks.Authorization)
authn := new(authnmocks.Authentication)
things := new(thmocks.ThingsServiceClient)

mux := readersapi.MakeHandler(repo, authn, authz, things, "test", "")
return httptest.NewServer(mux)
}
4 changes: 2 additions & 2 deletions pkg/sdk/go/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ func (sdk mgSDK) SendMessage(chanName, msg, key string) errors.SDKError {
return err
}

func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, token string) (MessagesPage, errors.SDKError) {
func (sdk mgSDK) ReadMessages(pm MessagePageMetadata, chanName, domainID, token string) (MessagesPage, errors.SDKError) {
chanNameParts := strings.SplitN(chanName, ".", channelParts)
chanID := chanNameParts[0]
subtopicPart := ""
if len(chanNameParts) == channelParts {
subtopicPart = fmt.Sprintf("?subtopic=%s", chanNameParts[1])
}

readMessagesEndpoint := fmt.Sprintf("channels/%s/messages%s", chanID, subtopicPart)
readMessagesEndpoint := fmt.Sprintf("%s/channels/%s/messages%s", domainID, chanID, subtopicPart)
msgURL, err := sdk.withMessageQueryParams(sdk.readerURL, readMessagesEndpoint, pm)
if err != nil {
return MessagesPage{}, errors.NewSDKError(err)
Expand Down
33 changes: 24 additions & 9 deletions pkg/sdk/go/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/absmach/magistrala/http/api"
mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/magistrala/pkg/apiutil"
mgauthn "github.com/absmach/magistrala/pkg/authn"
authnmocks "github.com/absmach/magistrala/pkg/authn/mocks"
authzmocks "github.com/absmach/magistrala/pkg/authz/mocks"
"github.com/absmach/magistrala/pkg/errors"
svcerr "github.com/absmach/magistrala/pkg/errors/service"
Expand Down Expand Up @@ -50,13 +52,14 @@ func setupMessages() (*httptest.Server, *thmocks.ThingsServiceClient, *pubsub.Pu
return httptest.NewServer(http.HandlerFunc(mp.ServeHTTP)), things, pub
}

func setupReader() (*httptest.Server, *authzmocks.Authorization, *readersmocks.MessageRepository) {
func setupReader() (*httptest.Server, *authzmocks.Authorization, *authnmocks.Authentication, *readersmocks.MessageRepository) {
repo := new(readersmocks.MessageRepository)
authz := new(authzmocks.Authorization)
authn := new(authnmocks.Authentication)
things := new(thmocks.ThingsServiceClient)

mux := readersapi.MakeHandler(repo, authz, things, "test", "")
return httptest.NewServer(mux), authz, repo
mux := readersapi.MakeHandler(repo, authn, authz, things, "test", "")
return httptest.NewServer(mux), authz, authn, repo
}

func TestSendMessage(t *testing.T) {
Expand Down Expand Up @@ -196,7 +199,7 @@ func TestSetContentType(t *testing.T) {
}

func TestReadMessages(t *testing.T) {
ts, authz, repo := setupReader()
ts, authz, authn, repo := setupReader()
defer ts.Close()

channelID := "channelID"
Expand All @@ -220,8 +223,10 @@ func TestReadMessages(t *testing.T) {
desc string
token string
chanName string
domainID string
messagePageMeta sdk.MessagePageMetadata
authErr error
authzErr error
authnErr error
repoRes readers.MessagesPage
repoErr error
response sdk.MessagesPage
Expand All @@ -231,6 +236,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages successfully",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -257,6 +263,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages successfully with subtopic",
token: validToken,
chanName: channelID + ".subtopic",
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -281,6 +288,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages with invalid token",
token: invalidToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -289,7 +297,7 @@ func TestReadMessages(t *testing.T) {
Subtopic: "subtopic",
Publisher: validID,
},
authErr: svcerr.ErrAuthorization,
authzErr: svcerr.ErrAuthorization,
repoRes: readers.MessagesPage{},
response: sdk.MessagesPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(svcerr.ErrAuthorization, svcerr.ErrAuthorization), http.StatusUnauthorized),
Expand All @@ -298,6 +306,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages with empty token",
token: "",
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -306,7 +315,7 @@ func TestReadMessages(t *testing.T) {
Subtopic: "subtopic",
Publisher: validID,
},
authErr: svcerr.ErrAuthorization,
authnErr: svcerr.ErrAuthentication,
repoRes: readers.MessagesPage{},
response: sdk.MessagesPage{},
err: errors.NewSDKErrorWithStatus(errors.Wrap(apiutil.ErrValidation, apiutil.ErrBearerToken), http.StatusUnauthorized),
Expand All @@ -315,6 +324,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages with empty channel ID",
token: validToken,
chanName: "",
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -332,6 +342,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages with invalid message page metadata",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -352,6 +363,7 @@ func TestReadMessages(t *testing.T) {
desc: "read messages with response that cannot be unmarshalled",
token: validToken,
chanName: channelID,
domainID: validID,
messagePageMeta: sdk.MessagePageMetadata{
PageMetadata: sdk.PageMetadata{
Offset: 0,
Expand All @@ -371,16 +383,19 @@ func TestReadMessages(t *testing.T) {
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
authCall := authz.On("Authorize", mock.Anything, mock.Anything).Return(tc.authErr)
authCall := authz.On("Authorize", mock.Anything, mock.Anything).Return(tc.authzErr)
authCall1 := authn.On("Authenticate", mock.Anything, tc.token).Return(mgauthn.Session{UserID: validID}, tc.authnErr)
repoCall := repo.On("ReadAll", channelID, mock.Anything).Return(tc.repoRes, tc.repoErr)
response, err := mgsdk.ReadMessages(tc.messagePageMeta, tc.chanName, tc.token)
response, err := mgsdk.ReadMessages(tc.messagePageMeta, tc.chanName, tc.domainID, tc.token)
fmt.Println(err)
assert.Equal(t, tc.err, err)
assert.Equal(t, tc.response, response)
if tc.err == nil {
ok := repoCall.Parent.AssertCalled(t, "ReadAll", channelID, mock.Anything)
assert.True(t, ok)
}
authCall.Unset()
authCall1.Unset()
repoCall.Unset()
})
}
Expand Down
Loading

0 comments on commit 3a02788

Please sign in to comment.