From 1475242c46a9e97ffc0f745917bb03c5fc0dcdd3 Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Mon, 25 Nov 2024 11:17:31 +0200 Subject: [PATCH 1/6] adding config props --- docker-compose.yml | 2 ++ src/index.ts | 3 ++- src/openhim/mediatorConfig.json | 16 +++++++++++++--- src/openhim/openhim.ts | 4 ++++ 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1b59fdf..0f9c4f7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,6 +35,8 @@ services: mongo-db: container_name: mongo-db image: mongo:4.0 + ports: + - 27017:27017 networks: - openhim restart: unless-stopped diff --git a/src/index.ts b/src/index.ts index f1a4c22..1241493 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,6 @@ import express from 'express'; import path from 'path'; +import { readFile } from 'fs/promises'; import { getConfig } from './config/config'; import logger from './logger'; import routes from './routes/index'; @@ -14,7 +15,7 @@ app.use('/', routes); createMinioBucketListeners(); -app.listen(getConfig().port, () => { +app.listen(getConfig().port, async() => { logger.info(`Server is running on port - ${getConfig().port}`); if (getConfig().runningMode !== 'testing' && getConfig().registerMediator) { diff --git a/src/openhim/mediatorConfig.json b/src/openhim/mediatorConfig.json index 2d82cea..ace4a2e 100644 --- a/src/openhim/mediatorConfig.json +++ b/src/openhim/mediatorConfig.json @@ -10,7 +10,7 @@ "routes": [ { "name": "Climate Mediator Endpoint", - "host": "climate-mediator", + "host": "172.17.0.1", "port": "3000", "primary": true, "type": "http" @@ -28,12 +28,22 @@ "endpoints": [ { "name": "Climate Endpoint", - "host": "climate-mediator", + "host": "172.17.0.1", "path": "/test", "port": "3000", "primary": true, "type": "http" } ], - "configDefs": [] + "configDefs": [{ + "param": "minio_buckets_registry", + "displayName": "Minio Buckets Registry", + "description": "The available Minio buckets and their configurations", + "type": "struct", + "array": true, + "template":[{ + "param": "bucket", + "type": "string" + }] + }] } diff --git a/src/openhim/openhim.ts b/src/openhim/openhim.ts index 59d0019..d2f6887 100644 --- a/src/openhim/openhim.ts +++ b/src/openhim/openhim.ts @@ -53,6 +53,10 @@ export const setupMediator = (mediatorConfigFilePath: string) => { emitter.on('error', (err: Error) => { logger.error(`Heartbeat failed: ${JSON.stringify(err)}`); }); + + emitter.on('config', (config: any) => { + logger.info(`Config: ${JSON.stringify(config)}`); + }); }); }); } catch (err) { From 79a9245512e7dd6d5fa502cf0a74848e83949521 Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Mon, 25 Nov 2024 21:24:01 +0200 Subject: [PATCH 2/6] listening and fetching configs from OpenHIM --- package-lock.json | 56 ++++++++++++++++ package.json | 1 + src/index.ts | 24 ++++--- src/openhim/mediatorConfig.json | 30 ++++++--- src/openhim/openhim.ts | 94 +++++++++++++++++++++++++-- src/routes/index.ts | 58 +++++++++-------- src/utils/clickhouse.ts | 14 ++-- src/utils/minioClient.ts | 67 ++++++++++++------- tests/services/table-creation.test.ts | 5 +- 9 files changed, 264 insertions(+), 85 deletions(-) diff --git a/package-lock.json b/package-lock.json index d9ee5a8..b223fdf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "ISC", "dependencies": { "@clickhouse/client": "^1.8.0", + "axios": "^1.7.7", "dotenv": "^16.4.5", "express": "^4.18.2", "express-async-handler": "^1.2.0", @@ -2039,6 +2040,29 @@ "resolved": "https://registry.npmjs.org/aws4/-/aws4-1.13.2.tgz", "integrity": "sha512-lHe62zvbTB5eEABUVi/AwVh0ZKY9rMMDhmm+eeyuuUQbQ3+J+fONVQOZyj+DdrvD4BY33uYniyRJ4UJIaSKAfw==" }, + "node_modules/axios": { + "version": "1.7.7", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.7.tgz", + "integrity": "sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==", + "dependencies": { + "follow-redirects": "^1.15.6", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + } + }, + "node_modules/axios/node_modules/form-data": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.1.tgz", + "integrity": "sha512-tzN8e4TX8+kkxGPK8D5u0FNmjPUjw3lwC9lSLxxoB/+GtsJG91CO8bSWy73APlgAZzZbXEYZJuxjkHH2w+Ezhw==", + "dependencies": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + }, + "engines": { + "node": ">= 6" + } + }, "node_modules/babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -6721,6 +6745,11 @@ "node": ">= 0.10" } }, + "node_modules/proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "node_modules/psl": { "version": "1.9.0", "resolved": "https://registry.npmjs.org/psl/-/psl-1.9.0.tgz", @@ -9829,6 +9858,28 @@ "resolved": "https://registry.npmjs.org/aws4/-/aws4-1.13.2.tgz", "integrity": "sha512-lHe62zvbTB5eEABUVi/AwVh0ZKY9rMMDhmm+eeyuuUQbQ3+J+fONVQOZyj+DdrvD4BY33uYniyRJ4UJIaSKAfw==" }, + "axios": { + "version": "1.7.7", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.7.7.tgz", + "integrity": "sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==", + "requires": { + "follow-redirects": "^1.15.6", + "form-data": "^4.0.0", + "proxy-from-env": "^1.1.0" + }, + "dependencies": { + "form-data": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/form-data/-/form-data-4.0.1.tgz", + "integrity": "sha512-tzN8e4TX8+kkxGPK8D5u0FNmjPUjw3lwC9lSLxxoB/+GtsJG91CO8bSWy73APlgAZzZbXEYZJuxjkHH2w+Ezhw==", + "requires": { + "asynckit": "^0.4.0", + "combined-stream": "^1.0.8", + "mime-types": "^2.1.12" + } + } + } + }, "babel-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/babel-jest/-/babel-jest-29.7.0.tgz", @@ -13282,6 +13333,11 @@ "ipaddr.js": "1.9.1" } }, + "proxy-from-env": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", + "integrity": "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==" + }, "psl": { "version": "1.9.0", "resolved": "https://registry.npmjs.org/psl/-/psl-1.9.0.tgz", diff --git a/package.json b/package.json index c60024c..b8577b2 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ }, "dependencies": { "@clickhouse/client": "^1.8.0", + "axios": "^1.7.7", "dotenv": "^16.4.5", "express": "^4.18.2", "express-async-handler": "^1.2.0", diff --git a/src/index.ts b/src/index.ts index 1241493..2b2a203 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,25 +1,29 @@ import express from 'express'; import path from 'path'; -import { readFile } from 'fs/promises'; import { getConfig } from './config/config'; import logger from './logger'; import routes from './routes/index'; -import { setupMediator } from './openhim/openhim'; -import { - createMinioBucketListeners, -} from './utils/minioClient'; +import { getRegisterBuckets, setupMediator } from './openhim/openhim'; +import { createMinioBucketListeners, ensureBucketExists } from './utils/minioClient'; const app = express(); app.use('/', routes); -createMinioBucketListeners(); - -app.listen(getConfig().port, async() => { +app.listen(getConfig().port, async () => { logger.info(`Server is running on port - ${getConfig().port}`); if (getConfig().runningMode !== 'testing' && getConfig().registerMediator) { - setupMediator(path.resolve(__dirname, './openhim/mediatorConfig.json')); + setupMediator(); } -}); + const buckets = await getRegisterBuckets(); + + buckets.length === 0 && logger.warn('No buckets specified in the configuration'); + + for await (const { bucket, region } of buckets) { + await ensureBucketExists(bucket, region); + } + + createMinioBucketListeners(buckets.map((bucket) => bucket.bucket)); +}); diff --git a/src/openhim/mediatorConfig.json b/src/openhim/mediatorConfig.json index ace4a2e..983b3e2 100644 --- a/src/openhim/mediatorConfig.json +++ b/src/openhim/mediatorConfig.json @@ -20,7 +20,8 @@ "instant" ], "methods": [ - "POST", "GET" + "POST", + "GET" ], "type": "http" } @@ -35,15 +36,26 @@ "type": "http" } ], - "configDefs": [{ - "param": "minio_buckets_registry", + "configDefs": [ + { + "param": "minio_buckets_registry", "displayName": "Minio Buckets Registry", "description": "The available Minio buckets and their configurations", "type": "struct", "array": true, - "template":[{ - "param": "bucket", - "type": "string" - }] - }] -} + "template": [ + { + "param": "bucket", + "displayName": "Bucket", + "type": "string" + }, + { + "param": "region", + "displayName": "Region (optional)", + "type": "string", + "optional": true + } + ] + } + ] +} \ No newline at end of file diff --git a/src/openhim/openhim.ts b/src/openhim/openhim.ts index d2f6887..7660cb3 100644 --- a/src/openhim/openhim.ts +++ b/src/openhim/openhim.ts @@ -2,14 +2,28 @@ import logger from '../logger'; import { MediatorConfig } from '../types/mediatorConfig'; import { RequestOptions } from '../types/request'; import { getConfig } from '../config/config'; -import { activateHeartbeat, fetchConfig, registerMediator } from 'openhim-mediator-utils'; +import axios, { AxiosError } from 'axios'; +import https from 'https'; +import { + activateHeartbeat, + fetchConfig, + registerMediator, + authenticate, + genAuthHeaders, +} from 'openhim-mediator-utils'; +import { Bucket, createMinioBucketListeners, ensureBucketExists } from '../utils/minioClient'; +import path from 'path'; -const { openhimUsername, openhimPassword, openhimMediatorUrl, trustSelfSigned } = getConfig(); +const { openhimUsername, openhimPassword, openhimMediatorUrl, trustSelfSigned, runningMode } = + getConfig(); -const resolveMediatorConfig = (mediatorConfigFilePath: string): MediatorConfig => { +const mediatorConfigFilePath = path.resolve(__dirname, './mediatorConfig.json'); + +const resolveMediatorConfig = (): MediatorConfig => { let mediatorConfigFile; try { + logger.info(`Loading mediator config from: ${mediatorConfigFilePath}`); mediatorConfigFile = require(mediatorConfigFilePath); } catch (error) { logger.error(`Failed to parse JSON: ${error}`); @@ -29,9 +43,9 @@ const resolveOpenhimConfig = (urn: string): RequestOptions => { }; }; -export const setupMediator = (mediatorConfigFilePath: string) => { +export const setupMediator = () => { try { - const mediatorConfig = resolveMediatorConfig(mediatorConfigFilePath); + const mediatorConfig = resolveMediatorConfig(); const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn); registerMediator(openhimConfig, mediatorConfig, (error: Error) => { @@ -54,8 +68,16 @@ export const setupMediator = (mediatorConfigFilePath: string) => { logger.error(`Heartbeat failed: ${JSON.stringify(err)}`); }); - emitter.on('config', (config: any) => { - logger.info(`Config: ${JSON.stringify(config)}`); + emitter.on('config', async (config: any) => { + logger.info('Received config from OpenHIM'); + + const buckets = config.minio_buckets_registry as Bucket[]; + + for await (const { bucket, region } of buckets) { + await ensureBucketExists(bucket, region); + } + + createMinioBucketListeners(buckets.map((bucket) => bucket.bucket)); }); }); }); @@ -63,3 +85,61 @@ export const setupMediator = (mediatorConfigFilePath: string) => { logger.error('Unable to register mediator', err); } }; + +//TODO: Add Typing and error handling. +async function getMediatorConfig() { + logger.info('Fetching mediator config from OpenHIM'); + const mediatorConfig = resolveMediatorConfig(); + const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn); + + const { apiURL, urn, username, password, trustSelfSigned } = openhimConfig; + + try { + const request = await axios.get(`${apiURL}/mediators/urn:mediator:climate-mediator`, { + auth: { + username, + password, + }, + httpsAgent: new https.Agent({ + rejectUnauthorized: !trustSelfSigned, + }), + }); + + return request.data; + } catch (e) { + const error = e as AxiosError; + logger.error(`Failed to fetch mediator config: ${JSON.stringify(error)}`); + error.status === 404 && logger.warn('Mediator config not found in OpenHIM, '); + return null; + } +} + +export async function getRegisterBuckets(): Promise { + if (runningMode !== 'testing') { + logger.info('Fetching registered buckets from OpenHIM'); + const mediatorConfig = await getMediatorConfig(); + + //TODO: Handle errors, and undefined response. + const buckets = mediatorConfig.config?.minio_buckets_registry as Bucket[]; + if (!buckets) { + return []; + } + return buckets; + } else { + logger.info('Running in testing mode, reading buckets from ENV'); + const buckets = getConfig().minio.buckets.split(','); + return buckets.map((bucket) => ({ bucket, region: '' })); + } +} + +export async function registerBucket(bucket: string, region?: string) { + if (runningMode !== 'testing') { + return true; + } + const mediatorConfig = await getMediatorConfig(); + const existingBuckets = mediatorConfig.config?.minio_buckets_registry; + + if (!existingBuckets) { + return []; + } +} diff --git a/src/routes/index.ts b/src/routes/index.ts index 7e7515c..88aee91 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -10,7 +10,7 @@ import { uploadToMinio } from '../utils/minioClient'; // Constants const VALID_MIME_TYPES = ['text/csv', 'application/json'] as const; -type ValidMimeType = typeof VALID_MIME_TYPES[number]; +type ValidMimeType = (typeof VALID_MIME_TYPES)[number]; interface UploadResponse { status: 'success' | 'error'; @@ -20,34 +20,34 @@ interface UploadResponse { const routes = express.Router(); const bodySizeLimit = getConfig().bodySizeLimit; -const upload = multer({ +const upload = multer({ storage: multer.memoryStorage(), fileFilter: (_, file, cb) => { cb(null, VALID_MIME_TYPES.includes(file.mimetype as ValidMimeType)); - } + }, }); // Helper functions const createErrorResponse = (code: string, message: string): UploadResponse => ({ status: 'error', code, - message + message, }); const createSuccessResponse = (code: string, message: string): UploadResponse => ({ status: 'success', code, - message + message, }); const saveCsvToTmp = async (fileBuffer: Buffer, fileName: string): Promise => { const tmpDir = path.join(process.cwd(), 'tmp'); await fs.mkdir(tmpDir, { recursive: true }); - + const fileUrl = path.join(tmpDir, fileName); await fs.writeFile(fileUrl, fileBuffer); logger.info(`File saved: ${fileUrl}`); - + return fileUrl; }; @@ -62,8 +62,9 @@ const validateJsonFile = (buffer: Buffer): boolean => { // File handlers const handleCsvFile = async ( - file: Express.Multer.File, - bucket: string + file: Express.Multer.File, + bucket: string, + region: string ): Promise => { const headers = getCsvHeaders(file.buffer); if (!headers) { @@ -72,7 +73,13 @@ const handleCsvFile = async ( const fileUrl = await saveCsvToTmp(file.buffer, file.originalname); try { - const uploadResult = await uploadToMinio(fileUrl, file.originalname, bucket, file.mimetype); + const uploadResult = await uploadToMinio( + fileUrl, + file.originalname, + bucket, + region, + file.mimetype + ); await fs.unlink(fileUrl); return uploadResult.success @@ -96,36 +103,35 @@ routes.post('/upload', upload.single('file'), async (req, res) => { try { const file = req.file; const bucket = req.query.bucket as string; + const region = req.query.region as string; if (!file) { logger.error('No file uploaded'); - return res.status(400).json( - createErrorResponse('FILE_MISSING', 'No file uploaded') - ); + return res.status(400).json(createErrorResponse('FILE_MISSING', 'No file uploaded')); } if (!bucket) { logger.error('No bucket provided'); - return res.status(400).json( - createErrorResponse('BUCKET_MISSING', 'No bucket provided') - ); + return res.status(400).json(createErrorResponse('BUCKET_MISSING', 'No bucket provided')); } - const response = file.mimetype === 'text/csv' - ? await handleCsvFile(file, bucket) - : handleJsonFile(file); + const response = + file.mimetype === 'text/csv' + ? await handleCsvFile(file, bucket, region) + : handleJsonFile(file); const statusCode = response.status === 'success' ? 201 : 400; return res.status(statusCode).json(response); - } catch (error) { logger.error('Error processing upload:', error); - return res.status(500).json( - createErrorResponse( - 'INTERNAL_SERVER_ERROR', - error instanceof Error ? error.message : 'Unknown error' - ) - ); + return res + .status(500) + .json( + createErrorResponse( + 'INTERNAL_SERVER_ERROR', + error instanceof Error ? error.message : 'Unknown error' + ) + ); } }); diff --git a/src/utils/clickhouse.ts b/src/utils/clickhouse.ts index 68c11b2..a29a695 100644 --- a/src/utils/clickhouse.ts +++ b/src/utils/clickhouse.ts @@ -5,7 +5,6 @@ import logger from '../logger'; const { clickhouse } = getConfig(); const { url, password } = clickhouse; - export async function createTable(fields: string[], tableName: string) { const client = createClient({ url, @@ -69,10 +68,14 @@ export function flattenJson(json: any, prefix = ''): string[] { return Array.from(fieldsSet); } -export async function insertFromS3(tableName: string, s3Path: string, s3Config: { - accessKey: string, - secretKey: string -}) { +export async function insertFromS3( + tableName: string, + s3Path: string, + s3Config: { + accessKey: string; + secretKey: string; + } +) { logger.info(`Inside the insertFromS3 function`); const client = createClient({ url, @@ -104,4 +107,3 @@ export async function insertFromS3(tableName: string, s3Path: string, s3Config: await client.close(); } } - diff --git a/src/utils/minioClient.ts b/src/utils/minioClient.ts index e48ff0f..f57bfc0 100644 --- a/src/utils/minioClient.ts +++ b/src/utils/minioClient.ts @@ -5,11 +5,20 @@ import crypto from 'crypto'; import { readFile, rm } from 'fs/promises'; import { createTable, insertFromS3 } from './clickhouse'; import { validateJsonFile, getCsvHeaders } from './file-validators'; +import { registerBucket } from '../openhim/openhim'; + +export interface Bucket { + bucket: string; + region?: string; +} const { endPoint, port, useSSL, bucketRegion, accessKey, secretKey, buckets, prefix, suffix } = getConfig().minio; +const registeredBuckets: Set = new Set(); + // Create a shared Minio client instance +//TODO: Add error handling and connection check const minioClient = new Minio.Client({ endPoint, port, @@ -30,14 +39,22 @@ interface FileExistsResponse extends MinioResponse { /** * Ensures a bucket exists, creates it if it doesn't * @param {string} bucket - Bucket name + * @param {string} [region] - Bucket region * @returns {Promise} */ -async function ensureBucketExists(bucket: string): Promise { +//TODO: use the bucket interface +export async function ensureBucketExists(bucket: string, region?: string): Promise { const exists = await minioClient.bucketExists(bucket); if (!exists) { - await minioClient.makeBucket(bucket, bucketRegion); - logger.info(`Bucket ${bucket} created in "${bucketRegion}"`); + //TODO: We Could add a flag to allow the user to create the bucket if it doesn't exist. maybe it could be on the url parameters so we know its an explicit action. + //TODO: Have the bucket region be optional and come from http request. + await minioClient.makeBucket(bucket, region); + logger.info( + `Bucket ${bucket} created${region ? `in "${region}"` : ' no region specified'}` + ); } + + await registerBucket(bucket); } /** @@ -104,12 +121,13 @@ export async function uploadToMinio( sourceFile: string, destinationObject: string, bucket: string, + region: string, fileType: string, customMetadata = {} ): Promise { try { logger.info(`Uploading file ${sourceFile} to bucket ${bucket}`); - await ensureBucketExists(bucket); + await ensureBucketExists(bucket, region); const fileCheck = await checkFileExists(destinationObject, bucket, fileType); if (fileCheck.exists) { @@ -134,6 +152,7 @@ export async function uploadToMinio( message: successMessage, }; } catch (error) { + //TODO: see if we can make a specific error for failing to register the bucket within the mediator. const errorMessage = `Error uploading file: ${error instanceof Error ? error.message : String(error)}`; logger.error(errorMessage); return { @@ -143,33 +162,31 @@ export async function uploadToMinio( } } -export async function createMinioBucketListeners() { - const minioClient = new Minio.Client({ - endPoint, - port, - useSSL, - accessKey, - secretKey, - }); - - try { - // Test connection by attempting to list buckets - await minioClient.listBuckets(); - logger.info(`Successfully connected to Minio at ${endPoint}:${port}`); - } catch (error) { - logger.error(`Failed to connect to Minio: ${error}`); - throw error; - } - - const listOfBuckets = buckets.split(','); - - listOfBuckets.length === 0 && logger.warn('No buckets specified in the configuration'); +export async function createMinioBucketListeners(listOfBuckets: string[]) { + // try { + // // Test connection by attempting to list buckets + // await minioClient.listBuckets(); + // logger.info(`Successfully connected to Minio at ${endPoint}:${port}`); + // } catch (error) { + // logger.error(`Failed to connect to Minio: ${error}`); + // throw error; + // } for (const bucket of listOfBuckets) { + /*TODO: + Check if the buckets actually exist before registering listeners. if not then log an error. + */ + if (registeredBuckets.has(bucket)) { + logger.info(`Bucket ${bucket} already registered`); + continue; + } + const listener = minioClient.listenBucketNotification(bucket, prefix, suffix, [ 's3:ObjectCreated:*', ]); + registeredBuckets.add(bucket); + logger.info(`Listening for notifications on bucket ${bucket}`); listener.on('notification', async (notification) => { diff --git a/tests/services/table-creation.test.ts b/tests/services/table-creation.test.ts index 5ca5128..947784a 100644 --- a/tests/services/table-creation.test.ts +++ b/tests/services/table-creation.test.ts @@ -2,7 +2,6 @@ import { expect } from 'chai'; import { getCsvHeaders } from '../../src/utils/file-validators'; import { createTable, flattenJson, generateDDL } from '../../src/utils/clickhouse'; - describe('Create Tables based on files', function () { this.timeout(60_000); @@ -33,7 +32,9 @@ describe('Create Tables based on files', function () { const fields = getCsvHeaders(csvFile); if (!fields) throw new Error('No fields found'); const result = generateDDL(fields, 'test'); - expect(result).to.equal('CREATE TABLE test (table_id UUID DEFAULT generateUUIDv4(),id VARCHAR, name VARCHAR, age VARCHAR) ENGINE = MergeTree ORDER BY (table_id)'); + expect(result).to.equal( + 'CREATE TABLE test (table_id UUID DEFAULT generateUUIDv4(),id VARCHAR, name VARCHAR, age VARCHAR) ENGINE = MergeTree ORDER BY (table_id)' + ); }); it('should create a table based on a csv file', async () => { From ce61c079fd01ab5195b35bfbc25472d672509443 Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Tue, 26 Nov 2024 16:37:34 +0200 Subject: [PATCH 3/6] resolved issues with mediator registration --- src/index.ts | 6 +- src/openhim/mediatorConfig.json | 9 ++- src/openhim/openhim.ts | 117 +++++++++++++++++++++++++----- src/routes/index.ts | 26 +++++-- src/types/index.ts | 1 + src/types/openhim-api/mediator.ts | 79 ++++++++++++++++++++ src/utils/minioClient.ts | 30 +++++--- 7 files changed, 228 insertions(+), 40 deletions(-) create mode 100644 src/types/index.ts create mode 100644 src/types/openhim-api/mediator.ts diff --git a/src/index.ts b/src/index.ts index 2b2a203..229618f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,7 +3,7 @@ import path from 'path'; import { getConfig } from './config/config'; import logger from './logger'; import routes from './routes/index'; -import { getRegisterBuckets, setupMediator } from './openhim/openhim'; +import { getRegisteredBuckets, setupMediator } from './openhim/openhim'; import { createMinioBucketListeners, ensureBucketExists } from './utils/minioClient'; const app = express(); @@ -14,10 +14,10 @@ app.listen(getConfig().port, async () => { logger.info(`Server is running on port - ${getConfig().port}`); if (getConfig().runningMode !== 'testing' && getConfig().registerMediator) { - setupMediator(); + await setupMediator(); } - const buckets = await getRegisterBuckets(); + const buckets = await getRegisteredBuckets(); buckets.length === 0 && logger.warn('No buckets specified in the configuration'); diff --git a/src/openhim/mediatorConfig.json b/src/openhim/mediatorConfig.json index 983b3e2..09f6880 100644 --- a/src/openhim/mediatorConfig.json +++ b/src/openhim/mediatorConfig.json @@ -10,7 +10,7 @@ "routes": [ { "name": "Climate Mediator Endpoint", - "host": "172.17.0.1", + "host": "climate-mediator", "port": "3000", "primary": true, "type": "http" @@ -29,7 +29,7 @@ "endpoints": [ { "name": "Climate Endpoint", - "host": "172.17.0.1", + "host": "climate-mediator", "path": "/test", "port": "3000", "primary": true, @@ -57,5 +57,8 @@ } ] } - ] + ], + "config": { + "minio_buckets_registry": [] + } } \ No newline at end of file diff --git a/src/openhim/openhim.ts b/src/openhim/openhim.ts index 7660cb3..2862c99 100644 --- a/src/openhim/openhim.ts +++ b/src/openhim/openhim.ts @@ -1,16 +1,11 @@ import logger from '../logger'; +import { MinioBucketsRegistry, Mediator as OpenHimAPIMediator } from '../types'; import { MediatorConfig } from '../types/mediatorConfig'; import { RequestOptions } from '../types/request'; import { getConfig } from '../config/config'; import axios, { AxiosError } from 'axios'; import https from 'https'; -import { - activateHeartbeat, - fetchConfig, - registerMediator, - authenticate, - genAuthHeaders, -} from 'openhim-mediator-utils'; +import { activateHeartbeat, fetchConfig, registerMediator } from 'openhim-mediator-utils'; import { Bucket, createMinioBucketListeners, ensureBucketExists } from '../utils/minioClient'; import path from 'path'; @@ -23,7 +18,7 @@ const resolveMediatorConfig = (): MediatorConfig => { let mediatorConfigFile; try { - logger.info(`Loading mediator config from: ${mediatorConfigFilePath}`); + logger.debug(`Loading mediator config from: ${mediatorConfigFilePath}`); mediatorConfigFile = require(mediatorConfigFilePath); } catch (error) { logger.error(`Failed to parse JSON: ${error}`); @@ -43,12 +38,12 @@ const resolveOpenhimConfig = (urn: string): RequestOptions => { }; }; -export const setupMediator = () => { +export const setupMediator = async () => { try { const mediatorConfig = resolveMediatorConfig(); const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn); - registerMediator(openhimConfig, mediatorConfig, (error: Error) => { + await registerMediator(openhimConfig, mediatorConfig, (error: Error) => { if (error) { logger.error(`Failed to register mediator: ${JSON.stringify(error)}`); throw error; @@ -87,8 +82,8 @@ export const setupMediator = () => { }; //TODO: Add Typing and error handling. -async function getMediatorConfig() { - logger.info('Fetching mediator config from OpenHIM'); +async function getMediatorConfig(): Promise { + logger.debug('Fetching mediator config from OpenHIM'); const mediatorConfig = resolveMediatorConfig(); const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn); @@ -108,17 +103,65 @@ async function getMediatorConfig() { return request.data; } catch (e) { const error = e as AxiosError; - logger.error(`Failed to fetch mediator config: ${JSON.stringify(error)}`); - error.status === 404 && logger.warn('Mediator config not found in OpenHIM, '); + + switch (error.status) { + case 401: + logger.error(`Failed to authenticate with OpenHIM, check your credentials`); + break; + case 404: + logger.debug( + 'Mediator config not found in OpenHIM, This is expected on initial setup' + ); + break; + default: + logger.error(`Failed to fetch mediator config: ${JSON.stringify(error)}`); + break; + } return null; } } -export async function getRegisterBuckets(): Promise { +async function putMediatorConfig(mediatorUrn: string, mediatorConfig: MinioBucketsRegistry[]) { + const openhimConfig = resolveOpenhimConfig(mediatorUrn); + const { apiURL, username, password, trustSelfSigned } = openhimConfig; + await axios.put( + `${apiURL}/mediators/urn:mediator:climate-mediator/config`, + { + minio_buckets_registry: mediatorConfig, + }, + { + auth: { username, password }, + httpsAgent: new https.Agent({ + rejectUnauthorized: !trustSelfSigned, + }), + } + ); + + try { + logger.info('Successfully updated mediator config in OpenHIM'); + } catch (error) { + const axiosError = error as AxiosError; + switch (axiosError.status) { + case 401: + logger.error(`Failed to authenticate with OpenHIM, check your credentials`); + break; + default: + logger.error( + `Failed to update mediator config in OpenHIM: ${JSON.stringify(axiosError)}` + ); + break; + } + } +} + +export async function getRegisteredBuckets(): Promise { if (runningMode !== 'testing') { logger.info('Fetching registered buckets from OpenHIM'); const mediatorConfig = await getMediatorConfig(); + if (!mediatorConfig) { + return []; + } //TODO: Handle errors, and undefined response. const buckets = mediatorConfig.config?.minio_buckets_registry as Bucket[]; if (!buckets) { @@ -133,13 +176,49 @@ export async function getRegisterBuckets(): Promise { } export async function registerBucket(bucket: string, region?: string) { - if (runningMode !== 'testing') { + // If we are in testing mode, we don't need to have the registered buckets persisted + if (runningMode === 'testing') { + logger.debug('Running in testing mode, skipping bucket registration'); return true; } + + //get the mediator config from OpenHIM const mediatorConfig = await getMediatorConfig(); - const existingBuckets = mediatorConfig.config?.minio_buckets_registry; - if (!existingBuckets) { - return []; + //TODO: Change this to a debug log + logger.debug(`Mediator config: ${JSON.stringify(mediatorConfig)}`); + + //if the mediator config is not found, log the issue and return false + if (mediatorConfig === null) { + logger.error('Mediator config not found in OpenHIM, unable to register bucket'); + return false; } + + const newBucket = { + bucket, + region: region || '', + }; + + //get the existing buckets from the mediator config + const existingConfig = mediatorConfig.config; + + if (existingConfig === undefined) { + logger.info('Mediator config does not have a config section, creating new config'); + mediatorConfig['config'] = { + minio_buckets_registry: [newBucket], + }; + } else { + const existingBucket = existingConfig.minio_buckets_registry.find( + (bucket) => bucket.bucket === newBucket.bucket + ); + if (existingBucket) { + logger.debug(`Bucket ${bucket} already exists in the config`); + return false; + } + logger.info(`Adding bucket ${bucket} to OpenHIM config`); + existingConfig.minio_buckets_registry.push(newBucket); + await putMediatorConfig(mediatorConfig.urn, existingConfig.minio_buckets_registry); + } + + return true; } diff --git a/src/routes/index.ts b/src/routes/index.ts index 88aee91..1404180 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -5,8 +5,12 @@ import { getCsvHeaders } from '../utils/file-validators'; import logger from '../logger'; import fs from 'fs/promises'; import path from 'path'; -import e from 'express'; -import { uploadToMinio } from '../utils/minioClient'; +import { + BucketDoesNotExistError, + ensureBucketExists, + uploadToMinio, +} from '../utils/minioClient'; +import { registerBucket } from '../openhim/openhim'; // Constants const VALID_MIME_TYPES = ['text/csv', 'application/json'] as const; @@ -77,7 +81,6 @@ const handleCsvFile = async ( fileUrl, file.originalname, bucket, - region, file.mimetype ); await fs.unlink(fileUrl); @@ -104,6 +107,7 @@ routes.post('/upload', upload.single('file'), async (req, res) => { const file = req.file; const bucket = req.query.bucket as string; const region = req.query.region as string; + const createBucketIfNotExists = req.query.createBucketIfNotExists === 'true'; if (!file) { logger.error('No file uploaded'); @@ -115,21 +119,31 @@ routes.post('/upload', upload.single('file'), async (req, res) => { return res.status(400).json(createErrorResponse('BUCKET_MISSING', 'No bucket provided')); } + await ensureBucketExists(bucket, region, createBucketIfNotExists); + const response = file.mimetype === 'text/csv' ? await handleCsvFile(file, bucket, region) : handleJsonFile(file); + createBucketIfNotExists && (await registerBucket(bucket, region)); + const statusCode = response.status === 'success' ? 201 : 400; return res.status(statusCode).json(response); - } catch (error) { - logger.error('Error processing upload:', error); + } catch (e) { + logger.error('Error processing upload:', e); + + if (e instanceof BucketDoesNotExistError) { + const error = e as BucketDoesNotExistError; + return res.status(400).json(createErrorResponse('BUCKET_DOES_NOT_EXIST', error.message)); + } + return res .status(500) .json( createErrorResponse( 'INTERNAL_SERVER_ERROR', - error instanceof Error ? error.message : 'Unknown error' + e instanceof Error ? e.message : 'Unknown error' ) ); } diff --git a/src/types/index.ts b/src/types/index.ts new file mode 100644 index 0000000..5ad22e8 --- /dev/null +++ b/src/types/index.ts @@ -0,0 +1 @@ +export * from './openhim-api/mediator'; diff --git a/src/types/openhim-api/mediator.ts b/src/types/openhim-api/mediator.ts new file mode 100644 index 0000000..d79b710 --- /dev/null +++ b/src/types/openhim-api/mediator.ts @@ -0,0 +1,79 @@ +export interface Mediator { + _id: string; + urn: string; + version: string; + name: string; + description: string; + endpoints: Endpoint[]; + defaultChannelConfig: DefaultChannelConfig[]; + configDefs: ConfigDef[]; + __v: number; + _lastHeartbeat: Date; + _uptime: number; + _configModifiedTS: Date; + config?: Config; +} + +interface Config { + minio_buckets_registry: MinioBucketsRegistry[]; +} + +export interface MinioBucketsRegistry { + bucket: string; + region?: string; +} + +interface ConfigDef { + param: string; + displayName: string; + description: string; + type: string; + values: any[]; + template: Template[]; + array: boolean; + _id: string; +} + +interface Template { + param: string; + displayName: string; + type: string; + optional?: boolean; +} + +interface DefaultChannelConfig { + name: string; + urlPattern: string; + isAsynchronousProcess: boolean; + methods: string[]; + type: string; + allow: string[]; + whitelist: any[]; + authType: string; + routes: Endpoint[]; + matchContentTypes: any[]; + properties: any[]; + txViewAcl: any[]; + txViewFullAcl: any[]; + txRerunAcl: any[]; + status: string; + rewriteUrls: boolean; + addAutoRewriteRules: boolean; + autoRetryEnabled: boolean; + autoRetryPeriodMinutes: number; + _id: string; + alerts: any[]; + rewriteUrlsConfig: any[]; +} + +interface Endpoint { + name: string; + type: string; + status: string; + host: string; + port: number; + primary: boolean; + forwardAuthHeader: boolean; + _id: string; + path?: string; +} diff --git a/src/utils/minioClient.ts b/src/utils/minioClient.ts index f57bfc0..4aca00c 100644 --- a/src/utils/minioClient.ts +++ b/src/utils/minioClient.ts @@ -5,13 +5,18 @@ import crypto from 'crypto'; import { readFile, rm } from 'fs/promises'; import { createTable, insertFromS3 } from './clickhouse'; import { validateJsonFile, getCsvHeaders } from './file-validators'; -import { registerBucket } from '../openhim/openhim'; export interface Bucket { bucket: string; region?: string; } +export class BucketDoesNotExistError extends Error { + constructor(message: string) { + super(message); + } +} + const { endPoint, port, useSSL, bucketRegion, accessKey, secretKey, buckets, prefix, suffix } = getConfig().minio; @@ -40,21 +45,28 @@ interface FileExistsResponse extends MinioResponse { * Ensures a bucket exists, creates it if it doesn't * @param {string} bucket - Bucket name * @param {string} [region] - Bucket region + * @param {boolean} [createBucketIfNotExists] - Whether to create the bucket if it doesn't exist * @returns {Promise} */ //TODO: use the bucket interface -export async function ensureBucketExists(bucket: string, region?: string): Promise { +export async function ensureBucketExists( + bucket: string, + region?: string, + createBucketIfNotExists = false +): Promise { + //TODO: make sure the bucket name and region are valid and conform to the expected format. const exists = await minioClient.bucketExists(bucket); - if (!exists) { - //TODO: We Could add a flag to allow the user to create the bucket if it doesn't exist. maybe it could be on the url parameters so we know its an explicit action. - //TODO: Have the bucket region be optional and come from http request. + if (!exists && createBucketIfNotExists) { await minioClient.makeBucket(bucket, region); logger.info( `Bucket ${bucket} created${region ? `in "${region}"` : ' no region specified'}` ); + await createMinioBucketListeners([bucket]); } - await registerBucket(bucket); + if (!exists && !createBucketIfNotExists) { + throw new BucketDoesNotExistError(`Bucket ${bucket} does not exist`); + } } /** @@ -121,15 +133,14 @@ export async function uploadToMinio( sourceFile: string, destinationObject: string, bucket: string, - region: string, fileType: string, customMetadata = {} ): Promise { try { logger.info(`Uploading file ${sourceFile} to bucket ${bucket}`); - await ensureBucketExists(bucket, region); const fileCheck = await checkFileExists(destinationObject, bucket, fileType); + if (fileCheck.exists) { return { success: false, @@ -219,7 +230,8 @@ export async function createMinioBucketListeners(listOfBuckets: string[]) { await createTable(fields, tableName); // If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server - const host = getConfig().runningMode === 'testing' ? 'minio' : endPoint; + //TODO: This is not working the way I want it to, fix it later. + const host = 'minio'; // Construct the S3-style URL for the file const minioUrl = `http://${host}:${port}/${bucket}/${file}`; From 550d7e7806b0c3a9b3de44af0f308fe23d16f1cc Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Wed, 27 Nov 2024 12:13:53 +0200 Subject: [PATCH 4/6] clean up --- docker-compose.yml | 2 ++ src/openhim/openhim.ts | 34 +++++++++++++++---------------- src/routes/index.ts | 21 +++++++++++++++++++ src/types/mediatorConfig.ts | 30 ++++++++++++++++++++++++++- src/types/openhim-api/mediator.ts | 15 ++------------ src/utils/minioClient.ts | 12 ++--------- 6 files changed, 72 insertions(+), 42 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0f9c4f7..fbd0163 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,6 +5,7 @@ services: - 9000:9000 - 9001:9001 command: server /data --console-address ":9001" + network_mode: host clickhouse: image: clickhouse/clickhouse-server:23.8.14.6 @@ -13,6 +14,7 @@ services: - 9002:9000 environment: - CLICKHOUSE_PASSWORD=dev_password_only + network_mode: host openhim-console: image: jembi/openhim-console:v1.18.2 diff --git a/src/openhim/openhim.ts b/src/openhim/openhim.ts index 2862c99..34a6655 100644 --- a/src/openhim/openhim.ts +++ b/src/openhim/openhim.ts @@ -1,5 +1,5 @@ import logger from '../logger'; -import { MinioBucketsRegistry, Mediator as OpenHimAPIMediator } from '../types'; +import { MinioBucketsRegistry } from '../types'; import { MediatorConfig } from '../types/mediatorConfig'; import { RequestOptions } from '../types/request'; import { getConfig } from '../config/config'; @@ -81,8 +81,7 @@ export const setupMediator = async () => { } }; -//TODO: Add Typing and error handling. -async function getMediatorConfig(): Promise { +async function getMediatorConfig(): Promise { logger.debug('Fetching mediator config from OpenHIM'); const mediatorConfig = resolveMediatorConfig(); const openhimConfig = resolveOpenhimConfig(mediatorConfig.urn); @@ -155,24 +154,24 @@ async function putMediatorConfig(mediatorUrn: string, mediatorConfig: MinioBucke } export async function getRegisteredBuckets(): Promise { - if (runningMode !== 'testing') { - logger.info('Fetching registered buckets from OpenHIM'); - const mediatorConfig = await getMediatorConfig(); - - if (!mediatorConfig) { - return []; - } - //TODO: Handle errors, and undefined response. - const buckets = mediatorConfig.config?.minio_buckets_registry as Bucket[]; - if (!buckets) { - return []; - } - return buckets; - } else { + if (runningMode === 'testing') { logger.info('Running in testing mode, reading buckets from ENV'); const buckets = getConfig().minio.buckets.split(','); return buckets.map((bucket) => ({ bucket, region: '' })); } + + logger.info('Fetching registered buckets from OpenHIM'); + const mediatorConfig = await getMediatorConfig(); + + if (!mediatorConfig) { + return []; + } + + const buckets = mediatorConfig.config?.minio_buckets_registry as Bucket[]; + if (buckets) { + return buckets; + } + return []; } export async function registerBucket(bucket: string, region?: string) { @@ -185,7 +184,6 @@ export async function registerBucket(bucket: string, region?: string) { //get the mediator config from OpenHIM const mediatorConfig = await getMediatorConfig(); - //TODO: Change this to a debug log logger.debug(`Mediator config: ${JSON.stringify(mediatorConfig)}`); //if the mediator config is not found, log the issue and return false diff --git a/src/routes/index.ts b/src/routes/index.ts index 1404180..fed182c 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -101,6 +101,15 @@ const handleJsonFile = (file: Express.Multer.File): UploadResponse => { return createSuccessResponse('JSON_VALID', 'JSON file is valid - Future implementation'); }; +const validateBucketName = (bucket: string): boolean => { + // Bucket names must be between 3 (min) and 63 (max) characters long. + // Bucket names can consist only of lowercase letters, numbers, dots (.), and hyphens (-). + // Bucket names must not start with the prefix xn--. + // Bucket names must not end with the suffix -s3alias. This suffix is reserved for access point alias names. + const regex = new RegExp(/^[a-z0-9][a-z0-9.-]{1,61}[a-z0-9]$/); + return regex.test(bucket); +}; + // Main route handler routes.post('/upload', upload.single('file'), async (req, res) => { try { @@ -119,6 +128,18 @@ routes.post('/upload', upload.single('file'), async (req, res) => { return res.status(400).json(createErrorResponse('BUCKET_MISSING', 'No bucket provided')); } + if (!validateBucketName(bucket)) { + logger.error(`Invalid bucket name ${bucket}`); + return res + .status(400) + .json( + createErrorResponse( + 'INVALID_BUCKET_NAME', + 'Bucket names must be between 3 (min) and 63 (max) characters long. Can consist only of lowercase letters, numbers, dots (.), and hyphens (-). Must not start with the prefix xn--. Must not end with the suffix -s3alias. This suffix is reserved for access point alias names.' + ) + ); + } + await ensureBucketExists(bucket, region, createBucketIfNotExists); const response = diff --git a/src/types/mediatorConfig.ts b/src/types/mediatorConfig.ts index d1f2c28..ccbdbc1 100644 --- a/src/types/mediatorConfig.ts +++ b/src/types/mediatorConfig.ts @@ -5,7 +5,35 @@ export interface MediatorConfig { description: string; defaultChannelConfig: ChannelConfig[]; endpoints: Route[]; - configDefs?: string[]; + configDefs?: ConfigDef[]; + config?: Config; +} + +interface ConfigDef { + param: string; + displayName: string; + description: string; + type: string; + values: any[]; + template: Template[]; + array: boolean; + _id: string; +} + +interface Template { + param: string; + displayName: string; + type: string; + optional?: boolean; +} + +interface Config { + minio_buckets_registry: MinioBucketsRegistry[]; +} + +export interface MinioBucketsRegistry { + bucket: string; + region?: string; } interface ChannelConfig { diff --git a/src/types/openhim-api/mediator.ts b/src/types/openhim-api/mediator.ts index d79b710..775743e 100644 --- a/src/types/openhim-api/mediator.ts +++ b/src/types/openhim-api/mediator.ts @@ -1,4 +1,4 @@ -export interface Mediator { +export interface OpenhimMediatorConfiguration { _id: string; urn: string; version: string; @@ -6,7 +6,7 @@ export interface Mediator { description: string; endpoints: Endpoint[]; defaultChannelConfig: DefaultChannelConfig[]; - configDefs: ConfigDef[]; + configDefs: any[]; __v: number; _lastHeartbeat: Date; _uptime: number; @@ -23,17 +23,6 @@ export interface MinioBucketsRegistry { region?: string; } -interface ConfigDef { - param: string; - displayName: string; - description: string; - type: string; - values: any[]; - template: Template[]; - array: boolean; - _id: string; -} - interface Template { param: string; displayName: string; diff --git a/src/utils/minioClient.ts b/src/utils/minioClient.ts index 4aca00c..d776f1e 100644 --- a/src/utils/minioClient.ts +++ b/src/utils/minioClient.ts @@ -23,7 +23,6 @@ const { endPoint, port, useSSL, bucketRegion, accessKey, secretKey, buckets, pre const registeredBuckets: Set = new Set(); // Create a shared Minio client instance -//TODO: Add error handling and connection check const minioClient = new Minio.Client({ endPoint, port, @@ -48,13 +47,11 @@ interface FileExistsResponse extends MinioResponse { * @param {boolean} [createBucketIfNotExists] - Whether to create the bucket if it doesn't exist * @returns {Promise} */ -//TODO: use the bucket interface export async function ensureBucketExists( bucket: string, region?: string, createBucketIfNotExists = false ): Promise { - //TODO: make sure the bucket name and region are valid and conform to the expected format. const exists = await minioClient.bucketExists(bucket); if (!exists && createBucketIfNotExists) { await minioClient.makeBucket(bucket, region); @@ -163,7 +160,6 @@ export async function uploadToMinio( message: successMessage, }; } catch (error) { - //TODO: see if we can make a specific error for failing to register the bucket within the mediator. const errorMessage = `Error uploading file: ${error instanceof Error ? error.message : String(error)}`; logger.error(errorMessage); return { @@ -184,9 +180,6 @@ export async function createMinioBucketListeners(listOfBuckets: string[]) { // } for (const bucket of listOfBuckets) { - /*TODO: - Check if the buckets actually exist before registering listeners. if not then log an error. - */ if (registeredBuckets.has(bucket)) { logger.info(`Bucket ${bucket} already registered`); continue; @@ -230,10 +223,9 @@ export async function createMinioBucketListeners(listOfBuckets: string[]) { await createTable(fields, tableName); // If running locally and using docker compose, the minio host is 'minio'. This is to allow clickhouse to connect to the minio server - //TODO: This is not working the way I want it to, fix it later. - const host = 'minio'; + // Construct the S3-style URL for the file - const minioUrl = `http://${host}:${port}/${bucket}/${file}`; + const minioUrl = `http://${endPoint}:${port}/${bucket}/${file}`; // Insert data into clickhouse await insertFromS3(tableName, minioUrl, { From c16d4bf6d885f889eccbaf5097a15b6692d37a83 Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Wed, 27 Nov 2024 12:17:45 +0200 Subject: [PATCH 5/6] removing dead code --- src/openhim/openhim.ts | 3 +- src/types/index.ts | 1 - src/types/openhim-api/mediator.ts | 68 ------------------------------- 3 files changed, 1 insertion(+), 71 deletions(-) delete mode 100644 src/types/index.ts delete mode 100644 src/types/openhim-api/mediator.ts diff --git a/src/openhim/openhim.ts b/src/openhim/openhim.ts index 34a6655..8d2248a 100644 --- a/src/openhim/openhim.ts +++ b/src/openhim/openhim.ts @@ -1,6 +1,5 @@ import logger from '../logger'; -import { MinioBucketsRegistry } from '../types'; -import { MediatorConfig } from '../types/mediatorConfig'; +import { MediatorConfig, MinioBucketsRegistry } from '../types/mediatorConfig'; import { RequestOptions } from '../types/request'; import { getConfig } from '../config/config'; import axios, { AxiosError } from 'axios'; diff --git a/src/types/index.ts b/src/types/index.ts deleted file mode 100644 index 5ad22e8..0000000 --- a/src/types/index.ts +++ /dev/null @@ -1 +0,0 @@ -export * from './openhim-api/mediator'; diff --git a/src/types/openhim-api/mediator.ts b/src/types/openhim-api/mediator.ts deleted file mode 100644 index 775743e..0000000 --- a/src/types/openhim-api/mediator.ts +++ /dev/null @@ -1,68 +0,0 @@ -export interface OpenhimMediatorConfiguration { - _id: string; - urn: string; - version: string; - name: string; - description: string; - endpoints: Endpoint[]; - defaultChannelConfig: DefaultChannelConfig[]; - configDefs: any[]; - __v: number; - _lastHeartbeat: Date; - _uptime: number; - _configModifiedTS: Date; - config?: Config; -} - -interface Config { - minio_buckets_registry: MinioBucketsRegistry[]; -} - -export interface MinioBucketsRegistry { - bucket: string; - region?: string; -} - -interface Template { - param: string; - displayName: string; - type: string; - optional?: boolean; -} - -interface DefaultChannelConfig { - name: string; - urlPattern: string; - isAsynchronousProcess: boolean; - methods: string[]; - type: string; - allow: string[]; - whitelist: any[]; - authType: string; - routes: Endpoint[]; - matchContentTypes: any[]; - properties: any[]; - txViewAcl: any[]; - txViewFullAcl: any[]; - txRerunAcl: any[]; - status: string; - rewriteUrls: boolean; - addAutoRewriteRules: boolean; - autoRetryEnabled: boolean; - autoRetryPeriodMinutes: number; - _id: string; - alerts: any[]; - rewriteUrlsConfig: any[]; -} - -interface Endpoint { - name: string; - type: string; - status: string; - host: string; - port: number; - primary: boolean; - forwardAuthHeader: boolean; - _id: string; - path?: string; -} From 356a69c5848f39d370ac36b113226d3b6425693e Mon Sep 17 00:00:00 2001 From: Brett Onions Date: Thu, 28 Nov 2024 08:45:52 +0200 Subject: [PATCH 6/6] rm dead code --- src/utils/minioClient.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/utils/minioClient.ts b/src/utils/minioClient.ts index d776f1e..e2663db 100644 --- a/src/utils/minioClient.ts +++ b/src/utils/minioClient.ts @@ -170,15 +170,6 @@ export async function uploadToMinio( } export async function createMinioBucketListeners(listOfBuckets: string[]) { - // try { - // // Test connection by attempting to list buckets - // await minioClient.listBuckets(); - // logger.info(`Successfully connected to Minio at ${endPoint}:${port}`); - // } catch (error) { - // logger.error(`Failed to connect to Minio: ${error}`); - // throw error; - // } - for (const bucket of listOfBuckets) { if (registeredBuckets.has(bucket)) { logger.info(`Bucket ${bucket} already registered`);