Skip to content

Commit

Permalink
IMN-710 - selfcare-onboarding-consumer (#959)
Browse files Browse the repository at this point in the history
Co-authored-by: AsterITA <[email protected]>
  • Loading branch information
ecamellini and AsterITA authored Sep 24, 2024
1 parent 6cd859b commit 611d8ca
Show file tree
Hide file tree
Showing 27 changed files with 975 additions and 44 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
dockerfile_path: packages/tenant-outbound-writer
- image_name: compute-agreements-consumer
dockerfile_path: packages/compute-agreements-consumer
- image_name: selfcare-onboarding-consumer
dockerfile_path: packages/selfcare-onboarding-consumer
- image_name: producer-key-events-writer
dockerfile_path: packages/producer-key-events-writer
- image_name: producer-key-readmodel-writer
Expand Down
2 changes: 2 additions & 0 deletions packages/commons/src/config/consumerServiceConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ export const KafkaConsumerConfig = KafkaConfig.and(
TOPIC_STARTING_OFFSET: z
.union([z.literal("earliest"), z.literal("latest")])
.default("latest"),
RESET_CONSUMER_OFFSETS: z.string().default("false"),
})
.transform((c) => ({
kafkaGroupId: c.KAFKA_GROUP_ID,
topicStartingOffset: c.TOPIC_STARTING_OFFSET,
resetConsumerOffsets: c.RESET_CONSUMER_OFFSETS.toLowerCase() === "true",
}))
);
export type KafkaConsumerConfig = z.infer<typeof KafkaConsumerConfig>;
Expand Down
2 changes: 2 additions & 0 deletions packages/commons/src/config/kafkaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const KafkaConfig = z
.number()
.default(20)
.transform((n) => n * 1000),
KAFKA_BROKER_CONNECTION_STRING: z.string().optional(),
})
.and(AWSConfig)
.transform((c) => ({
Expand All @@ -23,5 +24,6 @@ export const KafkaConfig = z
kafkaDisableAwsIamAuth: c.KAFKA_DISABLE_AWS_IAM_AUTH === "true",
kafkaLogLevel: logLevel[c.KAFKA_LOG_LEVEL],
kafkaReauthenticationThreshold: c.KAFKA_REAUTHENTICATION_THRESHOLD,
kafkaBrokerConnectionString: c.KAFKA_BROKER_CONNECTION_STRING,
}));
export type KafkaConfig = z.infer<typeof KafkaConfig>;
3 changes: 3 additions & 0 deletions packages/commons/src/config/producerServiceConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const KafkaProducerConfig = AWSConfig.and(
.number()
.default(20)
.transform((n) => n * 1000),
PRODUCER_KAFKA_BROKER_CONNECTION_STRING: z.string().optional(),
})
).transform((c) => ({
awsRegion: c.awsRegion,
Expand All @@ -24,5 +25,7 @@ export const KafkaProducerConfig = AWSConfig.and(
producerKafkaLogLevel: logLevel[c.PRODUCER_KAFKA_LOG_LEVEL],
producerKafkaReauthenticationThreshold:
c.PRODUCER_KAFKA_REAUTHENTICATION_THRESHOLD,
producerKafkaBrokerConnectionString:
c.PRODUCER_KAFKA_BROKER_CONNECTION_STRING,
}));
export type KafkaProducerConfig = z.infer<typeof KafkaProducerConfig>;
1 change: 1 addition & 0 deletions packages/commons/src/context/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./context.js";
export * from "./interopHeaders.js";
19 changes: 19 additions & 0 deletions packages/commons/src/context/interopHeaders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { z } from "zod";

export const InteropHeaders = z.object({
"X-Correlation-Id": z.string(),
Authorization: z.string(),
});

export type InteropHeaders = z.infer<typeof InteropHeaders>;

export const getInteropHeaders = ({
token,
correlationId,
}: {
token: string;
correlationId: string;
}): InteropHeaders => ({
"X-Correlation-Id": correlationId,
Authorization: `Bearer ${token}`,
});
1 change: 0 additions & 1 deletion packages/eservice-descriptors-archiver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"@pagopa/eslint-config": "3.0.0",
"@types/node": "20.14.6",
"@types/uuid": "9.0.8",
"openapi-endpoint-trimmer": "2.0.0",
"openapi-zod-client": "1.18.1",
"prettier": "2.8.8",
"testcontainers": "10.9.0",
Expand Down
66 changes: 59 additions & 7 deletions packages/kafka-iam-auth/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,28 @@ const kafkaCommitMessageOffsets = async (
);
};

export async function resetPartitionsOffsets(
topics: string[],
kafka: Kafka,
consumer: Consumer
): Promise<void> {
const admin = kafka.admin();

await admin.connect();

const fetchedTopics = await admin.fetchTopicMetadata({ topics });
fetchedTopics.topics.forEach((t) =>
t.partitions.forEach((p) =>
consumer.seek({
topic: t.name,
partition: p.partitionId,
offset: "-2",
})
)
);
await admin.disconnect();
}

async function oauthBearerTokenProvider(
region: string,
logger: Logger
Expand All @@ -131,17 +153,33 @@ async function oauthBearerTokenProvider(
}

const initKafka = (config: InteropKafkaConfig): Kafka => {
const kafkaConfig: KafkaConfig = config.kafkaDisableAwsIamAuth
const commonConfigProps = {
clientId: config.kafkaClientId,
brokers: config.kafkaBrokers,
logLevel: config.kafkaLogLevel,
};

const connectionStringKafkaConfig: KafkaConfig | undefined =
config.kafkaBrokerConnectionString
? {
...commonConfigProps,
reauthenticationThreshold: config.kafkaReauthenticationThreshold,
ssl: true,
sasl: {
mechanism: "plain",
username: "$ConnectionString",
password: config.kafkaBrokerConnectionString,
},
}
: undefined;

const iamAuthKafkaConfig: KafkaConfig = config.kafkaDisableAwsIamAuth
? {
clientId: config.kafkaClientId,
brokers: config.kafkaBrokers,
logLevel: config.kafkaLogLevel,
...commonConfigProps,
ssl: false,
}
: {
clientId: config.kafkaClientId,
brokers: config.kafkaBrokers,
logLevel: config.kafkaLogLevel,
...commonConfigProps,
reauthenticationThreshold: config.kafkaReauthenticationThreshold,
ssl: true,
sasl: {
Expand All @@ -151,6 +189,15 @@ const initKafka = (config: InteropKafkaConfig): Kafka => {
},
};

if (connectionStringKafkaConfig) {
genericLogger.warn(
"Using connection string mechanism for Kafka Broker authentication - this will override other mechanisms. If that is not desired, remove Kafka broker connection string from env variables."
);
}

const kafkaConfig: KafkaConfig =
connectionStringKafkaConfig ?? iamAuthKafkaConfig;

return new Kafka({
...kafkaConfig,
logCreator:
Expand Down Expand Up @@ -207,6 +254,10 @@ const initConsumer = async (
},
});

if (config.resetConsumerOffsets) {
await resetPartitionsOffsets(topics, kafka, consumer);
}

consumerKafkaEventsListener(consumer);
errorEventsListener(consumer);

Expand Down Expand Up @@ -262,6 +313,7 @@ export const initProducer = async (
kafkaReauthenticationThreshold:
config.producerKafkaReauthenticationThreshold,
awsRegion: config.awsRegion,
kafkaBrokerConnectionString: config.producerKafkaBrokerConnectionString,
});

const producer = kafka.producer({
Expand Down
25 changes: 25 additions & 0 deletions packages/selfcare-onboarding-consumer/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
LOG_LEVEL=info
KAFKA_LOG_LEVEL=INFO

KAFKA_CLIENT_ID="selfcare-onboarding-consumer-client"
KAFKA_GROUP_ID="selfcare-onboarding-consumer-group-local"
KAFKA_BROKERS="localhost:9092"
KAFKA_BROKER_CONNECTION_STRING="placeholder"
SELFCARE_TOPIC="placeholder.selfcare.topic"
TOPIC_STARTING_OFFSET="earliest"
AWS_REGION="eu-central-1"

RESET_CONSUMER_OFFSETS="false"

INTEROP_PRODUCT="interop-product-placeholder"
ALLOWED_ORIGINS="IPA"

TENANT_PROCESS_URL="http://localhost:3500"

INTERNAL_JWT_KID=ffcc9b5b-4612-49b1-9374-9d203a3834f2
INTERNAL_JWT_SUBJECT="dev-refactor.interop-selfcare-onboarding-consumer"
INTERNAL_JWT_ISSUER="dev-refactor.interop.pagopa.it"
INTERNAL_JWT_AUDIENCE="refactor.dev.interop.pagopa.it/internal"
INTERNAL_JWT_SECONDS_DURATION=60

AWS_CONFIG_FILE=aws.config.local
47 changes: 47 additions & 0 deletions packages/selfcare-onboarding-consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as build

RUN corepack enable

WORKDIR /app
COPY package.json /app/
COPY pnpm-lock.yaml /app/
COPY pnpm-workspace.yaml /app/

COPY ./packages/selfcare-onboarding-consumer/package.json /app/packages/selfcare-onboarding-consumer/package.json
COPY ./packages/commons/package.json /app/packages/commons/package.json
COPY ./packages/models/package.json /app/packages/models/package.json
COPY ./packages/kafka-iam-auth/package.json /app/packages/kafka-iam-auth/package.json
COPY ./packages/api-clients/package.json /app/packages/api-clients/package.json

RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile

COPY tsconfig.json /app/
COPY turbo.json /app/
COPY ./packages/selfcare-onboarding-consumer /app/packages/selfcare-onboarding-consumer
COPY ./packages/commons /app/packages/commons
COPY ./packages/models /app/packages/models
COPY ./packages/kafka-iam-auth /app/packages/kafka-iam-auth
COPY ./packages/api-clients /app/packages/api-clients

RUN pnpm build && \
rm -rf /app/node_modules/.modules.yaml && \
rm -rf /app/node_modules/.cache && \
mkdir /out && \
cp -a --parents -t /out \
node_modules packages/selfcare-onboarding-consumer/node_modules \
package*.json packages/selfcare-onboarding-consumer/package*.json \
packages/commons \
packages/models \
packages/kafka-iam-auth \
packages/api-clients \
packages/selfcare-onboarding-consumer/dist && \
find /out -exec touch -h --date=@0 {} \;

FROM node:20.14.0-slim@sha256:5e8ac65a0231d76a388683d07ca36a9769ab019a85d85169fe28e206f7a3208e as final

COPY --from=build /out /app

WORKDIR /app/packages/selfcare-onboarding-consumer
EXPOSE 3000

CMD ["node", "."]
3 changes: 3 additions & 0 deletions packages/selfcare-onboarding-consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Selfcare Onboarding Consumer

This service imports tenants that have joined Interop through Selfcare.
9 changes: 9 additions & 0 deletions packages/selfcare-onboarding-consumer/aws.config.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[default]
aws_access_key_id=testawskey
aws_secret_access_key=testawssecret
region=eu-central-1
services=local

[services local]
kms=
endpoint_url=http://localhost:4566
43 changes: 43 additions & 0 deletions packages/selfcare-onboarding-consumer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"name": "pagopa-interop-selfcare-onboarding-consumer",
"private": true,
"version": "1.0.0",
"description": "PagoPA Interoperability Selfcare onboarding consumer: service that imports tenants that have joined Interop through Selfcare",
"main": "dist",
"type": "module",
"scripts": {
"test": "vitest",
"test:it": "vitest integration",
"lint": "eslint . --ext .ts,.tsx",
"lint:autofix": "eslint . --ext .ts,.tsx --fix",
"format:check": "prettier --check src",
"format:write": "prettier --write src",
"start": "node --loader ts-node/esm -r 'dotenv-flow/config' --watch ./src/index.ts",
"build": "tsc",
"check": "tsc --project tsconfig.check.json"
},
"keywords": [],
"author": "",
"license": "Apache-2.0",
"devDependencies": {
"@pagopa/eslint-config": "3.0.0",
"@types/node": "20.14.6",
"@types/uuid": "9.0.8",
"prettier": "2.8.8",
"ts-node": "10.9.2",
"typescript": "5.4.5",
"vitest": "1.6.0"
},
"dependencies": {
"dotenv-flow": "4.1.0",
"kafka-iam-auth": "workspace:*",
"kafkajs": "2.2.4",
"pagopa-interop-commons": "workspace:*",
"pagopa-interop-commons-test": "workspace:*",
"pagopa-interop-models": "workspace:*",
"pagopa-interop-api-clients": "workspace:*",
"ts-pattern": "5.2.0",
"uuid": "10.0.0",
"zod": "3.23.8"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { tenantApi } from "pagopa-interop-api-clients";

export type TenantProcessClient = {
selfcare: ReturnType<typeof tenantApi.createSelfcareApiClient>;
};

export const tenantProcessClientBuilder = (
url: string
): TenantProcessClient => ({
selfcare: tenantApi.createSelfcareApiClient(url),
});
32 changes: 32 additions & 0 deletions packages/selfcare-onboarding-consumer/src/config/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { z } from "zod";
import {
KafkaConsumerConfig,
TokenGenerationConfig,
} from "pagopa-interop-commons";

export const SelfcareOnboardingConsumerConfig = KafkaConsumerConfig.and(
TokenGenerationConfig
).and(
z
.object({
SELFCARE_TOPIC: z.string(),

INTEROP_PRODUCT: z.string(),
ALLOWED_ORIGINS: z.string(),

TENANT_PROCESS_URL: z.string(),
})
.transform((c) => ({
selfcareTopic: c.SELFCARE_TOPIC,
interopProduct: c.INTEROP_PRODUCT,
allowedOrigins: c.ALLOWED_ORIGINS.split(","),
tenantProcessUrl: c.TENANT_PROCESS_URL,
}))
);

export type SelfcareOnboardingConsumerConfig = z.infer<
typeof SelfcareOnboardingConsumerConfig
>;

export const config: SelfcareOnboardingConsumerConfig =
SelfcareOnboardingConsumerConfig.parse(process.env);
23 changes: 23 additions & 0 deletions packages/selfcare-onboarding-consumer/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { runConsumer } from "kafka-iam-auth";
import {
InteropTokenGenerator,
RefreshableInteropToken,
} from "pagopa-interop-commons";
import { config } from "./config/config.js";
import { tenantProcessClientBuilder } from "./clients/tenantProcessClient.js";
import { selfcareOnboardingProcessorBuilder } from "./services/selfcareOnboardingProcessor.js";

const tokenGenerator = new InteropTokenGenerator(config);
const refreshableToken = new RefreshableInteropToken(tokenGenerator);
await refreshableToken.init();

const tenantProcessClient = tenantProcessClientBuilder(config.tenantProcessUrl);

const processor = selfcareOnboardingProcessorBuilder(
refreshableToken,
tenantProcessClient,
config.interopProduct,
config.allowedOrigins
);

await runConsumer(config, [config.selfcareTopic], processor.processMessage);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const ORIGIN_IPA = "IPA";
Loading

0 comments on commit 611d8ca

Please sign in to comment.