diff --git a/deployment/cdk/opensearch-service-migration/README.md b/deployment/cdk/opensearch-service-migration/README.md index c441f336d..84ce25dc8 100644 --- a/deployment/cdk/opensearch-service-migration/README.md +++ b/deployment/cdk/opensearch-service-migration/README.md @@ -57,6 +57,9 @@ This optional stack will be used when the Domain is configured to be placed insi #### Migration Assistance Stack (OSServiceMigrationCDKStack-STAGE-REGION) This optional stack is used to house the migration assistance resources which are in the process of being developed to assist in migrating to an OpenSearch domain. It has dependencies on both the Domain and Network stacks. +#### Historical Capture Stack (OSServiceHistoricalCDKStack-STAGE-REGION) +This optional exploratory stack sets up a deployable Logstash ECS cluster for historical data migration. It is experimental and should only be used for development purposes. It has dependencies on both the Domain and Network stacks. + ### Configuration Options The available configuration options are listed below. The vast majority of these options do not need to be provided, with only `domainName` and `engineVersion` being required. All non-required options can be provided as an empty string `""` or simply not included, and in each of these cases the option will be allocated with the CDK Domain default value diff --git a/deployment/cdk/opensearch-service-migration/lib/historical-capture-stack.ts b/deployment/cdk/opensearch-service-migration/lib/historical-capture-stack.ts new file mode 100644 index 000000000..93f1408a4 --- /dev/null +++ b/deployment/cdk/opensearch-service-migration/lib/historical-capture-stack.ts @@ -0,0 +1,62 @@ +import {Stack, StackProps} from "aws-cdk-lib"; +import {IVpc} from "aws-cdk-lib/aws-ec2"; +import {Construct} from "constructs"; +import {Cluster, ContainerImage, FargateService, FargateTaskDefinition, LogDrivers} from "aws-cdk-lib/aws-ecs"; +import {DockerImageAsset} from "aws-cdk-lib/aws-ecr-assets"; +import {join} from "path"; +import { readFileSync } from "fs" + +export interface historicalCaptureStackProps extends StackProps { + readonly vpc: IVpc, + readonly logstashConfigFilePath: string, + readonly sourceEndpoint?: string, + readonly targetEndpoint: string +} + +/** + * This stack was a short exploratory task into having a deployable Logstash ECS cluster for historical data migration. + * NOTE: It should only be used for development purposes in its current state + */ +export class HistoricalCaptureStack extends Stack { + + constructor(scope: Construct, id: string, props: historicalCaptureStackProps) { + super(scope, id, props); + + const ecsCluster = new Cluster(this, "ecsHistoricalCaptureCluster", { + vpc: props.vpc + }); + + const historicalCaptureFargateTask = new FargateTaskDefinition(this, "historicalCaptureFargateTask", { + memoryLimitMiB: 2048, + cpu: 512 + }); + + let logstashConfigData: string = readFileSync(props.logstashConfigFilePath, 'utf8'); + if (props.sourceEndpoint) { + logstashConfigData = logstashConfigData.replace("", props.sourceEndpoint) + } + logstashConfigData = logstashConfigData.replace("", props.targetEndpoint + ":80") + // Temporary measure to allow multi-line env variable + logstashConfigData = logstashConfigData.replace(/(\n)/g, "PUT_LINE") + // Create Historical Capture Container + const historicalCaptureImage = new DockerImageAsset(this, "historicalCaptureImage", { + directory: join(__dirname, "../../..", "docker/logstash-setup") + }); + + const historicalCaptureContainer = historicalCaptureFargateTask.addContainer("historicalCaptureContainer", { + image: ContainerImage.fromDockerImageAsset(historicalCaptureImage), + // Add in region and stage + containerName: "logstash", + environment: {"LOGSTASH_CONFIG": '' + logstashConfigData}, + logging: LogDrivers.awsLogs({ streamPrefix: 'logstash-lg', logRetention: 30 }) + }); + + // Create Fargate Service + const historicalCaptureFargateService = new FargateService(this, "historicalCaptureFargateService", { + cluster: ecsCluster, + taskDefinition: historicalCaptureFargateTask, + desiredCount: 1 + }); + + } +} \ No newline at end of file diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index eff73df51..cb49bcf44 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -7,6 +7,7 @@ import {AnyPrincipal, Effect, PolicyStatement} from "aws-cdk-lib/aws-iam"; import * as defaultValuesJson from "../default-values.json" import {NetworkStack} from "./network-stack"; import {MigrationAssistanceStack} from "./migration-assistance-stack"; +import {HistoricalCaptureStack} from "./historical-capture-stack"; import {MSKUtilityStack} from "./msk-utility-stack"; export interface StackPropsExt extends StackProps { @@ -58,6 +59,9 @@ export class StackComposer { const migrationAssistanceEnabled = getContextForType('migrationAssistanceEnabled', 'boolean') const mskARN = getContextForType('mskARN', 'string') const mskEnablePublicEndpoints = getContextForType('mskEnablePublicEndpoints', 'boolean') + const sourceClusterEndpoint = getContextForType('sourceClusterEndpoint', 'string') + const historicalCaptureEnabled = getContextForType('historicalCaptureEnabled', 'boolean') + const logstashConfigFilePath = getContextForType('logstashConfigFilePath', 'string') if (!domainName) { throw new Error("Domain name is not present and is a required field") @@ -183,6 +187,25 @@ export class StackComposer { this.stacks.push(mskUtilityStack) } + // Currently, placing a requirement on a VPC for a historical capture stack but this can be revisited + // Note: Future work to provide orchestration between historical capture and migration assistance as the current + // state will potentially have both stacks trying to add the same data + if (historicalCaptureEnabled && networkStack) { + const historicalCaptureStack = new HistoricalCaptureStack(scope, "historicalCaptureStack", { + vpc: networkStack.vpc, + logstashConfigFilePath: logstashConfigFilePath, + sourceEndpoint: sourceClusterEndpoint, + targetEndpoint: opensearchStack.domainEndpoint, + stackName: `OSServiceHistoricalCDKStack-${stage}-${region}`, + description: "This stack contains resources to assist migrating historical data to an OpenSearch Service domain", + ...props, + }) + + historicalCaptureStack.addDependency(opensearchStack) + this.stacks.push(historicalCaptureStack) + } + + function getContextForType(optionName: string, expectedType: string): any { const option = scope.node.tryGetContext(optionName)