Skip to content

Commit

Permalink
Merge pull request #955 from near/main
Browse files Browse the repository at this point in the history
Prod Release 31/07/2024
  • Loading branch information
darunrs authored Jul 31, 2024
2 parents 8034220 + 73d1fdd commit c8b17d0
Show file tree
Hide file tree
Showing 13 changed files with 664 additions and 383 deletions.
8 changes: 6 additions & 2 deletions coordinator/src/handlers/block_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::indexer_config::IndexerConfig;
use crate::redis::{KeyProvider, RedisClient};
use crate::utils::exponential_retry;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Clone)]
pub struct BlockStreamsHandler {
client: BlockStreamerClient<Channel>,
Expand Down Expand Up @@ -258,11 +260,13 @@ impl BlockStreamsHandler {
tracing::info!(stale, stalled, "Restarting stalled block stream");
}
} else {
tracing::info!("Restarting stalled block stream");
tracing::info!(
"Restarting stalled block stream after {RESTART_TIMEOUT_SECONDS} seconds"
);
}

self.stop(block_stream.stream_id.clone()).await?;

tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
let height = self.get_continuation_block_height(config).await?;
self.start(height, config).await?;

Expand Down
2 changes: 1 addition & 1 deletion coordinator/src/handlers/data_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::indexer_config::IndexerConfig;

type TaskId = String;

const TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes
const TASK_TIMEOUT_SECONDS: u64 = 600; // 10 minutes

#[derive(Clone)]
pub struct DataLayerHandler {
Expand Down
5 changes: 4 additions & 1 deletion coordinator/src/handlers/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use crate::indexer_config::IndexerConfig;
use crate::redis::KeyProvider;
use crate::utils::exponential_retry;

const RESTART_TIMEOUT_SECONDS: u64 = 600;

#[derive(Clone)]
pub struct ExecutorsHandler {
client: RunnerClient<Channel>,
Expand Down Expand Up @@ -136,9 +138,10 @@ impl ExecutorsHandler {
}
}

tracing::info!("Restarting stalled executor");
tracing::info!("Restarting stalled executor after {RESTART_TIMEOUT_SECONDS} seconds");

self.stop(executor.executor_id).await?;
tokio::time::sleep(tokio::time::Duration::from_secs(RESTART_TIMEOUT_SECONDS)).await;
self.start(config).await?;

Ok(())
Expand Down
64 changes: 34 additions & 30 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,13 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "repairing", skip_all)]
async fn handle_repairing(
&self,
config: Option<&IndexerConfig>,
_config: Option<&IndexerConfig>,
_state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
return LifecycleState::Deleting;
}
// TODO: Re-enable auto deprovision once guard rails in place
// if config.is_none() {
// return LifecycleState::Deleting;
// }

// TODO Add more robust error handling, for now just stop
LifecycleState::Repairing
Expand All @@ -228,33 +229,36 @@ impl<'a> LifecycleManager<'a> {
warn!(?error, "Failed to stop executor");
}

if self.state_manager.delete_state(state).await.is_err() {
// Retry
return LifecycleState::Deleting;
}

info!("Clearing block stream");

if self
.redis_client
.del(state.get_redis_stream_key())
.await
.is_err()
{
// Retry
return LifecycleState::Deleting;
}

if self
.data_layer_handler
.ensure_deprovisioned(state.account_id.clone(), state.function_name.clone())
.await
.is_err()
{
return LifecycleState::Deleted;
}
tracing::error!("Temporarily preventing indexer deprovision due to service instability");
LifecycleState::Repairing

LifecycleState::Deleted
// if self.state_manager.delete_state(state).await.is_err() {
// // Retry
// return LifecycleState::Deleting;
// }
//
// info!("Clearing block stream");
//
// if self
// .redis_client
// .del(state.get_redis_stream_key())
// .await
// .is_err()
// {
// // Retry
// return LifecycleState::Deleting;
// }
//
// if self
// .data_layer_handler
// .ensure_deprovisioned(state.account_id.clone(), state.function_name.clone())
// .await
// .is_err()
// {
// return LifecycleState::Deleted;
// }
//
// LifecycleState::Deleted
}

#[tracing::instrument(
Expand Down
60 changes: 58 additions & 2 deletions frontend/src/components/Editor/EditorComponents/Editor.tsx
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { request, useInitialPayload } from 'near-social-bridge';
import type { ReactElement } from 'react';
import type { Method, Event } from '@/pages/api/generateCode';

import React, { useContext, useEffect, useMemo, useRef, useState } from 'react';
import { Alert } from 'react-bootstrap';
import { useDebouncedCallback } from 'use-debounce';
Expand Down Expand Up @@ -33,6 +35,15 @@ const SCHEMA_TAB_NAME = 'schema.sql';
const originalSQLCode = formatSQL(defaultSchema);
const originalIndexingCode = formatIndexingCode(defaultCode);
const pgSchemaTypeGen = new PgSchemaTypeGen();
interface WizardResponse {
wizardContractFilter: string;
wizardMethods: Method[];
wizardEvents?: Event[];
}

const fetchWizardData = (req: string): Promise<WizardResponse> => {
return request<WizardResponse>('launchpad-create-indexer', req);
};

const Editor: React.FC = (): ReactElement => {
const { indexerDetails, isCreateNewIndexer } = useContext(IndexerDetailsContext);
Expand Down Expand Up @@ -105,6 +116,51 @@ const Editor: React.FC = (): ReactElement => {
return;
};

const generateCode = async (contractFilter: string, selectedMethods: Method[], selectedEvents?: Event[]) => {
try {
const response = await fetch('/api/generateCode', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ contractFilter, selectedMethods, selectedEvents }),
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const data = await response.json();

if (!data.hasOwnProperty('jsCode') || !data.hasOwnProperty('sqlCode')) {
throw new Error('No code was returned from the server');
}

return data;
} catch (error) {
throw error;
}
};

useEffect(() => {
const fetchData = async () => {
try {
const response = await fetchWizardData('');
const { wizardContractFilter, wizardMethods } = response;

if (wizardContractFilter === 'noFilter') {
return;
}

const codeResponse = await generateCode(wizardContractFilter, wizardMethods);
setIndexingCode(codeResponse.jsCode);
setSchema(codeResponse.sqlCode);
} catch (error: unknown) {
//todo: figure out best course of action for user if api fails
console.error(error);
}
};
fetchData();
}, []);

useEffect(() => {
//* Load saved code from local storage if it exists else load code from context
const savedCode = storageManager?.getIndexerCode();
Expand All @@ -117,7 +173,6 @@ const Editor: React.FC = (): ReactElement => {
//* Load saved cursor position from local storage if it exists else set cursor to start
const savedCursorPosition = storageManager?.getCursorPosition();
if (savedCursorPosition) setCursorPosition(savedCursorPosition);

if (monacoEditorRef.current && fileName === INDEXER_TAB_NAME) {
monacoEditorRef.current.setPosition(savedCursorPosition || { lineNumber: 1, column: 1 });
monacoEditorRef.current.focus();
Expand Down Expand Up @@ -282,8 +337,9 @@ const Editor: React.FC = (): ReactElement => {
`${primitives}}`,
'file:///node_modules/@near-lake/primitives/index.d.ts',
);

monaco.languages.typescript.typescriptDefaults.setCompilerOptions({
target: monaco.languages.typescript.ScriptTarget.ES2016,
target: monaco.languages.typescript.ScriptTarget.ES2018,
allowNonTsExtensions: true,
moduleResolution: monaco.languages.typescript.ModuleResolutionKind.NodeJs,
});
Expand Down
161 changes: 161 additions & 0 deletions frontend/src/pages/api/WizardCodeGenerator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import type { Schema } from 'genson-js/dist/types';

import type { Event, Method } from '@/pages/api/generateCode';

export interface GeneratedCode {
jsCode: string;
sqlCode: string;
}

interface Column {
name: string;
sql: string;
}

function sanitizeTableName(tableName: string): string {
// Convert to PascalCase
let pascalCaseTableName = tableName
// Replace special characters with underscores
.replace(/[^a-zA-Z0-9_]/g, '_')
// Makes first letter and any letters following an underscore upper case
.replace(/^([a-zA-Z])|_([a-zA-Z])/g, (match: string) => match.toUpperCase())
// Removes all underscores
.replace(/_/g, '');

// Add underscore if first character is a number
if (/^[0-9]/.test(pascalCaseTableName)) {
pascalCaseTableName = '_' + pascalCaseTableName;
}

return pascalCaseTableName;
}

const createColumn = (columnName: string, schema: Schema): Column => {
let type: string;
switch (schema.type) {
case 'string':
type = 'TEXT';
break;
case 'integer':
type = 'INT';
break;
case 'number':
type = 'FLOAT';
break;
case 'boolean':
type = 'BOOLEAN';
break;
case 'array':
type = 'TEXT[]';
break;
case 'object':
type = 'JSONB';
break;
default:
type = 'TEXT';
}
return { name: columnName, sql: `"${columnName}" ${type}` };
};

export class WizardCodeGenerator {
constructor(private contractFilter: string, private selectedMethods: Method[], private selectedEvents?: Event[]) {}

private getColumns(method: Method): Column[] {
if (!method.schema.properties) {
return [];
}
return Object.entries(method.schema.properties).map(([k, v]) => createColumn(k, v));
}

private getTableName(method: Method): { contextDbName: string; tableName: string } {
const tableName = `calls_to_${method.method_name}`;
return { tableName, contextDbName: sanitizeTableName(tableName) };
}

private generateSQLForMethod(method: Method): string {
if (!method.schema.properties) {
return '';
}
const { tableName } = this.getTableName(method);
const columns = this.getColumns(method);

// TODO: add NULLABLE for optional fields
return `
CREATE TABLE ${tableName}
(
"block_height" INT,
"block_timestamp" TIMESTAMP,
"signer_id" TEXT,
"receipt_id" TEXT,
${columns.map((c) => ` ${c.sql},`).join('\n')}
PRIMARY KEY ("receipt_id")
);
-- Consider adding an index (https://www.postgresql.org/docs/14/sql-createindex.html) on a frequently queried column, e.g.:
${columns.map((c) => `-- CREATE INDEX "${tableName}_${c.name}_key" ON "${tableName}" ("${c.name}" ASC);`).join('\n')}
`;
}

private generateJSForMethod(method: Method): string {
const columnNames = this.getColumns(method).map((c) => c.name);
const primaryKeys = ['receipt_id'];
const { contextDbName } = this.getTableName(method);
const methodName = method.method_name;
return `
// Extract and upsert ${methodName} function calls
const callsTo${methodName} = extractFunctionCallEntity("${this.contractFilter}", "${methodName}", ${JSON.stringify(
columnNames,
)});
try {
await context.db.${contextDbName}.upsert(callsTo${methodName}, ${JSON.stringify(primaryKeys)}, ${JSON.stringify(
columnNames,
)});
} catch(e) {
context.error(\`Unable to upsert ${methodName} function calls: \$\{e.message\}\`);
}
`;
}

private generateJSCode(): string {
return `
function extractFunctionCallEntity(contractFilter, methodName, argsToInclude) {
const jsonify = (v) => {
if ((typeof v === "object" && v !== null) || Array.isArray(v))
return JSON.stringify(v);
return v;
};
return block
.functionCallsToReceiver(contractFilter, methodName)
.map((fc) => {
let fcArgs = {};
try {
fcArgs = fc.argsAsJSON();
} catch (e) {
console.log(
\`Failed to parse args \$\{fc.args\} into JSON for \$\{fc.methodName\}\`
);
}
const extractedArgs = Object.fromEntries(
Object.entries(fcArgs)
.filter(([k]) => argsToInclude.includes(k))
.map(([k, v]) => [k, jsonify(v)])
);
return {
block_height: block.blockHeight,
block_timestamp: block.timestamp,
signer_id: fc.signerId,
receipt_id: fc.receiptId,
...extractedArgs,
};
});
}
${this.selectedMethods.map((m) => this.generateJSForMethod(m)).join('\n')}
`;
}

public generateCode(): GeneratedCode {
const jsCode = this.generateJSCode();
const sqlCode = this.selectedMethods.map((m) => this.generateSQLForMethod(m)).join('\n');
return { jsCode, sqlCode };
}
}
Loading

0 comments on commit c8b17d0

Please sign in to comment.