diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 52f6a7b..d8832b0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -49,6 +49,7 @@ repos: rev: v2.2.6 hooks: - id: codespell + args: [-L, orgin] - repo: https://github.com/pocc/pre-commit-hooks rev: v1.3.5 hooks: diff --git a/CMakeLists.txt b/CMakeLists.txt index 95a33f5..5343e9a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,13 @@ project( LANGUAGES C CXX ) +if(PROJECT_IS_TOP_LEVEL) + set(CMAKE_CXX_FLAGS_COVERAGE "-g -O0 --coverage -fprofile-abs-path") + set(CMAKE_EXE_LINKER_FLAGS_COVERAGE "--coverage") + set(CMAKE_SHARED_LINKER_FLAGS_COVERAGE "--coverage") + set(CMAKE_MODULE_LINKER_FLAGS_COVERAGE "--coverage") +endif() + # helper functions include(cmake/GaloisCompilerOptions.cmake) @@ -37,6 +44,7 @@ if(PROJECT_IS_TOP_LEVEL) endif() set(GALOIS_ENABLE_DIST ON CACHE BOOL "" FORCE) +set(GALOIS_ENABLE_WMD ON) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD 17) @@ -57,11 +65,12 @@ set(BUILD_TESTING OFF) add_subdirectory(galois EXCLUDE_FROM_ALL) set(BUILD_TESTING "${BUILD_TESTING_SAVED}") -set(PHMAP_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/galois/external/parallel-hashmap) -set(PCG_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/galois/external/pcg-cpp/include) +set(PHMAP_DIRECTORY ${Galois_SOURCE_DIR}/external/parallel-hashmap) +set(PCG_DIRECTORY ${Galois_SOURCE_DIR}/external/pcg-cpp/include) add_subdirectory(microbench) add_subdirectory(scripts) +add_subdirectory(wf4) # tests option(BUILD_TESTING "Build tests." ON) diff --git a/Dockerfile.dev b/Dockerfile.dev index c30d907..56cc268 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -102,6 +102,7 @@ RUN if [ "${IS_CI}" != "true" ] ; then \ doxygen \ texlive-latex-extra \ texlive-font-utils \ + gcovr \ && apt clean \ && update-locale; fi @@ -121,12 +122,12 @@ USER ${UNAME} WORKDIR /home/${UNAME} ENV BUILD_DIR=${BUILD_DIR} -RUN pip3 install compdb pre-commit cpplint "clang-format>=12.0.1" +RUN pip3 install compdb pre-commit cpplint "clang-format>=14.0.0,<17.0.0" RUN echo "export SRC_DIR=${SRC_DIR}" >> /home/${UNAME}/.profile RUN echo "export BUILD_DIR=${BUILD_DIR}" >> /home/${UNAME}/.profile RUN echo "touch ${SRC_DIR}/env-docker.sh" >> /home/${UNAME}/.profile -RUN echo "bash ${SRC_DIR}/env-docker.sh" >> /home/${UNAME}/.profile +RUN echo "source ${SRC_DIR}/env-docker.sh" >> /home/${UNAME}/.profile RUN echo "PATH=/home/${UNAME}/.local/bin/:\$PATH" >> /home/${UNAME}/.zshenv WORKDIR ${SRC_DIR} diff --git a/Makefile b/Makefile index ea28fc8..c67dc03 100644 --- a/Makefile +++ b/Makefile @@ -17,7 +17,7 @@ CONTAINER_BUILD_DIR ?= /galois/build CONTAINER_WORKDIR ?= ${CONTAINER_SRC_DIR} CONTAINER_CONTEXT ?= default CONTAINER_OPTS ?= -CONTAINER_CMD ?= setarch `uname -m` -R bash -l +CONTAINER_CMD ?= bash -l INTERACTIVE ?= i BUILD_TYPE ?= Release @@ -33,6 +33,19 @@ GALOIS_CONTAINER_FLAGS ?= GALOIS_BUILD_TOOL ?= 'Unix Makefiles' GALOIS_CCACHE_DIR ?= ${SRC_DIR}/.ccache +# Variables for SonarQube +PROFILE ?= observability +CONTAINER_SONAR_DIR ?= /galois/build-sonar +CONTAINER_SONAR_CMD ?= ${CONTAINER_CMD} -c "make sonar-scan" +SONAR_EXE ?= sonar-scanner +SONAR_PROJECT_VERSION ?= -Dsonar.projectVersion=0.1.0 +define sonar_ip +$(shell kubectl --context ${PROFILE} get nodes --namespace sonarqube -o jsonpath="{.items[0].status.addresses[0].address}") +endef +define sonar_port +$(shell kubectl --context ${PROFILE} get --namespace sonarqube -o jsonpath="{.spec.ports[0].nodePort}" services sonarqube-sonarqube) +endef + dependencies: dependencies-asdf dependencies-asdf: @@ -129,6 +142,57 @@ test: run-tests: test +gcovr: + @echo "Should be run outside a container" + @echo file://${SRC_DIR}/build-sonar/coverage/html + @python -mwebbrowser file://${SRC_DIR}/build-sonar/coverage/html + +sonar: + @echo "Should be run outside a container" + @echo http://$(call sonar_ip):$(call sonar_port) + @python -mwebbrowser http://$(call sonar_ip):$(call sonar_port) + + +run-gcovr: + COVERAGE_CMD='echo done' ${MAKE} run-coverage + +run-sonar: + COVERAGE_CMD="${SONAR_EXE} -Dsonar.host.url=http://$(call sonar_ip):$(call sonar_port) ${SONAR_PROJECT_VERSION}" ${MAKE} run-coverage + +run-coverage: + @docker --context ${CONTAINER_CONTEXT} run --rm \ + -v ${SRC_DIR}/:${CONTAINER_SRC_DIR} \ + ${GALOIS_CONTAINER_MOUNTS} \ + ${GALOIS_CONTAINER_ENV} \ + -e=COVERAGE_CMD="${COVERAGE_CMD}" \ + --privileged \ + --net=host \ + --workdir=${CONTAINER_WORKDIR} ${CONTAINER_OPTS} -${INTERACTIVE}t \ + ${IMAGE_NAME}:${VERSION} \ + ${CONTAINER_SONAR_CMD} + +sonar-scan: + @mkdir -p ${CONTAINER_SONAR_DIR}/coverage + @cmake \ + -S ${SRC_DIR} \ + -B ${CONTAINER_SONAR_DIR} \ + -DCMAKE_C_COMPILER=gcc-12 \ + -DCMAKE_CXX_COMPILER=g++-12 \ + -DCMAKE_BUILD_TYPE=Coverage \ + -DBUILD_TESTING=ON + @cd ${CONTAINER_SONAR_DIR} && \ + make -j8 && \ + ctest --verbose --output-junit junit.xml + @cd ${CONTAINER_SONAR_DIR} && \ + gcovr \ + --gcov-executable gcov-12 \ + --root .. \ + --xml-pretty --xml cobertura.xml \ + --sonarqube sonarqube.xml \ + --html-details coverage/html \ + -j8 . + @${COVERAGE_CMD} + # this command is slow since hooks are not stored in the container image # this is mostly for CI use docker-pre-commit: diff --git a/galois b/galois index c54db22..12c4e18 160000 --- a/galois +++ b/galois @@ -1 +1 @@ -Subproject commit c54db229836e759b8c87792c6933bf780d2f021d +Subproject commit 12c4e18ae750e3627b899c0187ae025a55af3f75 diff --git a/scripts/wf4/extract.py b/scripts/wf4/extract.py new file mode 100644 index 0000000..f601ab5 --- /dev/null +++ b/scripts/wf4/extract.py @@ -0,0 +1,131 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +##### Inputs ##### +# hosts = [8, 16, 32, 64] +# scales = ["31", "31", "31", "31"] +hosts = [8, 16, 32, 16, 32, 8, 16, 32] +scales = ["1", "1", "1", "2", "2", "3", "3", "3"] + + + +##### Parameters ##### +data_size = 8 +id_size = 8 +addr_size = 8 + + + +##### Processing ##### +for i, host in enumerate(hosts): + scale = scales[i] + + local_read = [] + master_read = [] + master_write = [] + mirror_read = [] + mirror_write = [] + dirty_mirror_to_remote = [] + + for host_id in range(host): + local_read.append([]) + master_read.append([]) + master_write.append([]) + mirror_read.append([]) + mirror_write.append([]) + dirty_mirror_to_remote.append([]) + + round_curr = -1 + round_total = 0 + + filename = "scale" + scale + "_" + str(host) + "procs_id" + str(host_id) + f = open(filename, "r") + lines = f.readlines() + + for row in lines: + if row.find('##### Round') != -1: + row_split = row.strip().split() + round_curr += 1 + round_total += 1 + + local_read[host_id].append([]) + master_read[host_id].append([]) + master_write[host_id].append([]) + mirror_read[host_id].append([]) + mirror_write[host_id].append([]) + dirty_mirror_to_remote[host_id].append([-1 for i in range(host)]) + elif row.find('local read (stream)') != -1: + row_split = row.strip().split() + num = row_split[5] + local_read[host_id][round_curr] = int(num) + elif row.find('master reads') != -1: + row_split = row.strip().split() + num = row_split[4] + master_read[host_id][round_curr] = int(num) + elif row.find('master writes') != -1: + row_split = row.strip().split() + num = row_split[4] + master_write[host_id][round_curr] = int(num) + elif row.find('mirror reads') != -1: + row_split = row.strip().split() + num = row_split[4] + mirror_read[host_id][round_curr] = int(num) + elif row.find('mirror writes') != -1: + row_split = row.strip().split() + num = row_split[4] + mirror_write[host_id][round_curr] = int(num) + elif row.find('remote communication for') != -1: + row_split = row.strip().split() + to_host = int(row_split[6][:-1]) + num = row_split[7] + dirty_mirror_to_remote[host_id][round_curr][to_host] = int(num) + + f.close() + + filename = "GAL_WF4_" + str(host) + "_1_" + scale + ".stats" + f = open(filename, "w") + + for round_num in range(round_total): + f.write("BSP " + str(2*round_num) + "\n") + + for src in range(host): + for dst in range(host): + if (src == dst): # only local in compute phase` + stream_read = local_read[src][round_num] + stream_read_bytes = stream_read * addr_size + + random_read = master_read[src][round_num] + mirror_read[src][round_num] + random_read_bytes = random_read * data_size + + random_write = master_write[src][round_num] + mirror_write[src][round_num] + random_write_bytes = random_write * data_size + + if stream_read != 0 or random_read != 0 or random_write != 0: + f.write("# " + str(src) + " " + str(dst) + "\n") + + if stream_read != 0: + f.write("STR RD " + str(stream_read) + " " + str(stream_read_bytes) + "\n") + + if random_read != 0: + f.write("RND RD " + str(random_read) + " " + str(random_read_bytes) + "\n") + + if random_write != 0: + f.write("RND WR " + str(random_write) + " " + str(random_write_bytes) + "\n") + + f.write("\n") + + f.write("BSP " + str(2*round_num+1) + "\n") + + for src in range(host): + for dst in range(host): + if (src != dst): # only remote in communication phase + random_write = dirty_mirror_to_remote[src][round_num][dst] + random_write_bytes = random_write * data_size + + if random_write != 0: + f.write("# " + str(src) + " " + str(dst) + "\n") + f.write("RND RMW " + str(random_write) + " " + str(random_write_bytes) + "\n") + + f.write("\n") + + f.close() diff --git a/scripts/wf4/extract_k4.py b/scripts/wf4/extract_k4.py new file mode 100644 index 0000000..f7bf492 --- /dev/null +++ b/scripts/wf4/extract_k4.py @@ -0,0 +1,131 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +##### Inputs ##### +# hosts = [8, 16, 32, 64] +# scales = ["31", "31", "31", "31"] +hosts = [8, 16, 32, 8, 16, 32, 8, 16, 32] +scales = ["1", "1", "1", "2", "2", "2", "3", "3", "3"] + + + +##### Parameters ##### +data_size = 8 +id_size = 8 +addr_size = 8 + + + +##### Processing ##### +for i, host in enumerate(hosts): + scale = scales[i] + + local_read = [] + master_read = [] + master_write = [] + mirror_read = [] + mirror_write = [] + dirty_mirror_to_remote = [] + + for host_id in range(host): + local_read.append([]) + master_read.append([]) + master_write.append([]) + mirror_read.append([]) + mirror_write.append([]) + dirty_mirror_to_remote.append([]) + + round_curr = -1 + round_total = 0 + + filename = "scale" + scale + "-kernel4_" + str(host) + "procs_id" + str(host_id) + f = open(filename, "r") + lines = f.readlines() + + for row in lines: + if row.find('##### Round') != -1: + row_split = row.strip().split() + round_curr += 1 + round_total += 1 + + local_read[host_id].append([]) + master_read[host_id].append([]) + master_write[host_id].append([]) + mirror_read[host_id].append([]) + mirror_write[host_id].append([]) + dirty_mirror_to_remote[host_id].append([-1 for i in range(host)]) + elif row.find('local read (stream)') != -1: + row_split = row.strip().split() + num = row_split[5] + local_read[host_id][round_curr] = int(num) + elif row.find('master reads') != -1: + row_split = row.strip().split() + num = row_split[4] + master_read[host_id][round_curr] = int(num) + elif row.find('master writes') != -1: + row_split = row.strip().split() + num = row_split[4] + master_write[host_id][round_curr] = int(num) + elif row.find('mirror reads') != -1: + row_split = row.strip().split() + num = row_split[4] + mirror_read[host_id][round_curr] = int(num) + elif row.find('mirror writes') != -1: + row_split = row.strip().split() + num = row_split[4] + mirror_write[host_id][round_curr] = int(num) + elif row.find('remote communication for') != -1: + row_split = row.strip().split() + to_host = int(row_split[6][:-1]) + num = row_split[7] + dirty_mirror_to_remote[host_id][round_curr][to_host] = int(num) + + f.close() + + filename = "GAL_WF4_" + str(host) + "_4_" + scale + ".stats" + f = open(filename, "w") + + for round_num in range(round_total): + f.write("BSP " + str(2*round_num) + "\n") + + for src in range(host): + for dst in range(host): + if (src == dst): # only local in compute phase` + stream_read = local_read[src][round_num] + stream_read_bytes = stream_read * addr_size + + random_read = master_read[src][round_num] + mirror_read[src][round_num] + random_read_bytes = random_read * data_size + + random_write = master_write[src][round_num] + mirror_write[src][round_num] + random_write_bytes = random_write * data_size + + if stream_read != 0 or random_read != 0 or random_write != 0: + f.write("# " + str(src) + " " + str(dst) + "\n") + + if stream_read != 0: + f.write("STR RD " + str(stream_read) + " " + str(stream_read_bytes) + "\n") + + if random_read != 0: + f.write("RND RD " + str(random_read) + " " + str(random_read_bytes) + "\n") + + if random_write != 0: + f.write("RND WR " + str(random_write) + " " + str(random_write_bytes) + "\n") + + f.write("\n") + + f.write("BSP " + str(2*round_num+1) + "\n") + + for src in range(host): + for dst in range(host): + if (src != dst): # only remote in communication phase + random_write = dirty_mirror_to_remote[src][round_num][dst] + random_write_bytes = random_write * data_size + + if random_write != 0: + f.write("# " + str(src) + " " + str(dst) + "\n") + f.write("RND RMW " + str(random_write) + " " + str(random_write_bytes) + "\n") + + f.write("\n") + + f.close() diff --git a/scripts/wf4/generate/commercial.py b/scripts/wf4/generate/commercial.py new file mode 100644 index 0000000..586cb24 --- /dev/null +++ b/scripts/wf4/generate/commercial.py @@ -0,0 +1,226 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +import os +import sys +import random +import numpy as np + +# command parameters: +# +# > +# +# 2000000 100 1000 10000 200000 30 50 70 0 +# + +SaleFile = open("wmd.data.csv", "r") # WMD standard data set +SaleOut = open("commercial.csv", "w") + +socialMap = dict() +num_persons = int( sys.argv[1] ) +np.random.seed(int( sys.argv[9] )) + +########### READ AND PRINT SALES RECORDS ########## +while True: + line = SaleFile.readline() + if line == "": break + if line[0] == '#': continue + + fields = line[:-1].split(',') + +# sale record + if fields[0] == "Sale": + if fields[6] == "8486": continue # discard coffee sales, coffee.py builds coffee subgraph + + P1 = int(fields[1]) + P2 = int(fields[2]) + + if P1 in socialMap: # if WMD id assigned a social id + fields[1] = str( socialMap[P1] ) # ... get social id + else: + T1 = int( num_persons * np.random.random() ) # ... chose a social ids from [1 .. 41,652,230] + socialMap[P1] = T1 # ... map WMD id to social id + fields[1] = str(T1) + + if P2 in socialMap: # if WMD id assigned a social id + fields[2] = str( socialMap[P2] ) # ... get social id + else: + T2 = int( num_persons * np.random.random() ) # ... chose a social ids from [1 .. 41,652,230] + socialMap[P2] = T2 # ... map WMD id to social id + fields[2] = str(T2) + + line = fields[0] + "," + fields[1] + "," + fields[2] + "," + fields[6] + "," + fields[7] + ",,," + SaleOut.write(line + "\n") + +#################################################### +########### CREATE COFFEE MARKET SUBGRAPH ########## +#################################################### + +chosenSet = set() +growerSet = set() +distributorSet = set() +wholesellerSet = set() +retailCustomerSet = set() + +num_persons = int(sys.argv[1]) +num_growers = int(sys.argv[2]) +num_distributors = int(sys.argv[3]) +num_wholesellers = int(sys.argv[4]) +num_retailCustomers = int(sys.argv[5]) + +random.seed(int( sys.argv[9] )) +np.random.seed(int( sys.argv[9] )) + +########### CHOSE PERSONS IN COFFEE SALE SUBGRAPH ########## +for i in range(0, num_growers): + person = int( num_persons * np.random.random() ) + while person in chosenSet: person = int( num_persons * np.random.random() ) + chosenSet.add(person) + growerSet.add(person) + +for i in range(0, num_distributors): + person = int( num_persons * np.random.random() ) + while person in chosenSet: person = int( num_persons * np.random.random() ) + chosenSet.add(person) + distributorSet.add(person) + +for i in range(0, num_wholesellers): + person = int( num_persons * np.random.random() ) + while person in chosenSet: person = int( num_persons * np.random.random() ) + chosenSet.add(person) + wholesellerSet.add(person) + +for i in range(0, num_retailCustomers): + person = int( num_persons * np.random.random() ) + while person in chosenSet: person = int( num_persons * np.random.random() ) + chosenSet.add(person) + retailCustomerSet.add(person) + +########### CONVERT SETS TO LISTS TO ACCESS BY RANDOM.CHOICE ########## +growers = list(growerSet) +distributors = list(distributorSet) +wholesellers = list(wholesellerSet) + +########### INIATIALIZE BUYS AND SELLS AMOUNTS ########## +buys = dict() +sells = dict() + +for i in range(0, num_persons): + buys[i] = 0 + sells[i] = 0 + +########### CONNECT RETAIL CUSTOMERS ########## +# Retail customers buy only from wholesellers, have 3 to 5 suppliers, and buy between 1 and 5 pounds of coffee +# from each supplier. +for customer in retailCustomerSet: + num_suppliers = int(3.0 * np.random.random() + 3.0) # 3 to 5 suppliers + suppliers = set() + + for j in range(0, num_suppliers): + supplier = random.choice(wholesellers) # supplier is a wholeseller + while supplier in suppliers: # buy only once from each supplier + supplier = random.choice(wholesellers) + + amount = 5.0 * np.random.random() + 1.0 # 1 to 5 pounds + sells[supplier] += amount + SaleOut.write("Sale," + str(supplier) + "," + str(customer) + ",8486,,,," + str(amount) + "\n") + +########### CONNECT WHOLESELLERS ########## +# Wholesellers buy from other wholesellers, distributors, and growers. They have 5 to 10 wholeseller suppliers +# from whom they buy 15 to 30 pounds of coffee. They buy arg[6] percent more coffee then they sell split 70/30 +# between distributors and growers. +multiplier = 1.0 + float(sys.argv[6]) / 100 + +for wholeseller in wholesellerSet: + num_suppliers = 5 + int(5.0 * np.random.random()) # 5 to 10 wholeseller suppliers + suppliers = set() + + for j in range(0, num_suppliers): + supplier = random.choice(wholesellers) # suppoer is a wholesellers + while (supplier == wholeseller) or (supplier in suppliers): # buy only once from each supplier + supplier = random.choice(wholesellers) + + amount = 15.0 * np.random.random() + 15.0 # 15 to 30 pounds + buys[wholeseller] += amount + sells[supplier] += amount + SaleOut.write("Sale," + str(supplier) + "," + str(wholeseller) + ",8486,,,," + str(amount) + "\n") + +for wholeseller in wholesellerSet: + to_buy = (sells[wholeseller] - buys[wholeseller]) * multiplier + suppliers = set() + + while to_buy > 0: + if np.random.random() < 0.70: # 70% of suppliers are distributors + supplier = random.choice(distributors) + while (supplier == wholeseller) or (supplier in suppliers): # ... buy only once from each supplier + supplier = random.choice(distributors) + else: # supplier is a grower 30% of the time + supplier = random.choice(growers) + while (supplier == wholeseller) or (supplier in suppliers): # ... buy only once from each supplier + supplier = random.choice(growers) + + amount = 15.0 * np.random.random() + 15.0 # 15 to 30 pounds + amount = min(amount, to_buy) # don't buy more than to_buy + to_buy -= amount + sells[supplier] += amount + SaleOut.write("Sale," + str(supplier) + "," + str(wholeseller) + ",8486,,,," + str(amount) + "\n") + +########### CONNECT DISTRIBUTORS ########## +# Distributors buy from other distributors and growers. They have 5 to 10 distributor suppliers from whom they +# buy 150 to 300 pounds of coffee. They buy arg[7] percent more coffee then they sell from growers. +multiplier = 1.0 + float(sys.argv[7]) / 100 + +for distributor in distributorSet: + num_suppliers = 5 + int(5.0 * np.random.random()) # 5 to 10 distributor suppliers + suppliers = set() + + for j in range(0, num_suppliers): + supplier = random.choice(distributors) # supplier is a distributors + while (supplier == distributor) or (supplier in suppliers): # buy only once from each supplier + supplier = random.choice(distributors) + + amount = 150.0 * np.random.random() + 150.0 # 150 to 300 pounds + buys[distributor] += amount + sells[supplier] += amount + SaleOut.write("Sale," + str(supplier) + "," + str(distributor) + ",8486,,,," + str(amount) + "\n") + +for distributor in distributorSet: + to_buy = (sells[distributor] - buys[distributor]) * multiplier + suppliers = set() + + while to_buy > 0: + supplier = random.choice(growers) + while (supplier == distributor) or (supplier in suppliers): # buy only once from each supplier + supplier = random.choice(growers) + + amount = 150.0 * np.random.random() + 150.0 # 150 to 300 pounds + amount = min(amount, to_buy) # don't buy more than to_buy + to_buy -= amount + sells[supplier] += amount + SaleOut.write("Sale," + str(supplier) + "," + str(distributor) + ",8486,,,," + str(amount) + "\n") + +########### CONNECT GROWERS ########## +# Growers buy arg[8] percent more coffee then they sell from themelves (self_edge). +multiplier = 1.0 + float(sys.argv[8]) / 100 + +for grower in growerSet: + amount = sells[grower] * multiplier + SaleOut.write("Sale," + str(grower) + "," + str(grower) + ",8486,,,," + str(amount) + "\n") + +SaleOut.close() # close sale file +SaleOut = open("commercial.csv", "r") # reopen sale file to read + +lines = SaleOut.readlines() # read sales records +random.shuffle(lines) # cwshuffle sales records + +SaleOut.close() # close sale file +SaleOut = open("commercial.csv", "w") # reopen sale file to write + +line1 = "#delimiter: ,\n" # set schema information +line2 = "#columns:type,person,person,topic,date,lat,lon,amount\n" +line3 = "#types:STRING,UINT,UINT,UINT,USDATE,DOUBLE,DOUBLE,DOUBLE\n" + +SaleOut.write(line1) +SaleOut.write(line2) +SaleOut.write(line3) +SaleOut.writelines(lines) diff --git a/scripts/wf4/generate/cyber.py b/scripts/wf4/generate/cyber.py new file mode 100644 index 0000000..a4cb52a --- /dev/null +++ b/scripts/wf4/generate/cyber.py @@ -0,0 +1,38 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +import os +import sys +import random +import numpy as np + +# command parameters: # +# +# 500000 2 +# + +CyberIn = open("netflow_day_3.csv", "r") +CyberOut = open("cyber.csv", "w") + +CyberOut.write(CyberIn.readline()) # copy first three lines +CyberOut.write(CyberIn.readline()) +CyberOut.write(CyberIn.readline()) + +num_servers = int(sys.argv[1]) +np.random.seed(int(sys.argv[2])) + +########### PRINT SOCIAL RECORDS ########## + +for S1 in range(0, num_servers): + num_connections = int( np.random.exponential(20.0) ) + + for i in range(0, num_connections): + S2 = int( np.random.random() * num_servers ) + while S1 == S2: S2 = int( np.random.random() * num_servers ) + + line = CyberIn.readline() # use next netflow record for properties + fields = line[:-1].split(',') + + line = str(S1) + "," + str(S2) + for field in fields[2:]: line += "," + field + CyberOut.write(line + "\n") diff --git a/scripts/wf4/generate/extract_nodes.py b/scripts/wf4/generate/extract_nodes.py new file mode 100644 index 0000000..ee1f31f --- /dev/null +++ b/scripts/wf4/generate/extract_nodes.py @@ -0,0 +1,75 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +import os +import sys +import random +import numpy as np + +# command parameters: +# +# + +csv_dir = sys.argv[1] +out_file = sys.argv[2] + +CommercialIn = open(csv_dir + "/commercial.csv", "r") +CyberIn = open(csv_dir + "/cyber.csv", "r") +SocialIn = open(csv_dir + "/social.csv", "r") +UsesIn = open(csv_dir + "/uses.csv", "r") +NodeOut = open(out_file, "w") + +person_ids = set() +device_ids = set() + +while True: + line = CommercialIn.readline() + if line == "": break + if line[0] == '#': continue + fields = line[:-1].split(',') + person_ids.add(fields[1]) + person_ids.add(fields[2]) +while True: + line = CyberIn.readline() + if line == "": break + if line[0] == '#': continue + fields = line[:-1].split(',') + device_ids.add(fields[0]) + device_ids.add(fields[1]) +while True: + line = SocialIn.readline() + if line == "": break + if line[0] == '#': continue + fields = line[:-1].split(',') + person_ids.add(fields[0]) + person_ids.add(fields[1]) +while True: + line = UsesIn.readline() + if line == "": break + if line[0] == '#': continue + fields = line[:-1].split(',') + person_ids.add(fields[0]) + device_ids.add(fields[1]) + +for person_id in person_ids: + NodeOut.write("Person," + person_id + ",,,,,,\n") +for device_id in device_ids: + NodeOut.write("Device," + device_id + ",,,,,,\n") + +NodeOut.close() # close sale file +NodeOut = open(out_file, "r") # reopen sale file to read + +lines = NodeOut.readlines() # read sales records +random.shuffle(lines) # cwshuffle sales records + +NodeOut.close() # close sale file +NodeOut = open(out_file, "w") # reopen sale file to write + +line1 = "#delimiter: ,\n" # set schema information +line2 = "#columns:type,person,person,topic,date,lat,lon\n" +line3 = "#types:STRING,UINT,UINT,UINT,USDATE,DOUBLE,DOUBLE\n" + +NodeOut.write(line1) +NodeOut.write(line2) +NodeOut.write(line3) +NodeOut.writelines(lines) diff --git a/scripts/wf4/generate/generate.sh b/scripts/wf4/generate/generate.sh new file mode 100644 index 0000000..c5de5ec --- /dev/null +++ b/scripts/wf4/generate/generate.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +SCRIPTS="${SCRIPTS:-scripts/generate}" +DATA="${DATA:-data}" + +GRAPH_NAME="${GRAPH_NAME:-graph}" +COMMERCIAL_PARAMETERS="${COMMERCIAL_PARAMETERS:-2000000 100 1000 10000 200000 30 50 70 0}" +CYBER_PARAMETERS="${CYBER_PARAMETERS:-500000 2}" +SOCIAL_PARAMETERS="${SOCIAL_PARAMETERS:-2000000 3}" +USES_PARAMETERS="${USES_PARAMETERS:-2000000 500000 4}" + +ln -s ${DATA}/workflow4-dataset-main/wmd.data.csv wmd.data.csv +ln -s ${DATA}/workflow4-dataset-main/netflow_day_3.csv netflow_day_3.csv +python ${SCRIPTS}/commercial.py ${COMMERCIAL_PARAMETERS} +python ${SCRIPTS}/cyber.py ${CYBER_PARAMETERS} +python ${SCRIPTS}/social.py ${SOCIAL_PARAMETERS} +python ${SCRIPTS}/uses.py ${USES_PARAMETERS} +python ${SCRIPTS}/extract_nodes.py . nodes.csv + +mkdir -p ${DATA}/${GRAPH_NAME} +mv commercial.csv ${DATA}/${GRAPH_NAME} +mv cyber.csv ${DATA}/${GRAPH_NAME} +mv social.csv ${DATA}/${GRAPH_NAME} +mv uses.csv ${DATA}/${GRAPH_NAME} +mv nodes.csv ${DATA}/${GRAPH_NAME} diff --git a/scripts/wf4/generate/generate_all.sh b/scripts/wf4/generate/generate_all.sh new file mode 100644 index 0000000..147b460 --- /dev/null +++ b/scripts/wf4/generate/generate_all.sh @@ -0,0 +1,28 @@ +#!/bin/bash + +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +echo "Make sure this is run from the root of the source directory" +echo "Must be run serially since these scripts suck and will conflict" + +export GRAPH_NAME="scale1" +export COMMERCIAL_PARAMETERS="2000000 100 1000 10000 200000 30 50 70 0" +export CYBER_PARAMETERS="500000 2" +export SOCIAL_PARAMETERS="2000000 3" +export USES_PARAMETERS="2000000 500000 4" +#bash scripts//generate/generate.sh + +export GRAPH_NAME="scale2" +export COMMERCIAL_PARAMETERS="4000000 200 2000 20000 400000 30 50 70 0" +export CYBER_PARAMETERS="1000000 2" +export SOCIAL_PARAMETERS="4000000 3" +export USES_PARAMETERS="4000000 1000000 4" +bash scripts//generate/generate.sh + +export GRAPH_NAME="scale3" +export COMMERCIAL_PARAMETERS="8000000 400 4000 40000 800000 30 50 70 0" +export CYBER_PARAMETERS="2000000 2" +export SOCIAL_PARAMETERS="8000000 3" +export USES_PARAMETERS="8000000 2000000 4" +#bash scripts//generate/generate.sh diff --git a/scripts/wf4/generate/social.py b/scripts/wf4/generate/social.py new file mode 100644 index 0000000..61283cb --- /dev/null +++ b/scripts/wf4/generate/social.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +import os +import sys +import random +import numpy as np + +# command parameters: # +# +# 2000000 3 +# + +SocialOut = open("social.csv", "w") + +line1 = "#delimieter: ,\n" +line2 = "#columns: person1,person2\n" +line3 = "#types: UINT,UINT\n" + +SocialOut.write(line1) +SocialOut.write(line2) +SocialOut.write(line3) + +num_persons = int(sys.argv[1]) +np.random.seed(int(sys.argv[2])) + +########### PRINT SOCIAL RECORDS ########## + +for person in range(0, num_persons): + num_friends = int( np.random.exponential(20.0) ) + if num_friends > 20: num_friends = int( num_friends / 10 ) # let's not generate too many edges + + for i in range(0, num_friends): + friend = int( np.random.random() * num_persons ) + while friend == person: friend = int( np.random.random() * num_persons ) + + SocialOut.write(str(person) + "," + str(friend) + "\n") diff --git a/scripts/wf4/generate/uses.py b/scripts/wf4/generate/uses.py new file mode 100644 index 0000000..1e779b1 --- /dev/null +++ b/scripts/wf4/generate/uses.py @@ -0,0 +1,36 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +import os +import sys +import random +import numpy as np + +# command parameters: # +# +# 2000000 500000 4 +# + +UsesOut = open("uses.csv", "w") + +line1 = "#delimieter: ,\n" +line2 = "#columns: person,server\n" +line3 = "#types: UINT,UINT\n" + +UsesOut.write(line1) +UsesOut.write(line2) +UsesOut.write(line3) + +num_persons = int(sys.argv[1]) +num_servers = int(sys.argv[2]) +np.random.seed(int(sys.argv[3])) + +########### PRINT SOCIAL RECORDS ########## + +for person in range(0, num_persons): + num_used = int( np.random.exponential(20.0) ) + if num_used > 20: num_used = int( num_used / 10 ) # let's not generate too many edges + + for i in range(0, num_used): + server = int( np.random.random() * num_servers ) + UsesOut.write(str(person) + "," + str(server) + "\n") diff --git a/scripts/wf4/run.sh b/scripts/wf4/run.sh new file mode 100644 index 0000000..6bb6f76 --- /dev/null +++ b/scripts/wf4/run.sh @@ -0,0 +1,70 @@ +#!/bin/bash + +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +TACC="${TACC:-}" +HOST_THREADS="${HOST_THREADS:-32}" + +HOSTS="${HOSTS:-1}" +PROCS="${PROCS:-4}" +TIME="${TIME:-0:15:00}" +QUEUE="${QUEUE:-normal}" +BUILD="${BUILD:-build}" +DATA="${DATA:-data}" +JOBS="${JOBN:-wf4-run}" +OUTS="${OUTS:-wf4}" + +THREADD=$((${HOST_THREADS} * ${HOSTS} / ${PROCS})) +THREADS="${THREADS:-${THREADD}}" +K="${K:-100}" +GRAPH_DIR="${GRAPH_DIR:-data/scale1}" +EPOCHS="${EPOCHS:-100}" +GRAPH_NAME="${GRAPH_NAME:-scale1}" +RRR_SETS="${RRR_SETS:-791625}" +INFLUENTIAL_THRESHOLD="${INFLUENTIAL_THRESHOLD:-0}" +SEED="${SEED:-9801}" +BIND="${BIND:-hwthread}" + +JOBN=${DATA}/${JOBS}_${HOSTS}_${PROCS}_${THREADS}_${GRAPH_NAME}_${K}_${EPOCHS}_${RRR_SETS}_${SEED} +OUTF=${DATA}/${OUTS}_${HOSTS}_${PROCS}_${THREADS}_${GRAPH_NAME}_${K}_${EPOCHS}_${RRR_SETS}_${SEED} +ENV=/scratch/09601/pkenney/pando-workflow4-galois/scripts/tacc_env.sh + +echo $HOSTS +echo $PROCS +echo $THREADS +echo $TIME +echo $GRAPH_DIR +echo $DATA +echo $QUEUE +echo $JOBN +echo $OUTF + +if [ ! -z "${TACC}" ]; then + sbatch < + +// Demonstrate some basic assertions. +TEST(HelloTest, BasicAssertions) { + // Expect two strings not to be equal. + EXPECT_STRNE("hello", "world"); + // Expect equality. + EXPECT_EQ(7 * 6, 42); +} diff --git a/wf4/CMakeLists.txt b/wf4/CMakeLists.txt new file mode 100644 index 0000000..9abc1ea --- /dev/null +++ b/wf4/CMakeLists.txt @@ -0,0 +1,14 @@ +# SPDX-License-Identifier: BSD-2-Clause +# Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +set(sources + src/import.cpp + src/influencer.cpp + src/main.cpp + src/quiesce.cpp +) + +add_executable(wf4) +target_sources(wf4 PRIVATE ${sources}) +target_include_directories(wf4 PRIVATE ${CMAKE_CURRENT_LIST_DIR}/include ${PHMAP_DIRECTORY} ${PCG_DIRECTORY} ${graph-log-sketch_SOURCE_DIR}/include) +target_link_libraries(wf4 PRIVATE Galois::cusp Galois::dist_async Galois::gluon Galois::wmd) diff --git a/wf4/include/full_graph.hpp b/wf4/include/full_graph.hpp new file mode 100644 index 0000000..c7f6cef --- /dev/null +++ b/wf4/include/full_graph.hpp @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#pragma once + +#include +#include +#include + +#include "galois/graphs/CuSPPartitioner.h" +#include "galois/graphs/GenericPartitioners.h" +#include "galois/graphs/GluonSubstrate.h" +#include "galois/wmd/WMDPartitioner.h" +#include "galois/wmd/graphTypes.h" + +namespace wf4 { + +class FullNetworkNode; +class FullNetworkEdge; +typedef galois::graphs::WMDGraph + FullNetworkGraph; + +class FullNetworkEdge { +public: + FullNetworkEdge() = default; + FullNetworkEdge(time_t date, double amount) + : date_(date), amount_(amount), weight_(0) {} + explicit FullNetworkEdge(const std::vector& tokens) { + double amount_sold = 0; + if (tokens[7].size() > 0) { + amount_sold = std::stod(tokens[7]); + } + + struct std::tm tm; + std::istringstream ss(tokens[4]); + ss >> std::get_time(&tm, "%MM:%DD:%YYYY"); + std::time_t date = mktime(&tm); + + date_ = date; + amount_ = amount_sold; + weight_ = 0; + src = std::stoull(tokens[1]); + dst = std::stoull(tokens[2]); + if (tokens[3].size() > 0) { + topic = std::stoull(tokens[3]); + } + } + FullNetworkEdge(agile::workflow1::TYPES type_, + const std::vector& tokens) + : type(type_) { + src = std::stoull(tokens[0]); + dst = std::stoull(tokens[1]); + const uint64_t half_max = std::numeric_limits::max() / 2; + + if (type == agile::workflow1::TYPES::USES) { + src_type = agile::workflow1::TYPES::PERSON; + dst_type = agile::workflow1::TYPES::DEVICE; + dst = half_max + (dst % half_max); + } else if (type == agile::workflow1::TYPES::FRIEND) { + src_type = agile::workflow1::TYPES::PERSON; + dst_type = agile::workflow1::TYPES::PERSON; + } else if (type == agile::workflow1::TYPES::COMMUNICATION) { + src_type = agile::workflow1::TYPES::DEVICE; + dst_type = agile::workflow1::TYPES::DEVICE; + src = half_max + (src % half_max); + dst = half_max + (dst % half_max); + + epoch_time = std::stoull(tokens[2]); + duration = std::stoull(tokens[3]); + protocol = std::stoull(tokens[4]); + src_port = std::stoull(tokens[5]); + dst_port = std::stoull(tokens[6]); + src_packets = std::stoull(tokens[7]); + dst_packets = std::stoull(tokens[8]); + src_bytes = std::stoull(tokens[9]); + dst_bytes = std::stoull(tokens[10]); + } + } + + time_t date_; + double amount_; + double weight_; + agile::workflow1::TYPES type = agile::workflow1::TYPES::SALE; + agile::workflow1::TYPES src_type = agile::workflow1::TYPES::PERSON; + agile::workflow1::TYPES dst_type = agile::workflow1::TYPES::PERSON; + uint64_t src; + uint64_t dst; + uint64_t src_glbid = std::numeric_limits::max(); + uint64_t dst_glbid = std::numeric_limits::max(); + + uint64_t topic; + + uint64_t epoch_time; + uint64_t duration; + uint64_t protocol; + uint64_t src_port; + uint64_t dst_port; + uint64_t src_packets; + uint64_t dst_packets; + uint64_t src_bytes; + uint64_t dst_bytes; +}; + +struct FullNetworkNode { +public: + FullNetworkNode() = default; + FullNetworkNode(uint64_t id_, uint64_t, agile::workflow1::TYPES type) + : id(id_), glbid(id_), sold_(0), bought_(0), desired_(0), type_(type) {} + explicit FullNetworkNode(uint64_t id_) + : id(id_), glbid(id_), sold_(0), bought_(0), desired_(0) {} + + void serialize(galois::runtime::SerializeBuffer& buf) const { + galois::runtime::gSerialize(buf, id, type_); + } + void deserialize(galois::runtime::DeSerializeBuffer& buf) const { + galois::runtime::gDeserialize(buf, id, type_); + } + + uint64_t id; + uint64_t glbid; + galois::CopyableAtomic + frequency_; // number of occurrences in Reverse Reachable Sets + galois::CopyableAtomic sold_; // amount of coffee sold + galois::CopyableAtomic + bought_; // amount of coffee bought (>= coffee sold) + double desired_; // amount of coffee desired (>= coffee bought) + + agile::workflow1::TYPES type_; + uint64_t extra_data; +}; + +} // namespace wf4 + +template <> +struct galois::runtime::has_serialize { + static const bool value = true; +}; diff --git a/wf4/include/graph.hpp b/wf4/include/graph.hpp new file mode 100644 index 0000000..cec2f03 --- /dev/null +++ b/wf4/include/graph.hpp @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#pragma once + +#include +#include +#include +#include +#include + +#include "full_graph.hpp" +#include "galois/AtomicWrapper.h" +#include "galois/DynamicBitset.h" +#include "galois/graphs/CuSPPartitioner.h" +#include "galois/graphs/GenericPartitioners.h" +#include "galois/graphs/GluonSubstrate.h" +#include "galois/runtime/SyncStructures.h" +#include "galois/wmd/WMDPartitioner.h" +#include "galois/wmd/graphTypes.h" + +class NodeData; + +namespace wf4 { + +using GlobalNodeID = uint64_t; + +class NetworkEdge; +using NetworkNode = NodeData; +typedef galois::graphs::WMDGraph + NetworkGraph; + +class NetworkEdge { +public: + NetworkEdge() = default; + NetworkEdge(time_t date, double amount) + : date_(date), amount_(amount), weight_(0) {} + explicit NetworkEdge(const std::vector& tokens) { + double amount_sold = 0; + if (tokens[7].size() > 0) { + amount_sold = std::stod(tokens[7]); + } + + struct std::tm tm; + std::istringstream ss(tokens[4]); + ss >> std::get_time(&tm, "%MM:%DD:%YYYY"); + std::time_t date = mktime(&tm); + + date_ = date; + amount_ = amount_sold; + weight_ = 0; + src = std::stoull(tokens[1]); + dst = std::stoull(tokens[2]); + } + explicit NetworkEdge(const wf4::FullNetworkEdge& full_edge) { + date_ = full_edge.date_; + amount_ = full_edge.amount_; + weight_ = full_edge.weight_; + type = full_edge.type; + src_type = full_edge.src_type; + dst_type = full_edge.dst_type; + src = full_edge.src; + dst = full_edge.dst; + src_glbid = full_edge.src_glbid; + dst_glbid = full_edge.dst_glbid; + } + + time_t date_; + double amount_; + double weight_; + agile::workflow1::TYPES type = agile::workflow1::TYPES::SALE; + agile::workflow1::TYPES src_type = agile::workflow1::TYPES::PERSON; + agile::workflow1::TYPES dst_type = agile::workflow1::TYPES::PERSON; + uint64_t src; + uint64_t dst; + uint64_t src_glbid = std::numeric_limits::max(); + uint64_t dst_glbid = std::numeric_limits::max(); +}; + +struct InputFiles { + InputFiles(std::string input_directory, std::string commercial_file, + std::string cyber_file, std::string social_file, + std::string uses_file, std::string nodes_file) { + if (input_directory.size() > 0) { + commercial_files.emplace_back(input_directory + "/commercial.csv"); + cyber_files.emplace_back(input_directory + "/cyber.csv"); + social_files.emplace_back(input_directory + "/social.csv"); + uses_files.emplace_back(input_directory + "/uses.csv"); + nodes_files.emplace_back(input_directory + "/nodes.csv"); + } + if (commercial_file.size() > 0) { + commercial_files.emplace_back(commercial_file); + } + if (cyber_file.size() > 0) { + cyber_files.emplace_back(cyber_file); + } + if (social_file.size() > 0) { + social_files.emplace_back(social_file); + } + if (uses_file.size() > 0) { + uses_files.emplace_back(uses_file); + } + if (nodes_file.size() > 0) { + nodes_files.emplace_back(nodes_file); + } + } + + void Print() { + for (const std::string& file : commercial_files) { + std::cout << "Commercial file: " << file << std::endl; + } + for (const std::string& file : cyber_files) { + std::cout << "Cyber file: " << file << std::endl; + } + for (const std::string& file : social_files) { + std::cout << "Social file: " << file << std::endl; + } + for (const std::string& file : uses_files) { + std::cout << "Uses file: " << file << std::endl; + } + for (const std::string& file : nodes_files) { + std::cout << "Node file: " << file << std::endl; + } + } + + std::vector commercial_files; + std::vector cyber_files; + std::vector social_files; + std::vector uses_files; + std::vector nodes_files; +}; + +} // namespace wf4 + +struct NodeData { +public: + NodeData() = default; + NodeData(uint64_t id_, uint64_t, agile::workflow1::TYPES) + : id(id_), sold_(0), bought_(0), desired_(0) {} + explicit NodeData(uint64_t id_) + : id(id_), sold_(0), bought_(0), desired_(0) {} + explicit NodeData(const wf4::FullNetworkNode& full_node) + : id(full_node.id), frequency_(full_node.frequency_), + sold_(full_node.sold_), bought_(full_node.bought_), + desired_(full_node.desired_) {} + + void serialize(galois::runtime::SerializeBuffer& buf) const { + galois::runtime::gSerialize(buf, id); + } + void deserialize(galois::runtime::DeSerializeBuffer& buf) const { + galois::runtime::gDeserialize(buf, id); + } + + void Cancel() { + sold_ = 0; + bought_ = 0; + desired_ = 0; + } + + uint64_t id; + galois::CopyableAtomic + frequency_; // number of occurrences in Reverse Reachable Sets + galois::CopyableAtomic sold_; // amount of coffee sold + galois::CopyableAtomic + bought_; // amount of coffee bought (>= coffee sold) + double desired_; // amount of coffee desired (>= coffee bought) +}; + +template <> +struct galois::runtime::has_serialize { + static const bool value = true; +}; + +extern galois::DynamicBitSet bitset_bought_; +extern galois::DynamicBitSet bitset_sold_; +GALOIS_SYNC_STRUCTURE_REDUCE_SET(sold_, double); +GALOIS_SYNC_STRUCTURE_REDUCE_ADD(sold_, double); +GALOIS_SYNC_STRUCTURE_BITSET(sold_); +GALOIS_SYNC_STRUCTURE_REDUCE_SET(bought_, double); +GALOIS_SYNC_STRUCTURE_REDUCE_ADD(bought_, double); +GALOIS_SYNC_STRUCTURE_BITSET(bought_); diff --git a/wf4/include/import.hpp b/wf4/include/import.hpp new file mode 100644 index 0000000..389509e --- /dev/null +++ b/wf4/include/import.hpp @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#pragma once + +#include +#include +#include + +#include "graph.hpp" + +namespace wf4 { + +std::unique_ptr ImportData(const InputFiles& input_files); +std::unique_ptr +ProjectGraph(std::unique_ptr full_graph); + +namespace internal { + +std::unique_ptr ImportGraph(const InputFiles& input_files); + +struct NetworkGraphProjection { + bool KeepNode(FullNetworkGraph& graph, FullNetworkGraph::GraphNode node); + bool KeepEdgeLessMasters() const { return false; } + bool KeepEdge(FullNetworkGraph& graph, const FullNetworkEdge& edge, + FullNetworkGraph::GraphNode, FullNetworkGraph::GraphNode dst); + NetworkNode ProjectNode(FullNetworkGraph&, const FullNetworkNode& node, + FullNetworkGraph::GraphNode); + NetworkEdge ProjectEdge(FullNetworkGraph&, const FullNetworkEdge& edge, + FullNetworkGraph::GraphNode, + FullNetworkGraph::GraphNode); +}; + +class CyberParser : public galois::graphs::FileParser { +public: + explicit CyberParser(std::vector files) : files_(files) {} + + const std::vector& GetFiles() override { return files_; } + galois::graphs::ParsedGraphStructure + ParseLine(char* line, uint64_t lineLength) override; + +private: + uint64_t csv_fields_ = 11; + std::vector files_; +}; + +class SocialParser : public galois::graphs::FileParser { +public: + explicit SocialParser(std::vector files) : files_(files) {} + + const std::vector& GetFiles() override { return files_; } + galois::graphs::ParsedGraphStructure + ParseLine(char* line, uint64_t lineLength) override; + +private: + uint64_t csv_fields_ = 2; + std::vector files_; +}; + +class UsesParser : public galois::graphs::FileParser { +public: + explicit UsesParser(std::vector files) : files_(files) {} + + const std::vector& GetFiles() override { return files_; } + galois::graphs::ParsedGraphStructure + ParseLine(char* line, uint64_t lineLength) override; + +private: + uint64_t csv_fields_ = 2; + std::vector files_; +}; + +class NodeParser : public galois::graphs::FileParser { +public: + explicit NodeParser(std::vector files) : files_(files) {} + + const std::vector& GetFiles() override { return files_; } + galois::graphs::ParsedGraphStructure + ParseLine(char* line, uint64_t lineLength) override; + +private: + uint64_t csv_fields_ = 7; + std::vector files_; +}; + +} // namespace internal + +} // namespace wf4 diff --git a/wf4/include/influencer.hpp b/wf4/include/influencer.hpp new file mode 100644 index 0000000..a47126a --- /dev/null +++ b/wf4/include/influencer.hpp @@ -0,0 +1,120 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#pragma once + +#include +#include +#include +#include + +#include "galois/PerThreadContainer.h" +#include "galois/graphs/GluonSubstrate.h" +#include "galois/gstl.h" +#include "graph.hpp" +#include "parallel_hashmap/phmap.h" + +namespace wf4 { + +typedef std::vector> ReverseReachableSet; + +void CalculateEdgeProbabilities( + NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate); +ReverseReachableSet GetRandomReverseReachableSets(NetworkGraph& graph, + uint64_t num_sets, + uint64_t seed, + uint64_t epochs); +std::vector +GetInfluentialNodes(wf4::NetworkGraph& graph, + ReverseReachableSet&& reachability_sets, uint64_t num_nodes, + uint64_t influential_node_threshold); + +namespace internal { + +const uint64_t INITIAL_SET_SIZE = 10; +struct PartialReachableSet { + PartialReachableSet() + : id(0), owner_host(0), + partial_reachable_set( + phmap::flat_hash_set(INITIAL_SET_SIZE)), + frontier(phmap::flat_hash_set(INITIAL_SET_SIZE)) {} + PartialReachableSet(uint64_t id_, uint32_t host_, wf4::GlobalNodeID root_); + + bool Finished() { return frontier.size() == 0; } + uint32_t NextHost(wf4::NetworkGraph& graph, bool finish); + friend size_t hash_value(const PartialReachableSet& set) { + return phmap::HashState().combine(0, set.owner_host, set.id); + } + void serialize(galois::runtime::SerializeBuffer& buf); + void deserialize(galois::runtime::DeSerializeBuffer& buf); + + uint64_t id; + uint64_t owner_host; + phmap::flat_hash_set partial_reachable_set; + phmap::flat_hash_set frontier; +} typedef PartialReachableSet; + +struct LocalMaxNode { + LocalMaxNode() : max_influence(0) {} + LocalMaxNode(wf4::GlobalNodeID node, uint64_t influence) + : max_node(node), max_influence(influence) {} + + wf4::GlobalNodeID max_node; + uint64_t max_influence; +} typedef LocalMaxNode; + +void FillNodeValues(wf4::NetworkGraph& graph, + const wf4::NetworkGraph::GraphNode& node); +void CalculateEdgeProbability(wf4::NetworkGraph& graph, + const wf4::NetworkGraph::GraphNode& node, + galois::DGAccumulator& total_edge_weights, + galois::DGAccumulator& total_sales); + +void GenerateSeedNodes(galois::gstl::Vector& seed_nodes, + wf4::NetworkGraph& graph, uint64_t seed, uint64_t id, + uint64_t host); +void GenerateRandomReversibleReachableSets( + wf4::ReverseReachableSet& reachability_sets, wf4::NetworkGraph& graph, + galois::gstl::Vector&& partial_sets, + galois::gstl::Vector>& + remote_partial_sets, + bool finish); +void ContinueRandomReversibleReachableSet(wf4::NetworkGraph& graph, + PartialReachableSet& partial_set, + uint64_t random_noise, bool finish); + +wf4::GlobalNodeID GetMostInfluentialNode(wf4::NetworkGraph& graph); +void FindLocalMaxNode(wf4::NetworkGraph& graph, + wf4::NetworkGraph::GraphNode node, + galois::gstl::Vector& local_max_node, + galois::DGAccumulator& total_influence); +LocalMaxNode +GetMaxNode(galois::PerThreadVector& most_influential_nodes); +LocalMaxNode GetMaxNode(std::vector& most_influential_nodes); + +void RemoveReachableSetsWithInfluentialNode( + wf4::NetworkGraph& graph, + phmap::flat_hash_set& reachability_set, + const wf4::GlobalNodeID& influential_node, + galois::gstl::Vector< + phmap::parallel_flat_hash_map_m>& + remote_updates); +void RemoveNodesFromRemotes( + wf4::NetworkGraph& graph, + galois::gstl::Vector< + phmap::parallel_flat_hash_map_m>& + remote_updates); + +galois::gstl::Vector ExchangePartialSets( + galois::gstl::Vector>& + remote_partial_sets); +std::vector +ExchangeMostInfluentialNodes(const LocalMaxNode& local_max); +void SerializeMap( + galois::runtime::SendBuffer& buf, + phmap::parallel_flat_hash_map_m& updates); + +} // namespace internal + +} // namespace wf4 diff --git a/wf4/include/quiesce.hpp b/wf4/include/quiesce.hpp new file mode 100644 index 0000000..7999b14 --- /dev/null +++ b/wf4/include/quiesce.hpp @@ -0,0 +1,76 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#pragma once + +#include + +#include "galois/graphs/GluonSubstrate.h" +#include "graph.hpp" +#include "parallel_hashmap/phmap.h" + +namespace wf4 { + +void CancelNodes(NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate, + std::vector nodes); +void QuiesceGraph(NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate); + +namespace internal { + +struct __attribute__((packed)) ProposedEdge { + ProposedEdge(wf4::GlobalNodeID src_, wf4::GlobalNodeID dst_, double weight_) + : src(src_), dst(dst_), weight(weight_) {} + friend size_t hash_value(const ProposedEdge& set) { + return phmap::HashState().combine(0, set.src, set.dst); + } + bool operator==(const ProposedEdge& other) const; + + wf4::GlobalNodeID src; + wf4::GlobalNodeID dst; + double weight; +} typedef ProposedEdge; + +struct IncomingEdges { + IncomingEdges() { empty = true; } + explicit IncomingEdges(galois::runtime::DeSerializeBuffer&& buffer); + + galois::runtime::DeSerializeBuffer buffer_; + ProposedEdge* edges; + uint64_t num_edges; + bool empty; +} typedef IncomingEdges; + +void CancelNode(wf4::NetworkGraph& graph, wf4::GlobalNodeID node_gid); + +void TryQuiesceNode( + wf4::NetworkGraph& graph, + galois::gstl::Vector>& + remote_edges, + wf4::NetworkGraph::GraphNode node); + +void TryAddEdges( + wf4::NetworkGraph& graph, + galois::gstl::Vector>& + remote_edges, + std::vector&& incoming_edges); +void TryAddEdge( + wf4::NetworkGraph& graph, + galois::gstl::Vector>& + remote_edges, + const ProposedEdge& edge); + +void AddPurchaseEdges(wf4::NetworkGraph& graph, + std::vector&& decided_edges); +void AddPurchaseEdge(wf4::NetworkGraph& graph, const ProposedEdge& edge); + +std::vector SendNewEdges( + galois::gstl::Vector>& + remote_edges); +void SerializeSet(galois::runtime::SendBuffer& buf, + phmap::parallel_flat_hash_set_m& edges); + +} // namespace internal + +} // namespace wf4 diff --git a/wf4/src/import.cpp b/wf4/src/import.cpp new file mode 100644 index 0000000..cd4726b --- /dev/null +++ b/wf4/src/import.cpp @@ -0,0 +1,135 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#include "import.hpp" + +#include +#include +#include +#include +#include +#include + +#include "galois/wmd/schema.h" + +std::unique_ptr +wf4::ImportData(const InputFiles& input_files) { + std::unique_ptr full_graph = + internal::ImportGraph(input_files); + + return full_graph; +} + +std::unique_ptr +wf4::ProjectGraph(std::unique_ptr full_graph) { + internal::NetworkGraphProjection projection; + (void)projection; + std::unique_ptr projected_graph = + full_graph->Project( + projection); + return projected_graph; +} + +std::unique_ptr +wf4::internal::ImportGraph(const wf4::InputFiles& input_files) { + auto& net = galois::runtime::getSystemNetworkInterface(); + std::vector>> + parsers; + + parsers.emplace_back( + std::make_unique< + galois::graphs::WMDParser>( + 8, input_files.commercial_files)); + parsers.emplace_back(std::make_unique(input_files.cyber_files)); + parsers.emplace_back( + std::make_unique(input_files.social_files)); + parsers.emplace_back(std::make_unique(input_files.uses_files)); + parsers.emplace_back(std::make_unique(input_files.nodes_files)); + std::unique_ptr g = + std::make_unique(parsers, net.ID, net.Num); + galois::runtime::getHostBarrier().wait(); + return g; +} + +bool wf4::internal::NetworkGraphProjection::KeepNode( + wf4::FullNetworkGraph& graph, wf4::FullNetworkGraph::GraphNode node) { + return graph.getData(node).type_ == agile::workflow1::TYPES::PERSON; +} + +bool wf4::internal::NetworkGraphProjection::KeepEdge( + wf4::FullNetworkGraph& graph, const wf4::FullNetworkEdge& edge, + wf4::FullNetworkGraph::GraphNode, wf4::FullNetworkGraph::GraphNode dst) { + return (edge.type == agile::workflow1::TYPES::PURCHASE || + edge.type == agile::workflow1::TYPES::SALE) && + edge.topic == 8486 && edge.amount_ > 0 && KeepNode(graph, dst); +} + +wf4::NetworkNode wf4::internal::NetworkGraphProjection::ProjectNode( + wf4::FullNetworkGraph&, const wf4::FullNetworkNode& node, + wf4::FullNetworkGraph::GraphNode) { + return NetworkNode(node); +} +wf4::NetworkEdge wf4::internal::NetworkGraphProjection::ProjectEdge( + wf4::FullNetworkGraph&, const wf4::FullNetworkEdge& edge, + wf4::FullNetworkGraph::GraphNode, wf4::FullNetworkGraph::GraphNode) { + return NetworkEdge(edge); +} + +galois::graphs::ParsedGraphStructure +wf4::internal::CyberParser::ParseLine(char* line, uint64_t lineLength) { + std::vector tokens = + this->SplitLine(line, lineLength, ',', csv_fields_); + + std::vector edges; + edges.emplace_back( + wf4::FullNetworkEdge(agile::workflow1::TYPES::COMMUNICATION, tokens)); + + return galois::graphs::ParsedGraphStructure(edges); +} + +galois::graphs::ParsedGraphStructure +wf4::internal::SocialParser::ParseLine(char* line, uint64_t lineLength) { + std::vector tokens = + this->SplitLine(line, lineLength, ',', csv_fields_); + + std::vector edges; + edges.emplace_back( + wf4::FullNetworkEdge(agile::workflow1::TYPES::FRIEND, tokens)); + + return galois::graphs::ParsedGraphStructure(edges); +} + +galois::graphs::ParsedGraphStructure +wf4::internal::UsesParser::ParseLine(char* line, uint64_t lineLength) { + std::vector tokens = + this->SplitLine(line, lineLength, ',', csv_fields_); + + std::vector edges; + edges.emplace_back( + wf4::FullNetworkEdge(agile::workflow1::TYPES::USES, tokens)); + + return galois::graphs::ParsedGraphStructure(edges); +} + +galois::graphs::ParsedGraphStructure +wf4::internal::NodeParser::ParseLine(char* line, uint64_t lineLength) { + std::vector tokens = + this->SplitLine(line, lineLength, ',', csv_fields_); + + galois::graphs::ParsedUID uid = + shad::data_types::encode(tokens[1]); + agile::workflow1::TYPES type = agile::workflow1::TYPES::PERSON; + if (tokens[0] == "Device") { + const uint64_t half_max = std::numeric_limits::max() / 2; + uid = half_max + (uid % half_max); + type = agile::workflow1::TYPES::DEVICE; + } + wf4::FullNetworkNode node(uid, 0, type); + + return galois::graphs::ParsedGraphStructure(node); +} diff --git a/wf4/src/influencer.cpp b/wf4/src/influencer.cpp new file mode 100644 index 0000000..5d25a01 --- /dev/null +++ b/wf4/src/influencer.cpp @@ -0,0 +1,687 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#include "influencer.hpp" + +#include +#include + +#include "galois/AtomicHelpers.h" +#include "parallel_hashmap/phmap_dump.h" +#include "pcg_random.hpp" + +namespace { + +static uint32_t EXCHANGE_INFLUENTIAL_NODES = 9801; +static uint32_t EXCHANGE_PARTIAL_SETS = 19602; +static uint32_t REMOVE_RRR_SETS = 39204; + +void dumpGraph(wf4::NetworkGraph& graph) { + for (auto src : graph.masterNodesRange()) { + std::cout << "LID: " << src << ", GID: " << graph.getGID(src) + << ", LID2: " << graph.getLID(graph.getGID(src)) + << ", UID: " << graph.getData(src).id + << ", Sold: " << graph.getData(src).sold_ << std::endl; + for (auto edge : graph.edges(src)) { + std::cout << "TOPOLOGY: SRC_LID: " << src + << ", DST_LID: " << graph.getEdgeDst(edge) + << ", DST_GID: " << graph.getGID(graph.getEdgeDst(edge)) + << ", DST_UID: " << graph.getData(graph.getEdgeDst(edge)).id + << std::endl; + auto data = graph.getEdgeData(edge); + std::cout << "EDGE: SRC_UID: " << data.src + << ", SRC_GID: " << data.src_glbid << ", DST_UID: " << data.dst + << ", DST_GID: " << data.dst_glbid + << ", Sold: " << graph.getData(graph.getEdgeDst(edge)).sold_ + << std::endl; + } + } +} + +void verifyInfluentialNodesDebug( + wf4::NetworkGraph& graph, + const std::vector& influential_nodes, + uint64_t influential_node_threshold) { + // This function only works with synthetically generated graphs + // from the `tools` directory and is for debugging ease only + + if (influential_node_threshold == 0) { + return; + } + auto& net = galois::runtime::getSystemNetworkInterface(); + galois::DGAccumulator non_influential_nodes; + non_influential_nodes.reset(); + for (const wf4::GlobalNodeID& influential_node : influential_nodes) { + if (graph.isOwned(influential_node)) { + const wf4::NetworkNode& node_data = + graph.getData(graph.getLID(influential_node)); + if (node_data.id > influential_node_threshold) { + non_influential_nodes += 1; + } + } + } + non_influential_nodes.reduce(); + if (net.ID == 0 && + non_influential_nodes.read() > influential_nodes.size() / 10) { + std::cout << "ERROR: Too many non-influential nodes found: " + << non_influential_nodes.read() << "/" << influential_nodes.size() + << std::endl; + std::cerr << "ERROR: Too many non-influential nodes found: " + << non_influential_nodes.read() << "/" << influential_nodes.size() + << std::endl; + std::cout << "The node threshold was: " << influential_node_threshold + << std::endl; + std::cerr << "The node threshold was: " << influential_node_threshold + << std::endl; + } +} + +} // end namespace + +void wf4::CalculateEdgeProbabilities( + wf4::NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate) { + const bool async = false; + substrate.set_num_round(0); + bitset_sold_.resize(graph.size()); + bitset_sold_.reset(); + galois::do_all( + galois::iterate(graph.masterNodesRange().begin(), + graph.masterNodesRange().end()), + [&](NetworkGraph::GraphNode node) { + internal::FillNodeValues(graph, node); + }, + galois::loopname("FillNodeValues")); + // update mirrors + substrate.sync("Update Mirrors"); + galois::runtime::getHostBarrier().wait(); + bitset_sold_.resize(0); + + galois::DGAccumulator total_edge_weights; + galois::DGAccumulator total_sales; + total_edge_weights.reset(); + total_sales.reset(); + galois::do_all( + galois::iterate(graph.masterNodesRange().begin(), + graph.masterNodesRange().end()), + [&](NetworkGraph::GraphNode node) { + internal::CalculateEdgeProbability(graph, node, total_edge_weights, + total_sales); + }, + galois::loopname("CalculateEdgeProbabilities")); + total_edge_weights.reduce(); + total_sales.reduce(); + if (galois::runtime::getSystemNetworkInterface().ID == 0) { + std::cout << "Total Edge weights: " << total_edge_weights.read() + << std::endl; + std::cout << "Total sold: " << total_sales.read() << std::endl; + } +} + +wf4::ReverseReachableSet +wf4::GetRandomReverseReachableSets(wf4::NetworkGraph& graph, + uint64_t global_sets, uint64_t seed, + uint64_t epochs) { + galois::Timer timer; + timer.start(); + galois::Timer seed_timer; + seed_timer.start(); + + uint32_t threads = galois::getActiveThreads(); + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t host = net.ID; + uint32_t num_hosts = net.Num; + uint64_t host_sets = global_sets / num_hosts; + if (host == num_hosts - 1) { + host_sets = global_sets - ((num_hosts - 1) * (global_sets / num_hosts)); + } + // lazy allocation does not help, some solution should be put in place to + // limit memory overhead + ReverseReachableSet reachable_sets( + host_sets, + phmap::flat_hash_set(internal::INITIAL_SET_SIZE)); + galois::gstl::Vector partial_sets(host_sets); + galois::gstl::Vector> + remote_partial_sets(num_hosts); + bool finished = false; + + std::cout << "Started generating seed nodes" << std::endl; + galois::do_all( + galois::iterate(uint64_t(0), host_sets), + [&](uint64_t i) { + internal::GenerateSeedNodes(partial_sets, graph, seed, i, host); + }, + galois::loopname("GenerateSeedNodes")); + seed_timer.stop(); + std::cout << "Finished generating seed nodes, time: " + << seed_timer.get_usec() / 1000000.0f << "s" << std::endl; + + for (uint64_t epoch = 0; epoch < epochs; epoch++) { + if (partial_sets.size() > 0) { + std::cout << "Epoch: " << epoch + << ", Partial Sets: " << partial_sets.size() << std::endl; + } + internal::GenerateRandomReversibleReachableSets( + reachable_sets, graph, std::move(partial_sets), remote_partial_sets, + finished); + partial_sets = internal::ExchangePartialSets(remote_partial_sets); + } + + // clean up logic, any reachable sets not run to completion are sent back to + // their owner host where it is added as-is to reachable_sets + finished = true; + internal::GenerateRandomReversibleReachableSets( + reachable_sets, graph, std::move(partial_sets), remote_partial_sets, + finished); + partial_sets = internal::ExchangePartialSets(remote_partial_sets); + internal::GenerateRandomReversibleReachableSets( + reachable_sets, graph, std::move(partial_sets), remote_partial_sets, + finished); + + timer.stop(); + std::cout << "RRR Sets generated, time: " << timer.get_usec() / 1000000.0f + << "s" << std::endl; + galois::runtime::getHostBarrier().wait(); + + return reachable_sets; +} + +std::vector wf4::GetInfluentialNodes( + wf4::NetworkGraph& graph, wf4::ReverseReachableSet&& reachability_sets, + uint64_t num_nodes, uint64_t influential_node_threshold) { + galois::Timer timer; + timer.start(); + auto& net = galois::runtime::getSystemNetworkInterface(); + galois::gstl::Vector> + remote_updates(net.Num); + std::vector influential_nodes; + GlobalNodeID influential_node = internal::GetMostInfluentialNode(graph); + influential_nodes.emplace_back(influential_node); + + for (uint64_t i = 0; i < num_nodes - 1; i++) { + galois::do_all( + galois::iterate(uint64_t(0), reachability_sets.size()), + [&](uint64_t set) { + internal::RemoveReachableSetsWithInfluentialNode( + graph, reachability_sets[set], influential_node, remote_updates); + }, + galois::loopname("RemoveReachableSetsWithInfluentialNode")); + internal::RemoveNodesFromRemotes(graph, remote_updates); + + influential_node = internal::GetMostInfluentialNode(graph); + influential_nodes.emplace_back(influential_node); + } + // TODO(Patrick) remove debug statement + uint64_t uninfluenced_sets = 0; + for (uint64_t i = 0; i < reachability_sets.size(); i++) { + if (reachability_sets[i].size() > 0) { + uninfluenced_sets += 1; + } + } + timer.stop(); + std::cout << "Remaining uninfluenced sets: " << uninfluenced_sets + << ", time: " << timer.get_usec() / 1000000.0f << "s" << std::endl; + verifyInfluentialNodesDebug(graph, influential_nodes, + influential_node_threshold); + return influential_nodes; +} + +wf4::internal::PartialReachableSet::PartialReachableSet(uint64_t id_, + uint32_t host_, + wf4::GlobalNodeID root_) + : id(id_), owner_host(host_), + partial_reachable_set( + phmap::flat_hash_set(INITIAL_SET_SIZE)), + frontier(phmap::flat_hash_set(INITIAL_SET_SIZE)) { + partial_reachable_set.emplace(root_); + frontier.emplace(root_); +} + +uint32_t wf4::internal::PartialReachableSet::NextHost(wf4::NetworkGraph& graph, + bool finish) { + if (finish || Finished()) { + return owner_host; + } + return graph.getHostID(*frontier.begin()); +} + +void wf4::internal::PartialReachableSet::serialize( + galois::runtime::SerializeBuffer& buf) { + uint64_t length = + 1 + 1 + 1 + 1 + partial_reachable_set.size() + frontier.size(); + std::vector data(length); + uint64_t offset = 0; + data[offset++] = id; + data[offset++] = owner_host; + data[offset++] = partial_reachable_set.size(); + data[offset++] = frontier.size(); + for (wf4::GlobalNodeID gid : partial_reachable_set) { + data[offset++] = gid; + } + for (wf4::GlobalNodeID gid : frontier) { + data[offset++] = gid; + } + buf.insert(reinterpret_cast(&data[0]), length * sizeof(uint64_t)); +} + +void wf4::internal::PartialReachableSet::deserialize( + galois::runtime::DeSerializeBuffer& buf) { + buf.extract(reinterpret_cast(&id), sizeof(id)); + buf.extract(reinterpret_cast(&owner_host), sizeof(owner_host)); + uint64_t reachable_size; + uint64_t frontier_size; + buf.extract(reinterpret_cast(&reachable_size), + sizeof(reachable_size)); + buf.extract(reinterpret_cast(&frontier_size), + sizeof(frontier_size)); + for (uint64_t i = 0; i < reachable_size; i++) { + wf4::GlobalNodeID elt; + buf.extract(reinterpret_cast(&elt), sizeof(elt)); + partial_reachable_set.emplace(elt); + } + for (uint64_t i = 0; i < frontier_size; i++) { + wf4::GlobalNodeID elt; + buf.extract(reinterpret_cast(&elt), sizeof(elt)); + frontier.emplace(elt); + } +} + +void wf4::internal::FillNodeValues(wf4::NetworkGraph& graph, + const wf4::NetworkGraph::GraphNode& node) { + for (const auto& node_edge : graph.edges(node)) { + const wf4::NetworkEdge& edge = graph.getEdgeData(node_edge); + graph.getData(node).id = edge.src; + graph.getData(graph.getEdgeDst(node_edge)).id = edge.dst; + + if (edge.type == agile::workflow1::TYPES::SALE) { + galois::atomicAdd(graph.getData(node).sold_, edge.amount_); + } else if (edge.type == agile::workflow1::TYPES::PURCHASE) { + galois::atomicAdd(graph.getData(node).bought_, edge.amount_); + graph.getData(node).desired_ += edge.amount_; + } + } + if (graph.getData(node).sold_ > 0) { + bitset_sold_.set(node); + } +} + +void wf4::internal::CalculateEdgeProbability( + wf4::NetworkGraph& graph, const wf4::NetworkGraph::GraphNode& node, + galois::DGAccumulator& total_edge_weights, + galois::DGAccumulator& total_sales) { + wf4::NetworkNode& node_data = graph.getData(node); + + auto& net = galois::runtime::getSystemNetworkInterface(); + double amount_sold = node_data.sold_; + double amount_bought = node_data.bought_; + for (const auto& node_edge : graph.edges(node)) { + auto& edge = graph.getEdgeData(node_edge); + total_sales += edge.amount_; + if (edge.type == agile::workflow1::TYPES::SALE && amount_sold > 0) { + edge.weight_ = edge.amount_ / amount_sold; + total_edge_weights += edge.weight_; + } else if (edge.type == agile::workflow1::TYPES::PURCHASE) { + auto dst_sold = graph.getData(graph.getEdgeDst(node_edge)).sold_; + if (dst_sold > 0) { + edge.weight_ = edge.amount_ / dst_sold; + total_edge_weights += edge.weight_; + } + } + } +} + +void wf4::internal::GenerateSeedNodes( + galois::gstl::Vector& seed_nodes, + wf4::NetworkGraph& graph, uint64_t seed, uint64_t id, uint64_t host) { + pcg64 generator = pcg64(seed, (1 + host) * id); + std::uniform_int_distribution dist_nodes = + std::uniform_int_distribution(0, graph.numMasters() - 1); + seed_nodes[id] = + PartialReachableSet(id, host, graph.getGID(dist_nodes(generator))); +} + +void wf4::internal::GenerateRandomReversibleReachableSets( + wf4::ReverseReachableSet& reachability_sets, wf4::NetworkGraph& graph, + galois::gstl::Vector&& partial_sets, + galois::gstl::Vector< + galois::PerThreadVector>& + remote_partial_sets, + bool finish) { + auto& net = galois::runtime::getSystemNetworkInterface(); + galois::do_all( + galois::iterate(uint64_t(0), partial_sets.size()), + [&](uint64_t i) { + PartialReachableSet& partial_set = partial_sets[i]; + ContinueRandomReversibleReachableSet(graph, partial_set, i, finish); + if (partial_set.owner_host == net.ID && + (finish || partial_set.Finished())) { + reachability_sets[partial_set.id] = + std::move(partial_set.partial_reachable_set); + } else { + remote_partial_sets[partial_set.NextHost(graph, finish)] + .get() + .emplace_back(partial_set); + } + }, + galois::loopname("GenerateRandomReversibleReachableSets")); +} + +void wf4::internal::ContinueRandomReversibleReachableSet( + wf4::NetworkGraph& graph, wf4::internal::PartialReachableSet& partial_set, + uint64_t random_noise, bool finish) { + std::uniform_real_distribution dist_bfs = + std::uniform_real_distribution(0, 1); + pcg64 generator = pcg64(partial_set.id, random_noise); + + // do gymnastics since we cannot iterate and mutate the parallel hashmap + std::queue local_frontier; + phmap::flat_hash_set remote_frontier(INITIAL_SET_SIZE); + for (wf4::GlobalNodeID gid : partial_set.frontier) { + if (graph.isOwned(gid)) { + local_frontier.emplace(gid); + } else { + remote_frontier.emplace(gid); + } + } + while (!local_frontier.empty()) { + wf4::GlobalNodeID gid = local_frontier.front(); + local_frontier.pop(); + wf4::NetworkGraph::GraphNode lid = graph.getLID(gid); + + galois::atomicAdd(graph.getData(lid).frequency_, (uint64_t)1); + if (finish) { + continue; + } + for (auto& edge : graph.edges(lid)) { + if (dist_bfs(generator) <= graph.getEdgeData(edge).weight_) { + uint64_t reachable_node_gid = graph.getGID(graph.getEdgeDst(edge)); + if (partial_set.partial_reachable_set.find(reachable_node_gid) == + partial_set.partial_reachable_set.end()) { + partial_set.partial_reachable_set.emplace(reachable_node_gid); + if (graph.isOwned(reachable_node_gid)) { + local_frontier.emplace(reachable_node_gid); + } else { + remote_frontier.emplace(reachable_node_gid); + } + } + } + } + } + // remove dangling frontier nodes if traversal is forcefully terminated + if (finish) { + for (wf4::GlobalNodeID gid : remote_frontier) { + partial_set.partial_reachable_set.erase(gid); + } + remote_frontier.clear(); + } + partial_set.frontier = std::move(remote_frontier); +} + +wf4::GlobalNodeID +wf4::internal::GetMostInfluentialNode(wf4::NetworkGraph& graph) { + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t threads = galois::getActiveThreads(); + galois::PerThreadVector most_influential_nodes; + galois::DGAccumulator total_influence; + total_influence.reset(); + + galois::do_all( + galois::iterate(graph.masterNodesRange().begin(), + graph.masterNodesRange().end()), + [&](wf4::NetworkGraph::GraphNode node) { + FindLocalMaxNode(graph, node, most_influential_nodes.get(), + total_influence); + }, + galois::loopname("FindMostInfluentialNode")); + + LocalMaxNode host_max = GetMaxNode(most_influential_nodes); + std::vector host_maxes = ExchangeMostInfluentialNodes(host_max); + LocalMaxNode global_max = GetMaxNode(host_maxes); + wf4::GlobalNodeID most_influential_node = global_max.max_node; + + total_influence.reduce(); + if (graph.isOwned(most_influential_node)) { + auto node_lid = graph.getLID(most_influential_node); + std::cout << "Most influential node " << EXCHANGE_INFLUENTIAL_NODES - 9801 + << " on " << net.ID << ": " << graph.getData(node_lid).id + << ", Occurred: " << global_max.max_influence << ", Degree: " + << std::distance(graph.edge_begin(node_lid), + graph.edge_end(node_lid)) + << ", Bought: " << graph.getData(node_lid).bought_ + << ", Sold: " << graph.getData(node_lid).sold_ + << ", Total Influence in Graph: " << total_influence.read() + << std::endl; + } + return most_influential_node; +} + +wf4::internal::LocalMaxNode +wf4::internal::GetMaxNode(galois::PerThreadVector& + most_influential_nodes) { + LocalMaxNode most_influential_node; + for (auto iter = most_influential_nodes.begin_all(); + iter != most_influential_nodes.end_all(); iter++) { + const LocalMaxNode& influential_node = *iter; + if (influential_node.max_influence > most_influential_node.max_influence) { + most_influential_node.max_node = influential_node.max_node; + most_influential_node.max_influence = influential_node.max_influence; + } + } + return most_influential_node; +} + +wf4::internal::LocalMaxNode wf4::internal::GetMaxNode( + std::vector& most_influential_nodes) { + LocalMaxNode most_influential_node; + for (const LocalMaxNode& influential_node : most_influential_nodes) { + if (influential_node.max_influence > most_influential_node.max_influence) { + most_influential_node.max_node = influential_node.max_node; + most_influential_node.max_influence = influential_node.max_influence; + } + } + return most_influential_node; +} + +void wf4::internal::RemoveReachableSetsWithInfluentialNode( + wf4::NetworkGraph& graph, + phmap::flat_hash_set& reachability_set, + const wf4::GlobalNodeID& influential_node, + galois::gstl::Vector< + phmap::parallel_flat_hash_map_m>& + remote_updates) { + using map = phmap::parallel_flat_hash_map_m; + if (reachability_set.find(influential_node) != reachability_set.end()) { + for (wf4::GlobalNodeID reachable_node_gid : reachability_set) { + if (graph.isOwned(reachable_node_gid)) { + galois::atomicSubtract( + graph.getData(graph.getLID(reachable_node_gid)).frequency_, + (uint64_t)1); + } else { + remote_updates[graph.getHostID(reachable_node_gid)].try_emplace_l( + reachable_node_gid, [&](map::value_type& p) { p.second++; }, + (uint64_t)0); // may need to change to 1 + } + } + reachability_set.clear(); + } +} + +void wf4::internal::RemoveNodesFromRemotes( + wf4::NetworkGraph& graph, + galois::gstl::Vector< + phmap::parallel_flat_hash_map_m>& + remote_updates) { + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t host = net.ID; + uint32_t num_hosts = net.Num; + // TODO(Patrick): ensure efficient serialization of phmap maps + + std::vector nodes_to_remove(num_hosts); + + // send partial sets to other hosts + for (uint32_t h = 0; h < num_hosts; h++) { + if (h == host) { + continue; + } + galois::runtime::SendBuffer send_buffer; + SerializeMap(send_buffer, remote_updates[h]); + if (send_buffer.size() == 0) { + send_buffer.push('a'); + } + net.sendTagged(h, REMOVE_RRR_SETS, std::move(send_buffer)); + } + + // recv node range from other hosts + for (uint32_t h = 0; h < num_hosts - 1; h++) { + decltype(net.recieveTagged(REMOVE_RRR_SETS)) p; + do { + p = net.recieveTagged(REMOVE_RRR_SETS); + } while (!p); + uint32_t sending_host = p->first; + nodes_to_remove[sending_host] = std::move(p->second); + } + REMOVE_RRR_SETS++; + + for (galois::runtime::DeSerializeBuffer& buf : nodes_to_remove) { + if (buf.size() < 16) { + continue; + } + uint64_t* data = reinterpret_cast(buf.data()); + uint64_t size = data[0]; + data = &data[1]; + galois::do_all( + galois::iterate((uint64_t)0, size), + [&](uint64_t i) { + galois::atomicSubtract( + graph.getData(graph.getLID(data[i * 2])).frequency_, + data[i * 2 + 1]); + }, + galois::loopname("RemoveNodesFromRemotes")); + } + galois::runtime::getHostBarrier().wait(); +} + +void wf4::internal::FindLocalMaxNode( + wf4::NetworkGraph& graph, wf4::NetworkGraph::GraphNode node, + galois::gstl::Vector& local_max_node, + galois::DGAccumulator& total_influence) { + const wf4::NetworkNode& node_data = graph.getData(node); + total_influence += node_data.frequency_; + if (local_max_node.size() == 0) { + local_max_node.emplace_back( + LocalMaxNode(graph.getGID(node), node_data.frequency_)); + } else if (node_data.frequency_ > local_max_node[0].max_influence) { + local_max_node[0].max_node = graph.getGID(node); + local_max_node[0].max_influence = node_data.frequency_; + } +} + +galois::gstl::Vector +wf4::internal::ExchangePartialSets( + galois::gstl::Vector< + galois::PerThreadVector>& + remote_partial_sets) { + galois::gstl::Vector local_partial_sets; + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t host = net.ID; + uint32_t num_hosts = net.Num; + // TODO(Patrick): ensure efficient serialization of phmap sets + + std::vector host_partial_sets(num_hosts); + + // send partial sets to other hosts + for (uint32_t h = 0; h < num_hosts; h++) { + if (h == host) { + continue; + } + galois::runtime::SendBuffer send_buffer; + for (auto iter = remote_partial_sets[h].begin_all(); + iter != remote_partial_sets[h].end_all(); iter++) { + (*iter).serialize(send_buffer); + } + if (send_buffer.size() == 0) { + send_buffer.push('a'); + } + net.sendTagged(h, EXCHANGE_PARTIAL_SETS, std::move(send_buffer)); + } + + // recv node range from other hosts + for (uint32_t h = 0; h < num_hosts - 1; h++) { + decltype(net.recieveTagged(EXCHANGE_PARTIAL_SETS)) p; + do { + p = net.recieveTagged(EXCHANGE_PARTIAL_SETS); + } while (!p); + uint32_t sending_host = p->first; + host_partial_sets[sending_host] = std::move(p->second); + } + EXCHANGE_PARTIAL_SETS++; + + for (galois::runtime::DeSerializeBuffer& buf : host_partial_sets) { + if (buf.size() < 16) { + continue; + } + while (buf.size() > 0) { + PartialReachableSet set; + set.deserialize(buf); + local_partial_sets.emplace_back(set); + } + } + for (galois::PerThreadVector& remote_sets : + remote_partial_sets) { + remote_sets.clear_all_parallel(); + } + galois::runtime::getHostBarrier().wait(); + + return local_partial_sets; +} + +std::vector +wf4::internal::ExchangeMostInfluentialNodes( + const wf4::internal::LocalMaxNode& local_max) { + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t host = net.ID; + uint32_t num_hosts = net.Num; + std::vector host_maxes(num_hosts); + host_maxes[host] = local_max; + + // send local max to other hosts + for (uint32_t h = 0; h < num_hosts; h++) { + if (h == host) { + continue; + } + galois::runtime::SendBuffer send_buffer; + galois::runtime::gSerialize(send_buffer, local_max); + net.sendTagged(h, EXCHANGE_INFLUENTIAL_NODES, std::move(send_buffer)); + } + + // recv node range from other hosts + for (uint32_t h = 0; h < num_hosts - 1; h++) { + decltype(net.recieveTagged(EXCHANGE_INFLUENTIAL_NODES)) p; + do { + p = net.recieveTagged(EXCHANGE_INFLUENTIAL_NODES); + } while (!p); + uint32_t sending_host = p->first; + + galois::runtime::gDeserialize(p->second, host_maxes[sending_host]); + } + galois::runtime::getHostBarrier().wait(); + EXCHANGE_INFLUENTIAL_NODES++; + + return host_maxes; +} + +void wf4::internal::SerializeMap( + galois::runtime::SendBuffer& buf, + phmap::parallel_flat_hash_map_m& updates) { + uint64_t length = 1 + updates.size() * 2; + uint64_t data[length]; + uint64_t offset = 0; + data[offset++] = updates.size(); + for (const std::pair& update : updates) { + data[offset++] = update.first; + data[offset++] = update.second; + } + buf.insert(reinterpret_cast(&data), length * sizeof(uint64_t)); + updates.clear(); +} diff --git a/wf4/src/main.cpp b/wf4/src/main.cpp new file mode 100644 index 0000000..1b04856 --- /dev/null +++ b/wf4/src/main.cpp @@ -0,0 +1,262 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#include + +#include "galois/DistGalois.h" +#include "galois/runtime/DataCommMode.h" +#include "import.hpp" +#include "influencer.hpp" +#include "quiesce.hpp" + +static const char* name = "Network of Networks"; + +galois::DynamicBitSet bitset_bought_; +galois::DynamicBitSet bitset_sold_; + +namespace { + +void printUsageExit(char* argv0) { + std::printf("Usage: %s " + "[-t ] " + "[-k ] " + "[-r ] " + "[-s ] " + "[-e ] " + "[-d ] " + "[-c ] " + "[-y ] " + "[-o ] " + "[-u ] " + "[-n ]\n", + argv0); + std::exit(EXIT_FAILURE); +} + +struct ProgramOptions { + ProgramOptions() = default; + + void Parse(int argc, char** argv) { + // Other libraries may have called getopt before, so we reset optind for + // correctness + optind = 0; + + int opt; + std::string file; + while ((opt = getopt(argc, argv, "t:k:r:s:e:d:c:y:o:u:n:")) != -1) { + switch (opt) { + case 't': + num_threads = strtoull(optarg, nullptr, 10); + break; + case 'k': + k = strtoull(optarg, nullptr, 10); + break; + case 'r': + rrr = strtoull(optarg, nullptr, 10); + break; + case 's': + seed = strtoull(optarg, nullptr, 10); + break; + case 'e': + epochs = strtoull(optarg, nullptr, 10); + break; + case 'd': + file = std::string(optarg); + input_directory = file; + break; + case 'c': + file = std::string(optarg); + commercial_input_file = file; + break; + case 'y': + file = std::string(optarg); + cyber_input_file = file; + break; + case 'o': + file = std::string(optarg); + social_input_file = file; + break; + case 'u': + file = std::string(optarg); + uses_input_file = file; + break; + case 'n': + file = std::string(optarg); + nodes_input_file = file; + break; + default: + printUsageExit(argv[0]); + exit(-1); + } + } + if (!Verify()) { + printUsageExit(argv[0]); + exit(-1); + } + } + + bool Verify() { + if (k <= 0) { + return false; + } + if (rrr <= 0) { + return false; + } + if (input_directory.empty() && commercial_input_file.empty() && + cyber_input_file.empty() && social_input_file.empty() && + uses_input_file.empty() && nodes_input_file.empty()) { + return false; + } + return true; + } + + uint64_t num_threads = 16; + uint64_t k = 100; + uint64_t rrr = 100000; + uint64_t seed = 98011089; + uint64_t epochs = 100; + + std::string input_directory; + + std::string commercial_input_file; + std::string cyber_input_file; + std::string social_input_file; + std::string uses_input_file; + std::string nodes_input_file; + + // debug argument for verifying correctness on artificial graphs, unused + uint64_t influential_node_threshold = 0; +} typedef ProgramOptions; + +template +uint64_t countEdges(T& graph) { + uint64_t edges = 0; + uint64_t person_nodes = 0; + uint64_t trading_edges = 0; + uint64_t trading_person_edges = 0; + + for (auto src : graph.masterNodesRange()) { + bool owned = graph.isOwned(graph.getGID(src)); + owned = graph.isLocal(graph.getGID(src)); + auto& node = graph.getData(src); + if (node.type_ == agile::workflow1::TYPES::PERSON) { + // person_nodes++; + } + // node.id = 1; + /* + std::cout << "LID: " << src << ", GID: " << graph.getGID(src) + << ", LID2: " << graph.getLID(graph.getGID(src)) + << ", UID: " << graph.getData(src).id << std::endl; + */ + edges += std::distance(graph.edges(src).begin(), graph.edges(src).end()); + bool has_valid_edges = false; + + for (auto edge : graph.edges(src)) { + try { + auto data = graph.getEdgeData(edge); + auto dst_lid = graph.getEdgeDst(edge); + auto gid = graph.getGID(graph.getEdgeDst(edge)); + owned = graph.isOwned(gid); + owned = graph.isLocal(gid); + auto dst_node = graph.getData(dst_lid); + if (data.type == agile::workflow1::TYPES::SALE || + data.type == agile::workflow1::TYPES::PURCHASE) { + if (data.topic = 8486 && data.amount_ > 0) { + trading_edges++; + + if (node.type_ == agile::workflow1::TYPES::PERSON) { + has_valid_edges = true; + trading_person_edges++; + } + } + } + } catch (std::out_of_range) { + std::cout << "Overflow DST: " + << "NaN" + << ", SRC: " << src << ", SIZE: " << graph.size() + << std::endl; + } + // node.id = 1; + /* + std::cout << "SRC_LID: " << src << ", DST_LID: " << graph.getEdgeDst(edge) + << ", DST_GID: " << graph.getGID(graph.getEdgeDst(edge)) + << ", DST_UID: " << graph.getData(graph.getEdgeDst(edge)).id + << std::endl; + std::cout << "EDGE: SRC_UID: " << data.src + << ", SRC_GID: " << data.src_glbid << ", DST_UID: " << data.dst + << ", DST_GID: " << data.dst_glbid + << ", AMOUNT: " << data.amount_ << std::endl; + */ + } + if (has_valid_edges) { + person_nodes++; + } + } + std::cout << "Person nodes: " << person_nodes << std::endl; + std::cout << "Trading edges: " << trading_edges << std::endl; + std::cout << "Trading person edges: " << trading_person_edges << std::endl; + return edges; +} + +template +void printGraphStatisticsDebug(T& graph) { + auto num_edges = countEdges(graph); + galois::DGAccumulator global_nodes; + global_nodes.reset(); + global_nodes += graph.numMasters(); + global_nodes.reduce(); + galois::DGAccumulator global_edges; + global_edges.reset(); + global_edges += num_edges; + global_edges.reduce(); + std::cout << "Total Nodes: " << global_nodes.read() << std::endl; + std::cout << "Total Edges: " << global_edges.read() << std::endl; + std::cout << "Masters in graph: " << graph.numMasters() << std::endl; + std::cout << "Local nodes in graph: " << graph.size() << std::endl; + std::cout << "Local edges in graph: " << num_edges << std::endl; +} + +} // end namespace + +int main(int argc, char* argv[]) { + ProgramOptions program_options; + program_options.Parse(argc, argv); + galois::DistMemSys G; + galois::setActiveThreads(program_options.num_threads); + auto& net = galois::runtime::getSystemNetworkInterface(); + + wf4::InputFiles input_files( + program_options.input_directory, program_options.commercial_input_file, + program_options.cyber_input_file, program_options.social_input_file, + program_options.uses_input_file, program_options.nodes_input_file); + if (net.ID == 0) { + input_files.Print(); + } + + std::unique_ptr full_graph = + wf4::ImportData(input_files); + galois::runtime::getHostBarrier().wait(); + printGraphStatisticsDebug(*full_graph); + std::unique_ptr projected_graph = + wf4::ProjectGraph(std::move(full_graph)); + std::unique_ptr> + sync_substrate = + std::make_unique>( + *projected_graph, net.ID, net.Num, + projected_graph->isTransposed(), + projected_graph->cartesianGrid()); + + // printGraphStatisticsDebug(*projected_graph); + + wf4::CalculateEdgeProbabilities(*projected_graph, *sync_substrate); + wf4::ReverseReachableSet reachability_sets = + wf4::GetRandomReverseReachableSets(*projected_graph, program_options.rrr, + program_options.seed, + program_options.epochs); + std::vector influencers = wf4::GetInfluentialNodes( + *projected_graph, std::move(reachability_sets), program_options.k, + program_options.influential_node_threshold); + + wf4::CancelNodes(*projected_graph, *sync_substrate, influencers); + wf4::QuiesceGraph(*projected_graph, *sync_substrate); +} diff --git a/wf4/src/quiesce.cpp b/wf4/src/quiesce.cpp new file mode 100644 index 0000000..ebfc11f --- /dev/null +++ b/wf4/src/quiesce.cpp @@ -0,0 +1,368 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Copyright (c) 2023. University of Texas at Austin. All rights reserved. + +#include "quiesce.hpp" + +#include "galois/AtomicHelpers.h" + +// location based partitioning + +namespace { + +static uint32_t SEND_NEW_EDGES = 79408; + +void dumpUpdates(wf4::NetworkGraph& graph, wf4::GlobalNodeID node) { + if (!graph.isOwned(node)) { + return; + } + std::cout << std::endl << std::endl; + auto src = graph.getLID(node); + for (auto edge : graph.edges(src)) { + auto data = graph.getEdgeData(edge); + std::cout << "EDGE: SRC_UID: " << data.src << ", DST_UID: " << data.dst + << ", DST_GID: " << data.dst_glbid << ", Local: " + << graph.isOwned(graph.getGID(graph.getEdgeDst(edge))) + << ", Sale: " << (data.type == agile::workflow1::TYPES::SALE) + << ", Amount: " << data.amount_ + << ", Bought: " << graph.getData(graph.getEdgeDst(edge)).bought_ + << ", Sold: " << graph.getData(graph.getEdgeDst(edge)).sold_ + << std::endl; + } +} + +void verifyState(wf4::NetworkGraph& graph) { + galois::do_all( + galois::iterate(graph.allNodesRange()), + [&](wf4::GlobalNodeID gid) { + if (graph.isLocal(gid)) { + auto data = graph.getData(graph.getLID(gid)); + if (data.bought_ > 0 && data.sold_ > data.bought_ + 0.01) { + std::cout << "ERROR: NODE " << data.id + << " SOLD MORE THAN IT BOUGHT: " << data.sold_ << "/" + << data.bought_ << std::endl; + } + if (graph.isOwned(gid) && data.bought_ > data.desired_ + 0.01) { + std::cout << "ERROR: NODE " << data.id + << " BOUGHT MORE THAN IT DESIRED: " << data.bought_ << "/" + << data.desired_ << std::endl; + } + } + }, + galois::loopname("VerifyState")); +} + +} // end namespace + +void wf4::CancelNodes( + wf4::NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate, + std::vector nodes) { + const bool async = false; + bitset_bought_.resize(graph.size()); + bitset_bought_.reset(); + bitset_sold_.resize(graph.size()); + bitset_sold_.reset(); + + galois::do_all( + galois::iterate(nodes.begin(), nodes.end()), + [&](GlobalNodeID node) { internal::CancelNode(graph, node); }, + galois::loopname("CancelNodes")); + + // update mirrors + substrate.sync( + "Update bought values after cancellation"); + substrate.sync( + "Update sold values after cancellation"); + galois::runtime::getHostBarrier().wait(); +} + +void wf4::QuiesceGraph( + wf4::NetworkGraph& graph, + galois::graphs::GluonSubstrate& substrate) { + galois::Timer timer; + timer.start(); + auto& net = galois::runtime::getSystemNetworkInterface(); + galois::gstl::Vector> + remote_edges(net.Num); + const bool async = false; + bitset_bought_.resize(graph.size()); + bitset_bought_.reset(); + bitset_sold_.resize(graph.size()); + bitset_sold_.reset(); + + // will require multiple iterations + galois::do_all( + galois::iterate(graph.masterNodesRange().begin(), + graph.masterNodesRange().end()), + [&](NetworkGraph::GraphNode node) { + internal::TryQuiesceNode(graph, remote_edges, node); + }, + galois::loopname("TryQuiesceGraph")); + + std::vector potential_edges = + internal::SendNewEdges(remote_edges); + internal::TryAddEdges(graph, remote_edges, std::move(potential_edges)); + std::vector decided_edges = + internal::SendNewEdges(remote_edges); + internal::AddPurchaseEdges(graph, std::move(decided_edges)); + + substrate.sync( + "Update Mirrors with amount sold"); + substrate.sync( + "Update Mirrors with amount bought"); + + timer.stop(); + std::cout << "Quiescence time: " << timer.get_usec() / 1000000.0f << "s" + << std::endl; + + // verifyState(graph); +} + +bool wf4::internal::ProposedEdge::operator==( + const wf4::internal::ProposedEdge& other) const { + return src == other.src && dst == other.dst; +} + +wf4::internal::IncomingEdges::IncomingEdges( + galois::runtime::DeSerializeBuffer&& buffer) + : buffer_(std::move(buffer)) { + if (buffer_.size() < 16) { + empty = true; + } else { + empty = false; + uint64_t* data = reinterpret_cast(buffer_.data()); + num_edges = data[0]; + edges = reinterpret_cast(&data[1]); + } +} + +void wf4::internal::CancelNode(wf4::NetworkGraph& graph, + wf4::GlobalNodeID node_gid) { + if (!graph.isLocal(node_gid)) { + return; + } + wf4::NetworkGraph::GraphNode node = graph.getLID(node_gid); + graph.getData(node).Cancel(); + if (!graph.isOwned(node_gid)) { + return; + } + + for (auto& edge : graph.edges(node)) { + wf4::NetworkEdge edge_data = graph.getEdgeData(edge); + if (edge_data.type == agile::workflow1::TYPES::SALE) { + wf4::NetworkGraph::GraphNode dst_node = graph.getEdgeDst(edge); + // if destination is a mirror set it to 0 and subtract + if (!graph.isOwned(graph.getGID(dst_node))) { + galois::atomicMin(graph.getData(dst_node).bought_, 0.0); + } + galois::atomicSubtract(graph.getData(dst_node).bought_, + edge_data.amount_); + bitset_bought_.set(dst_node); + + // TODO(Patrick) + // graph.removeEdge(node, out_edge); + } else if (edge_data.type == agile::workflow1::TYPES::PURCHASE) { + wf4::NetworkGraph::GraphNode src_node = graph.getEdgeDst(edge); + // if source is a mirror set it to 0 and subtract + if (!graph.isOwned(graph.getGID(src_node))) { + galois::atomicMin(graph.getData(src_node).sold_, 0.0); + } + galois::atomicSubtract(graph.getData(src_node).sold_, edge_data.amount_); + bitset_sold_.set(src_node); + + // TODO(Patrick) + // graph.removeInEdge(node, in_edge); + } + } +} + +void wf4::internal::TryQuiesceNode( + wf4::NetworkGraph& graph, + galois::gstl::Vector< + phmap::parallel_flat_hash_set_m>& + remote_edges, + wf4::NetworkGraph::GraphNode node) { + using map = phmap::parallel_flat_hash_set_m; + wf4::NetworkNode& node_data = graph.getData(node); + double needed = node_data.desired_ - node_data.bought_; + for (auto& in_edge : graph.edges(node)) { + if (needed <= 0) { + return; + } + wf4::NetworkEdge edge_data = graph.getEdgeData(in_edge); + if (edge_data.type != agile::workflow1::TYPES::PURCHASE) { + continue; + } + wf4::NetworkGraph::GraphNode seller_node = graph.getEdgeDst(in_edge); + wf4::NetworkNode& seller_data = graph.getData(seller_node); + + // TODO(Patrick) explore negative case more closely + double seller_surplus = seller_data.bought_ - seller_data.sold_; + double trade_delta = std::min(needed, seller_surplus); + if (trade_delta <= 0) { + continue; + } + wf4::GlobalNodeID seller_gid = graph.getGID(seller_node); + + // TODO(Patrick) add new edges instead of increasing old ones + double seller_sold = + galois::atomicAdd(graph.getData(seller_node).sold_, trade_delta) + + trade_delta; + if (seller_sold > seller_data.bought_) { + galois::atomicSubtract(graph.getData(seller_node).sold_, trade_delta); + } else { + if (graph.isOwned(seller_gid)) { + edge_data.amount_ += trade_delta; + galois::atomicAdd(graph.getData(node).bought_, trade_delta); + bitset_bought_.set(node); + bitset_sold_.set(seller_node); + } else { + // defer updating state until remote edge confirmed + ProposedEdge proposed_edge = + ProposedEdge(graph.getGID(node), seller_gid, trade_delta); + remote_edges[graph.getHostID(seller_gid)].lazy_emplace_l( + std::move(proposed_edge), [&](map::value_type&) {}, + [&](const map::constructor& ctor) { + ctor(std::move(proposed_edge)); + }); + } + needed -= trade_delta; + } + } +} + +void wf4::internal::TryAddEdges( + wf4::NetworkGraph& graph, + galois::gstl::Vector< + phmap::parallel_flat_hash_set_m>& + remote_edges, + std::vector&& incoming_edges) { + auto& net = galois::runtime::getSystemNetworkInterface(); + for (const IncomingEdges& edges : incoming_edges) { + if (edges.empty) { + continue; + } + galois::do_all( + galois::iterate(uint64_t(0), edges.num_edges), + [&](uint64_t edge) { + TryAddEdge(graph, remote_edges, edges.edges[edge]); + }, + galois::loopname("TryAddEdges")); + } +} + +void wf4::internal::TryAddEdge( + wf4::NetworkGraph& graph, + galois::gstl::Vector< + phmap::parallel_flat_hash_set_m>& + remote_edges, + const wf4::internal::ProposedEdge& edge) { + using map = phmap::parallel_flat_hash_set_m; + wf4::NetworkGraph::GraphNode seller_lid = graph.getLID(edge.dst); + wf4::NetworkNode& seller_data = graph.getData(seller_lid); + // TODO(Patrick) explore negative case more closely + double seller_surplus = seller_data.bought_ - seller_data.sold_; + double trade_delta = std::min(edge.weight, seller_surplus); + if (trade_delta <= 0) { + return; + } + + double seller_sold = + galois::atomicAdd(graph.getData(seller_lid).sold_, trade_delta) + + trade_delta; + if (seller_sold > seller_data.bought_) { + galois::atomicSubtract(graph.getData(seller_lid).sold_, trade_delta); + } else { + // TODO(Patrick) add new edge instead of increasing old ones + // edge_data.amount_ += trade_delta; + bitset_sold_.set(seller_lid); + // defer updating buyer state on master when new edge is added + ProposedEdge proposed_edge = ProposedEdge(edge.src, edge.dst, trade_delta); + remote_edges[graph.getHostID(edge.src)].lazy_emplace_l( + std::move(proposed_edge), [&](map::value_type&) {}, + [&](const map::constructor& ctor) { ctor(std::move(proposed_edge)); }); + } +} + +void wf4::internal::AddPurchaseEdges( + wf4::NetworkGraph& graph, + std::vector&& decided_edges) { + auto& net = galois::runtime::getSystemNetworkInterface(); + for (const IncomingEdges& edges : decided_edges) { + if (edges.empty) { + continue; + } + galois::do_all( + galois::iterate(uint64_t(0), edges.num_edges), + [&](uint64_t edge) { AddPurchaseEdge(graph, edges.edges[edge]); }, + galois::loopname("AddPurchaseEdge")); + } +} + +void wf4::internal::AddPurchaseEdge(wf4::NetworkGraph& graph, + const wf4::internal::ProposedEdge& edge) { + wf4::NetworkGraph::GraphNode buyer_lid = graph.getLID(edge.src); + galois::atomicAdd(graph.getData(buyer_lid).bought_, edge.weight); + // TODO(Patrick) add new edge instead of increasing old ones + // edge_data.amount_ += trade_delta; + bitset_bought_.set(buyer_lid); +} + +std::vector wf4::internal::SendNewEdges( + galois::gstl::Vector< + phmap::parallel_flat_hash_set_m>& + remote_edges) { + auto& net = galois::runtime::getSystemNetworkInterface(); + uint32_t host = net.ID; + uint32_t num_hosts = net.Num; + // TODO(Patrick): ensure efficient serialization of phmap maps + + std::vector incoming_edges(num_hosts); + + // send partial sets to other hosts + for (uint32_t h = 0; h < num_hosts; h++) { + if (h == host) { + continue; + } + uint64_t size = remote_edges[h].size(); + galois::runtime::SendBuffer send_buffer; + SerializeSet(send_buffer, remote_edges[h]); + if (send_buffer.size() == 0) { + send_buffer.push('a'); + } + net.sendTagged(h, SEND_NEW_EDGES, std::move(send_buffer)); + } + + // recv node range from other hosts + for (uint32_t h = 0; h < num_hosts - 1; h++) { + decltype(net.recieveTagged(SEND_NEW_EDGES)) p; + do { + p = net.recieveTagged(SEND_NEW_EDGES); + } while (!p); + uint32_t sending_host = p->first; + incoming_edges[sending_host] = IncomingEdges(std::move(p->second)); + } + SEND_NEW_EDGES++; + + return incoming_edges; +} + +void wf4::internal::SerializeSet( + galois::runtime::SendBuffer& buf, + phmap::parallel_flat_hash_set_m& edges) { + uint64_t length = 1 + edges.size() * 3; + uint64_t data[length]; + uint64_t offset = 0; + data[offset++] = edges.size(); + for (const ProposedEdge& edge : edges) { + data[offset++] = edge.src; + data[offset++] = edge.dst; + data[offset++] = edge.weight; + } + buf.insert(reinterpret_cast(&data), length * sizeof(uint64_t)); + edges.clear(); +}