Skip to content

Commit

Permalink
Replace panic with error handling
Browse files Browse the repository at this point in the history
Starting from the processor.Block.Process all methods now return errors
if something goes wrong with unpacking of the blocks and reading the
transactions. In each function where the error is being propagated back
to client it is wrapped in a message with the function name. This makes
it easier to track down the error and see the propagation chain. Finally
the error is logged to the terminal and the go routine shuts down
gracefully. The graceful shutdown executes all deferred functions which
close the context, the checkpointer and the gateway.

Before panics were used everywhere which was an issue because the
unpacking of the blocks happened in a go routine. When a panic happens
in a go routine only the deferred functions of the go routine are called
but not those of the client which lead to unexpected behavior.

Signed-off-by: Stanislav Jakuschevskij <[email protected]>
  • Loading branch information
twoGiants committed Jan 8, 2025
1 parent ed3737d commit 0b8ba91
Show file tree
Hide file tree
Showing 13 changed files with 380 additions and 155 deletions.
4 changes: 2 additions & 2 deletions off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ func listen(clientConnection *grpc.ClientConn) {
channelName,
)

if err := blockProcessor.Process(); err == store.ErrExpected {
fmt.Println(err)
if err := blockProcessor.Process(); err != nil {
fmt.Println("\033[31m[ERROR]\033[0m", err)
return
}
}
Expand Down
86 changes: 57 additions & 29 deletions off_chain_data/application-go/parser/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,74 +14,102 @@ type Block struct {
}

func ParseBlock(block *common.Block) *Block {
return &Block{block, nil}
return &Block{block, []*Transaction{}}
}

func (b *Block) Number() uint64 {
header := utils.AssertDefined(b.block.GetHeader(), "missing block header")
return header.GetNumber()
func (b *Block) Number() (uint64, error) {
header, err := utils.AssertDefined(b.block.GetHeader(), "missing block header")
if err != nil {
return 0, fmt.Errorf("in Number: %w", err)
}
return header.GetNumber(), nil
}

func (b *Block) Transactions() []*Transaction {
return utils.Cache(func() []*Transaction {
envelopes := b.unmarshalEnvelopesFromBlockData()
func (b *Block) Transactions() ([]*Transaction, error) {
return utils.Cache(func() ([]*Transaction, error) {
funcName := "Transactions"
envelopes, err := b.unmarshalEnvelopesFromBlockData()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

commonPayloads := b.unmarshalPayloadsFrom(envelopes)
commonPayloads, err := b.unmarshalPayloadsFrom(envelopes)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

payloads := b.parse(commonPayloads)
payloads, err := b.parse(commonPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

return b.createTransactionsFrom(payloads)
return b.createTransactionsFrom(payloads), nil
})()
}

func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope {
func (b *Block) unmarshalEnvelopesFromBlockData() ([]*common.Envelope, error) {
result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err)
return nil, fmt.Errorf("in unmarshalEnvelopesFromBlockData: %w", err)
}
result = append(result, envelope)
}
return result
return result, nil
}

func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload {
func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) ([]*common.Payload, error) {
result := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err)
return nil, fmt.Errorf("in unmarshalPayloadsFrom: %w", err)
}
result = append(result, commonPayload)
}
return result
return result, nil
}

func (b *Block) parse(commonPayloads []*common.Payload) []*payload {
validationCodes := b.extractTransactionValidationCodes()
func (b *Block) parse(commonPayloads []*common.Payload) ([]*payload, error) {
funcName := "parse"

validationCodes, err := b.extractTransactionValidationCodes()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

result := []*payload{}
for i, commonPayload := range commonPayloads {
payload := parsePayload(
commonPayload,
int32(utils.AssertDefined(
validationCodes[i],
fmt.Sprint("missing validation code index", i),
),
),
statusCode, err := utils.AssertDefined(
validationCodes[i],
fmt.Sprint("missing validation code index", i),
)
if payload.isEndorserTransaction() {
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

payload := parsePayload(commonPayload, int32(statusCode))
is, err := payload.isEndorserTransaction()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}
if is {
result = append(result, payload)
}
}
return result

return result, nil
}

func (b *Block) extractTransactionValidationCodes() []byte {
metadata := utils.AssertDefined(
func (b *Block) extractTransactionValidationCodes() ([]byte, error) {
metadata, err := utils.AssertDefined(
b.block.GetMetadata(),
"missing block metadata",
)
if err != nil {
return nil, fmt.Errorf("in extractTransactionValidationCodes: %w", err)
}

return utils.AssertDefined(
metadata.GetMetadata()[common.BlockMetadataIndex_TRANSACTIONS_FILTER],
Expand Down
16 changes: 12 additions & 4 deletions off_chain_data/application-go/parser/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@ func Test_GetReadWriteSetsFromEndorserTransaction(t *testing.T) {
}

parsedEndorserTransaction := parseEndorserTransaction(transaction)
if len(parsedEndorserTransaction.readWriteSets()) != 1 {
t.Fatal("expected 1 ReadWriteSet, got", len(parsedEndorserTransaction.readWriteSets()))
readWriteSets, err := parsedEndorserTransaction.readWriteSets()
if err != nil {
t.Fatal("unexpected error:", err)
}

if len(readWriteSets) != 1 {
t.Fatal("expected 1 ReadWriteSet, got", len(readWriteSets))
}

assertReadWriteSet(
parsedEndorserTransaction.readWriteSets()[0].namespaceReadWriteSets()[0],
readWriteSets[0].namespaceReadWriteSets()[0],
expectedNamespace,
expectedAsset,
t,
Expand All @@ -57,7 +62,10 @@ func assertReadWriteSet(
t.Errorf("expected namespace %s, got %s", expectedNamespace, parsedNsRwSet.Namespace())
}

actualKVRWSet := parsedNsRwSet.ReadWriteSet()
actualKVRWSet, err := parsedNsRwSet.ReadWriteSet()
if err != nil {
t.Fatal("unexpected error:", err)
}
if len(actualKVRWSet.Writes) != 1 {
t.Fatal("expected 1 write, got", len(actualKVRWSet.Writes))
}
Expand Down
74 changes: 48 additions & 26 deletions off_chain_data/application-go/parser/endorserTransaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parser

import (
"fmt"
"offChainData/utils"

"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
Expand All @@ -16,83 +17,104 @@ func parseEndorserTransaction(transaction *peer.Transaction) *endorserTransactio
return &endorserTransaction{transaction}
}

func (p *endorserTransaction) readWriteSets() []*readWriteSet {
return utils.Cache(func() []*readWriteSet {
chaincodeActionPayloads := p.unmarshalChaincodeActionPayloads()
func (p *endorserTransaction) readWriteSets() ([]*readWriteSet, error) {
return utils.Cache(func() ([]*readWriteSet, error) {
funcName := "readWriteSets"
chaincodeActionPayloads, err := p.unmarshalChaincodeActionPayloads()
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

chaincodeEndorsedActions := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
chaincodeEndorsedActions, err := p.extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

proposalResponsePayloads := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
proposalResponsePayloads, err := p.unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

chaincodeActions := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
chaincodeActions, err := p.unmarshalChaincodeActionsFrom(proposalResponsePayloads)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

txReadWriteSets := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
txReadWriteSets, err := p.unmarshalTxReadWriteSetsFrom(chaincodeActions)
if err != nil {
return nil, fmt.Errorf("in %s: %w", funcName, err)
}

return p.parseReadWriteSets(txReadWriteSets)
return p.parseReadWriteSets(txReadWriteSets), nil
})()
}

func (p *endorserTransaction) unmarshalChaincodeActionPayloads() []*peer.ChaincodeActionPayload {
func (p *endorserTransaction) unmarshalChaincodeActionPayloads() ([]*peer.ChaincodeActionPayload, error) {
result := []*peer.ChaincodeActionPayload{}
for _, transactionAction := range p.transaction.GetActions() {
chaincodeActionPayload := &peer.ChaincodeActionPayload{}
if err := proto.Unmarshal(transactionAction.GetPayload(), chaincodeActionPayload); err != nil {
panic(err)
return nil, fmt.Errorf("in unmarshalChaincodeActionPayloads: %w", err)
}

result = append(result, chaincodeActionPayload)
}
return result
return result, nil
}

func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) []*peer.ChaincodeEndorsedAction {
func (*endorserTransaction) extractChaincodeEndorsedActionsFrom(chaincodeActionPayloads []*peer.ChaincodeActionPayload) ([]*peer.ChaincodeEndorsedAction, error) {
result := []*peer.ChaincodeEndorsedAction{}
for _, payload := range chaincodeActionPayloads {
chaincodeEndorsedAction, err := utils.AssertDefined(
payload.GetAction(),
"missing chaincode endorsed action",
)
if err != nil {
return nil, fmt.Errorf("in extractChaincodeEndorsedActionsFrom: %w", err)
}

result = append(
result,
utils.AssertDefined(
payload.GetAction(),
"missing chaincode endorsed action",
),
chaincodeEndorsedAction,
)
}
return result
return result, nil
}

func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) []*peer.ProposalResponsePayload {
func (*endorserTransaction) unmarshalProposalResponsePayloadsFrom(chaincodeEndorsedActions []*peer.ChaincodeEndorsedAction) ([]*peer.ProposalResponsePayload, error) {
result := []*peer.ProposalResponsePayload{}
for _, endorsedAction := range chaincodeEndorsedActions {
proposalResponsePayload := &peer.ProposalResponsePayload{}
if err := proto.Unmarshal(endorsedAction.GetProposalResponsePayload(), proposalResponsePayload); err != nil {
panic(err)
return nil, fmt.Errorf("in unmarshalProposalResponsePayloadsFrom: %w", err)
}
result = append(result, proposalResponsePayload)
}
return result
return result, nil
}

func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) []*peer.ChaincodeAction {
func (*endorserTransaction) unmarshalChaincodeActionsFrom(proposalResponsePayloads []*peer.ProposalResponsePayload) ([]*peer.ChaincodeAction, error) {
result := []*peer.ChaincodeAction{}
for _, proposalResponsePayload := range proposalResponsePayloads {
chaincodeAction := &peer.ChaincodeAction{}
if err := proto.Unmarshal(proposalResponsePayload.GetExtension(), chaincodeAction); err != nil {
panic(err)
return nil, fmt.Errorf("in unmarshalChaincodeActionsFrom: %w", err)
}
result = append(result, chaincodeAction)
}
return result
return result, nil
}

func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) []*rwset.TxReadWriteSet {
func (*endorserTransaction) unmarshalTxReadWriteSetsFrom(chaincodeActions []*peer.ChaincodeAction) ([]*rwset.TxReadWriteSet, error) {
result := []*rwset.TxReadWriteSet{}
for _, chaincodeAction := range chaincodeActions {
txReadWriteSet := &rwset.TxReadWriteSet{}
if err := proto.Unmarshal(chaincodeAction.GetResults(), txReadWriteSet); err != nil {
continue
return nil, fmt.Errorf("in unmarshalTxReadWriteSetsFrom: %w", err)
}
result = append(result, txReadWriteSet)
}
return result
return result, nil
}

func (*endorserTransaction) parseReadWriteSets(txReadWriteSets []*rwset.TxReadWriteSet) []*readWriteSet {
Expand Down
9 changes: 5 additions & 4 deletions off_chain_data/application-go/parser/namespaceReadWriteSet.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parser

import (
"fmt"
"offChainData/utils"

"github.com/hyperledger/fabric-protos-go-apiv2/ledger/rwset"
Expand All @@ -20,13 +21,13 @@ func (p *NamespaceReadWriteSet) Namespace() string {
return p.nsReadWriteSet.GetNamespace()
}

func (p *NamespaceReadWriteSet) ReadWriteSet() *kvrwset.KVRWSet {
return utils.Cache(func() *kvrwset.KVRWSet {
func (p *NamespaceReadWriteSet) ReadWriteSet() (*kvrwset.KVRWSet, error) {
return utils.Cache(func() (*kvrwset.KVRWSet, error) {
result := kvrwset.KVRWSet{}
if err := proto.Unmarshal(p.nsReadWriteSet.GetRwset(), &result); err != nil {
panic(err)
return nil, fmt.Errorf("in ReadWriteSet: %w", err)
}

return &result
return &result, nil
})()
}
Loading

0 comments on commit 0b8ba91

Please sign in to comment.