diff --git a/scripts/data/generate_data.py b/scripts/data/generate_data.py index 780edf9b..ea6daf1f 100755 --- a/scripts/data/generate_data.py +++ b/scripts/data/generate_data.py @@ -220,7 +220,7 @@ def format_block_with_transactions(block: dict): def fetch_block_header(block_hash: str): - """Downloads block header (without trasnasction) from RPC given the block hash.""" + """Downloads block header (without transaction) from RPC given the block hash.""" return request_rpc("getblockheader", [block_hash]) diff --git a/scripts/data/generate_timestamp_data.py b/scripts/data/generate_timestamp_data.py index 215dfffd..b6c46e1c 100644 --- a/scripts/data/generate_timestamp_data.py +++ b/scripts/data/generate_timestamp_data.py @@ -1,13 +1,20 @@ +#!/usr/bin/env python + import json import os import requests from google.cloud import storage +from tqdm import tqdm +from collections import defaultdict +from functools import lru_cache + +INDEX_SIZE = 50000 +BASE_DIR = "previous_outputs_data" -GCS_BASE_URL = "https://storage.cloud.google.com/shinigami-consensus/previous_outputs/" -BASE_DIR = "previous_outputs" -bucket_name = "shinigami-consensus" -folder_prefix = "previous_outputs/" +GCS_BUCKET_NAME = "shinigami-consensus" +GCS_FOLDER_NAME = "previous_outputs" +GCS_BASE_URL = f"https://storage.googleapis.com/shinigami-consensus/{GCS_FOLDER_NAME}/" def download_timestamp(file_name: str): @@ -19,6 +26,7 @@ def download_timestamp(file_name: str): return url = f"{GCS_BASE_URL}{file_name}" + response = requests.get(url) if response.status_code != 200: raise Exception(f"Failed to download {file_name}") @@ -29,16 +37,16 @@ def download_timestamp(file_name: str): def create_index(folder_path): index = {} - for filename in os.listdir(folder_path): + for filename in tqdm(os.listdir(folder_path), "Creating index"): if filename.endswith(".json"): with open(os.path.join(folder_path, filename), "r") as file: data = [json.loads(line.rstrip()) for line in file] for entry in data: block_number = entry["block_number"] - index[block_number] = { - "previous_timestamps": entry["previous_timestamps"], - "median_timestamp": entry["median_timestamp"], - } + index[block_number] = [ + entry["median_timestamp"], + entry["previous_timestamps"], + ] return index @@ -47,17 +55,51 @@ def list_files_in_gcs(bucket_name: str, prefix: str): client = storage.Client() bucket = client.get_bucket(bucket_name) blobs = bucket.list_blobs(prefix=prefix) - return [blob.name for blob in blobs if blob.name.endswith(".json")] + return [ + os.path.basename(blob.name) for blob in blobs if blob.name.endswith(".json") + ] -if __name__ == "__main__": +def index_file_name(key): + return f"{BASE_DIR}/timestamp_index_{key}.json" + + +def partition_and_dump(index, partition): + """Partition the index and dump each partition to a separate file.""" + + partitions = defaultdict(dict) + for key, value in tqdm(index.items(), "Partitioning index"): + group = int(key) // partition + partitions[group][key] = value + + for key, partition in tqdm(partitions.items(), "Saving partitions"): + with open(index_file_name(key), "w") as f: + json.dump(partition, f) - bucket_name = "shinigami-consensus" - folder_prefix = "previous_outputs/" - file_names = list_files_in_gcs(bucket_name, folder_prefix) - for file_name in file_names: + +@lru_cache(maxsize=None) +def load_index(file_name): + if not os.path.exists(file_name): + raise Exception(f"Index file {file_name} not found") + + with open(file_name, "r") as f: + return json.load(f) + + +def get_timestamp_data(block_number): + """Get the timestamp data for a given block number.""" + file_name = index_file_name(int(block_number) // INDEX_SIZE) + index = load_index(file_name) + median, previous_timestamps = index[block_number] + return median, previous_timestamps + + +if __name__ == "__main__": + file_names = list_files_in_gcs(GCS_BUCKET_NAME, GCS_FOLDER_NAME) + for file_name in tqdm(file_names, "Downloading files"): download_timestamp(file_name) index = create_index(BASE_DIR) - with open("timestamp_data.json", "w") as outfile: - json.dump(index, outfile, indent=4) + print(f"Index contains {len(index)} entries.") + partition_and_dump(index, INDEX_SIZE) + print("Done.") diff --git a/scripts/data/utxo_processor.py b/scripts/data/generate_utxo_data.py similarity index 64% rename from scripts/data/utxo_processor.py rename to scripts/data/generate_utxo_data.py index 86cd8829..a5b4bae8 100644 --- a/scripts/data/utxo_processor.py +++ b/scripts/data/generate_utxo_data.py @@ -1,16 +1,21 @@ +#!/usr/bin/env python + import os +import sys import json import requests import subprocess from typing import Dict, Any import argparse from tqdm import tqdm +from functools import lru_cache +from collections import defaultdict # Constants -GCS_BASE_URL = "https://storage.cloud.google.com/shinigami-consensus/utxos/" +GCS_BASE_URL = "https://storage.googleapis.com/shinigami-consensus/utxos/" BASE_DIR = "utxo_data" -CHUNK_SIZE = 10000 -INDEX_FILE = "utxo_index.json" +CHUNK_SIZE = 10 +INDEX_SIZE = 50000 def download_and_split(file_name: str): @@ -24,28 +29,43 @@ def download_and_split(file_name: str): if response.status_code != 200: raise Exception(f"Failed to download {file_name}") + if response.headers.get("Content-Encoding") == "gzip": + print("Content is GZIP encoded") + file_path = os.path.join(BASE_DIR, file_name) with open(file_path, "wb") as f: f.write(response.content) # Split file - split_cmd = f"split -l {CHUNK_SIZE} {file_path} {file_dir}/chunk_" + split_cmd = f"split -l {CHUNK_SIZE} {file_path} {file_dir}/" subprocess.run(split_cmd, shell=True, check=True) # Remove original file os.remove(file_path) +def index_file_name(key): + return f"{BASE_DIR}/utxo_index_{key}.json" + + +def partition_and_dump(index, partition_size): + """Partition the index and dump each partition to a separate file.""" + + partitions = defaultdict(dict) + for key, value in tqdm(index.items(), "Partitioning index"): + group = int(key) // partition_size + partitions[group][key] = value + + for key, partition in tqdm(partitions.items(), "Saving partitions"): + with open(index_file_name(key), "w") as f: + json.dump(partition, f) + + def create_index(): """Create or update an index mapping block numbers to chunk files.""" index: Dict[int, str] = {} - # Load existing index if it exists - if os.path.exists(INDEX_FILE): - with open(INDEX_FILE, "r") as f: - index = json.load(f) - - for dir_name in os.listdir(BASE_DIR): + for dir_name in tqdm(os.listdir(BASE_DIR), "Creating index"): dir_path = os.path.join(BASE_DIR, dir_name) if not os.path.isdir(dir_path): continue @@ -58,44 +78,44 @@ def create_index(): data = json.loads(line.strip()) block_number = data["block_number"] if block_number in index: - print(f"Warning: Duplicate block number {block_number}") - index[block_number] = chunk_path + print(f"Error: Duplicate block number {block_number}") + sys.exit(1) + index[block_number] = os.path.relpath(chunk_path, BASE_DIR) except json.JSONDecodeError as e: print( f"Error decoding JSON in file {chunk_path}, line {line_num}: {e}" ) - print(f"Problematic line: {line.strip()}") + print(f"Problematic line: {line.strip()[:50]}") + sys.exit(1) except KeyError as e: print(f"KeyError in file {chunk_path}, line {line_num}: {e}") print(f"Data: {data}") + sys.exit(1) - with open(INDEX_FILE, "w") as f: - json.dump(index, f) + partition_and_dump(index, INDEX_SIZE) print(f"Index created or updated with {len(index)} entries") -def get_utxo_set(block_number: int) -> Dict[str, Any]: - """Get the UTXO set for a given block number.""" - # Load index - if not os.path.exists(INDEX_FILE): - create_index() +@lru_cache(maxsize=None) +def load_index(file_name): + if not os.path.exists(file_name): + raise Exception(f"Index file {file_name} not found") + + with open(file_name, "r") as f: + return json.load(f) - with open(INDEX_FILE, "r") as f: - index = json.load(f) + +def get_utxo_set(block_number: int) -> Dict[str, Any]: + index = load_index(index_file_name(int(block_number) // INDEX_SIZE)) # Find chunk file chunk_file = index.get(str(block_number)) if not chunk_file: raise Exception(f"Block number {block_number} not found in index") - # If chunk file doesn't exist, download and split - if not os.path.exists(chunk_file): - file_name = os.path.basename(os.path.dirname(chunk_file)) + ".json" - download_and_split(file_name) - # Find and return data for the block - with open(chunk_file, "r") as f: + with open(BASE_DIR + "/" + chunk_file, "r") as f: for line in f: data = json.loads(line.strip()) if data["block_number"] == block_number: @@ -106,17 +126,18 @@ def get_utxo_set(block_number: int) -> Dict[str, Any]: def process_file_range(start_file: str, end_file: str): """Process a range of files from start_file to end_file.""" + + os.makedirs(BASE_DIR, exist_ok=True) + start_num = int(start_file.split(".")[0]) end_num = int(end_file.split(".")[0]) for file_num in tqdm(range(start_num, end_num + 1), desc="Processing files"): file_name = f"{file_num:012d}.json" - print(f"\nProcessing file: {file_name}") + # print(f"\nProcessing file: {file_name}") download_and_split(file_name) - print("\nCreating index...") create_index() - print("Index creation completed.") # usage diff --git a/scripts/data/requirements.txt b/scripts/data/requirements.txt index 4f1d06fa..d07be68e 100644 --- a/scripts/data/requirements.txt +++ b/scripts/data/requirements.txt @@ -4,3 +4,4 @@ flake8==7.1.1 flake8-black==0.3.6 poseidon_py==0.1.5 tqdm==4.66.5 +google-cloud-storage==2.18.2