Skip to content

Commit

Permalink
Merge pull request #816 from AntelopeIO/GH-815-ship-hang
Browse files Browse the repository at this point in the history
[1.0.2] SHiP: Fix hang on shutdown
  • Loading branch information
heifner authored Sep 24, 2024
2 parents a5f0a7f + 6df1c11 commit 94fea6a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class session final : public session_base {
std::optional<log_catalog>& chain_state_log;
std::optional<log_catalog>& finality_data_log;

GetBlockID get_block_id;
GetBlockID get_block_id; // call from main app thread
GetBlock get_block;

///these items might be used on either the strand or main thread
Expand Down
34 changes: 18 additions & 16 deletions plugins/state_history_plugin/state_history_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,24 @@ struct state_history_plugin_impl {
template <typename Protocol>
void create_listener(const std::string& address) {
const boost::posix_time::milliseconds accept_timeout(200);
// connections set must only be modified by main thread; run listener on main thread to avoid needing another post()
fc::create_listener<Protocol>(app().get_io_service(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
// connections set must only be modified by main thread; run listener on ship thread so sockets use default executor of the ship thread
fc::create_listener<Protocol>(thread_pool.get_executor(), _log, accept_timeout, address, "", [this](Protocol::socket&& socket) {
boost::asio::post(app().get_io_service(), [this, socket{std::move(socket)}]() mutable {
catch_and_log([this, &socket]() {
connections.emplace(new session(std::move(socket), boost::asio::make_strand(thread_pool.get_executor()), chain_plug->chain(),
trace_log, chain_state_log, finality_data_log,
[this](const chain::block_num_type block_num) {
return get_block_id(block_num);
},
[this](const chain::block_id_type& block_id) {
return chain_plug->chain().fetch_block_by_id(block_id);
},
[this](session_base* conn) {
boost::asio::post(app().get_io_service(), [conn, this]() {
connections.erase(connections.find(conn));
});
}, _log));
});
});
});
}
Expand Down
3 changes: 3 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cli_test.py ${CMAKE_CURRENT_BINARY_DI
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_reqs_across_svnn_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_reqs_across_svnn_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_streamer_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_streamer_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/ship_kill_client_test.py ${CMAKE_CURRENT_BINARY_DIR}/ship_kill_client_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/bridge_for_fork_test_shape.json ${CMAKE_CURRENT_BINARY_DIR}/bridge_for_fork_test_shape.json COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/lib_advance_test.py ${CMAKE_CURRENT_BINARY_DIR}/lib_advance_test.py COPYONLY)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/http_plugin_test.py ${CMAKE_CURRENT_BINARY_DIR}/http_plugin_test.py COPYONLY)
Expand Down Expand Up @@ -187,6 +188,8 @@ add_test(NAME ship_streamer_if_test COMMAND tests/ship_streamer_test.py -v --num
set_property(TEST ship_streamer_if_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_streamer_if_fetch_finality_data_test COMMAND tests/ship_streamer_test.py -v --num-clients 10 --activate-if --finality-data-history ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_streamer_if_fetch_finality_data_test PROPERTY LABELS long_running_tests)
add_test(NAME ship_kill_client_test COMMAND tests/ship_kill_client_test.py -v --num-clients 20 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST ship_kill_client_test PROPERTY LABELS nonparallelizable_tests)

add_test(NAME p2p_dawn515_test COMMAND tests/p2p_tests/dawn_515/test.sh WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_dawn515_test PROPERTY LABELS nonparallelizable_tests)
Expand Down
131 changes: 131 additions & 0 deletions tests/ship_kill_client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python3

import time
import json
import os
import shutil
import signal
import sys

from TestHarness import Account, Cluster, TestHelper, Utils, WalletMgr
from TestHarness.TestHelper import AppArgs

###############################################################
# ship_kill_client_test
#
# Setup a nodeos with SHiP (state_history_plugin).
# Connect a number of clients and then kill the clients and shutdown nodoes.
# nodeos should exit cleanly and not hang or SEGfAULT.
#
###############################################################

Print=Utils.Print

appArgs = AppArgs()
extraArgs = appArgs.add(flag="--num-clients", type=int, help="How many ship_streamers should be started", default=1)
args = TestHelper.parse_args({"--dump-error-details","--keep-logs","-v","--leave-running","--unshared"}, applicationSpecificArgs=appArgs)

Utils.Debug=args.v
cluster=Cluster(unshared=args.unshared, keepRunning=args.leave_running, keepLogs=args.keep_logs)
dumpErrorDetails=args.dump_error_details
walletPort=TestHelper.DEFAULT_WALLET_PORT

# simpler to have two producer nodes then setup different accounts for trx generator
totalProducerNodes=2
totalNonProducerNodes=1
totalNodes=totalProducerNodes+totalNonProducerNodes

walletMgr=WalletMgr(True, port=walletPort)
testSuccessful=False

WalletdName=Utils.EosWalletName
shipTempDir=None

try:
TestHelper.printSystemInfo("BEGIN")

cluster.setWalletMgr(walletMgr)
Print("Stand up cluster")

shipNodeNum = 2
specificExtraNodeosArgs={}
specificExtraNodeosArgs[shipNodeNum]="--plugin eosio::state_history_plugin --trace-history --chain-state-history --finality-data-history --state-history-stride 200 --plugin eosio::net_api_plugin --plugin eosio::producer_api_plugin "

if cluster.launch(pnodes=totalProducerNodes, loadSystemContract=False,
totalNodes=totalNodes, totalProducers=totalProducerNodes, activateIF=True, biosFinalizer=False,
specificExtraNodeosArgs=specificExtraNodeosArgs) is False:
Utils.cmdError("launcher")
Utils.errorExit("Failed to stand up cluster.")

# verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=5)
Print("Cluster in Sync")

prodNode0 = cluster.getNode(0)
prodNode1 = cluster.getNode(1)
shipNode = cluster.getNode(shipNodeNum)

# cluster.waitOnClusterSync(blockAdvancing=3)
start_block_num = shipNode.getBlockNum()

#verify nodes are in sync and advancing
cluster.waitOnClusterSync(blockAdvancing=3)
Print("Shutdown unneeded bios node")
cluster.biosNode.kill(signal.SIGTERM)

Print("Configure and launch txn generators")
targetTpsPerGenerator = 10
testTrxGenDurationSec=60*60
numTrxGenerators=2
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[cluster.defproduceraAccount.name, cluster.defproducerbAccount.name],
acctPrivKeysList=[cluster.defproduceraAccount.activePrivateKey,cluster.defproducerbAccount.activePrivateKey], nodeId=prodNode1.nodeId,
tpsPerGenerator=targetTpsPerGenerator, numGenerators=numTrxGenerators, durationSec=testTrxGenDurationSec,
waitToComplete=False)

status = cluster.waitForTrxGeneratorsSpinup(nodeId=prodNode1.nodeId, numGenerators=numTrxGenerators)
assert status is not None and status is not False, "ERROR: Failed to spinup Transaction Generators"

prodNode1.waitForProducer("defproducera")

block_range = 100000 # we are going to kill the client, so just make this a huge number
end_block_num = start_block_num + block_range

shipClient = "tests/ship_streamer"
cmd = f"{shipClient} --start-block-num {start_block_num} --end-block-num {end_block_num} --fetch-block --fetch-traces --fetch-deltas --fetch-finality-data"
if Utils.Debug: Utils.Print(f"cmd: {cmd}")
clients = []
files = []
shipTempDir = os.path.join(Utils.DataDir, "ship")
os.makedirs(shipTempDir, exist_ok = True)
shipClientFilePrefix = os.path.join(shipTempDir, "client")

for i in range(0, args.num_clients):
outFile = open(f"{shipClientFilePrefix}{i}.out", "w")
errFile = open(f"{shipClientFilePrefix}{i}.err", "w")
Print(f"Start client {i}")
popen=Utils.delayedCheckOutput(cmd, stdout=outFile, stderr=errFile)
clients.append((popen, cmd))
files.append((outFile, errFile))
Print(f"Client {i} started, Ship node head is: {shipNode.getBlockNum()}")


# allow time for all clients to connect
shipNode.waitForHeadToAdvance(5)
shipNode.waitForLibToAdvance()

Print(f"Kill all {args.num_clients} clients and ship node")
for index, (popen, _) in zip(range(len(clients)), clients):
popen.kill()
if index == len(clients)/2:
shipNode.kill(signal.SIGTERM)
assert not shipNode.verifyAlive(), "ship node did not shutdown"

testSuccessful = True
finally:
TestHelper.shutdown(cluster, walletMgr, testSuccessful=testSuccessful, dumpErrorDetails=dumpErrorDetails)
if shipTempDir is not None:
if testSuccessful and not args.keep_logs:
shutil.rmtree(shipTempDir, ignore_errors=True)

errorCode = 0 if testSuccessful else 1
exit(errorCode)

0 comments on commit 94fea6a

Please sign in to comment.