Skip to content

Commit

Permalink
Migrate Redis Caching to Coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Sep 27, 2023
1 parent e7d31bd commit 6d6b20d
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 201 deletions.
17 changes: 17 additions & 0 deletions indexer/queryapi_coordinator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use actix_web::cookie::Expiration;
use cached::SizedCache;
use futures::stream::{self, StreamExt};
use near_jsonrpc_client::JsonRpcClient;
Expand Down Expand Up @@ -27,6 +28,7 @@ pub(crate) const INTERVAL: std::time::Duration = std::time::Duration::from_milli
pub(crate) const MAX_DELAY_TIME: std::time::Duration = std::time::Duration::from_millis(4000);
pub(crate) const RETRY_COUNT: usize = 2;


type SharedIndexerRegistry = std::sync::Arc<Mutex<IndexerRegistry>>;

#[derive(Debug, Default, Clone, Copy)]
Expand Down Expand Up @@ -146,6 +148,21 @@ async fn handle_streamer_message(

let block_height: BlockHeight = context.streamer_message.block.header.height;

// Cache streamer message block and shards for use in real time processing
storage::setEx(
context.redis_connection_manager,
format!(
"{}{}{}:{}",
storage::STREAMER_MESSAGE_HASH_KEY_BASE,
storage::LAKE_BUCKET_PREFIX,
context.chain_id,
block_height
),
180,
serde_json::to_string(&context.streamer_message)?,
)
.await?;

let spawned_indexers = indexer_registry::index_registry_changes(
block_height,
&mut indexer_registry_locked,
Expand Down
18 changes: 18 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};

const STORAGE: &str = "storage_alertexer";

pub const LAKE_BUCKET_PREFIX: &str = "near-lake-data-";
pub const STREAMS_SET_KEY: &str = "streams";
pub const STREAMER_MESSAGE_HASH_KEY_BASE: &str = "streamer:message:cache:";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
Expand Down Expand Up @@ -57,6 +59,22 @@ pub async fn set(
Ok(())
}

pub async fn setEx(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
expiration: usize,
value: impl ToRedisArgs + std::fmt::Debug,
) -> anyhow::Result<()> {
redis::cmd("SETEX")
.arg(&key)
.arg(expiration)
.arg(&value)
.query_async(&mut redis_connection_manager.clone())
.await?;
tracing::debug!(target: STORAGE, "SETEX: {:?}: {:?} with expiration {:?}s", key, value, expiration);
Ok(())
}

pub async fn get<V: FromRedisValue + std::fmt::Debug>(
redis_connection_manager: &ConnectionManager,
key: impl ToRedisArgs + std::fmt::Debug,
Expand Down
226 changes: 83 additions & 143 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ CREATE TABLE
});

const transparentRedis = {
getStreamerBlockFromCache: jest.fn(),
addStreamerBlockToCache: jest.fn(),
getStreamerShardFromCache: jest.fn(),
addStreamerShardToCache: jest.fn()
getStreamerMessageFromCache: jest.fn()
} as unknown as RedisClient;

beforeEach(() => {
Expand All @@ -191,16 +188,20 @@ CREATE TABLE
}));
const blockHeight = 456;
const mockData = jest.fn().mockResolvedValue(
JSON.stringify({
chunks: [],
header: {
height: blockHeight
JSON.stringify(
{
block: {
chunks: [],
header: {
height: blockHeight
}
},
shards: {}
}
})
)
);
const mockRedis = {
getStreamerShardFromCache: mockData,
getStreamerBlockFromCache: mockData
getStreamerMessageFromCache: mockData
} as unknown as RedisClient;

const indexer = new Indexer('mainnet', { fetch: mockFetch as unknown as typeof fetch, redisClient: mockRedis });
Expand All @@ -218,30 +219,7 @@ CREATE TABLE
expect(mockFetch.mock.calls).toMatchSnapshot();
});

test('Indexer.fetchBlock() should fetch a block from cache', async () => {
const author = 'dokiacapital.poolv1.near';
const mockData = jest.fn().mockResolvedValue(
JSON.stringify({
author
})
);
const mockRedis = {
getStreamerBlockFromCache: mockData
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { redisClient: mockRedis });

const blockHeight = 84333960;
const block = await indexer.fetchBlockPromise(blockHeight, false);

expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(1);
expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledWith(
'near-lake-data-mainnet',
`${blockHeight.toString().padStart(12, '0')}/block.json`
);
expect(block.author).toEqual(author);
});

test('Indexer.fetchBlock() should fetch a block from the S3 after cache miss', async () => {
test('Indexer.fetchBlock() should fetch a block from S3', async () => {
const author = 'dokiacapital.poolv1.near';
const mockData = JSON.stringify({
author
Expand All @@ -258,69 +236,18 @@ CREATE TABLE
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis });

const blockHeight = 84333960;
const block = await indexer.fetchBlockPromise(blockHeight, false);
const block = await indexer.fetchBlockPromise(blockHeight);
const params = {
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/block.json`
};

expect(mockS3.send).toHaveBeenCalledTimes(1);
expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params)));
expect(transparentRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(1);
expect(transparentRedis.addStreamerBlockToCache).toHaveBeenCalledWith(params.Bucket, params.Key, mockData);
expect(block.author).toEqual(author);
});

test('Indexer.fetchBlock() should fetch a block not from cache but from the S3 if historical', async () => {
const author = 'dokiacapital.poolv1.near';
const mockSend = jest.fn().mockResolvedValue({
Body: {
transformToString: () => JSON.stringify({
author
})
}
});
const mockS3 = {
send: mockSend,
} as unknown as S3Client;
const mockRedis = {
getStreamerBlockFromCache: jest.fn(),
addStreamerBlockToCache: jest.fn()
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis });

const blockHeight = 84333960;
const block = await indexer.fetchBlockPromise(blockHeight, true);

expect(mockS3.send).toHaveBeenCalledTimes(1);
expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand({
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/block.json`
})));
expect(mockRedis.getStreamerBlockFromCache).toHaveBeenCalledTimes(0);
expect(mockRedis.addStreamerBlockToCache).toHaveBeenCalledTimes(0);
expect(block.author).toEqual(author);
});

test('Indexer.fetchShard() should fetch the steamer message from cache', async () => {
const mockData = jest.fn().mockResolvedValue(JSON.stringify({}));
const mockRedis = {
getStreamerShardFromCache: mockData
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { redisClient: mockRedis });

const blockHeight = 82699904;
const shard = 0;
await indexer.fetchShardPromise(blockHeight, shard, false);

expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(1);
expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledWith(
'near-lake-data-mainnet',
`${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json`
);
});

test('Indexer.fetchShard() should fetch the steamer message from S3 after cache miss', async () => {
test('Indexer.fetchShard() should fetch a shard from S3', async () => {
const mockData = JSON.stringify({});
const mockSend = jest.fn().mockResolvedValue({
Body: {
Expand All @@ -330,88 +257,53 @@ CREATE TABLE
const mockS3 = {
send: mockSend,
} as unknown as S3Client;
const mockRedis = {
getStreamerShardFromCache: jest.fn(),
addStreamerShardToCache: jest.fn()
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis });
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: transparentRedis });

const blockHeight = 82699904;
const shard = 0;
const params = {
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json`
};
await indexer.fetchShardPromise(blockHeight, shard, false);
await indexer.fetchShardPromise(blockHeight, shard);

expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand(params)));
expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(1);
expect(mockRedis.addStreamerShardToCache).toHaveBeenCalledWith(params.Bucket, params.Key, mockData);
});

test('Indexer.fetchShard() should fetch the steamer message not from cache but from S3 if historical', async () => {
const mockSend = jest.fn().mockResolvedValue({
Body: {
transformToString: () => JSON.stringify({})
}
});
const mockS3 = {
send: mockSend,
} as unknown as S3Client;
const mockRedis = {
getStreamerShardFromCache: jest.fn(),
addStreamerShardToCache: jest.fn()
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis });

const blockHeight = 82699904;
const shard = 0;
await indexer.fetchShardPromise(blockHeight, shard, true);

expect(mockS3.send).toHaveBeenCalledTimes(1);
expect(JSON.stringify(mockSend.mock.calls[0][0])).toMatch(JSON.stringify(new GetObjectCommand({
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_${shard}.json`
})));
expect(mockRedis.getStreamerShardFromCache).toHaveBeenCalledTimes(0);
expect(mockRedis.addStreamerShardToCache).toHaveBeenCalledTimes(0);
});

test('Indexer.fetchStreamerMessage() should fetch the block/shards from cache and construct the streamer message', async () => {
test('Indexer.fetchStreamerMessage() should fetch the message from cache and use it directly', async () => {
const blockHeight = 85233529;
const blockHash = 'xyz';
const getBlockFromCache = jest.fn()
.mockReturnValueOnce(JSON.stringify({
chunks: [0],
header: {
height: blockHeight,
hash: blockHash,
const getMessageFromCache = jest.fn()
.mockReturnValueOnce(JSON.stringify(
{
block: {
chunks: [0],
header: {
height: blockHeight,
hash: blockHash,
}
},
shards: {}
}
}));
const getShardFromCache = jest.fn().mockReturnValue(JSON.stringify({}));
));
const mockRedis = {
getStreamerBlockFromCache: getBlockFromCache,
getStreamerShardFromCache: getShardFromCache
getStreamerMessageFromCache: getMessageFromCache
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { redisClient: mockRedis });

const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, false);

expect(getBlockFromCache).toHaveBeenCalledTimes(1);
expect(getShardFromCache).toHaveBeenCalledTimes(4);
expect(JSON.stringify(getBlockFromCache.mock.calls[0])).toEqual(
`["near-lake-data-mainnet","${blockHeight.toString().padStart(12, '0')}/block.json"]`
);
expect(JSON.stringify(getShardFromCache.mock.calls[1])).toEqual(
`["near-lake-data-mainnet","${blockHeight.toString().padStart(12, '0')}/shard_1.json"]`,
expect(getMessageFromCache).toHaveBeenCalledTimes(1);
expect(JSON.stringify(getMessageFromCache.mock.calls[0])).toEqual(
`["near-lake-data-mainnet",${blockHeight}]`
);
const block = Block.fromStreamerMessage(streamerMessage);

expect(block.blockHeight).toEqual(blockHeight);
expect(block.blockHash).toEqual(blockHash);
});

test('Indexer.fetchStreamerMessage() should fetch the block/shards from S3 and construct the streamer message', async () => {
test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 upon cache miss', async () => {
const blockHeight = 85233529;
const blockHash = 'xyz';
const mockSend = jest.fn()
Expand Down Expand Up @@ -447,6 +339,54 @@ CREATE TABLE
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json`
})));
expect(transparentRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(1);

const block = Block.fromStreamerMessage(streamerMessage);

expect(block.blockHeight).toEqual(blockHeight);
expect(block.blockHash).toEqual(blockHash);
});

test('Indexer.fetchStreamerMessage() should fetch the block and shards from S3 and not cache and construct the streamer message if historical', async () => {
const blockHeight = 85233529;
const blockHash = 'xyz';
const mockSend = jest.fn()
.mockReturnValueOnce({ // block
Body: {
transformToString: () => JSON.stringify({
chunks: [0],
header: {
height: blockHeight,
hash: blockHash,
}
})
}
})
.mockReturnValue({ // shard
Body: {
transformToString: () => JSON.stringify({})
}
});
const mockS3 = {
send: mockSend,
} as unknown as S3Client;
const mockRedis = {
getStreamerMessageFromCache: jest.fn()
} as unknown as RedisClient;
const indexer = new Indexer('mainnet', { s3: mockS3, redisClient: mockRedis });

const streamerMessage = await indexer.fetchStreamerMessage(blockHeight, true);

expect(mockSend).toHaveBeenCalledTimes(5);
expect(JSON.stringify(mockSend.mock.calls[0][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/block.json`
})));
expect(JSON.stringify(mockSend.mock.calls[1][0])).toStrictEqual(JSON.stringify(new GetObjectCommand({
Bucket: 'near-lake-data-mainnet',
Key: `${blockHeight.toString().padStart(12, '0')}/shard_0.json`
})));
expect(mockRedis.getStreamerMessageFromCache).toHaveBeenCalledTimes(0);

const block = Block.fromStreamerMessage(streamerMessage);

Expand Down
Loading

0 comments on commit 6d6b20d

Please sign in to comment.