This repository has been archived by the owner on Jun 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathStreamletNodeGST.cpp
1024 lines (840 loc) · 38.9 KB
/
StreamletNodeGST.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include "grpcpp/grpcpp.h"
#include "streamlet.grpc.pb.h"
#include <iostream>
#include <mutex>
#include <thread>
#include <fstream>
#include <sstream>
#include "structs.h"
#include "utils.h"
#include "NetworkInterposer.h"
#include "CryptoManager.h"
#include "ThroughputLossStateMachine.h"
/*
* Implements the Streamlet protocol in which votes are allowed to arrive after the epoch in
* which a block is proposed. However, epochs still track physical time, and in particular,
* a new block cannot be proposed until the duration of physical time corresponding to an
* epoch has elapsed, even if the block proposed in a prior epoch has been notarized.
*
* The service implements the RPC server using the synchronous interface and acts as an RPC
* client using the asynchronous interface so that broadcasts to other nodes do not block
* request processing.
*/
class StreamletNodeGST : public Streamlet::Service {
public:
/*
* Params
* id: Index in peers that is the ID of this node
* peers: Vector of peers in the including the local node
* priv: Private key of local node
* pub: Public key of local node
* epoch_len: Duration of the epoch in milliseconds
* client_app: Distributed application implementing a ReplicatedStateMachine
*/
StreamletNodeGST(
uint32_t id,
const std::vector<Peer> &peers,
const Key &priv,
const gpr_timespec &epoch_len,
ReplicatedStateMachine &client_app,
std::ofstream ¬e_file,
std::ofstream &fin_file
);
~StreamletNodeGST();
grpc::Status NotifyVote(
grpc::ServerContext* context,
const Vote* vote,
Response* response
) override;
grpc::Status ProposeBlock(
grpc::ServerContext* context,
const Proposal* proposal,
Response* response
) override;
/*
* Processes finished RPCs and epoch advancement, intended to take over the main
* thread. Finished RPCs only need to be cleaned up since, for now, the server side
* of the Streamlet service is not expected to return any information. A deadline
* is set at the start of the next epoch wait on the completion queue so
* that it does not miss the start of an epoch if the queue is empty. Regardless,
* advancement of the epoch is always checked first whenever anything is dequeued.
*
* Params
* epoch_sync: Start of the initial epoch in local time. Must be the same, relative
* to the local time zone, as the epoch synchronization point of other
* nodes in the system.
*/
void Run(gpr_timespec epoch_sync);
private:
void broadcast_vote(const Proposal* proposal, const std::string &hash);
void record_proposal(const Proposal *proposal, uint64_t epoch);
/*
* Links block proposed in note_epoch to the notarized chain under parent proposed in par_epoch.
* Called from ProposeBlock once the block contents are known. This should only be called once
* for a given note_epoch once the block for that epoch gains enough votes.
*
* Notarization of a block is recorded by storing a pointer to a ChainElement that has its
* block fields filled into successors. If a block's parent has not been notarized, then a
* placeholder ChainElement is constructed with only the parent's epoch number and a pointer
* to it is stored into successors. This allows later notarizations to be recorded and later
* relinked as the predecessors get notarized.
*
* If the chain index of the notarized block is known, which is the case if its parent has already
* been notarized and the same holds inductively stretching back to the genesis block, a BFS is
* run from the notarized block over its successors to update their chain indexes. Any prior notarized
* blocks that have formed a "floating" part of the chain have their chain indexes updated in this way
* and this is also how max_chainlen is tracked and updated. This BFS procedure maintains the invariant
* that a block's chain position is known if and only if its parent's is known.
*
* Finalization is also detected during the BFS. If a finalization is detected during BFS, a walk
* backwards up the chain is made. If last_finalized can be reached, then it is known that the
* finalized chain is not missing any notarizations, and any branches extending from blocks in the
* finalized chain that are not part of the finalized chain are removed (via repeated BFS as the
* walk progresses up the chain). ChainElements on these branches are garbage collected, and future
* RPCs will know to discard messages for that block (including additional votes, which are no
* longer needed). The distributed application's (client's) callback is then invoked for each newly
* finalized block after last_finalized, which is also updated to the latest finalized block.
* Otherwise, there is at least one block on the finalized chain whose notarization has not yet been
* seen, and last_finalized remains unchanged.
*/
void notarize_block(
const Block ¬e_block,
const std::string ¬e_hash,
uint64_t note_epoch,
uint64_t par_epoch
);
NetworkInterposer network;
CryptoManager crypto;
ReplicatedStateMachine &client;
// Completion queue for outbound RPCs
grpc::CompletionQueue req_queue;
// Blocks that have been seen or proposed and that are awaiting votes
std::unordered_map<uint64_t, Candidate*> candidates;
// Mutex for candidates. Short reads in common case so better not
// having reader/writer lock.
std::mutex candidates_m;
// Mapping from a block to its dependents so that the chain can
// be constructed with later blocks that were notarized first and
// were waiting on a given earlier block
std::unordered_map<uint64_t, ChainElement*> successors;
// Length of longest notarized chain, also guarded by successors_m
uint64_t max_chainlen;
// last element of longest notarized chain (not necessarily unique),
// also guarded by successors_m
const ChainElement *last_chainlen;
// Pointer to one past last finalized ChainElement in successors,
// which is the block at which to start notifying the client app
// of new finalizations, also guarded by successors_m
const ChainElement *last_finalized;
// Mutex for successors
std::mutex successors_m;
// Mutex to serialize client notifications, and also to
// ensure finalization notifications occur in order
std::mutex notification_m;
// Genesis block
ChainElement genesis_block;
// Address of this server
const std::string local_addr;
// The ID of this node
const uint32_t local_id;
// Number of peers, which is one greater than the max ID
const uint32_t num_peers;
// Number of votes at which a block is notarized
const uint32_t note_threshold;
// Duration of each epoch
const gpr_timespec epoch_duration;
// Current epoch number
std::atomic_uint64_t epoch_counter;
#ifdef DEBUG_PRINTS
std::mutex print_m;
#endif
// Files for recording notarized and finalized blocks
std::ofstream ¬arizations;
std::ofstream &finalizations;
gpr_timespec start_time;
};
StreamletNodeGST::StreamletNodeGST(
uint32_t id,
const std::vector<Peer> &peers,
const Key &priv,
const gpr_timespec &epoch_len,
ReplicatedStateMachine &client_app,
std::ofstream ¬e_file,
std::ofstream &fin_file)
: network{peers, id},
crypto{peers, priv, id},
client{client_app},
max_chainlen{0},
genesis_block{0},
local_addr{make_loopback(peers.at(id).addr)},
local_id{id},
num_peers{static_cast<uint32_t>(peers.size())},
note_threshold{2 * ((num_peers - 1) / 3) + 1}, // given a value of 3f + 1, calculate 2f + 1
epoch_duration{epoch_len},
epoch_counter{0},
notarizations{note_file},
finalizations{fin_file}
{
// Create entry for dependents of genesis block
successors.emplace(0, &genesis_block);
// To suppress hash_block arning about phash not being 256 bits in length
genesis_block.block.mutable_phash()->resize(32);
// Set the hash of the empty block
genesis_block.hash = crypto.hash_block(genesis_block.block);
// Set pointer once genesis block is constructed
last_finalized = &genesis_block;
last_chainlen = &genesis_block;
}
StreamletNodeGST::~StreamletNodeGST() {
}
grpc::Status StreamletNodeGST::NotifyVote(
grpc::ServerContext* context,
const Vote* vote,
Response* response
) {
const uint64_t cur_epoch = epoch_counter.load(std::memory_order_relaxed);
const uint64_t b_epoch = vote->epoch();
const std::string& b_hash = vote->hash();
const uint32_t voter = vote->node();
if (!crypto.verify_vote(voter, vote->signature(), vote)) {
std::cout << "Warning: Signature verification failed on vote for block " << b_epoch
<< " by node " << voter << ", discarding message" << std::endl;
return grpc::Status::OK; // discard message
}
// Cleanup the block
bool remove = false;
// The number of votes including the vote from the message being
// processed if it is valid
uint32_t votes = 0;
// Pointer to candidate set only when the vote is in scope for the epoch
Candidate *cand = nullptr;
// discard and ignore or report error if b_epoch > cur_epoch
if (b_epoch <= cur_epoch) {
candidates_m.lock();
std::unordered_map<uint64_t, Candidate*>::iterator iter
= candidates.find(b_epoch);
if (iter == candidates.end()) {
cand = new Candidate{num_peers};
candidates.emplace(b_epoch, cand);
} else {
cand = iter->second;
}
cand->ref_count.fetch_add(1, std::memory_order_relaxed);
candidates_m.unlock();
}
if (cand != nullptr) {
cand->m.lock();
// Empty hash means the proposal has not yet been seen
if (cand->hash.empty()) {
cand->vote_hashes.emplace(voter, b_hash);
} else if (cand->hash == b_hash && cand->voters[voter] != true) {
cand->voters[voter] = true;
votes = ++cand->votes;
}
// This shows some unexpected looping echoing behavior that stalls progress
/*else if (cand->hash != b_hash) {
std::cout << "VOTE for block " << b_epoch << " from node " << voter << " hash mismatch" << std::endl;
}*/
remove = (cand->removed
&& (cand->ref_count.fetch_sub(1, std::memory_order_relaxed) == 1));
cand->m.unlock();
if (remove) {
delete cand;
} else if (votes > 0) {
// Votes can only be non-zero if the proposal has already been seen. Furthermore, if this
// invocation of NotifyVote has incremented the number of votes to meet the notarization
// threshold, then the block must be notarized by the same invocation.
if (votes == note_threshold) {
// can lock candidates map, remove from map, add entry to some "finished" set, and then unlock
// then call notarize_block() with the Candidate's block
// then lock the Candidate, set the remove flag, record ref_count.load(std::memory_order_relaxed),
// and then unlock, and finally deallocate the candidate if ref_count was zero
// but for now just notarize
notarize_block(cand->block, b_hash, b_epoch, vote->parent());
}
// votes is 0 when the message is a duplicate or unanticipated, echo otherwise
network.broadcast(*vote, &req_queue);
}
#ifdef DEBUG_PRINTS
print_m.lock();
std::cout << "VOTE for block " << b_epoch << " from node " << voter
<< ", current epoch is " << cur_epoch << std::endl;
if (votes == 0) std::cout << "\t^ IGNORED" << std::endl;
print_m.unlock();
#endif
// As future work, NotifyVote could also notarize blocks whose proposal has not been seen if a
// mechanism to catch up by recursively requesting missing proposals and parents is implemented.
}
return grpc::Status::OK;
}
grpc::Status StreamletNodeGST::ProposeBlock(
grpc::ServerContext* context,
const Proposal* proposal,
Response* response
) {
const uint64_t cur_epoch = epoch_counter.load(std::memory_order_relaxed);
const uint64_t b_epoch = proposal->block().epoch();
const uint64_t b_parent = proposal->block().parent();
const uint32_t proposer = proposal->node();
if (proposer != crypto.hash_epoch(b_epoch)) {
std::cout << "Warning: Received proposal for epoch " << b_epoch
<< " from wrong leader (node " << proposer
<< "), discarding message" << std::endl;
return grpc::Status::OK; // discard mesasge
}
std::string hash{
crypto.hash_block(proposal->block())
};
if (!crypto.verify_block(proposer, proposal->signature(), proposal->block())) {
std::cout << "Warning: Signature verification failed for block " << b_epoch
<< " proposed by node " << proposer << ", discarding message" << std::endl;
return grpc::Status::OK; // discard mesasge
}
// Check that the block extends the longest chain. Since messages can be delayed
// and the proposal might not have been seen by the time a block is notarized,
// it is not always possible to check this. A proposal can be definitely ruled out
// if its parent's index is known and is less than the maximum notarized chain length.
// Since a ChainElement's index is 0 when its position is not known, and the genesis
// block's index is also 0, this definite condition is checked in two cases, one for
// when the parent block is the genesis block and one for when it is not. The one
// other definite condition is when a proposed block's parent has an epoch number
// earlier than the most recent finalized block. This also ensures that when blocks
// not in the finalized chain are deleted in notarize_block() below, no later messages
// will allocate a new ChainElement for that parent.
bool outdated_length = false;
bool index_known = false;
successors_m.lock();
if (b_parent < last_finalized->block.epoch()) {
outdated_length = true;
} else {
std::unordered_map<uint64_t, ChainElement*>::iterator iter
= successors.find(b_parent);
if (iter != successors.end() && b_parent != 0 && iter->second->index != 0) {
if (iter->second->index < max_chainlen) {
outdated_length = true;
} else if (iter->second->hash == proposal->block().phash()) {
index_known = true;
} // else the parent hash did not match so do not vote
} else if (iter != successors.end() && b_parent == 0) {
if (max_chainlen > 0) {
outdated_length = true;
} else if (iter->second->hash == proposal->block().phash()) {
index_known = true;
}
}
}
successors_m.unlock();
// This is misleading if an echoed proposal that is stale is discarded, which
// is the right behavior, so keep this commented out.
//
// if (outdated_length) {
// std::cout << "Proposal for epoch " << b_epoch
// << " does not extend longest chain, discarding message" << std::endl;
// return grpc::Status::OK; // discard message
// }
// Check that the block is a valid extension according to the application
// by invoking a callback on the ReplicatedStateMachine.
if (!client.ValidateTransactions(proposal->block().txns(), proposal->block().epoch())) {
std::cout << "Warning: Client rejected proposal for epoch " << b_epoch
<< ", discarding message" << std::endl;
return grpc::Status::OK; // discard message
}
bool remove = false;
uint32_t votes = 0;
Candidate *cand = nullptr;
// discard and ignore or report error if b_epoch > cur_epoch
if (b_epoch <= cur_epoch) {
candidates_m.lock();
std::unordered_map<uint64_t, Candidate*>::iterator iter
= candidates.find(b_epoch);
if (iter == candidates.end()) {
cand = new Candidate{num_peers};
candidates.emplace(b_epoch, cand);
} else {
cand = iter->second;
}
cand->ref_count.fetch_add(1, std::memory_order_relaxed);
candidates_m.unlock();
}
if (cand != nullptr) {
// An empty hash means the proposal has not yet been seen. If it has been
// seen, then this is a duplicate or invalid message and must be ignored.
cand->m.lock();
if (cand->hash.empty()) {
cand->hash = hash;
// Proposer's vote is always recorded below. Add the local vote if the epoch
// matches and the chain length has been verified. Otherwise, only count
// the proposers vote,
if (b_epoch == cur_epoch && index_known) {
votes = 2;
cand->voters[local_id] = true;
} else {
votes = 1;
}
cand->voters[proposer] = true;
for (std::pair<const uint32_t, std::string> &p : cand->vote_hashes) {
if (p.second == hash) {
cand->voters[p.first] = true;
votes++;
}
}
cand->votes = votes;
cand->vote_hashes.clear();
cand->block.CopyFrom(proposal->block());
}
remove = (cand->removed
&& (cand->ref_count.fetch_sub(1, std::memory_order_relaxed) == 1));
cand->m.unlock();
// See comment above identical branch in NotifyVote. Notarization will not
// be applied a second time if the message is a duplicate since votes is 0.
if (remove) {
delete cand;
} else if (votes >= note_threshold) {
// can lock candidates map, remove from map, add entry to some "finished" set, and then unlock
// then call notarize_block() with the Candidate's block
// then lock the Candidate, set the remove flag, record ref_count.load(std::memory_order_relaxed),
// and then unlock, and finally deallocate the candidate if ref_count was zero
// but for now just notarize
notarize_block(proposal->block(), hash, b_epoch, b_parent);
}
#ifdef DEBUG_PRINTS
print_m.lock();
std::cout << "PROPOSAL for block " << b_epoch << " (parent " << b_parent
<< ") from node " << proposer << ", current epoch is " << cur_epoch << std::endl;
if (votes == 0) std::cout << "\t^ IGNORED" << std::endl;
print_m.unlock();
#endif
// votes is 0 when the message is a duplicate or unanticipated, echo otherwise
if (!remove && votes > 0) {
network.broadcast(*proposal, &req_queue);
if (b_epoch == cur_epoch && index_known) {
broadcast_vote(proposal, hash);
}
}
}
return grpc::Status::OK;
}
void StreamletNodeGST::notarize_block(
const Block ¬e_block,
const std::string ¬e_hash,
uint64_t note_epoch,
uint64_t par_epoch
) {
notification_m.lock();
// ==== Begin Nusret's benchmarking code ====
gpr_timespec t_diff = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
double seconds_elapsed = static_cast<double>(t_diff.tv_sec);
seconds_elapsed += static_cast<double>(t_diff.tv_nsec) / 1e9;
std::string print_hash{};
write_hexstring(print_hash, note_hash);
notarizations << "block hash: " << print_hash << " current epoch: " << note_epoch << " time: "
<< seconds_elapsed << " chain len: " << max_chainlen + 1 << std::endl;
// ==== End Nusret's benchmarking code ====
client.TransactionsNotarized(note_block.txns(), note_epoch);
notification_m.unlock();
std::list<ChainElement*> queue;
const ChainElement *prev_finalized = nullptr;
ChainElement *new_finalized = nullptr;
if (note_epoch == 0) {
std::cerr << "Genesis block should never be passed to notarize_block" << std::endl;
return;
}
successors_m.lock();
// In the time that ProposeBlock was running, it is possible that a block on a different branch
// has extended the finalized chain, in which case a ChainElement for note_epoch must not be
// constructed because the ChainElement at par_epoch has been deleted and the new ChainElement
// would not later be garbage collected.
if (par_epoch < last_finalized->block.epoch()) {
successors_m.unlock();
return;
}
ChainElement *&p_elem = successors[par_epoch];
if (p_elem == nullptr) {
p_elem = new ChainElement{par_epoch};
}
ChainElement *&new_elem = successors[note_epoch];
if (new_elem == nullptr) {
new_elem = new ChainElement{note_epoch};
new_elem->block.CopyFrom(note_block);
new_elem->hash = note_hash;
} else if (new_elem->index == 0) {
// ChainElement already existed but block was previously not notarized
new_elem->block.CopyFrom(note_block);
new_elem->hash = note_hash;
} else {
successors_m.unlock();
std::cerr << "Duplicate attempt to notarize block " << note_epoch << std::endl;
return;
}
new_elem->plink = p_elem;
p_elem->links.push_back(new_elem);
// If a finalization is detected, this is set to reference the last element
// of a sequence of three or more contiguous epochs
std::list<ChainElement*>::iterator finalize_end = queue.end();
// If the parent is already linked into the chain, or the parent is the genesis
// block, then set the chain index of new_elem and look for any children
// that can be linked and check if they extend the longest chain at their position.
if (p_elem->index != 0 || par_epoch == 0) {
new_elem->index = p_elem->index + 1;
queue.push_back(new_elem);
// Check whether new_elem itself ends a finalized chain
if (new_elem->epoch == p_elem->epoch + 1
&& (p_elem->plink != nullptr && p_elem->epoch == p_elem->plink->epoch + 1)) {
finalize_end = queue.begin();
}
// Run a BFS to relink previously notarized blocks and detect finalization
std::list<ChainElement*>::iterator queue_iter = queue.begin();
while (queue_iter != queue.end()) {
const ChainElement *elem = *queue_iter;
bool parent_contiguous = elem->epoch == elem->plink->epoch + 1;
for (ChainElement *child : elem->links) {
child->index = elem->index + 1;
queue.push_back(child);
// If the child's epoch is contiguous with its parent's, and the parent
// was contiguous with the grandparent, then the child is now the first
// non-finalized element.
if (parent_contiguous && child->epoch == elem->epoch + 1) {
finalize_end = --queue.end();
}
}
++queue_iter;
}
// Because queue is expanded in order of increasing BFS depth, the
// last element must contain the length of the longest notarized
// chain, although this chain is not necessarily unique
last_chainlen = queue.back();
max_chainlen = last_chainlen->index;
}
if (finalize_end != queue.end()) {
// The last finalized element is second to last element in the
// contiguous sequence of epochs. This will be used to update
// last_finalized below if the tail of the finalized chain can
// be walked all the way back to last_finalized.
new_finalized = (*finalize_end)->plink;
queue.clear();
std::list<ChainElement*>::iterator queue_iter = queue.end();
// In this branch, since finalize_end was set, its plink is known
// not to be null as is its grandparent (*finalize_end)->plink->plink,
// so that the first iteration of the loop below is safe
ChainElement *elem = new_finalized;
while (elem != last_finalized && elem->index != 0) {
ChainElement *p = elem->plink;
std::list<ChainElement*>::iterator pchild = p->links.begin();
std::list<ChainElement*>::iterator rm;
// This loop pushes siblings of elem, which are now known not to be
// in the finalized chain, into queue to be deleted below
while (pchild != p->links.end()) {
if (*pchild != elem) {
queue.push_back(*pchild);
rm = pchild;
++pchild;
p->links.erase(rm);
} else {
++pchild;
}
}
// queue_iter will still be queue.end() on next iteration
// if nothing was added to queue
if (queue_iter == queue.end()) {
queue_iter = queue.begin();
} else {
++queue_iter;
}
// Run a BFS over the added elements to collect all elements
// that are known not to be in the finalized chain
while (queue_iter != queue.end()) {
for (ChainElement *child : (*queue_iter)->links) {
queue.push_back(child);
}
++queue_iter;
}
// Move iterator back to last element in queue so that it can
// continue from the given position to newly added elements
// in the next iteration of the outer while loop
--queue_iter;
elem = p;
}
if (elem == last_finalized) {
prev_finalized = last_finalized;
last_finalized = new_finalized;
// Remove outdated chains from the successors map
for (ChainElement *rm : queue) {
successors.erase(rm->epoch);
}
}
}
// Overlap locks because, in case of concurrent finalizations, the thread that
// will notify the client of earlier finalized nodes must run first
notification_m.lock();
successors_m.unlock();
std::list<const ChainElement*> finalized_elems;
if (prev_finalized != nullptr) {
// Deletion can be done outside the lock since elements are deleted
// only when last_finalized has been updated, so no later blocks
// can be notarized on any chain not extending from the new last
// finalized block.
for (ChainElement *elem : queue) {
delete elem;
}
// Notify state machine client of finalized transactions. Similar to
// the above, no new messages will change the state of these elements.
// Moreover, notification is read-only, so notification does not require
// taking hold of the lock. There also should be only one child per
// element in the finalized chain, and the notifications begin from the
// child of the previous last finalized element.
if (prev_finalized->links.size() != 1) {
std::cerr << "Block " << prev_finalized->epoch
<< " on finalized chain does not have exactly one successor" << std::endl;
}
do {
prev_finalized = prev_finalized->links.front();
// ==== Begin Nusret's benchmarking code ====
gpr_timespec t_diff = gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), start_time);
double seconds_elapsed = static_cast<double>(t_diff.tv_sec);
seconds_elapsed += static_cast<double>(t_diff.tv_nsec) / 1e9;
std::string print_hash;
write_hexstring(print_hash, prev_finalized->hash);
finalizations << "block hash: " << print_hash << " block epoch: " << prev_finalized->block.epoch() << " time: "
<< seconds_elapsed << " chain len: " << prev_finalized->index << std::endl;
// ==== End Nusret's benchmarking code ====
client.TransactionsFinalized(prev_finalized->block.txns(), prev_finalized->block.epoch());
} while (prev_finalized != new_finalized);
}
notification_m.unlock();
}
void StreamletNodeGST::broadcast_vote(const Proposal* proposal, const std::string &hash) {
Vote v;
v.set_node(local_id);
v.set_parent(proposal->block().parent());
v.set_epoch(proposal->block().epoch());
v.set_hash(hash);
// sign_vote computes a signature over only the parent, epoch, and hash,
// which are set above, so it's ok to sign a partially filled vote
v.set_signature(crypto.sign_vote(&v));
network.broadcast(v, &req_queue);
}
void StreamletNodeGST::record_proposal(const Proposal *proposal, uint64_t epoch) {
bool remove = false;
uint32_t votes = 0;
Candidate *cand = nullptr;
std::string hash{
crypto.hash_block(proposal->block())
};
candidates_m.lock();
std::unordered_map<uint64_t, Candidate*>::iterator iter
= candidates.find(epoch);
if (iter == candidates.end()) {
cand = new Candidate{num_peers};
candidates.emplace(epoch, cand);
} else {
cand = iter->second;
}
cand->ref_count.fetch_add(1, std::memory_order_relaxed);
candidates_m.unlock();
// Unlike in ProposeBlock, the check on hash.empty() is to prevent double counting
// in case an echo from a remote node has returned the local proposal before this
// method was run.
cand->m.lock();
if (cand->hash.empty()) {
cand->hash = hash;
// Since the local node proposed the block, only count one vote
votes = 1;
cand->voters[local_id] = true;
// Tally other votes in case votes from other nodes started rolling in
// before local execution reached this point
for (std::pair<const uint32_t, std::string> &p : cand->vote_hashes) {
if (p.second == hash) {
cand->voters[p.first] = true;
votes++;
}
}
cand->votes = votes;
cand->vote_hashes.clear();
cand->block.CopyFrom(proposal->block());
}
remove = (cand->removed
&& (cand->ref_count.fetch_sub(1, std::memory_order_relaxed) == 1));
cand->m.unlock();
// See comment above identical branch in NotifyVote. Notarization will not
// be applied a second time if the message is a duplicate since votes is 0.
if (remove) {
delete cand;
} else if (votes >= note_threshold) {
// can lock candidates map, remove from map, add entry to some "finished" set, and then unlock
// then call notarize_block() with the Candidate's block
// then lock the Candidate, set the remove flag, record ref_count.load(std::memory_order_relaxed),
// and then unlock, and finally deallocate the candidate if ref_count was zero
// but for now just notarize
notarize_block(proposal->block(), hash, epoch, proposal->block().parent());
}
}
void StreamletNodeGST::Run(gpr_timespec epoch_sync) {
std::cout << "Notarization threshold: " << note_threshold << std::endl;
// Build server and run
std::string server_address{ local_addr };
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.RegisterService(this);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
// Copy of start time for printing time differences
start_time = epoch_sync;
// Poll for completed async requests and monitor epoch progress
grpc::CompletionQueue::NextStatus status;
void *tag;
bool ok;
status = req_queue.AsyncNext(&tag, &ok, epoch_sync);
// Client should only begin tracking time after the initial sync is complete
client.BeginTime();
while (status != grpc::CompletionQueue::NextStatus::SHUTDOWN) {
// Always check if the epoch advanced
gpr_timespec t_now = gpr_now(GPR_CLOCK_MONOTONIC);
if (gpr_time_cmp(gpr_time_sub(t_now, epoch_sync), epoch_duration) >= 0) {
epoch_sync = gpr_time_add(epoch_sync, epoch_duration);
// Get delay after updating epoch_sync
// gpr_timespec delay = gpr_time_sub(t_now, epoch_sync);
// std::cout << "Timeout delay: " << delay.tv_nsec << std::endl;
// Since the increment specifies relaxed memory order, be careful
// that no code below this statement changes other data shared among
// threads that must be sequenced with epoch_counter.
uint64_t cur_epoch = epoch_counter.fetch_add(1, std::memory_order_relaxed);
cur_epoch++; // Must be incremented because the atomic op returns the previous value
// Run leader logic when the hash of the epoch matches the local node id
if (crypto.hash_epoch(cur_epoch) == local_id) {
Proposal p;
p.set_node(local_id);
p.mutable_block()->set_parent(max_chainlen);
successors_m.lock();
p.mutable_block()->set_phash(last_chainlen->hash);
successors_m.unlock();
p.mutable_block()->set_epoch(cur_epoch);
#ifdef BYZANTINE
for (uint32_t peer = 0; peer < num_peers; peer++) {
client.GetTransactions(peer, p.mutable_block()->mutable_txns(), cur_epoch);
#ifdef DEBUG_PRINTS
print_m.lock();
std::cout << "Epoch " << cur_epoch << ", adversarial leader " << local_id
<< " to node " << peer << ": " << p.block().txns() << std::endl;
print_m.unlock();
#endif
// This must be run after all fields of the block have been filled out
p.set_signature(crypto.sign_block(p.block()));
// Only send if not the local id. This also allows the adversarial client to
// customize its own record.
if (peer != local_id) {
network.send_single(peer, p, &req_queue);
} else {
record_proposal(&p, cur_epoch);
}
}
#else
client.GetTransactions(p.mutable_block()->mutable_txns(), cur_epoch);
#ifdef DEBUG_PRINTS
print_m.lock();
std::cout << "Epoch " << cur_epoch << ", leader " << local_id
<< ": " << p.block().txns() << std::endl;
print_m.unlock();
#endif
// This must be run after all fields of the block have been filled out
p.set_signature(crypto.sign_block(p.block()));
network.broadcast(p, &req_queue);
// Broadcast first then propose locally to overlap the network requests
// with local exection. record_proposal is designed to be safe to call
// with the possiblity of from remote nodes echoing the proposal so that
// ProposeBlock is called before or concurrently with record_proposal.
record_proposal(&p, cur_epoch);
#endif
}
}
// Nothing to process if the status is TIMEOUT. Otherwise, tag is a
// completed async request that must be cleaned up.
if (status == grpc::CompletionQueue::NextStatus::GOT_EVENT) {
NetworkInterposer::Pending *req = static_cast<NetworkInterposer::Pending*>(tag);
// optionally check status and response (currently an empty struct)
// Deallocate the Pending structure. As stated in the comment above NetworkInterposer's
// broadcast methods, we are not tracking outstanding requests or associated memory
// for simplicity. In principle, the completion queue should eventually give us back
// a pointer to each of our allocations, including if a request was cancelled. If the server
// is intended to run indefinitely, there is no point in tracking the outstanding memory
// since we would expect to see these pointers. In reality, we aren't familiar with GRPC's
// cancellation behavior or guarantees on the completion queue, and this is a demo that will
// be Ctrl-C'd anyway.
delete req;
}
status = req_queue.AsyncNext(&tag, &ok, epoch_sync);
}
server->Shutdown();
server->Wait();
}
int main(const int argc, const char *argv[]) {
if (argc != 5) {
std::cout << "Usage: StreamletNodeGST <sync_time> <epoch_len> <config_file> <local_id>\n"
<< "where\n"
<< "\tsync_time is of the form HH:MM:SS specifying at time in UTC at which to start counting epochs\n"
<< "\tepoch_len is the duration of each epoch in milliseconds\n"
<< "\tconfig_file is a path to a copy of the configuration file supplied to all nodes in the system\n"
<< "\tlocal_id is the node ID used by the instance being started\n"
<< std::endl;
return 1;
}
// read command line: sync_time epoch_len config_file local_id
// parse config into std::list<Peer>
int status = 0;
const uint32_t id = strtoul(argv[4], nullptr, 10);
const uint32_t epoch = strtoul(argv[2], nullptr, 10);
std::vector<Peer> peers;
Key privkey;
status = load_config(argv[3], id, peers, privkey);
if (status == 1) {
std::cerr << "Unable to open or read " << argv[3] << std::endl;
return 1;
} else if (status == 2) {
std::cerr << "Configuration in " << argv[3] << " is malformed" << std::endl;
return 1;
}
gpr_time_init();
// Files for recording notarized and finalized blocks
std::ofstream notarizations;
notarizations.exceptions(std::ofstream::failbit);
std::ofstream finalizations;
finalizations.exceptions(std::ofstream::failbit);
std::ostringstream fmt;
fmt << "notarizations_" << id;
notarizations.open(fmt.str());
fmt.str(std::string{}); // clear
fmt << "finalizations_" << id;
finalizations.open(fmt.str());
// Set up client application and server
ThroughputLossStateMachine rsm{
id,
static_cast<uint32_t>(peers.size()),
gpr_time_from_millis(1000, GPR_TIMESPAN),
notarizations,
finalizations
};
std::thread input_thread = rsm.SpawnThread();
StreamletNodeGST service{
id,
peers,
privkey,
gpr_time_from_millis(epoch, GPR_TIMESPAN),
rsm,
notarizations,
finalizations