From e4c0ac3905c32575b641003bd9d728f13622dfda Mon Sep 17 00:00:00 2001 From: Meyer Zinn <6132034+meyerzinn@users.noreply.github.com> Date: Sun, 21 Apr 2024 21:22:36 -0500 Subject: [PATCH] add parallel binary edge list shuffler script (#67) --- scripts/CMakeLists.txt | 3 - scripts/shufbel.cpp | 77 ---------------------- scripts/shufbel/go.mod | 5 ++ scripts/shufbel/go.sum | 2 + scripts/shufbel/shufbel.go | 128 +++++++++++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 80 deletions(-) delete mode 100644 scripts/shufbel.cpp create mode 100644 scripts/shufbel/go.mod create mode 100644 scripts/shufbel/go.sum create mode 100644 scripts/shufbel/shufbel.go diff --git a/scripts/CMakeLists.txt b/scripts/CMakeLists.txt index 3a2a221..ce61f29 100644 --- a/scripts/CMakeLists.txt +++ b/scripts/CMakeLists.txt @@ -7,6 +7,3 @@ target_link_libraries(divide-into-batches PRIVATE Boost::program_options) add_executable(count-batched count_batched.cpp) target_link_libraries(count-batched PRIVATE Boost::program_options) - -add_executable(shufbel shufbel.cpp) -target_link_libraries(shufbel PRIVATE Boost::program_options) diff --git a/scripts/shufbel.cpp b/scripts/shufbel.cpp deleted file mode 100644 index a0ced83..0000000 --- a/scripts/shufbel.cpp +++ /dev/null @@ -1,77 +0,0 @@ -// SPDX-License-Identifier: BSD-2-Clause -// Copyright (c) 2023. University of Texas at Austin. All rights reserved. - -#include -#include - -#include -#include -#include - -#include -#include -#include - -struct Edge { - uint64_t src; - uint64_t dst; - - friend inline void swap(Edge& lhs, Edge& rhs) { - using std::swap; - uint64_t src = lhs.src; - uint64_t dst = lhs.dst; - lhs.src = rhs.src; - lhs.dst = rhs.dst; - rhs.src = src; - rhs.dst = dst; - } -} __attribute__((packed)); - -static_assert(sizeof(Edge) == 16, "Edge must be 16 bytes."); - -namespace bip = boost::interprocess; -namespace po = boost::program_options; - -int main(int argc, char const* argv[]) { - po::options_description desc("Shuffle binary edge list in-place"); - desc.add_options() // - ("help,h", "Print help messages") // - ("file", po::value(), "Input file path") // - ("rseed", po::value()->default_value(0), - "Fixed random seed"); - - po::variables_map vm; - try { - // Parse command line arguments - po::store(po::parse_command_line(argc, argv, desc), vm); - po::notify(vm); - } catch (po::error& e) { - std::cout << e.what() << std::endl; - return 1; - } - - if (vm.count("help")) { - std::cout << desc << std::endl; - return 1; - } - - if (vm.count("file") != 1) { - std::cerr << "Must specify an input file with --file" << std::endl; - return 1; - } - - bip::file_mapping input_file_mapping(vm["file"].as().c_str(), - bip::read_write); - bip::mapped_region mapped_rgn(input_file_mapping, bip::read_write, 0, 0, - nullptr, MAP_POPULATE); - mapped_rgn.advise(bip::mapped_region::advice_willneed); - - // rng - std::mt19937 rng(vm["rseed"].as()); - std::shuffle(reinterpret_cast(mapped_rgn.get_address()), - reinterpret_cast( - reinterpret_cast(mapped_rgn.get_address()) + - mapped_rgn.get_size()), - rng); - return 0; -} diff --git a/scripts/shufbel/go.mod b/scripts/shufbel/go.mod new file mode 100644 index 0000000..dd25e69 --- /dev/null +++ b/scripts/shufbel/go.mod @@ -0,0 +1,5 @@ +module shufbel + +go 1.18 + +require github.com/vmunoz82/shuffle v1.0.2 // indirect diff --git a/scripts/shufbel/go.sum b/scripts/shufbel/go.sum new file mode 100644 index 0000000..542ce73 --- /dev/null +++ b/scripts/shufbel/go.sum @@ -0,0 +1,2 @@ +github.com/vmunoz82/shuffle v1.0.2 h1:9QxvJN9F1OdsnRVrPu1+N1F+IU14PPtHATtvlxTSDVI= +github.com/vmunoz82/shuffle v1.0.2/go.mod h1:MvogPEkxTyK+LU0v+nYDCzWiD8Jlu/rocUCJOvs8mcA= diff --git a/scripts/shufbel/shufbel.go b/scripts/shufbel/shufbel.go new file mode 100644 index 0000000..49a154d --- /dev/null +++ b/scripts/shufbel/shufbel.go @@ -0,0 +1,128 @@ +package main + +import ( + "flag" + "fmt" + "math/rand" + "os" + "runtime" + "sync" + "syscall" + "unsafe" + + "github.com/vmunoz82/shuffle" +) + +/* Function for each round, could be anything don't need to be reversible */ +func roundFunction(v, key shuffle.FeistelWord) shuffle.FeistelWord { + return (v * 941083987) ^ (key >> (v & 7) * 104729) +} + +// main function +func main() { + // parse CLI options + threads := flag.Int("threads", runtime.NumCPU(), "number of threads") + rseed := flag.Int64("rseed", 0, "random seed") + flag.Parse() + + // limit go runtime threads + runtime.GOMAXPROCS(*threads) + pageSize := syscall.Getpagesize() + + files := flag.Args() + if len(files) != 2 { + fmt.Println("Usage: shufbel [options] input output ...") + flag.PrintDefaults() + return + } + input_file := files[0] + output_file := files[1] + + // mmap input file + fin, err := os.Open(input_file) + if err != nil { + fmt.Printf("Error opening file %s: %v\n", input_file, err) + return + } + defer fin.Close() + + fout, err := os.OpenFile(output_file, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + fmt.Printf("Error opening file %s: %v\n", output_file, err) + return + } + + fi, err := fin.Stat() + if err != nil { + fmt.Printf("Error getting file info: %v\n", err) + } + sizeBytes := uint64(fi.Size()) + if sizeBytes == 0 { + fmt.Printf("Error: file size is 0: %s\n", input_file) + return + } + if sizeBytes%16 != 0 { + fmt.Printf("Error: file size is not divisible by 16") + return + } + + fout.Truncate(int64(sizeBytes)) + + data, err := syscall.Mmap(int(fin.Fd()), 0, int(sizeBytes), syscall.PROT_READ, syscall.MAP_PRIVATE|syscall.MAP_NORESERVE) + if err != nil { + fmt.Printf("Error mmaping file %s: %v\n", input_file, err) + return + } + if uintptr(unsafe.Pointer(&data[0]))%uintptr(pageSize) != 0 { + fmt.Println("mmap returned a non-page-aligned address: %p", uintptr(unsafe.Pointer(&data[0]))) + return + } + // madv willneed + err = syscall.Madvise(data, syscall.MADV_WILLNEED) + if err != nil { + fmt.Printf("Error madvise: %v\n", err) + return + } + + // mmap output file + output, err := syscall.Mmap(int(fout.Fd()), 0, int(sizeBytes), syscall.PROT_WRITE, syscall.MAP_SHARED) + if err != nil { + fmt.Printf("Error mmaping file %s: %v\n", output_file, err) + return + } + + num_edges := sizeBytes / 16 + + rand.Seed(*rseed) + // keys should be an array of 4 random uint64s + keys := []shuffle.FeistelWord{shuffle.FeistelWord(rand.Uint64()), shuffle.FeistelWord(rand.Uint64()), shuffle.FeistelWord(rand.Uint64()), shuffle.FeistelWord(rand.Uint64())} + + var wg sync.WaitGroup + // spawn a goroutine for every page, since each page can fault + // todo (meyer): should we limit the number of concurrent goroutines? + for i := uint64(0); i < num_edges; i += uint64(pageSize / 16) { + wg.Add(1) + go func(i uint64) { + defer wg.Done() + fn := shuffle.NewFeistel(keys, roundFunction) + for j := i; j < i+uint64(pageSize/16) && j < num_edges; j++ { + src, _ := shuffle.GetIndex(shuffle.FeistelWord(j), shuffle.FeistelWord(num_edges), fn) + // move 16 bytes from data to output + copy(output[j*16:], data[src*16:src*16+16]) + } + }(i) + } + wg.Wait() + + err = syscall.Munmap(data) + if err != nil { + fmt.Printf("Error unmapping file %s: %v\n", input_file, err) + return + } + + err = syscall.Munmap(output) + if err != nil { + fmt.Printf("Error unmapping file %s: %v\n", output_file, err) + return + } +}