Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: hard quote firehose streams #286

Merged
merged 8 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 185 additions & 1 deletion bin/stacks/analytics-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ enum RS_DATA_TYPES {

export interface AnalyticsStackProps extends cdk.NestedStackProps {
quoteLambda: aws_lambda_nodejs.NodejsFunction;
hardQuoteLambda: aws_lambda_nodejs.NodejsFunction;
envVars: Record<string, string>;
analyticsStreamArn: string;
stage: string;
chatbotSNSArn?: string;
}

/**
Expand All @@ -59,12 +61,14 @@ export class AnalyticsStack extends cdk.NestedStack {

constructor(scope: Construct, id: string, props: AnalyticsStackProps) {
super(scope, id, props);
const { quoteLambda, analyticsStreamArn, stage } = props;
const { quoteLambda, hardQuoteLambda, analyticsStreamArn, stage, chatbotSNSArn } = props;

/* S3 Initialization */
const rfqRequestBucket = new aws_s3.Bucket(this, 'RfqRequestBucket');
const hardRequestBucket = new aws_s3.Bucket(this, 'HardRequestBucket');
const unifiedRoutingRequestBucket = new aws_s3.Bucket(this, 'UnifiedRoutingRequestBucket');
const rfqResponseBucket = new aws_s3.Bucket(this, 'RfqResponseBucket');
const hardResponseBucket = new aws_s3.Bucket(this, 'HardResponseBucket');
const unifiedRoutingResponseBucket = new aws_s3.Bucket(this, 'UnifiedRoutingResponseBucket');
const fillBucket = new aws_s3.Bucket(this, 'FillBucket');
const ordersBucket = new aws_s3.Bucket(this, 'OrdersBucket');
Expand All @@ -75,6 +79,8 @@ export class AnalyticsStack extends cdk.NestedStack {
const dsRole = aws_iam.Role.fromRoleArn(this, 'DsRole', 'arn:aws:iam::867401673276:user/bq-load-sa');
rfqRequestBucket.grantRead(dsRole);
rfqResponseBucket.grantRead(dsRole);
hardRequestBucket.grantRead(dsRole);
hardResponseBucket.grantRead(dsRole);
unifiedRoutingRequestBucket.grantRead(dsRole);
unifiedRoutingResponseBucket.grantRead(dsRole);
fillBucket.grantRead(dsRole);
Expand Down Expand Up @@ -183,6 +189,28 @@ export class AnalyticsStack extends cdk.NestedStack {
],
});

const hardRequestTable = new aws_rs.Table(this, 'HardRequestTable', {
cluster: rsCluster,
adminUser: creds,
databaseName: RS_DATABASE_NAME,
tableName: 'HardRequests',
tableColumns: [
{ name: 'requestId', dataType: RS_DATA_TYPES.UUID, distKey: true },
{ name: 'quoteId', dataType: RS_DATA_TYPES.UUID },
{ name: 'offerer', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'tokenInChainId', dataType: RS_DATA_TYPES.INTEGER },
{ name: 'tokenOutChainId', dataType: RS_DATA_TYPES.INTEGER },
{ name: 'tokenIn', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'tokenOut', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'amount', dataType: RS_DATA_TYPES.UINT256 },
{ name: 'type', dataType: RS_DATA_TYPES.TRADE_TYPE },
{ name: 'numOutputs', dataType: RS_DATA_TYPES.INTEGER },
{ name: 'cosigner', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'createdAt', dataType: RS_DATA_TYPES.TIMESTAMP },
{ name: 'createdAtMs', dataType: RS_DATA_TYPES.TIMESTAMP_MS },
],
});

const uraResponseTable = new aws_rs.Table(this, 'UnifiedRoutingResponseTable', {
cluster: rsCluster,
adminUser: creds,
Expand Down Expand Up @@ -233,6 +261,27 @@ export class AnalyticsStack extends cdk.NestedStack {
],
});

const hardResponseTable = new aws_rs.Table(this, 'HardResponseTable', {
cluster: rsCluster,
adminUser: creds,
databaseName: RS_DATABASE_NAME,
tableName: 'HardResponses',
tableColumns: [
{ name: 'quoteId', dataType: RS_DATA_TYPES.UUID },
{ name: 'requestId', dataType: RS_DATA_TYPES.UUID, distKey: true },
{ name: 'offerer', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'tokenIn', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'tokenOut', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'amountIn', dataType: RS_DATA_TYPES.UINT256 },
{ name: 'amountOut', dataType: RS_DATA_TYPES.UINT256 },
{ name: 'tokenInChainId', dataType: RS_DATA_TYPES.INTEGER },
{ name: 'tokenOutChainId', dataType: RS_DATA_TYPES.INTEGER },
{ name: 'filler', dataType: RS_DATA_TYPES.ADDRESS },
{ name: 'createdAt', dataType: RS_DATA_TYPES.TIMESTAMP },
{ name: 'createdAtMs', dataType: RS_DATA_TYPES.TIMESTAMP_MS },
],
});

const archivedOrdersTable = new aws_rs.Table(this, 'archivedOrdersTable', {
cluster: rsCluster,
adminUser: creds,
Expand Down Expand Up @@ -470,6 +519,61 @@ export class AnalyticsStack extends cdk.NestedStack {
],
})
);
let chatBotTopic: cdk.aws_sns.ITopic;
if (chatbotSNSArn) {
chatBotTopic = cdk.aws_sns.Topic.fromTopicArn(this, 'ChatbotTopic', chatbotSNSArn);
}

/* log processor alarms */
[quoteProcessorLambda, fillEventProcessorLambda, postOrderProcessorLambda, botOrderEventsProcessorLambda].forEach(
(lambda) => {
const successRateSev2Name = `${lambda.node.id}-SEV2-SuccessRate`;
const successRateSev3Name = `${lambda.node.id}-SEV3-SuccessRate`;

const errors = lambda.metricErrors({
period: cdk.Duration.minutes(5),
statistic: cdk.aws_cloudwatch.Stats.SUM,
label: `${lambda.node.id} Errors`,
});

const invocations = lambda.metricInvocations({
period: cdk.Duration.minutes(5),
statistic: cdk.aws_cloudwatch.Stats.SUM,
label: `${lambda.node.id} Invocations`,
});

const successRate = new cdk.aws_cloudwatch.MathExpression({
expression: '100 - 100 * errors / MAX([errors, invocations])',
usingMetrics: {
errors,
invocations,
},
label: `${lambda.node.id} Success Rate`,
});

const successRateSev2 = new cdk.aws_cloudwatch.Alarm(this, successRateSev2Name, {
metric: successRate,
threshold: 60,
evaluationPeriods: 1,
comparisonOperator: cdk.aws_cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
actionsEnabled: true,
});

const successRateSev3 = new cdk.aws_cloudwatch.Alarm(this, successRateSev3Name, {
metric: successRate,
threshold: 90,
evaluationPeriods: 1,
comparisonOperator: cdk.aws_cloudwatch.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
actionsEnabled: true,
});

if (chatBotTopic) {
successRateSev2.addAlarmAction(new cdk.aws_cloudwatch_actions.SnsAction(chatBotTopic));
successRateSev3.addAlarmAction(new cdk.aws_cloudwatch_actions.SnsAction(chatBotTopic));
}
}
);

// CDK doesn't have this implemented yet, so have to use the CloudFormation resource (lower level of abstraction)
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisfirehose-deliverystream.html
const uraRequestStream = new aws_firehose.CfnDeliveryStream(this, 'uraRequestStream', {
Expand Down Expand Up @@ -538,6 +642,72 @@ export class AnalyticsStack extends cdk.NestedStack {
},
});

const hardRequestFirehoseStream = new aws_firehose.CfnDeliveryStream(this, 'HardRequestStream', {
redshiftDestinationConfiguration: {
clusterJdbcurl: `jdbc:redshift://${rsCluster.clusterEndpoint.hostname}:${rsCluster.clusterEndpoint.port}/${RS_DATABASE_NAME}`,
username: 'admin',
password: creds.secretValueFromJson('password').toString(),
s3Configuration: {
bucketArn: rfqRequestBucket.bucketArn,
roleArn: firehoseRole.roleArn,
compressionFormat: 'UNCOMPRESSED',
},
roleArn: firehoseRole.roleArn,
copyCommand: {
copyOptions: "JSON 'auto ignorecase'",
dataTableName: hardRequestTable.tableName,
dataTableColumns: hardRequestTable.tableColumns.map((column) => column.name).toString(),
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'Lambda',
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: quoteProcessorLambda.functionArn,
},
],
},
],
},
},
});

const hardResponseFirehoseStream = new aws_firehose.CfnDeliveryStream(this, 'HardResponseStream', {
redshiftDestinationConfiguration: {
clusterJdbcurl: `jdbc:redshift://${rsCluster.clusterEndpoint.hostname}:${rsCluster.clusterEndpoint.port}/${RS_DATABASE_NAME}`,
username: 'admin',
password: creds.secretValueFromJson('password').toString(),
s3Configuration: {
bucketArn: rfqRequestBucket.bucketArn,
roleArn: firehoseRole.roleArn,
compressionFormat: 'UNCOMPRESSED',
},
roleArn: firehoseRole.roleArn,
copyCommand: {
copyOptions: "JSON 'auto ignorecase'",
dataTableName: hardResponseTable.tableName,
dataTableColumns: hardResponseTable.tableColumns.map((column) => column.name).toString(),
},
processingConfiguration: {
enabled: true,
processors: [
{
type: 'Lambda',
parameters: [
{
parameterName: 'LambdaArn',
parameterValue: quoteProcessorLambda.functionArn,
},
],
},
],
},
},
});

const uraResponseStream = new aws_firehose.CfnDeliveryStream(this, 'UnifiedRoutingResponseStream', {
redshiftDestinationConfiguration: {
clusterJdbcurl: `jdbc:redshift://${rsCluster.clusterEndpoint.hostname}:${rsCluster.clusterEndpoint.port}/${RS_DATABASE_NAME}`,
Expand Down Expand Up @@ -956,6 +1126,20 @@ export class AnalyticsStack extends cdk.NestedStack {
roleArn: subscriptionRole.roleArn,
});

new aws_logs.CfnSubscriptionFilter(this, 'HardRequestSub', {
destinationArn: hardRequestFirehoseStream.attrArn,
filterPattern: '{ $.eventType = "HardRequest" }',
logGroupName: hardQuoteLambda.logGroup.logGroupName,
roleArn: subscriptionRole.roleArn,
});

new aws_logs.CfnSubscriptionFilter(this, 'HardResponseSub', {
destinationArn: hardResponseFirehoseStream.attrArn,
filterPattern: '{ $.eventType = "HardResponse" }',
logGroupName: hardQuoteLambda.logGroup.logGroupName,
roleArn: subscriptionRole.roleArn,
});

new CfnOutput(this, 'fillDestinationName', {
value: fillDestination.attrArn,
});
Expand Down
10 changes: 6 additions & 4 deletions bin/stacks/api-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import * as aws_lambda from 'aws-cdk-lib/aws-lambda';
import * as aws_lambda_nodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as aws_logs from 'aws-cdk-lib/aws-logs';
import * as aws_waf from 'aws-cdk-lib/aws-wafv2';
import { KmsStack } from './kms-stack'
import { Construct } from 'constructs';
import * as path from 'path';
import { KmsStack } from './kms-stack';

import { Metric } from '../../lib/entities';
import { STAGE } from '../../lib/util/stage';
Expand Down Expand Up @@ -156,7 +156,7 @@ export class APIStack extends cdk.Stack {
});

// KMS initialization
const kmsStack = new KmsStack(this, `${SERVICE_NAME}HardQuoteCosignerKey-1`)
const kmsStack = new KmsStack(this, `${SERVICE_NAME}HardQuoteCosignerKey-1`);

/*
* Firehose Initialization
Expand Down Expand Up @@ -184,7 +184,7 @@ export class APIStack extends cdk.Stack {
actions: ['kms:GetPublicKey', 'kms:Sign'],
effect: aws_iam.Effect.ALLOW,
})
)
);

lambdaRole.addToPolicy(
new aws_iam.PolicyStatement({
Expand Down Expand Up @@ -307,7 +307,7 @@ export class APIStack extends cdk.Stack {
sourceMap: true,
},
environment: {
VERSION: '2',
VERSION: '3',
NODE_OPTIONS: '--enable-source-maps',
...props.envVars,
stage,
Expand Down Expand Up @@ -458,9 +458,11 @@ export class APIStack extends cdk.Stack {
*/
const analyticsStack = new AnalyticsStack(this, 'AnalyticsStack', {
quoteLambda,
hardQuoteLambda,
envVars: props.envVars,
analyticsStreamArn: firehoseStack.analyticsStreamArn,
stage,
chatbotSNSArn,
});

const cronStack = new CronStack(this, 'CronStack', {
Expand Down
2 changes: 1 addition & 1 deletion lib/entities/HardQuoteRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { TradeType } from '@uniswap/sdk-core';
import { UnsignedV2DutchOrder } from '@uniswap/uniswapx-sdk';
import { BigNumber, ethers, utils } from 'ethers';

import { HardQuoteRequestBody } from '../handlers/hard-quote';
import { QuoteRequest, QuoteRequestDataJSON } from '.';
import { HardQuoteRequestBody } from '../handlers/hard-quote';

export class HardQuoteRequest {
public order: UnsignedV2DutchOrder;
Expand Down
21 changes: 14 additions & 7 deletions lib/handlers/hard-quote/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,28 @@ export class QuoteHandler extends APIGLambdaHandler<
throw new UnknownOrderCosignerError();
}

// TODO: finalize on v2 metrics logging
// Instead of decoding the order, we rely on frontend passing in the requestId
// from indicative quote
log.info({
eventType: 'HardQuoteRequest',
eventType: 'HardRequest',
body: {
requestId: request.requestId,
quoteId: request.quoteId,
ConjunctiveNormalForm marked this conversation as resolved.
Show resolved Hide resolved
tokenInChainId: request.tokenInChainId,
tokenOutChainId: request.tokenInChainId,
encoded: requestBody.encodedInnerOrder,
sig: requestBody.innerSig,
tokenOutChainId: request.tokenOutChainId,
tokenIn: request.tokenIn,
tokenOut: request.tokenOut,
offerer: request.swapper,
amount: request.amount.toString(),
type: TradeType[request.type],
numOutputs: request.numOutputs,
cosigner: request.order.info.cosigner,
createdAt: timestampInMstoSeconds(start),
createdAtMs: start.toString(),
},
});

const bestQuote = await getBestQuote(quoters, request.toQuoteRequest(), log, metric);
const bestQuote = await getBestQuote(quoters, request.toQuoteRequest(), log, metric, 'HardResponse');
if (!bestQuote) {
metric.putMetric(Metric.HARD_QUOTE_404, 1, MetricLoggerUnit.Count);
throw new NoQuotesAvailable();
Expand All @@ -75,7 +82,7 @@ export class QuoteHandler extends APIGLambdaHandler<
const cosignedOrder = CosignedV2DutchOrder.fromUnsignedOrder(request.order, cosignerData, cosignature);

try {
await orderServiceProvider.postOrder(cosignedOrder, request.innerSig, request.quoteId);
await orderServiceProvider.postOrder(cosignedOrder, request.innerSig, bestQuote.quoteId);
} catch (e) {
metric.putMetric(Metric.HARD_QUOTE_400, 1, MetricLoggerUnit.Count);
throw new OrderPostError();
Expand Down
7 changes: 5 additions & 2 deletions lib/handlers/quote/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { APIHandleRequestParams, ErrorResponse, Response } from '../base/api-han
import { ContainerInjected, RequestInjected } from './injector';
import { PostQuoteRequestBody, PostQuoteRequestBodyJoi, PostQuoteResponse, URAResponseJoi } from './schema';

export type EventType = 'QuoteResponse' | 'HardResponse';

export class QuoteHandler extends APIGLambdaHandler<
ContainerInjected,
RequestInjected,
Expand Down Expand Up @@ -83,7 +85,8 @@ export async function getBestQuote(
quoters: Quoter[],
quoteRequest: QuoteRequest,
log: Logger,
metric: IMetric
metric: IMetric,
eventType: EventType = 'QuoteResponse'
): Promise<QuoteResponse | null> {
const responses: QuoteResponse[] = (await Promise.all(quoters.map((q) => q.quote(quoteRequest)))).flat();
switch (responses.length) {
Expand All @@ -107,7 +110,7 @@ export async function getBestQuote(
// return the response with the highest amountOut value
return responses.reduce((bestQuote: QuoteResponse | null, quote: QuoteResponse) => {
log.info({
eventType: 'QuoteResponse',
eventType: eventType,
body: { ...quote.toLog(), offerer: quote.swapper },
});

Expand Down
2 changes: 1 addition & 1 deletion test/handlers/hard-quote/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
} from '../../../lib/handlers/hard-quote';
import { getCosignerData } from '../../../lib/handlers/hard-quote/handler';
import { MockOrderServiceProvider } from '../../../lib/providers';
import { MOCK_FILLER_ADDRESS, MockQuoter, Quoter } from '../../../lib/quoters';
import { MockQuoter, MOCK_FILLER_ADDRESS, Quoter } from '../../../lib/quoters';

jest.mock('axios');

Expand Down
Loading