Skip to content

Commit

Permalink
support redis74 hash with ttl (#71)
Browse files Browse the repository at this point in the history
* support hash listpack ex

Signed-off-by: catcherwong <[email protected]>

* support redis 74 hash metadata

Signed-off-by: catcherwong <[email protected]>

* read hash metadata with encoding ht

Signed-off-by: catcherwong <[email protected]>

---------

Signed-off-by: catcherwong <[email protected]>
  • Loading branch information
catcherwong authored Oct 2, 2024
1 parent d7ba809 commit 3427613
Show file tree
Hide file tree
Showing 13 changed files with 199 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/RDBCli/Callbacks/KeysOnlyCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/RDBCli/Callbacks/MemoryCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
var lenOfElem = ElementLength(field) + ElementLength(value);
if (lenOfElem > _currentRecord.LenOfLargestElem)
Expand Down
25 changes: 22 additions & 3 deletions src/RDBParser/BinaryReaderRDBParser.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ public void ReadObject(BinaryReader br, byte[] key, int encType, long expiry, In
{
ReadStream(br, encType);
}
else if (encType == Constant.DataType.HASH_LISTPACK)
else if (encType == Constant.DataType.HASH_LISTPACK
|| encType == Constant.DataType.HASH_LISTPACK_EX
|| encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA)
{
ReadHashFromListPack(br);
ReadHashFromListPack(br, encType);
}
else if (encType == Constant.DataType.ZSET_LISTPACK)
{
Expand All @@ -125,6 +127,11 @@ public void ReadObject(BinaryReader br, byte[] key, int encType, long expiry, In
{
ReadSetFromListPack(br);
}
else if(encType == Constant.DataType.HASH_METADATA_PRE_GA
|| encType == Constant.DataType.HASH_METADATA)
{
ReadHashMetadata(br, encType);
}
else
{
throw new RDBParserException($"Invalid object type {encType} for {key} ");
Expand Down Expand Up @@ -205,8 +212,15 @@ private void SkipObject(BinaryReader br, int encType)
{
SkipStream(br, encType);
}
else if (encType == Constant.DataType.HASH_LISTPACK)
else if (encType == Constant.DataType.HASH_LISTPACK
|| encType == Constant.DataType.HASH_LISTPACK_EX
|| encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA)
{
if (encType == Constant.DataType.HASH_LISTPACK_EX)
{
_ = br.ReadUInt64();
}

skip = 1;
}
else if (encType == Constant.DataType.ZSET_LISTPACK)
Expand All @@ -217,6 +231,11 @@ private void SkipObject(BinaryReader br, int encType)
{
skip = 1;
}
else if (encType == Constant.DataType.HASH_METADATA_PRE_GA
|| encType == Constant.DataType.HASH_METADATA)
{
SkipHashMetadata(br, encType);
}
else
{
throw new RDBParserException($"Invalid object type {encType} for {_key} ");
Expand Down
72 changes: 72 additions & 0 deletions src/RDBParser/BinaryReaderRDBParser.HashMetadata.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System.IO;

namespace RDBParser
{
public partial class BinaryReaderRDBParser : IRDBParser
{
private const ulong EB_EXPIRE_TIME_MAX = 0x0000FFFFFFFFFFFF;
private const ulong EB_EXPIRE_TIME_INVALID = EB_EXPIRE_TIME_MAX + 1;

private void ReadHashMetadata(BinaryReader br, int encType)
{
// https://github.com/redis/redis/blob/c9d29f6a918c335bc1778d9f68e521c1bbb36a0f/src/rdb.c#L2265
ulong minExpire = EB_EXPIRE_TIME_INVALID;
if (encType == Constant.DataType.HASH_METADATA)
{
minExpire = br.ReadUInt64();
}

var len = br.ReadLength();

Info info = new Info();
info.Idle = _idle;
info.Freq = _freq;
info.Encoding = Constant.ObjEncoding.HT;
info.SizeOfValue = (int)len;
_callback.StartHash(_key, (int)len, _expiry, info);

ulong expireAt = 0;
while (len > 0)
{
var ttl = br.ReadLength();

if (encType == Constant.DataType.HASH_METADATA)
{
expireAt = ttl != 0 ? ttl + minExpire - 1 : 0;
}
else
{
expireAt = ttl;
}

var field = br.ReadStr();
var value = br.ReadStr();

_callback.HSet(_key, field, value, (long)expireAt);

len--;
}

_callback.EndHash(_key);
}

private void SkipHashMetadata(BinaryReader br, int encType)
{
// https://github.com/redis/redis/blob/c9d29f6a918c335bc1778d9f68e521c1bbb36a0f/src/rdb.c#L2265
if (encType == Constant.DataType.HASH_METADATA)
{
_ = br.ReadUInt64();
}

var len = br.ReadLength();
while (len > 0)
{
_ = br.ReadLength();
_ = br.ReadStr();
_ = br.ReadStr();

len--;
}
}
}
}
32 changes: 26 additions & 6 deletions src/RDBParser/BinaryReaderRDBParser.ListPack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ public partial class BinaryReaderRDBParser
}
}

private void ReadHashFromListPack(BinaryReader br)
private void ReadHashFromListPack(BinaryReader br, int encType)
{
// If Hash TTLs, Load next/min expiration time before the `encoded`
if (encType == Constant.DataType.HASH_LISTPACK_EX)
{
_ = br.ReadUInt64();
}

// https://github.com/redis/redis/blob/7.0.8/src/rdb.c#L1703
// https://github.com/redis/redis/blob/7.0.8/src/listpack.c#L1282
// <total_bytes><size><entry><entry>..<entry><end>
Expand All @@ -164,24 +170,38 @@ private void ReadHashFromListPack(BinaryReader br)
var bytes = lpGetTotalBytes(rd);
// <size>
var numEle = lpGetNumElements(rd);
if (numEle % 2 != 0) throw new RDBParserException($"Expected even number of elements, but found {numEle} for key {_key}");

var numEntries = (ushort)(numEle / 2);
var isListpackEx = encType == Constant.DataType.HASH_LISTPACK_EX || encType == Constant.DataType.HASH_LISTPACK_EX_PRE_GA;
var tupleLen = isListpackEx ? 3 : 2;

if (numEle % tupleLen != 0) throw new RDBParserException($"Unexpected number of elements, isListpackEx = {isListpackEx}, found {numEle} for key {_key}");
var numEntries = (ushort)(numEle / tupleLen);


Info info = new Info();
info.Idle = _idle;
info.Freq = _freq;
info.Encoding = Constant.ObjEncoding.LISTPACK;
info.Encoding = isListpackEx ? Constant.ObjEncoding.LISTPACK_EX : Constant.ObjEncoding.LISTPACK;
info.SizeOfValue = rawString.Length;
_callback.StartHash(_key, numEntries, _expiry, info);

// <entry>
// simple hash field-value pair
// hash with TTL field-value-ttl pair
for (int i = 0; i < numEntries; i++)
{
// <encode><val><backlen>
var field = ReadListPackEntry(rd);
var value = ReadListPackEntry(rd);
_callback.HSet(_key, field.data, value.data);

long ttl = 0;
if(isListpackEx)
{
var tmp = ReadListPackEntry(rd);
ttl = long.Parse(Encoding.UTF8.GetString(tmp.data));
}

_callback.HSet(_key, field.data, value.data, ttl);
}

var lpEnd = rd.ReadByte();
Expand Down
4 changes: 2 additions & 2 deletions src/RDBParser/Callbacks/DefaultConsoleReaderCallBack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
Console.WriteLine($"HandleModuleData, Key={GetString(key)}, opCode={opCode}, data={GetString(data)}");
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
Console.WriteLine($"HSet, Key={GetString(key)}, Field={GetString(field)}, Value={GetString(value)}");
Console.WriteLine($"HSet, Key={GetString(key)}, Field={GetString(field)}, Value={GetString(value)}, expire={expiry}");
}

public void RPush(byte[] key, byte[] value)
Expand Down
3 changes: 2 additions & 1 deletion src/RDBParser/Callbacks/IReaderCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public interface IReaderCallback
/// <param name="key">the redis key for this hash</param>
/// <param name="field">a string</param>
/// <param name="value">the value to store for this field</param>
void HSet(byte[] key, byte[] field, byte[] value);
/// <param name="expiry">a `datetime` object. 0 means the object does not expire, work for >7.4</param>
void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0);

/// <summary>
/// Called when there are no more elements in the hash
Expand Down
2 changes: 1 addition & 1 deletion src/RDBParser/Callbacks/NoOpReaderCallBack.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
{
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
}

Expand Down
6 changes: 4 additions & 2 deletions src/RDBParser/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ public static class RdbVersion

// Redis 6.2.14 9
// Redis 7.0-rc1~~7.0.15 10
// Redis 7.2-rc1~~7.4.0 11
// Redis 7.2-rc1~~7.2.5 11
// Valkey 7.2.4-rc1~~8.0.0 11
public const int Max = 11;
// Redis 7.4-rc1~~7.4.0 12
public const int Max = 12;
}

public static class MagicCount
Expand Down Expand Up @@ -154,6 +155,7 @@ public static class ObjEncoding
public const string SKIPLIST = "skiplist";
public const string QUICKLIST = "quicklist";
public const string LISTPACK = "listpack";
public const string LISTPACK_EX = "listpack_ex";
public const string LINKEDLIST = "linkedlist";
}
}
Expand Down
58 changes: 58 additions & 0 deletions tests/RDBParserTests/HashTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using RDBParser;
using System.Collections.Generic;
using System.Text;
using Xunit;
using Xunit.Abstractions;
Expand Down Expand Up @@ -161,5 +162,62 @@ public void TestHashWithRedis70ListPack()
Assert.Equal(Encoding.UTF8.GetBytes("pppppppppppppppppppppppppppppp"), hashs[0][key][Encoding.UTF8.GetBytes("1234566")]);
Assert.Equal(Encoding.UTF8.GetBytes("2.60"), hashs[0][key][Encoding.UTF8.GetBytes("abc")]);
}

[Fact]
public void TestHashWithRedis74ListPackEx()
{
// hset mykey f1 v1 f2 v2
// hexpire mykey 3000 FIELDS 1 f1
var path = TestHelper.GetRDBPath("redis74_hash_ttl_listpack_ex.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback);
parser.Parse(path);

var lengths = callback.GetLengths();
var hashs = callback.GetHashs();
var expires = callback.GetExpiries();

Assert.Equal(2, lengths[0][Encoding.UTF8.GetBytes("mykey")]);
var key = Encoding.UTF8.GetBytes("mykey");

Assert.Equal(Encoding.UTF8.GetBytes("v1"), hashs[0][key][Encoding.UTF8.GetBytes("f1")]);
Assert.Equal(Encoding.UTF8.GetBytes("v2"), hashs[0][key][Encoding.UTF8.GetBytes("f2")]);

var expiryKey = new byte[key.Length + Encoding.UTF8.GetBytes("f1").Length];
key.CopyTo(expiryKey, 0);
Encoding.UTF8.GetBytes("f1").CopyTo(expiryKey, key.Length);
var f1Exp = expires[0][expiryKey];
Assert.True(f1Exp > 0);
}

[Fact]
public void TestHashWithRedis74HashMetadata()
{
// config set hash-max-listpack-entries 0
// hset mykey f1 v1 f2 v2
// hexpire mykey 3000 FIELDS 1 f1
var path = TestHelper.GetRDBPath("redis74_hash_ttl_hash_metadata.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback);
parser.Parse(path);

var lengths = callback.GetLengths();
var hashs = callback.GetHashs();
var expires = callback.GetExpiries();

Assert.Equal(2, lengths[0][Encoding.UTF8.GetBytes("mykey")]);
var key = Encoding.UTF8.GetBytes("mykey");

Assert.Equal(Encoding.UTF8.GetBytes("v1"), hashs[0][key][Encoding.UTF8.GetBytes("f1")]);
Assert.Equal(Encoding.UTF8.GetBytes("v2"), hashs[0][key][Encoding.UTF8.GetBytes("f2")]);

var expiryKey = new byte[key.Length + Encoding.UTF8.GetBytes("f1").Length];
key.CopyTo(expiryKey, 0);
Encoding.UTF8.GetBytes("f1").CopyTo(expiryKey, key.Length);
var f1Exp = expires[0][expiryKey];
Assert.True(f1Exp > 0);
}
}
}
11 changes: 10 additions & 1 deletion tests/RDBParserTests/TestReaderCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,24 @@ public void HandleModuleData(byte[] key, ulong opCode, byte[] data)
//throw new System.NotImplementedException();
}

public void HSet(byte[] key, byte[] field, byte[] value)
public void HSet(byte[] key, byte[] field, byte[] value, long expiry = 0)
{
if (!_hashs[_database].ContainsKey(key))
throw new System.Exception("0");

_output.WriteLine(Encoding.UTF8.GetString(key));
_output.WriteLine(Encoding.UTF8.GetString(field));
_output.WriteLine(Encoding.UTF8.GetString(value));
_output.WriteLine(expiry.ToString());
_hashs[_database][key][field] = value;

if(expiry > 0)
{
var expiryKey = new byte[key.Length + field.Length];
key.CopyTo(expiryKey, 0);
field.CopyTo(expiryKey, key.Length);
_expiries[_database][expiryKey] = expiry;
}
}

public void RPush(byte[] key, byte[] value)
Expand Down
Binary file added tests/dumps/redis74_hash_ttl_hash_metadata.rdb
Binary file not shown.
Binary file added tests/dumps/redis74_hash_ttl_listpack_ex.rdb
Binary file not shown.

0 comments on commit 3427613

Please sign in to comment.