diff --git a/.github/workflows/build-push.yaml b/.github/workflows/build-push.yaml index 3bda9bbc67..bd3fe3beee 100644 --- a/.github/workflows/build-push.yaml +++ b/.github/workflows/build-push.yaml @@ -71,6 +71,8 @@ jobs: dockerfile_path: packages/tenant-outbound-writer - image_name: compute-agreements-consumer dockerfile_path: packages/compute-agreements-consumer + - image_name: selfcare-onboarding-consumer + dockerfile_path: packages/selfcare-onboarding-consumer - image_name: producer-key-events-writer dockerfile_path: packages/producer-key-events-writer - image_name: producer-key-readmodel-writer diff --git a/packages/commons/src/config/consumerServiceConfig.ts b/packages/commons/src/config/consumerServiceConfig.ts index 25b126951a..726073e919 100644 --- a/packages/commons/src/config/consumerServiceConfig.ts +++ b/packages/commons/src/config/consumerServiceConfig.ts @@ -9,10 +9,12 @@ export const KafkaConsumerConfig = KafkaConfig.and( TOPIC_STARTING_OFFSET: z .union([z.literal("earliest"), z.literal("latest")]) .default("latest"), + RESET_CONSUMER_OFFSETS: z.string().default("false"), }) .transform((c) => ({ kafkaGroupId: c.KAFKA_GROUP_ID, topicStartingOffset: c.TOPIC_STARTING_OFFSET, + resetConsumerOffsets: c.RESET_CONSUMER_OFFSETS.toLowerCase() === "true", })) ); export type KafkaConsumerConfig = z.infer; diff --git a/packages/commons/src/config/kafkaConfig.ts b/packages/commons/src/config/kafkaConfig.ts index e30ff5c09b..84a6de802d 100644 --- a/packages/commons/src/config/kafkaConfig.ts +++ b/packages/commons/src/config/kafkaConfig.ts @@ -14,6 +14,7 @@ export const KafkaConfig = z .number() .default(20) .transform((n) => n * 1000), + KAFKA_BROKER_CONNECTION_STRING: z.string().optional(), }) .and(AWSConfig) .transform((c) => ({ @@ -23,5 +24,6 @@ export const KafkaConfig = z kafkaDisableAwsIamAuth: c.KAFKA_DISABLE_AWS_IAM_AUTH === "true", kafkaLogLevel: logLevel[c.KAFKA_LOG_LEVEL], kafkaReauthenticationThreshold: c.KAFKA_REAUTHENTICATION_THRESHOLD, + kafkaBrokerConnectionString: c.KAFKA_BROKER_CONNECTION_STRING, })); export type KafkaConfig = z.infer; diff --git a/packages/commons/src/config/producerServiceConfig.ts b/packages/commons/src/config/producerServiceConfig.ts index c08b71e9d3..dd6f7686ab 100644 --- a/packages/commons/src/config/producerServiceConfig.ts +++ b/packages/commons/src/config/producerServiceConfig.ts @@ -14,6 +14,7 @@ export const KafkaProducerConfig = AWSConfig.and( .number() .default(20) .transform((n) => n * 1000), + PRODUCER_KAFKA_BROKER_CONNECTION_STRING: z.string().optional(), }) ).transform((c) => ({ awsRegion: c.awsRegion, @@ -24,5 +25,7 @@ export const KafkaProducerConfig = AWSConfig.and( producerKafkaLogLevel: logLevel[c.PRODUCER_KAFKA_LOG_LEVEL], producerKafkaReauthenticationThreshold: c.PRODUCER_KAFKA_REAUTHENTICATION_THRESHOLD, + producerKafkaBrokerConnectionString: + c.PRODUCER_KAFKA_BROKER_CONNECTION_STRING, })); export type KafkaProducerConfig = z.infer; diff --git a/packages/commons/src/context/index.ts b/packages/commons/src/context/index.ts index 45f75471b1..7e4ff8f41c 100644 --- a/packages/commons/src/context/index.ts +++ b/packages/commons/src/context/index.ts @@ -1 +1,2 @@ export * from "./context.js"; +export * from "./interopHeaders.js"; diff --git a/packages/commons/src/context/interopHeaders.ts b/packages/commons/src/context/interopHeaders.ts new file mode 100644 index 0000000000..b691237764 --- /dev/null +++ b/packages/commons/src/context/interopHeaders.ts @@ -0,0 +1,19 @@ +import { z } from "zod"; + +export const InteropHeaders = z.object({ + "X-Correlation-Id": z.string(), + Authorization: z.string(), +}); + +export type InteropHeaders = z.infer; + +export const getInteropHeaders = ({ + token, + correlationId, +}: { + token: string; + correlationId: string; +}): InteropHeaders => ({ + "X-Correlation-Id": correlationId, + Authorization: `Bearer ${token}`, +}); diff --git a/packages/eservice-descriptors-archiver/package.json b/packages/eservice-descriptors-archiver/package.json index 29953d5feb..94ca71141d 100644 --- a/packages/eservice-descriptors-archiver/package.json +++ b/packages/eservice-descriptors-archiver/package.json @@ -23,7 +23,6 @@ "@pagopa/eslint-config": "3.0.0", "@types/node": "20.14.6", "@types/uuid": "9.0.8", - "openapi-endpoint-trimmer": "2.0.0", "openapi-zod-client": "1.18.1", "prettier": "2.8.8", "testcontainers": "10.9.0", diff --git a/packages/kafka-iam-auth/src/index.ts b/packages/kafka-iam-auth/src/index.ts index 8f31c0c4b9..75272e53e1 100644 --- a/packages/kafka-iam-auth/src/index.ts +++ b/packages/kafka-iam-auth/src/index.ts @@ -111,6 +111,28 @@ const kafkaCommitMessageOffsets = async ( ); }; +export async function resetPartitionsOffsets( + topics: string[], + kafka: Kafka, + consumer: Consumer +): Promise { + const admin = kafka.admin(); + + await admin.connect(); + + const fetchedTopics = await admin.fetchTopicMetadata({ topics }); + fetchedTopics.topics.forEach((t) => + t.partitions.forEach((p) => + consumer.seek({ + topic: t.name, + partition: p.partitionId, + offset: "-2", + }) + ) + ); + await admin.disconnect(); +} + async function oauthBearerTokenProvider( region: string, logger: Logger @@ -131,17 +153,33 @@ async function oauthBearerTokenProvider( } const initKafka = (config: InteropKafkaConfig): Kafka => { - const kafkaConfig: KafkaConfig = config.kafkaDisableAwsIamAuth + const commonConfigProps = { + clientId: config.kafkaClientId, + brokers: config.kafkaBrokers, + logLevel: config.kafkaLogLevel, + }; + + const connectionStringKafkaConfig: KafkaConfig | undefined = + config.kafkaBrokerConnectionString + ? { + ...commonConfigProps, + reauthenticationThreshold: config.kafkaReauthenticationThreshold, + ssl: true, + sasl: { + mechanism: "plain", + username: "$ConnectionString", + password: config.kafkaBrokerConnectionString, + }, + } + : undefined; + + const iamAuthKafkaConfig: KafkaConfig = config.kafkaDisableAwsIamAuth ? { - clientId: config.kafkaClientId, - brokers: config.kafkaBrokers, - logLevel: config.kafkaLogLevel, + ...commonConfigProps, ssl: false, } : { - clientId: config.kafkaClientId, - brokers: config.kafkaBrokers, - logLevel: config.kafkaLogLevel, + ...commonConfigProps, reauthenticationThreshold: config.kafkaReauthenticationThreshold, ssl: true, sasl: { @@ -151,6 +189,15 @@ const initKafka = (config: InteropKafkaConfig): Kafka => { }, }; + if (connectionStringKafkaConfig) { + genericLogger.warn( + "Using connection string mechanism for Kafka Broker authentication - this will override other mechanisms. If that is not desired, remove Kafka broker connection string from env variables." + ); + } + + const kafkaConfig: KafkaConfig = + connectionStringKafkaConfig ?? iamAuthKafkaConfig; + return new Kafka({ ...kafkaConfig, logCreator: @@ -207,6 +254,10 @@ const initConsumer = async ( }, }); + if (config.resetConsumerOffsets) { + await resetPartitionsOffsets(topics, kafka, consumer); + } + consumerKafkaEventsListener(consumer); errorEventsListener(consumer); @@ -262,6 +313,7 @@ export const initProducer = async ( kafkaReauthenticationThreshold: config.producerKafkaReauthenticationThreshold, awsRegion: config.awsRegion, + kafkaBrokerConnectionString: config.producerKafkaBrokerConnectionString, }); const producer = kafka.producer({ diff --git a/packages/selfcare-onboarding-consumer/.env b/packages/selfcare-onboarding-consumer/.env new file mode 100644 index 0000000000..ed0fe628e1 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/.env @@ -0,0 +1,25 @@ +LOG_LEVEL=info +KAFKA_LOG_LEVEL=INFO + +KAFKA_CLIENT_ID="selfcare-onboarding-consumer-client" +KAFKA_GROUP_ID="selfcare-onboarding-consumer-group-local" +KAFKA_BROKERS="localhost:9092" +KAFKA_BROKER_CONNECTION_STRING="placeholder" +SELFCARE_TOPIC="placeholder.selfcare.topic" +TOPIC_STARTING_OFFSET="earliest" +AWS_REGION="eu-central-1" + +RESET_CONSUMER_OFFSETS="false" + +INTEROP_PRODUCT="interop-product-placeholder" +ALLOWED_ORIGINS="IPA" + +TENANT_PROCESS_URL="http://localhost:3500" + +INTERNAL_JWT_KID=ffcc9b5b-4612-49b1-9374-9d203a3834f2 +INTERNAL_JWT_SUBJECT="dev-refactor.interop-selfcare-onboarding-consumer" +INTERNAL_JWT_ISSUER="dev-refactor.interop.pagopa.it" +INTERNAL_JWT_AUDIENCE="refactor.dev.interop.pagopa.it/internal" +INTERNAL_JWT_SECONDS_DURATION=60 + +AWS_CONFIG_FILE=aws.config.local diff --git a/packages/selfcare-onboarding-consumer/Dockerfile b/packages/selfcare-onboarding-consumer/Dockerfile new file mode 100644 index 0000000000..8c1c3eec88 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/Dockerfile @@ -0,0 +1,47 @@ +FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as build + +RUN corepack enable + +WORKDIR /app +COPY package.json /app/ +COPY pnpm-lock.yaml /app/ +COPY pnpm-workspace.yaml /app/ + +COPY ./packages/selfcare-onboarding-consumer/package.json /app/packages/selfcare-onboarding-consumer/package.json +COPY ./packages/commons/package.json /app/packages/commons/package.json +COPY ./packages/models/package.json /app/packages/models/package.json +COPY ./packages/kafka-iam-auth/package.json /app/packages/kafka-iam-auth/package.json +COPY ./packages/api-clients/package.json /app/packages/api-clients/package.json + +RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile + +COPY tsconfig.json /app/ +COPY turbo.json /app/ +COPY ./packages/selfcare-onboarding-consumer /app/packages/selfcare-onboarding-consumer +COPY ./packages/commons /app/packages/commons +COPY ./packages/models /app/packages/models +COPY ./packages/kafka-iam-auth /app/packages/kafka-iam-auth +COPY ./packages/api-clients /app/packages/api-clients + +RUN pnpm build && \ + rm -rf /app/node_modules/.modules.yaml && \ + rm -rf /app/node_modules/.cache && \ + mkdir /out && \ + cp -a --parents -t /out \ + node_modules packages/selfcare-onboarding-consumer/node_modules \ + package*.json packages/selfcare-onboarding-consumer/package*.json \ + packages/commons \ + packages/models \ + packages/kafka-iam-auth \ + packages/api-clients \ + packages/selfcare-onboarding-consumer/dist && \ + find /out -exec touch -h --date=@0 {} \; + +FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as final + +COPY --from=build /out /app + +WORKDIR /app/packages/selfcare-onboarding-consumer +EXPOSE 3000 + +CMD ["node", "."] diff --git a/packages/selfcare-onboarding-consumer/README.md b/packages/selfcare-onboarding-consumer/README.md new file mode 100644 index 0000000000..79ac96051f --- /dev/null +++ b/packages/selfcare-onboarding-consumer/README.md @@ -0,0 +1,3 @@ +# Selfcare Onboarding Consumer + +This service imports tenants that have joined Interop through Selfcare. diff --git a/packages/selfcare-onboarding-consumer/aws.config.local b/packages/selfcare-onboarding-consumer/aws.config.local new file mode 100644 index 0000000000..9c57dc948e --- /dev/null +++ b/packages/selfcare-onboarding-consumer/aws.config.local @@ -0,0 +1,9 @@ +[default] +aws_access_key_id=testawskey +aws_secret_access_key=testawssecret +region=eu-central-1 +services=local + +[services local] +kms= + endpoint_url=http://localhost:4566 diff --git a/packages/selfcare-onboarding-consumer/package.json b/packages/selfcare-onboarding-consumer/package.json new file mode 100644 index 0000000000..eaf9394332 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/package.json @@ -0,0 +1,43 @@ +{ + "name": "pagopa-interop-selfcare-onboarding-consumer", + "private": true, + "version": "1.0.0", + "description": "PagoPA Interoperability Selfcare onboarding consumer: service that imports tenants that have joined Interop through Selfcare", + "main": "dist", + "type": "module", + "scripts": { + "test": "vitest", + "test:it": "vitest integration", + "lint": "eslint . --ext .ts,.tsx", + "lint:autofix": "eslint . --ext .ts,.tsx --fix", + "format:check": "prettier --check src", + "format:write": "prettier --write src", + "start": "node --loader ts-node/esm -r 'dotenv-flow/config' --watch ./src/index.ts", + "build": "tsc", + "check": "tsc --project tsconfig.check.json" + }, + "keywords": [], + "author": "", + "license": "Apache-2.0", + "devDependencies": { + "@pagopa/eslint-config": "3.0.0", + "@types/node": "20.14.6", + "@types/uuid": "9.0.8", + "prettier": "2.8.8", + "ts-node": "10.9.2", + "typescript": "5.4.5", + "vitest": "1.6.0" + }, + "dependencies": { + "dotenv-flow": "4.1.0", + "kafka-iam-auth": "workspace:*", + "kafkajs": "2.2.4", + "pagopa-interop-commons": "workspace:*", + "pagopa-interop-commons-test": "workspace:*", + "pagopa-interop-models": "workspace:*", + "pagopa-interop-api-clients": "workspace:*", + "ts-pattern": "5.2.0", + "uuid": "10.0.0", + "zod": "3.23.8" + } +} diff --git a/packages/selfcare-onboarding-consumer/src/clients/tenantProcessClient.ts b/packages/selfcare-onboarding-consumer/src/clients/tenantProcessClient.ts new file mode 100644 index 0000000000..96a3dbdad7 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/clients/tenantProcessClient.ts @@ -0,0 +1,11 @@ +import { tenantApi } from "pagopa-interop-api-clients"; + +export type TenantProcessClient = { + selfcare: ReturnType; +}; + +export const tenantProcessClientBuilder = ( + url: string +): TenantProcessClient => ({ + selfcare: tenantApi.createSelfcareApiClient(url), +}); diff --git a/packages/selfcare-onboarding-consumer/src/config/config.ts b/packages/selfcare-onboarding-consumer/src/config/config.ts new file mode 100644 index 0000000000..a5b57e22f3 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/config/config.ts @@ -0,0 +1,32 @@ +import { z } from "zod"; +import { + KafkaConsumerConfig, + TokenGenerationConfig, +} from "pagopa-interop-commons"; + +export const SelfcareOnboardingConsumerConfig = KafkaConsumerConfig.and( + TokenGenerationConfig +).and( + z + .object({ + SELFCARE_TOPIC: z.string(), + + INTEROP_PRODUCT: z.string(), + ALLOWED_ORIGINS: z.string(), + + TENANT_PROCESS_URL: z.string(), + }) + .transform((c) => ({ + selfcareTopic: c.SELFCARE_TOPIC, + interopProduct: c.INTEROP_PRODUCT, + allowedOrigins: c.ALLOWED_ORIGINS.split(","), + tenantProcessUrl: c.TENANT_PROCESS_URL, + })) +); + +export type SelfcareOnboardingConsumerConfig = z.infer< + typeof SelfcareOnboardingConsumerConfig +>; + +export const config: SelfcareOnboardingConsumerConfig = + SelfcareOnboardingConsumerConfig.parse(process.env); diff --git a/packages/selfcare-onboarding-consumer/src/index.ts b/packages/selfcare-onboarding-consumer/src/index.ts new file mode 100644 index 0000000000..48d9d57a7a --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/index.ts @@ -0,0 +1,23 @@ +import { runConsumer } from "kafka-iam-auth"; +import { + InteropTokenGenerator, + RefreshableInteropToken, +} from "pagopa-interop-commons"; +import { config } from "./config/config.js"; +import { tenantProcessClientBuilder } from "./clients/tenantProcessClient.js"; +import { selfcareOnboardingProcessorBuilder } from "./services/selfcareOnboardingProcessor.js"; + +const tokenGenerator = new InteropTokenGenerator(config); +const refreshableToken = new RefreshableInteropToken(tokenGenerator); +await refreshableToken.init(); + +const tenantProcessClient = tenantProcessClientBuilder(config.tenantProcessUrl); + +const processor = selfcareOnboardingProcessorBuilder( + refreshableToken, + tenantProcessClient, + config.interopProduct, + config.allowedOrigins +); + +await runConsumer(config, [config.selfcareTopic], processor.processMessage); diff --git a/packages/selfcare-onboarding-consumer/src/model/constants.ts b/packages/selfcare-onboarding-consumer/src/model/constants.ts new file mode 100644 index 0000000000..ce9d1157ba --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/model/constants.ts @@ -0,0 +1 @@ +export const ORIGIN_IPA = "IPA"; diff --git a/packages/selfcare-onboarding-consumer/src/model/institutionEvent.ts b/packages/selfcare-onboarding-consumer/src/model/institutionEvent.ts new file mode 100644 index 0000000000..8c425279cf --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/model/institutionEvent.ts @@ -0,0 +1,25 @@ +import { z } from "zod"; + +const SubUnitType = z.enum(["AOO", "UO"]); +type SubUnitType = z.infer; + +const InstitutionEvent = z.object({ + description: z.string().trim().min(1), + origin: z.string().trim().min(1), + originId: z.string().trim().min(1), + taxCode: z.string().trim().min(1).nullish(), + subUnitCode: z.string().optional().nullish(), // AOO/UO ID + subUnitType: SubUnitType.optional().nullish(), + digitalAddress: z.string().trim().min(1), +}); +export type InstitutionEvent = z.infer; + +export const InstitutionEventPayload = z.object({ + id: z.string(), + internalIstitutionID: z.string().trim().min(1), // Selfcare ID + product: z.string().trim().min(1), + onboardingTokenId: z.string(), + institution: InstitutionEvent, + createdAt: z.string(), +}); +export type InstitutionEventPayload = z.infer; diff --git a/packages/selfcare-onboarding-consumer/src/services/selfcareOnboardingProcessor.ts b/packages/selfcare-onboarding-consumer/src/services/selfcareOnboardingProcessor.ts new file mode 100644 index 0000000000..bf2511c3cd --- /dev/null +++ b/packages/selfcare-onboarding-consumer/src/services/selfcareOnboardingProcessor.ts @@ -0,0 +1,106 @@ +import { + getInteropHeaders, + logger, + RefreshableInteropToken, +} from "pagopa-interop-commons"; +import { EachMessagePayload } from "kafkajs"; +import { v4 as uuidv4 } from "uuid"; +import { tenantApi } from "pagopa-interop-api-clients"; +import { genericInternalError } from "pagopa-interop-models"; +import { TenantProcessClient } from "../clients/tenantProcessClient.js"; +import { InstitutionEventPayload } from "../model/institutionEvent.js"; +import { ORIGIN_IPA } from "../model/constants.js"; + +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export function selfcareOnboardingProcessorBuilder( + refreshableToken: RefreshableInteropToken, + tenantProcessClient: TenantProcessClient, + productName: string, + allowedOrigins: string[] +) { + return { + async processMessage({ + message, + partition, + }: EachMessagePayload): Promise { + const correlationId = uuidv4(); + + const loggerInstance = logger({ + serviceName: "selfcare-onboarding-consumer", + correlationId, + }); + + try { + loggerInstance.info( + `Consuming message for partition ${partition} with offset ${message.offset}` + ); + + if (!message.value) { + loggerInstance.warn( + `Empty message for partition ${partition} with offset ${message.offset}` + ); + return; + } + + const stringPayload = message.value.toString(); + const jsonPayload = JSON.parse(stringPayload); + + // Process only messages of our product + // Note: doing this before parsing to avoid errors on messages of other products + if (jsonPayload.product !== productName) { + loggerInstance.info( + `Skipping message for partition ${partition} with offset ${message.offset} - Not required product: ${jsonPayload.product}` + ); + return; + } + + const eventPayload = InstitutionEventPayload.parse(jsonPayload); + + const institution = eventPayload.institution; + if (!allowedOrigins.includes(institution.origin)) { + loggerInstance.warn( + `Skipping message for partition ${partition} with offset ${message.offset} - Not allowed origin. SelfcareId: ${eventPayload.internalIstitutionID} Origin: ${institution.origin} OriginId: ${institution.originId}` + ); + return; + } + + const externalIdValue = + institution.origin === ORIGIN_IPA + ? institution.subUnitCode || institution.originId + : institution.taxCode || institution.originId; + + const seed: tenantApi.SelfcareTenantSeed = { + externalId: { + origin: institution.origin, + value: externalIdValue, + }, + selfcareId: eventPayload.internalIstitutionID, + name: institution.description, + onboardedAt: eventPayload.createdAt, + digitalAddress: { + kind: tenantApi.MailKind.Values.DIGITAL_ADDRESS, + description: "Domicilio digitale", + address: institution.digitalAddress, + }, + subUnitType: institution.subUnitType || undefined, + }; + + const token = (await refreshableToken.get()).serialized; + + const headers = getInteropHeaders({ token, correlationId }); + + await tenantProcessClient.selfcare.selfcareUpsertTenant(seed, { + headers, + }); + + loggerInstance.info( + `Message in partition ${partition} with offset ${message.offset} correctly consumed. SelfcareId: ${eventPayload.internalIstitutionID} Origin: ${institution.origin} OriginId: ${institution.originId}` + ); + } catch (err) { + throw genericInternalError( + `Error consuming message in partition ${partition} with offset ${message.offset}. Reason: ${err}` + ); + } + }, + }; +} diff --git a/packages/selfcare-onboarding-consumer/test/selfcareOnboardingProcessor.test.ts b/packages/selfcare-onboarding-consumer/test/selfcareOnboardingProcessor.test.ts new file mode 100644 index 0000000000..038d33ecbc --- /dev/null +++ b/packages/selfcare-onboarding-consumer/test/selfcareOnboardingProcessor.test.ts @@ -0,0 +1,374 @@ +/* eslint-disable prefer-const */ +/* eslint-disable functional/no-let */ +import { + vi, + afterEach, + beforeAll, + beforeEach, + describe, + MockInstance, + it, + expect, +} from "vitest"; +import { + getInteropHeaders, + InteropTokenGenerator, + RefreshableInteropToken, +} from "pagopa-interop-commons"; +import { EachMessagePayload } from "kafkajs"; +import { selfcareOnboardingProcessorBuilder } from "../src/services/selfcareOnboardingProcessor.js"; +import { + TenantProcessClient, + tenantProcessClientBuilder, +} from "../src/clients/tenantProcessClient.js"; +import { config } from "../src/config/config.js"; +import { + allowedOrigins, + correctEventPayload, + correctInstitutionEventField, + generateInternalTokenMock, + interopProductName, + interopToken, + kafkaMessagePayload, + selfcareUpsertTenantMock, + uuidRegexp, +} from "./utils.js"; + +describe("Message processor", () => { + let tenantProcessClientMock: TenantProcessClient = tenantProcessClientBuilder( + config.tenantProcessUrl + ); + let tokenGeneratorMock = new InteropTokenGenerator(config); + let refreshableTokenMock = new RefreshableInteropToken(tokenGeneratorMock); + let selfcareOnboardingProcessor: ReturnType< + typeof selfcareOnboardingProcessorBuilder + >; + + beforeAll(async () => { + selfcareOnboardingProcessor = selfcareOnboardingProcessorBuilder( + refreshableTokenMock, + tenantProcessClientMock, + interopProductName, + allowedOrigins + ); + }); + + let refreshableInternalTokenSpy: MockInstance; + let selfcareUpsertTenantSpy: MockInstance; + + beforeEach(() => { + vi.spyOn(tokenGeneratorMock, "generateInternalToken").mockImplementation( + generateInternalTokenMock + ); + refreshableInternalTokenSpy = vi + .spyOn(refreshableTokenMock, "get") + .mockImplementation(generateInternalTokenMock); + + selfcareUpsertTenantSpy = vi + .spyOn(tenantProcessClientMock.selfcare, "selfcareUpsertTenant") + .mockImplementation(selfcareUpsertTenantMock); + }); + + afterEach(async () => { + vi.clearAllMocks(); + }); + + it("should skip empty message", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { ...kafkaMessagePayload.message, value: null }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(0); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(0); + }); + + it("should throw an error if message is malformed", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from('{ not-a : "correct-json"'), + }, + }; + + await expect(() => + selfcareOnboardingProcessor.processMessage(message) + ).rejects.toThrowError(/Error.*partition.*offset.*Reason/); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(0); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(0); + }); + + it("should skip message not containing required product", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ ...correctEventPayload, product: "another-product" }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(0); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(0); + }); + + it("should throw an error if message has unexpected schema", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + `{ "product" : "${interopProductName}", "this-schema" : "was-unexpected" }` + ), + }, + }; + + await expect(() => + selfcareOnboardingProcessor.processMessage(message) + ).rejects.toThrowError(/Error.*partition.*offset.*Reason/); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(0); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(0); + }); + + it("should upsert tenant on correct message", async () => { + const message = kafkaMessagePayload; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + }); + + it("should upsert PA tenant - Main institution", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "IPA", + originId: "ipa_123", + subUnitType: null, + subUnitCode: null, + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toHaveBeenCalledWith( + expect.objectContaining({ + externalId: { origin: "IPA", value: "ipa_123" }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctInstitutionEventField.description, + }), + expect.objectContaining({ + headers: getInteropHeaders({ + token: interopToken.serialized, + correlationId: expect.stringMatching(uuidRegexp), + }), + }) + ); + }); + + it("should upsert PA tenant - AOO/UO", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "IPA", + originId: "ipa_123", + subUnitType: "AOO", + subUnitCode: "AOO_456", + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toHaveBeenCalledWith( + expect.objectContaining({ + externalId: { origin: "IPA", value: "AOO_456" }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctInstitutionEventField.description, + }), + expect.objectContaining({ + headers: getInteropHeaders({ + token: interopToken.serialized, + correlationId: expect.stringMatching(uuidRegexp), + }), + }) + ); + }); + + it("should upsert non-PA tenant with allowed origin", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "ANAC", + originId: "ipa_123", + taxCode: "tax789", + subUnitType: null, + subUnitCode: null, + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toHaveBeenCalledWith( + expect.objectContaining({ + externalId: { origin: "ANAC", value: "tax789" }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctInstitutionEventField.description, + }), + expect.objectContaining({ + headers: getInteropHeaders({ + token: interopToken.serialized, + correlationId: expect.stringMatching(uuidRegexp), + }), + }) + ); + }); + + it("should upsert non-PA tenant with missing tax code", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "ANAC", + originId: "anac_123", + taxCode: undefined, + subUnitType: null, + subUnitCode: null, + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toHaveBeenCalledWith( + expect.objectContaining({ + externalId: { origin: "ANAC", value: "anac_123" }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctInstitutionEventField.description, + }), + expect.objectContaining({ + headers: getInteropHeaders({ + token: interopToken.serialized, + correlationId: expect.stringMatching(uuidRegexp), + }), + }) + ); + }); + + it("should upsert non-PA tenant with null tax code", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "ANAC", + originId: "anac_123", + taxCode: null, + subUnitType: null, + subUnitCode: null, + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(1); + expect(selfcareUpsertTenantSpy).toHaveBeenCalledWith( + expect.objectContaining({ + externalId: { origin: "ANAC", value: "anac_123" }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctInstitutionEventField.description, + }), + expect.objectContaining({ + headers: getInteropHeaders({ + token: interopToken.serialized, + correlationId: expect.stringMatching(uuidRegexp), + }), + }) + ); + }); + + it("should skip upsert of tenant with not allowed origin", async () => { + const message: EachMessagePayload = { + ...kafkaMessagePayload, + message: { + ...kafkaMessagePayload.message, + value: Buffer.from( + JSON.stringify({ + ...correctEventPayload, + institution: { + ...correctInstitutionEventField, + origin: "not-allowed", + originId: "ipa_123", + taxCode: "tax789", + subUnitType: null, + subUnitCode: null, + }, + }) + ), + }, + }; + + await selfcareOnboardingProcessor.processMessage(message); + + expect(refreshableInternalTokenSpy).toBeCalledTimes(0); + expect(selfcareUpsertTenantSpy).toBeCalledTimes(0); + }); +}); diff --git a/packages/selfcare-onboarding-consumer/test/tsconfig.json b/packages/selfcare-onboarding-consumer/test/tsconfig.json new file mode 100644 index 0000000000..379a994d81 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/test/tsconfig.json @@ -0,0 +1,4 @@ +{ + "extends": "../tsconfig.json", + "include": ["."] +} diff --git a/packages/selfcare-onboarding-consumer/test/utils.ts b/packages/selfcare-onboarding-consumer/test/utils.ts new file mode 100644 index 0000000000..61afc889a3 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/test/utils.ts @@ -0,0 +1,101 @@ +/* eslint-disable @typescript-eslint/no-empty-function */ +import { EachMessagePayload } from "kafkajs"; +import { tenantApi } from "pagopa-interop-api-clients"; +import { InteropToken } from "pagopa-interop-commons"; + +export const interopProductName = "test-interop-product"; +export const allowedOrigins = ["IPA", "ANAC", "IVASS"]; + +export const selfcareUpsertTenantMock = (): Promise => + Promise.resolve({ id: "tenant-id" }); + +export const correctInstitutionEventField = { + institutionType: "PA", + description: "Somewhere", + digitalAddress: "somewhere@wonderland", + address: "123 Street", + taxCode: "12345678987", + origin: "IPA", + originId: "ipa_code", + zipCode: "12345", + paymentServiceProvider: null, + istatCode: "123456", + city: "somewhere", + country: "wonderland", + county: "WL", + subUnitCode: null, + subUnitType: null, + rootParent: { + id: null, + description: null, + }, +}; + +export const correctEventPayload = { + id: "cfb4f57f-8d93-4e30-8c87-37a29c3c6dac", + internalIstitutionID: "b730fbb7-fffe-4090-a3ea-53ee7e07a4b9", + product: interopProductName, + state: "ACTIVE", + fileName: "", + contentType: "application/json", + onboardingTokenId: "8e73950f-b51d-46df-92a1-057907f2cb98", + institution: correctInstitutionEventField, + billing: { + vatNumber: "12345678987", + recipientCode: "11111", + publicServices: false, + }, + createdAt: "2023-08-04T09:08:09.723118Z", + updatedAt: "2023-08-04T09:08:09.723137Z", + notificationType: "ADD", +}; + +export const kafkaMessagePayload: EachMessagePayload = { + topic: "kafka-test-topic", + partition: 0, + message: { + key: Buffer.from("kafka-message-key"), + value: Buffer.from(JSON.stringify(correctEventPayload)), + timestamp: "0", + attributes: 0, + offset: "10", + size: 100, + }, + heartbeat: async () => {}, + pause: () => () => {}, +}; + +export const selfcareUpsertTenantSeed = { + externalId: { + origin: correctEventPayload.institution.origin, + value: correctEventPayload.institution.originId, + }, + selfcareId: correctEventPayload.internalIstitutionID, + name: correctEventPayload.institution.description, +}; + +export const generateInternalTokenMock = (): Promise => + Promise.resolve(interopToken); + +export const interopToken: InteropToken = { + header: { + alg: "algorithm", + use: "use", + typ: "type", + kid: "key-id", + }, + payload: { + jti: "token-id", + iss: "issuer", + aud: ["audience1"], + sub: "subject", + iat: 0, + nbf: 0, + exp: 10, + role: "role1", + }, + serialized: "the-token", +}; + +export const uuidRegexp = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/; diff --git a/packages/selfcare-onboarding-consumer/test/vitestGlobalSetup.ts b/packages/selfcare-onboarding-consumer/test/vitestGlobalSetup.ts new file mode 100644 index 0000000000..32972b5c32 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/test/vitestGlobalSetup.ts @@ -0,0 +1,3 @@ +import { setupTestContainersVitestGlobal } from "pagopa-interop-commons-test"; + +export default setupTestContainersVitestGlobal(); diff --git a/packages/selfcare-onboarding-consumer/tsconfig.check.json b/packages/selfcare-onboarding-consumer/tsconfig.check.json new file mode 100644 index 0000000000..a19f84bcb7 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/tsconfig.check.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "noEmit": true, + }, + "include": ["src", "test"] +} diff --git a/packages/selfcare-onboarding-consumer/tsconfig.json b/packages/selfcare-onboarding-consumer/tsconfig.json new file mode 100644 index 0000000000..039e0b4d16 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/tsconfig.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": ["src"] +} diff --git a/packages/selfcare-onboarding-consumer/vitest.config.ts b/packages/selfcare-onboarding-consumer/vitest.config.ts new file mode 100644 index 0000000000..9ece1be991 --- /dev/null +++ b/packages/selfcare-onboarding-consumer/vitest.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + globalSetup: ["./test/vitestGlobalSetup.ts"], + testTimeout: 60000, + hookTimeout: 60000, + fileParallelism: false, + pool: "forks", + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8d26da241f..05f7bf7b8c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1371,9 +1371,6 @@ importers: '@types/uuid': specifier: 9.0.8 version: 9.0.8 - openapi-endpoint-trimmer: - specifier: 2.0.0 - version: 2.0.0 openapi-zod-client: specifier: 1.18.1 version: 1.18.1 @@ -2025,6 +2022,61 @@ importers: specifier: 1.6.0 version: 1.6.0(@types/node@20.14.6) + packages/selfcare-onboarding-consumer: + dependencies: + dotenv-flow: + specifier: 4.1.0 + version: 4.1.0 + kafka-iam-auth: + specifier: workspace:* + version: link:../kafka-iam-auth + kafkajs: + specifier: 2.2.4 + version: 2.2.4 + pagopa-interop-api-clients: + specifier: workspace:* + version: link:../api-clients + pagopa-interop-commons: + specifier: workspace:* + version: link:../commons + pagopa-interop-commons-test: + specifier: workspace:* + version: link:../commons-test + pagopa-interop-models: + specifier: workspace:* + version: link:../models + ts-pattern: + specifier: 5.2.0 + version: 5.2.0 + uuid: + specifier: 10.0.0 + version: 10.0.0 + zod: + specifier: 3.23.8 + version: 3.23.8 + devDependencies: + '@pagopa/eslint-config': + specifier: 3.0.0 + version: 3.0.0(typescript@5.4.5) + '@types/node': + specifier: 20.14.6 + version: 20.14.6 + '@types/uuid': + specifier: 9.0.8 + version: 9.0.8 + prettier: + specifier: 2.8.8 + version: 2.8.8 + ts-node: + specifier: 10.9.2 + version: 10.9.2(@types/node@20.14.6)(typescript@5.4.5) + typescript: + specifier: 5.4.5 + version: 5.4.5 + vitest: + specifier: 1.6.0 + version: 1.6.0(@types/node@20.14.6) + packages/tenant-outbound-writer: dependencies: '@pagopa/interop-outbound-models': @@ -4468,11 +4520,6 @@ packages: engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0, npm: '>=6.14.13'} dev: true - /@fastify/busboy@2.1.1: - resolution: {integrity: sha512-vBZP4NlzfOlerQTnba4aqZoMhE/a9HY7HRqoOPaETQcSQuWEIyZMHGfVu6w9wGtGK5fED5qRs2DteVCjOH60sA==} - engines: {node: '>=14'} - dev: true - /@humanwhocodes/config-array@0.11.14: resolution: {integrity: sha512-3T8LkOmg45BV5FICb15QQMsyUSWrQ8AygVfC7ZG32zOalnqrilm018ZVCw0eapXux8FtA33q8PSRSstjee3jSg==} engines: {node: '>=10.10.0'} @@ -6367,11 +6414,6 @@ packages: ansi-styles: 4.3.0 supports-color: 7.2.0 - /chalk@5.3.0: - resolution: {integrity: sha512-dLitG79d+GV1Nb/VYcCDFivJeK1hiukt9QjRNVOsUtTy1rR1YJsmpGGTZ3qJos+uw7WmWF4wUwBd9jxjocFC2w==} - engines: {node: ^12.17.0 || ^14.13 || >=16.0.0} - dev: true - /check-error@1.0.3: resolution: {integrity: sha512-iKEoDYaRmd1mxM90a2OEfWhjsjPpYPuQ+lMYsoxB126+t8fw7ySEO48nmDg5COTjxDI65/Y2OWpeEHk3ZOe8zg==} dependencies: @@ -6454,11 +6496,6 @@ packages: dependencies: delayed-stream: 1.0.0 - /commander@10.0.1: - resolution: {integrity: sha512-y4Mg2tXshplEbSGzx7amzPwKKOCGuoSRP/CjEdwwk0FOGlUbq6lKuoyDZTNZkmxHdJtp54hdfY/JUrdL7Xfdug==} - engines: {node: '>=14'} - dev: true - /comment-parser@1.3.1: resolution: {integrity: sha512-B52sN2VNghyq5ofvUsqZjmk6YkihBX5vMSChmSK9v4ShjKf3Vk5Xcmgpw4o+iIgtrnM/u5FiMpz9VKb8lpBveA==} engines: {node: '>= 12.0.0'} @@ -8922,17 +8959,6 @@ packages: mimic-fn: 4.0.0 dev: true - /openapi-endpoint-trimmer@2.0.0: - resolution: {integrity: sha512-0eHcIbHkItgk9S8mHPMn8Zn+jsdBztc2KHEaUmJYdq70vzgDr6WF9k4npisb7LhUeH3GN2d6YmKy30Xzs7GXAQ==} - hasBin: true - dependencies: - chalk: 5.3.0 - commander: 10.0.1 - js-yaml: 4.1.0 - undici: 5.28.4 - zod: 3.23.8 - dev: true - /openapi-types@12.1.3: resolution: {integrity: sha512-N4YtSYJqghVu4iek2ZUvcN/0aqH1kRDuNqzcycDxhOUpg7GdvLa2F3DgS6yBNhInhv2r/6I0Flkn7CqL8+nIcw==} @@ -10409,13 +10435,6 @@ packages: /undici-types@5.26.5: resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} - /undici@5.28.4: - resolution: {integrity: sha512-72RFADWFqKmUb2hmmvNODKL3p9hcB6Gt2DOQMis1SEBaV6a4MH8soBvzg+95CYhCKPFedut2JY9bMfrDl9D23g==} - engines: {node: '>=14.0'} - dependencies: - '@fastify/busboy': 2.1.1 - dev: true - /universalify@2.0.1: resolution: {integrity: sha512-gptHNQghINnc/vTGIk0SOFGFNXw7JVrlRUtConJRlvaw6DuX0wO5Jeko9sWrMBhh+PsYAZ7oXAiOnf/UKogyiw==} engines: {node: '>= 10.0.0'}