Skip to content

Commit

Permalink
Fixes to utxo/timestamp data generation scripts (#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
maciejka authored Sep 25, 2024
1 parent f485f76 commit 3522db8
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 49 deletions.
2 changes: 1 addition & 1 deletion scripts/data/generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def format_block_with_transactions(block: dict):


def fetch_block_header(block_hash: str):
"""Downloads block header (without trasnasction) from RPC given the block hash."""
"""Downloads block header (without transaction) from RPC given the block hash."""
return request_rpc("getblockheader", [block_hash])


Expand Down
76 changes: 59 additions & 17 deletions scripts/data/generate_timestamp_data.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
#!/usr/bin/env python

import json
import os
import requests
from google.cloud import storage
from tqdm import tqdm
from collections import defaultdict
from functools import lru_cache

INDEX_SIZE = 50000

BASE_DIR = "previous_outputs_data"

GCS_BASE_URL = "https://storage.cloud.google.com/shinigami-consensus/previous_outputs/"
BASE_DIR = "previous_outputs"
bucket_name = "shinigami-consensus"
folder_prefix = "previous_outputs/"
GCS_BUCKET_NAME = "shinigami-consensus"
GCS_FOLDER_NAME = "previous_outputs"
GCS_BASE_URL = f"https://storage.googleapis.com/shinigami-consensus/{GCS_FOLDER_NAME}/"


def download_timestamp(file_name: str):
Expand All @@ -19,6 +26,7 @@ def download_timestamp(file_name: str):
return

url = f"{GCS_BASE_URL}{file_name}"

response = requests.get(url)
if response.status_code != 200:
raise Exception(f"Failed to download {file_name}")
Expand All @@ -29,16 +37,16 @@ def download_timestamp(file_name: str):

def create_index(folder_path):
index = {}
for filename in os.listdir(folder_path):
for filename in tqdm(os.listdir(folder_path), "Creating index"):
if filename.endswith(".json"):
with open(os.path.join(folder_path, filename), "r") as file:
data = [json.loads(line.rstrip()) for line in file]
for entry in data:
block_number = entry["block_number"]
index[block_number] = {
"previous_timestamps": entry["previous_timestamps"],
"median_timestamp": entry["median_timestamp"],
}
index[block_number] = [
entry["median_timestamp"],
entry["previous_timestamps"],
]
return index


Expand All @@ -47,17 +55,51 @@ def list_files_in_gcs(bucket_name: str, prefix: str):
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs(prefix=prefix)
return [blob.name for blob in blobs if blob.name.endswith(".json")]
return [
os.path.basename(blob.name) for blob in blobs if blob.name.endswith(".json")
]


if __name__ == "__main__":
def index_file_name(key):
return f"{BASE_DIR}/timestamp_index_{key}.json"


def partition_and_dump(index, partition):
"""Partition the index and dump each partition to a separate file."""

partitions = defaultdict(dict)
for key, value in tqdm(index.items(), "Partitioning index"):
group = int(key) // partition
partitions[group][key] = value

for key, partition in tqdm(partitions.items(), "Saving partitions"):
with open(index_file_name(key), "w") as f:
json.dump(partition, f)

bucket_name = "shinigami-consensus"
folder_prefix = "previous_outputs/"
file_names = list_files_in_gcs(bucket_name, folder_prefix)
for file_name in file_names:

@lru_cache(maxsize=None)
def load_index(file_name):
if not os.path.exists(file_name):
raise Exception(f"Index file {file_name} not found")

with open(file_name, "r") as f:
return json.load(f)


def get_timestamp_data(block_number):
"""Get the timestamp data for a given block number."""
file_name = index_file_name(int(block_number) // INDEX_SIZE)
index = load_index(file_name)
median, previous_timestamps = index[block_number]
return median, previous_timestamps


if __name__ == "__main__":
file_names = list_files_in_gcs(GCS_BUCKET_NAME, GCS_FOLDER_NAME)
for file_name in tqdm(file_names, "Downloading files"):
download_timestamp(file_name)

index = create_index(BASE_DIR)
with open("timestamp_data.json", "w") as outfile:
json.dump(index, outfile, indent=4)
print(f"Index contains {len(index)} entries.")
partition_and_dump(index, INDEX_SIZE)
print("Done.")
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
#!/usr/bin/env python

import os
import sys
import json
import requests
import subprocess
from typing import Dict, Any
import argparse
from tqdm import tqdm
from functools import lru_cache
from collections import defaultdict

# Constants
GCS_BASE_URL = "https://storage.cloud.google.com/shinigami-consensus/utxos/"
GCS_BASE_URL = "https://storage.googleapis.com/shinigami-consensus/utxos/"
BASE_DIR = "utxo_data"
CHUNK_SIZE = 10000
INDEX_FILE = "utxo_index.json"
CHUNK_SIZE = 10
INDEX_SIZE = 50000


def download_and_split(file_name: str):
Expand All @@ -24,28 +29,43 @@ def download_and_split(file_name: str):
if response.status_code != 200:
raise Exception(f"Failed to download {file_name}")

if response.headers.get("Content-Encoding") == "gzip":
print("Content is GZIP encoded")

file_path = os.path.join(BASE_DIR, file_name)
with open(file_path, "wb") as f:
f.write(response.content)

# Split file
split_cmd = f"split -l {CHUNK_SIZE} {file_path} {file_dir}/chunk_"
split_cmd = f"split -l {CHUNK_SIZE} {file_path} {file_dir}/"
subprocess.run(split_cmd, shell=True, check=True)

# Remove original file
os.remove(file_path)


def index_file_name(key):
return f"{BASE_DIR}/utxo_index_{key}.json"


def partition_and_dump(index, partition_size):
"""Partition the index and dump each partition to a separate file."""

partitions = defaultdict(dict)
for key, value in tqdm(index.items(), "Partitioning index"):
group = int(key) // partition_size
partitions[group][key] = value

for key, partition in tqdm(partitions.items(), "Saving partitions"):
with open(index_file_name(key), "w") as f:
json.dump(partition, f)


def create_index():
"""Create or update an index mapping block numbers to chunk files."""
index: Dict[int, str] = {}

# Load existing index if it exists
if os.path.exists(INDEX_FILE):
with open(INDEX_FILE, "r") as f:
index = json.load(f)

for dir_name in os.listdir(BASE_DIR):
for dir_name in tqdm(os.listdir(BASE_DIR), "Creating index"):
dir_path = os.path.join(BASE_DIR, dir_name)
if not os.path.isdir(dir_path):
continue
Expand All @@ -58,44 +78,44 @@ def create_index():
data = json.loads(line.strip())
block_number = data["block_number"]
if block_number in index:
print(f"Warning: Duplicate block number {block_number}")
index[block_number] = chunk_path
print(f"Error: Duplicate block number {block_number}")
sys.exit(1)
index[block_number] = os.path.relpath(chunk_path, BASE_DIR)
except json.JSONDecodeError as e:
print(
f"Error decoding JSON in file {chunk_path}, line {line_num}: {e}"
)
print(f"Problematic line: {line.strip()}")
print(f"Problematic line: {line.strip()[:50]}")
sys.exit(1)
except KeyError as e:
print(f"KeyError in file {chunk_path}, line {line_num}: {e}")
print(f"Data: {data}")
sys.exit(1)

with open(INDEX_FILE, "w") as f:
json.dump(index, f)
partition_and_dump(index, INDEX_SIZE)

print(f"Index created or updated with {len(index)} entries")


def get_utxo_set(block_number: int) -> Dict[str, Any]:
"""Get the UTXO set for a given block number."""
# Load index
if not os.path.exists(INDEX_FILE):
create_index()
@lru_cache(maxsize=None)
def load_index(file_name):
if not os.path.exists(file_name):
raise Exception(f"Index file {file_name} not found")

with open(file_name, "r") as f:
return json.load(f)

with open(INDEX_FILE, "r") as f:
index = json.load(f)

def get_utxo_set(block_number: int) -> Dict[str, Any]:
index = load_index(index_file_name(int(block_number) // INDEX_SIZE))

# Find chunk file
chunk_file = index.get(str(block_number))
if not chunk_file:
raise Exception(f"Block number {block_number} not found in index")

# If chunk file doesn't exist, download and split
if not os.path.exists(chunk_file):
file_name = os.path.basename(os.path.dirname(chunk_file)) + ".json"
download_and_split(file_name)

# Find and return data for the block
with open(chunk_file, "r") as f:
with open(BASE_DIR + "/" + chunk_file, "r") as f:
for line in f:
data = json.loads(line.strip())
if data["block_number"] == block_number:
Expand All @@ -106,17 +126,18 @@ def get_utxo_set(block_number: int) -> Dict[str, Any]:

def process_file_range(start_file: str, end_file: str):
"""Process a range of files from start_file to end_file."""

os.makedirs(BASE_DIR, exist_ok=True)

start_num = int(start_file.split(".")[0])
end_num = int(end_file.split(".")[0])

for file_num in tqdm(range(start_num, end_num + 1), desc="Processing files"):
file_name = f"{file_num:012d}.json"
print(f"\nProcessing file: {file_name}")
# print(f"\nProcessing file: {file_name}")
download_and_split(file_name)

print("\nCreating index...")
create_index()
print("Index creation completed.")


# usage
Expand Down
1 change: 1 addition & 0 deletions scripts/data/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ flake8==7.1.1
flake8-black==0.3.6
poseidon_py==0.1.5
tqdm==4.66.5
google-cloud-storage==2.18.2

0 comments on commit 3522db8

Please sign in to comment.