From 94fa4ba37f84fddb7c452554fd170fa263bea4ee Mon Sep 17 00:00:00 2001 From: Guanghua Guo <1536310027@qq.com> Date: Fri, 20 Oct 2017 10:04:04 +0800 Subject: [PATCH] Dev (#61) * Specifiy the branches of CI It's enough to test on master and dev branch. Furthermore, no need to clone dev branch particularly in script part * Fix a bug(#46) * Close(#41) * Replace the basic log statement with logrus * Finish the basic log replacement * Add block process to build accountutxos db (#53) * Add block process to build accountutxos db * Update 544bf9be2752abaa5f1ff58f392ba41fc4126a72 * Update init pin.Store * Rename some variables * Update gofmt * Modify io_test to not support bytom/log && Add logrus in glide.lock * Replace the basic log in p2p (#60) * Account bind key && Asset bind key (#59) * Add bindAccount && bindAsset function in bytomcli * Add signTransactions in bytomcli --- .travis.yml | 20 ++- blockchain/account/accounts.go | 132 +++++++++------- blockchain/account/builder.go | 16 +- blockchain/account/indexer.go | 204 +++++++++++------------- blockchain/hsm.go | 23 --- blockchain/pin/pin.go | 272 ++++++++++++++++++++++++++++++++ blockchain/query.go | 138 +++++++++++----- blockchain/reactor.go | 17 +- blockchain/transact.go | 16 +- cmd/bytomcli/main.go | 131 ++++++++++++--- cmd/bytomd/commands/init.go | 3 +- cmd/bytomd/commands/run_node.go | 3 +- cmd/bytomd/main.go | 36 ++++- common/bytes.go | 6 +- config/config.go | 1 + crypto/crypto.go | 10 -- glide.lock | 1 + mining/cpuminer/cpuminer.go | 14 +- mining/mining.go | 6 +- net/http/httpjson/io.go | 4 +- net/http/httpjson/io_test.go | 21 --- node/node.go | 66 ++++++-- p2p/addrbook.go | 48 +++--- p2p/connection.go | 65 +++++--- p2p/listener.go | 24 +-- p2p/peer.go | 3 +- p2p/peer_test.go | 10 +- p2p/pex_reactor.go | 28 ++-- p2p/switch.go | 61 +++++-- protocol/bc/legacy/block.go | 1 - protocol/block.go | 36 ----- protocol/protocol.go | 7 - version/version.go | 4 +- 33 files changed, 959 insertions(+), 468 deletions(-) create mode 100644 blockchain/pin/pin.go diff --git a/.travis.yml b/.travis.yml index 12060277c..e2ba27903 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,13 +1,19 @@ language: go dist: trusty sudo: false + go: -- 1.8.3 -- 1.9 -- tip + - 1.8.3 + - 1.9 + - tip + +branches: + only: + - master + - dev script: -- git clone -b dev https://github.com/Bytom/bytom.git $GOPATH/src/github.com/bytom -- cd $GOPATH/src/github.com/bytom -- make install -- make test + - git clone https://github.com/Bytom/bytom.git $GOPATH/src/github.com/bytom + - cd $GOPATH/src/github.com/bytom + - make install + - make test diff --git a/blockchain/account/accounts.go b/blockchain/account/accounts.go index 027df8298..e466b9089 100644 --- a/blockchain/account/accounts.go +++ b/blockchain/account/accounts.go @@ -2,25 +2,23 @@ package account import ( - "context" - // stdsql "database/sql" - "encoding/json" "fmt" "sync" "time" + "context" + "encoding/json" + "github.com/bytom/log" + "github.com/bytom/errors" + "github.com/bytom/protocol" + "github.com/bytom/blockchain/pin" "github.com/golang/groupcache/lru" - //"github.com/lib/pq" - - // "chain/core/pin" + "github.com/bytom/crypto/sha3pool" + "github.com/bytom/protocol/vm/vmutil" "github.com/bytom/blockchain/signers" "github.com/bytom/blockchain/txbuilder" "github.com/bytom/crypto/ed25519/chainkd" - // "chain/database/pg" - "github.com/bytom/errors" - "github.com/bytom/log" - "github.com/bytom/protocol" - "github.com/bytom/protocol/vm/vmutil" + dbm "github.com/tendermint/tmlibs/db" ) @@ -31,12 +29,12 @@ var ( ErrBadIdentifier = errors.New("either ID or alias must be specified, and not both") ) -func NewManager(db dbm.DB, chain *protocol.Chain /*, pinStore *pin.Store*/) *Manager { +func NewManager(db dbm.DB, chain *protocol.Chain , pinStore *pin.Store) *Manager { return &Manager{ db: db, chain: chain, - utxoDB: newReserver(db, chain /*, pinStore*/), - // pinStore: pinStore, + utxoDB: newReserver(db, chain), + pinStore: pinStore, cache: lru.New(maxAccountCache), aliasCache: lru.New(maxAccountCache), delayedACPs: make(map[*txbuilder.TemplateBuilder][]*controlProgram), @@ -49,7 +47,7 @@ type Manager struct { chain *protocol.Chain utxoDB *reserver indexer Saver - // pinStore *pin.Store + pinStore *pin.Store cacheMu sync.Mutex cache *lru.Cache @@ -102,7 +100,7 @@ func (m *Manager) Create(ctx context.Context, xpubs []chainkd.XPub, quorum int, return nil, errors.Wrap(err) } - account_id := []byte(accountSigner.ID) + account_id := json.RawMessage(accountSigner.ID) account := &Account{ Signer: accountSigner, Alias: alias, @@ -114,8 +112,8 @@ func (m *Manager) Create(ctx context.Context, xpubs []chainkd.XPub, quorum int, return nil, errors.Wrap(err, "failed marshal account") } if len(acc) > 0 { - m.db.Set(account_id, json.RawMessage(acc)) - m.db.Set([]byte(alias), account_id) + m.db.Set(account_id, acc) + m.db.Set(json.RawMessage("ali"+alias), account_id) } err = m.indexAnnotatedAccount(ctx, account) @@ -138,7 +136,7 @@ func (m *Manager) UpdateTags(ctx context.Context, id, alias *string, tags map[st if alias != nil { key_id = m.db.Get([]byte(*alias)) } else { - key_id = []byte(*id) + key_id = json.RawMessage(*id) } bytes := m.db.Get(key_id) @@ -173,7 +171,7 @@ func (m *Manager) UpdateTags(ctx context.Context, id, alias *string, tags map[st } else { - m.db.Set(key_id, json.RawMessage(acc)) + m.db.Set(key_id, acc) return nil } @@ -214,14 +212,23 @@ func (m *Manager) findByID(ctx context.Context, id string) (*signers.Signer, err if ok { return cached.(*signers.Signer), nil } - account, err := signers.Find(ctx, m.db, "account", id) + + bytes := m.db.Get(json.RawMessage(id)) + if bytes == nil { + return nil,errors.New("not find this account.") + } + + var account Account + err := json.Unmarshal(bytes, &account) if err != nil { - return nil, err + return nil,errors.New("failed unmarshal this account.") } + + m.cacheMu.Lock() - m.cache.Add(id, account) + m.cache.Add(id, account.Signer) m.cacheMu.Unlock() - return account, nil + return account.Signer, nil } type controlProgram struct { @@ -273,31 +280,40 @@ func (m *Manager) CreateControlProgram(ctx context.Context, accountID string, ch return cp.controlProgram, nil } +type ControlProgram struct { + AccountID string + KeyIndex uint64 + ControlProgram []byte + Change bool + ExpiresAt time.Time +} + func (m *Manager) insertAccountControlProgram(ctx context.Context, progs ...*controlProgram) error { - /*const q = ` - INSERT INTO account_control_programs (signer_id, key_index, control_program, change, expires_at) - SELECT unnest($1::text[]), unnest($2::bigint[]), unnest($3::bytea[]), unnest($4::boolean[]), - unnest($5::timestamp with time zone[]) - ` - var ( - accountIDs pq.StringArray - keyIndexes pq.Int64Array - controlProgs pq.ByteaArray - change pq.BoolArray - expirations []stdsql.NullString - ) + + var b32 [32]byte for _, p := range progs { - accountIDs = append(accountIDs, p.accountID) - keyIndexes = append(keyIndexes, int64(p.keyIndex)) - controlProgs = append(controlProgs, p.controlProgram) - change = append(change, p.change) - expirations = append(expirations, stdsql.NullString{ - String: p.expiresAt.Format(time.RFC3339), - Valid: !p.expiresAt.IsZero(), - }) - }*/ - - // _, err := m.dbm.ExecContext(ctx, q, accountIDs, keyIndexes, controlProgs, change, pq.Array(expirations)) + + acp, err := json.Marshal(&struct{ + AccountID string + KeyIndex uint64 + ControlProgram []byte + Change bool + ExpiresAt time.Time}{ + AccountID: p.accountID, + KeyIndex: p.keyIndex, + ControlProgram: p.controlProgram, + Change: p.change, + ExpiresAt: p.expiresAt}) + + if err != nil { + return errors.Wrap(err, "failed marshal controlProgram") + } + if len(acp) > 0 { + sha3pool.Sum256(b32[:], p.controlProgram) + m.db.Set(json.RawMessage("acp"+string(b32[:])), acp) + } + } + return errors.Wrap(nil) } @@ -306,15 +322,14 @@ func (m *Manager) nextIndex(ctx context.Context) (uint64, error) { defer m.acpMu.Unlock() if m.acpIndexNext >= m.acpIndexCap { - /*var cap uint64 - const incrby = 10000 // account_control_program_seq increments by 10,000 - const q = `SELECT nextval('account_control_program_seq')` - err := m.db.QueryRowContext(ctx, q).Scan(&cap) - if err != nil { - return 0, errors.Wrap(err, "scan") + + const incrby = 10000 // start 1,increments by 10,000 + if(m.acpIndexCap <= incrby){ + m.acpIndexCap = incrby + 1 + }else{ + m.acpIndexCap += incrby } - m.acpIndexCap = cap - m.acpIndexNext = cap - incrby*/ + m.acpIndexNext = m.acpIndexCap - incrby } n := m.acpIndexNext @@ -327,12 +342,11 @@ func (m *Manager) QueryAll(ctx context.Context) (interface{}, error) { iter := m.db.Iterator() for iter.Next() { - value := string(iter.Value()) - if value[:3] == "acc" { + key := string(iter.Key()) + if key[:3] != "acc" { continue } - ret = append(ret, value) - //log.Printf(ctx,"%s\t", value) + ret = append(ret, string(iter.Value())) } return ret, nil diff --git a/blockchain/account/builder.go b/blockchain/account/builder.go index b1ca0d85d..ec015ceac 100644 --- a/blockchain/account/builder.go +++ b/blockchain/account/builder.go @@ -126,10 +126,17 @@ func (a *spendUTXOAction) Build(ctx context.Context, b *txbuilder.TemplateBuilde } b.OnRollback(canceler(ctx, a.accounts, res.ID)) - acct, err := a.accounts.findByID(ctx, res.Source.AccountID) - if err != nil { - return err + var acct *signers.Signer + if res.Source.AccountID == ""{ + //TODO coinbase + acct = &signers.Signer{} + }else{ + acct, err = a.accounts.findByID(ctx, res.Source.AccountID) + if err != nil { + return err + } } + txInput, sigInst, err := utxoToInputs(ctx, acct, res.UTXOs[0], a.ReferenceData) if err != nil { return err @@ -231,7 +238,6 @@ func (m *Manager) insertControlProgramDelayed(ctx context.Context, b *txbuilder. if len(acps) == 0 { return nil } - // return m.insertAccountControlProgram(ctx, acps...) - return nil + return m.insertAccountControlProgram(ctx, acps...) }) } diff --git a/blockchain/account/indexer.go b/blockchain/account/indexer.go index 7a943e192..7631ba6e7 100644 --- a/blockchain/account/indexer.go +++ b/blockchain/account/indexer.go @@ -1,19 +1,18 @@ package account import ( + "time" "context" "encoding/json" - "fmt" - // "github.com/lib/pq" - - "github.com/bytom/blockchain/query" - "github.com/bytom/blockchain/signers" - //"github.com/blockchain/database/pg" - chainjson "github.com/bytom/encoding/json" "github.com/bytom/errors" "github.com/bytom/protocol/bc" + "github.com/bytom/crypto/sha3pool" + "github.com/bytom/blockchain/query" "github.com/bytom/protocol/bc/legacy" + "github.com/bytom/blockchain/signers" + + chainjson "github.com/bytom/encoding/json" ) const ( @@ -28,6 +27,20 @@ const ( DeleteSpentsPinName = "delete-account-spents" ) +type AccountUTXOs struct { + OutputID []byte + AssetID []byte + Amount int64 + AccountID string + CpIndex int64 + Program []byte + Confirmed int64 + SourceID []byte + SourcePos int64 + RefData []byte + Change bool +} + var emptyJSONObject = json.RawMessage(`{}`) // A Saver is responsible for saving an annotated account object. @@ -100,41 +113,28 @@ type accountOutput struct { } func (m *Manager) ProcessBlocks(ctx context.Context) { - /*if m.pinStore == nil { + if m.pinStore == nil { return } - go m.pinStore.ProcessBlocks(ctx, m.chain, ExpirePinName, func(ctx context.Context, b *legacy.Block) error { - <-m.pinStore.PinWaiter(PinName, b.Height) - return m.expireControlPrograms(ctx, b) - }) + go m.pinStore.ProcessBlocks(ctx, m.chain, DeleteSpentsPinName, func(ctx context.Context, b *legacy.Block) error { <-m.pinStore.PinWaiter(PinName, b.Height) - <-m.pinStore.PinWaiter(query.TxPinName, b.Height) return m.deleteSpentOutputs(ctx, b) }) m.pinStore.ProcessBlocks(ctx, m.chain, PinName, m.indexAccountUTXOs) - */ -} -/* -func (m *Manager) expireControlPrograms(ctx context.Context, b *legacy.Block) error { - // Delete expired account control programs. - const deleteQ = `DELETE FROM account_control_programs WHERE expires_at IS NOT NULL AND expires_at < $1` - _, err := m.db.ExecContext(ctx, deleteQ, b.Time()) - return err } func (m *Manager) deleteSpentOutputs(ctx context.Context, b *legacy.Block) error { // Delete consumed account UTXOs. delOutputIDs := prevoutDBKeys(b.Transactions...) - const delQ = ` - DELETE FROM account_utxos - WHERE output_id IN (SELECT unnest($1::bytea[])) - ` - _, err := m.db.ExecContext(ctx, delQ, delOutputIDs) - return errors.Wrap(err, "deleting spent account utxos") + for _,delOutputID := range delOutputIDs{ + m.pinStore.DB.Delete(json.RawMessage("acu"+string(delOutputID.Bytes()))) + } + + return errors.Wrap(nil, "deleting spent account utxos") } -*/ + func (m *Manager) indexAccountUTXOs(ctx context.Context, b *legacy.Block) error { // Upsert any UTXOs belonging to accounts managed by this Core. @@ -161,123 +161,105 @@ func (m *Manager) indexAccountUTXOs(ctx context.Context, b *legacy.Block) error outs = append(outs, out) } } - accOuts, err := m.loadAccountInfo(ctx, outs) - if err != nil { - return errors.Wrap(err, "loading account info from control programs") - } - fmt.Printf("accOuts:%v", accOuts) + accOuts := m.loadAccountInfo(ctx, outs) - //err = m.upsertConfirmedAccountOutputs(ctx, accOuts, blockPositions, b) + err := m.upsertConfirmedAccountOutputs(ctx, accOuts, blockPositions, b) return errors.Wrap(err, "upserting confirmed account utxos") } -/* -func prevoutDBKeys(txs ...*legacy.Tx) (outputIDs pq.ByteaArray) { + +func prevoutDBKeys(txs ...*legacy.Tx) (outputIDs []bc.Hash) { for _, tx := range txs { for _, inpID := range tx.Tx.InputIDs { if sp, err := tx.Spend(inpID); err == nil { - outputIDs = append(outputIDs, sp.SpentOutputId.Bytes()) + outputIDs = append(outputIDs, *sp.SpentOutputId) } } } return } -*/ + // loadAccountInfo turns a set of output IDs into a set of // outputs by adding account annotations. Outputs that can't be // annotated are excluded from the result. -func (m *Manager) loadAccountInfo(ctx context.Context, outs []*rawOutput) ([]*accountOutput, error) { +func (m *Manager) loadAccountInfo(ctx context.Context, outs []*rawOutput) ([]*accountOutput) { outsByScript := make(map[string][]*rawOutput, len(outs)) for _, out := range outs { scriptStr := string(out.ControlProgram) outsByScript[scriptStr] = append(outsByScript[scriptStr], out) } - /*var scripts pq.ByteaArray + result := make([]*accountOutput, 0, len(outs)) + cp := struct { + AccountID string + KeyIndex uint64 + ControlProgram []byte + Change bool + ExpiresAt time.Time + }{} + + var b32 [32]byte for s := range outsByScript { - scripts = append(scripts, []byte(s)) - } - */ + sha3pool.Sum256(b32[:], []byte(s)) + bytes := m.db.Get(json.RawMessage("acp"+string(b32[:]))) + if bytes == nil { + continue + } - result := make([]*accountOutput, 0, len(outs)) + err := json.Unmarshal(bytes, &cp) + if err != nil { + continue + } - /* - const q = ` - SELECT signer_id, key_index, control_program, change - FROM account_control_programs - WHERE control_program IN (SELECT unnest($1::bytea[])) - ` - err := pg.ForQueryRows(ctx, m.db, q, scripts, func(accountID string, keyIndex uint64, program []byte, change bool) { - for _, out := range outsByScript[string(program)] { - newOut := &accountOutput{ - rawOutput: *out, - AccountID: accountID, - keyIndex: keyIndex, - change: change, - } - result = append(result, newOut) + for _, out := range outsByScript[s] { + newOut := &accountOutput{ + rawOutput: *out, + AccountID: cp.AccountID, + keyIndex: cp.KeyIndex, + change: cp.Change, } - }) - if err != nil { - return nil, err + result = append(result, newOut) } - */ + } - return result, nil + return result } -/* + // upsertConfirmedAccountOutputs records the account data for confirmed utxos. // If the account utxo already exists (because it's from a local tx), the // block confirmation data will in the row will be updated. -func (m *Manager) upsertConfirmedAccountOutputs(ctx context.Context, outs []*accountOutput, pos map[bc.Hash]uint32, block *legacy.Block) error { - var ( - outputID pq.ByteaArray - assetID pq.ByteaArray - amount pq.Int64Array - accountID pq.StringArray - cpIndex pq.Int64Array - program pq.ByteaArray - sourceID pq.ByteaArray - sourcePos pq.Int64Array - refData pq.ByteaArray - change pq.BoolArray - ) +func (m *Manager) upsertConfirmedAccountOutputs(ctx context.Context, + outs []*accountOutput, + pos map[bc.Hash]uint32, + block *legacy.Block) error { + + var au *AccountUTXOs for _, out := range outs { - outputID = append(outputID, out.OutputID.Bytes()) - assetID = append(assetID, out.AssetId.Bytes()) - amount = append(amount, int64(out.Amount)) - accountID = append(accountID, out.AccountID) - cpIndex = append(cpIndex, int64(out.keyIndex)) - program = append(program, out.ControlProgram) - sourceID = append(sourceID, out.sourceID.Bytes()) - sourcePos = append(sourcePos, int64(out.sourcePos)) - refData = append(refData, out.refData.Bytes()) - change = append(change, out.change) + au = &AccountUTXOs{OutputID: out.OutputID.Bytes(), + AssetID: out.AssetId.Bytes(), + Amount: int64(out.Amount), + AccountID: out.AccountID, + CpIndex: int64(out.keyIndex), + Program: out.ControlProgram, + Confirmed: int64(block.Height), + SourceID: out.sourceID.Bytes(), + SourcePos: int64(out.sourcePos), + RefData: out.refData.Bytes(), + Change: out.change} + + accountutxo, err := json.Marshal(au) + if err != nil { + return errors.Wrap(err, "failed marshal accountutxo") + } + + if len(accountutxo) > 0 { + m.pinStore.DB.Set(json.RawMessage("acu"+string(au.OutputID)), accountutxo) + } + } - const q = ` - INSERT INTO account_utxos (output_id, asset_id, amount, account_id, control_program_index, - control_program, confirmed_in, source_id, source_pos, ref_data_hash, change) - SELECT unnest($1::bytea[]), unnest($2::bytea[]), unnest($3::bigint[]), - unnest($4::text[]), unnest($5::bigint[]), unnest($6::bytea[]), $7, - unnest($8::bytea[]), unnest($9::bigint[]), unnest($10::bytea[]), unnest($11::boolean[]) - ON CONFLICT (output_id) DO NOTHING - ` - _, err := m.db.ExecContext(ctx, q, - outputID, - assetID, - amount, - accountID, - cpIndex, - program, - block.Height, - sourceID, - sourcePos, - refData, - change, - ) - return errors.Wrap(err) + return nil } -*/ + diff --git a/blockchain/hsm.go b/blockchain/hsm.go index cdf704334..0a8ff393e 100644 --- a/blockchain/hsm.go +++ b/blockchain/hsm.go @@ -15,29 +15,6 @@ func init() { errorFormatter.Errors[pseudohsm.ErrTooManyAliasesToList] = httperror.Info{400, "BTM802", "Too many aliases to list"} } -/* -// PseudoHSM configures the Core to expose the PseudoHSM endpoints. It -// is only included in non-production builds. -func PseudoHSM(hsm *Pseudohsm.HSM) RunOption { - return func(api *API) { - - h := &pseudoHSMHandler{PseudoHSM: hsm} - needConfig := api.needConfig() - api.mux.Handle("/hsm/create-key", needConfig(h.pseudohsmCreateKey)) - api.mux.Handle("/hsm/list-keys", needConfig(h.pseudohsmListKeys)) - api.mux.Handle("/hsm/delete-key", needConfig(h.pseudohsmDeleteKey)) - api.mux.Handle("/hsm/sign-transaction", needConfig(h.pseudohsmSignTemplates)) - api.mux.Handle("/hsm/reset-password", needConfig(h.pseudohsmResetPassword)) - api.mux.Handle("/hsm/update-alias", needConfig(h.pseudohsmUpdateAlias)) - } -} - - -type pseudoHSMHandler struct { - PseudoHSM *Pseudohsm.HSM -} -*/ - func (a *BlockchainReactor) pseudohsmCreateKey(ctx context.Context, in struct{ Alias, Password string }) (result *pseudohsm.XPub, err error) { return a.hsm.XCreate(in.Password, in.Alias) } diff --git a/blockchain/pin/pin.go b/blockchain/pin/pin.go new file mode 100644 index 000000000..d8b70bf0c --- /dev/null +++ b/blockchain/pin/pin.go @@ -0,0 +1,272 @@ +package pin + +import ( + "context" + "encoding/json" + "sort" + "sync" + + "github.com/bytom/errors" + "github.com/bytom/log" + "github.com/bytom/protocol" + "github.com/bytom/protocol/bc/legacy" + + dbm "github.com/tendermint/tmlibs/db" +) + +const processorWorkers = 10 + +type Store struct { + DB dbm.DB + + mu sync.Mutex + cond sync.Cond + pins map[string]*pin +} + +func NewStore(db dbm.DB) *Store { + s := &Store{ + DB: db, + pins: make(map[string]*pin), + } + s.cond.L = &s.mu + return s +} + +func (s *Store) ProcessBlocks(ctx context.Context, c *protocol.Chain, pinName string, cb func(context.Context, *legacy.Block) error) { + p := <-s.pin(pinName) + height := p.getHeight() + for { + select { + case <-ctx.Done(): + log.Error(ctx, ctx.Err()) + return + case <-c.BlockWaiter(height + 1): + select { + case <-ctx.Done(): + log.Error(ctx, ctx.Err()) + return + case p.sem <- true: + go p.processBlock(ctx, c, height+1, cb) + height++ + } + } + } +} + +func (s *Store) CreatePin(ctx context.Context, name string, height uint64) error { + s.mu.Lock() + defer s.mu.Unlock() + if _, ok := s.pins[name]; ok { + return nil + } + + block_processor, err := json.Marshal(&struct { + Name string + Height uint64 + }{Name: name, + Height: height}) + if err != nil { + return errors.Wrap(err, "failed marshal block_processor") + } + if len(block_processor) > 0 { + s.DB.Set(json.RawMessage("blp"+name), block_processor) + } + + s.pins[name] = newPin(s.DB, name, height) + s.cond.Broadcast() + return nil +} + +func (s *Store) Height(name string) uint64 { + p := <-s.pin(name) + return p.getHeight() +} + +func (s *Store) LoadAll(ctx context.Context) error { + s.mu.Lock() + defer s.mu.Unlock() + + var block_processor = struct { + Name string + Height uint64 + }{} + iter := s.DB.Iterator() + for iter.Next() { + key := string(iter.Key()) + if key[:3] != "blp" { + continue + } + err := json.Unmarshal(iter.Value(), &block_processor) + if err != nil { + return errors.New("failed unmarshal this block_processor.") + } + + s.pins[block_processor.Name] = newPin(s.DB, + block_processor.Name[3:], + block_processor.Height) + + } + + s.cond.Broadcast() + return nil +} + +func (s *Store) pin(name string) <-chan *pin { + ch := make(chan *pin, 1) + go func() { + s.mu.Lock() + defer s.mu.Unlock() + for s.pins[name] == nil { + s.cond.Wait() + } + ch <- s.pins[name] + }() + return ch +} + +func (s *Store) PinWaiter(pinName string, height uint64) <-chan struct{} { + ch := make(chan struct{}, 1) + p := <-s.pin(pinName) + go func() { + p.mu.Lock() + defer p.mu.Unlock() + for p.height < height { + p.cond.Wait() + } + ch <- struct{}{} + }() + return ch +} + +func (s *Store) AllWaiter(height uint64) <-chan struct{} { + ch := make(chan struct{}, 1) + go func() { + var pins []string + s.mu.Lock() + for name := range s.pins { + pins = append(pins, name) + } + s.mu.Unlock() + for _, name := range pins { + <-s.PinWaiter(name, height) + } + ch <- struct{}{} + }() + return ch +} + +type pin struct { + mu sync.Mutex + cond sync.Cond + height uint64 + completed []uint64 + + db dbm.DB + name string + sem chan bool +} + +func newPin(db dbm.DB, name string, height uint64) *pin { + p := &pin{db: db, name: name, height: height, sem: make(chan bool, processorWorkers)} + p.cond.L = &p.mu + return p +} + +func (p *pin) getHeight() uint64 { + p.mu.Lock() + defer p.mu.Unlock() + return p.height +} + +func (p *pin) processBlock(ctx context.Context, c *protocol.Chain, height uint64, cb func(context.Context, *legacy.Block) error) { + defer func() { <-p.sem }() + for { + block, err := c.GetBlock(height) + if err != nil { + log.Error(ctx, err) + continue + } + + err = cb(ctx, block) + if err != nil { + log.Error(ctx, errors.Wrapf(err, "pin %q callback", p.name)) + continue + } + + err = p.complete(ctx, block.Height) + if err != nil { + log.Error(ctx, err) + } + break + } +} + +func (p *pin) complete(ctx context.Context, height uint64) error { + p.mu.Lock() + defer p.mu.Unlock() + + p.completed = append(p.completed, height) + sort.Sort(uint64s(p.completed)) + + var ( + max = p.height + i int + ) + for i = 0; i < len(p.completed); i++ { + if p.completed[i] <= max { + continue + } else if p.completed[i] > max+1 { + break + } + max = p.completed[i] + } + + if max == p.height { + return nil + } + + var ( + block_processor = struct { + Name string + Height uint64 + }{} + err error + ) + + bytes := p.db.Get(json.RawMessage("blp" + p.name)) + if bytes != nil { + err = json.Unmarshal(bytes, &block_processor) + if err == nil && block_processor.Height >= max { + goto Noupdate + } + } + + block_processor = struct { + Name string + Height uint64 + }{ + Name: p.name, + Height: max} + + bytes, err = json.Marshal(&block_processor) + if err != nil { + goto Noupdate + } + if len(bytes) > 0 { + p.db.Set(json.RawMessage("blp"+p.name), bytes) + } + +Noupdate: + p.completed = p.completed[i:] + p.height = max + p.cond.Broadcast() + + return nil +} + +type uint64s []uint64 + +func (a uint64s) Len() int { return len(a) } +func (a uint64s) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a uint64s) Less(i, j int) bool { return a[i] < a[j] } diff --git a/blockchain/query.go b/blockchain/query.go index 7935150fe..e0b948220 100644 --- a/blockchain/query.go +++ b/blockchain/query.go @@ -2,19 +2,31 @@ package blockchain import ( "context" + "encoding/json" + "fmt" "math" + "sort" + "github.com/bytom/blockchain/account" "github.com/bytom/blockchain/query" - //"github.com/bytom/blockchain/query/filter" "github.com/bytom/errors" "github.com/bytom/net/http/httpjson" - //"github.com/bytom/log" ) const ( defGenericPageSize = 100 ) +var ( + AccountUTXOFmt = ` + { + "OutputID":"%x","AssetID":"%x","Amount":"%d", + "AccountID":"%s","CpIndex":"%d","Program":"%x", + "Confirmed":"%d","SourceID":"%x","SourcePos":"%d", + "RefData":"%x","Change":"%t" + }` +) + // // POST /list-accounts func (bcr *BlockchainReactor) listAccounts(ctx context.Context, in requestQuery) interface{} { @@ -34,16 +46,81 @@ func (bcr *BlockchainReactor) listAssets(ctx context.Context, in requestQuery) i return response } +func (bcr *BlockchainReactor) GetAccountUTXOs() []account.AccountUTXOs { + + var ( + au = account.AccountUTXOs{} + accutoxs = []account.AccountUTXOs{} + ) + + iter := bcr.pinStore.DB.Iterator() + for iter.Next() { + key := string(iter.Key()) + if key[:3] != "acu" { + continue + } + + err := json.Unmarshal(iter.Value(), &au) + if err != nil { + continue + } + + accutoxs = append(accutoxs, au) + } + + return accutoxs +} + // POST /list-balances func (bcr *BlockchainReactor) listBalances(ctx context.Context, in requestQuery) interface{} { - response := bcr.chain.GetAssetsAmount() - if len(response) == 0 { - return nil - } else { - return response + type assetAmount struct { + AssetID string + Amount int64 + } + var ( + aa = assetAmount{} + accBalances = make(map[string][]assetAmount, 0) + accBalancesSort = make(map[string][]assetAmount, 0) + keys = make([]string, 0) + response = make([]interface{}, 0) + ) + + accoutUTXOs := bcr.GetAccountUTXOs() + + for _, res := range accoutUTXOs { + + aa.AssetID = fmt.Sprintf("%x", res.AssetID) + aa.Amount = res.Amount + if _, ok := accBalances[res.AccountID]; ok { + for _, amentry := range accBalances[res.AccountID] { + if amentry.AssetID == aa.AssetID { + amentry.Amount += aa.Amount + } else { + accBalances[res.AccountID] = append(accBalances[res.AccountID], aa) + } + } + } else { + accBalances[res.AccountID] = append(accBalances[res.AccountID], aa) + } + + } + + for k := range accBalances { + keys = append(keys, k) + } + + sort.Strings(keys) + + for _, k := range keys { + accBalancesSort[k] = accBalances[k] } + if len(accBalancesSort) != 0 { + response = append(response, accBalancesSort) + } + + return response } // listTransactions is an http handler for listing transactions matching @@ -125,36 +202,25 @@ func (bcr *BlockchainReactor) listTxFeeds(ctx context.Context, in requestQuery) } // POST /list-unspent-outputs -func (bcr *BlockchainReactor) listUnspentOutputs(ctx context.Context, in requestQuery) (result page, err error) { - limit := in.PageSize - if limit == 0 { - limit = defGenericPageSize - } +func (bcr *BlockchainReactor) listUnspentOutputs(ctx context.Context, in requestQuery) interface{} { - // var after *query.OutputsAfter - if in.After != "" { - _, err = query.DecodeOutputsAfter(in.After) - if err != nil { - return result, errors.Wrap(err, "decoding `after`") - } - } + var ( + response = make([]string, 0) + restring = "" + ) + + accoutUTXOs := bcr.GetAccountUTXOs() + + for _, res := range accoutUTXOs { - timestampMS := in.TimestampMS - if timestampMS == 0 { - timestampMS = math.MaxInt64 - } else if timestampMS > math.MaxInt64 { - return result, errors.WithDetail(httpjson.ErrBadRequest, "timestamp is too large") + restring = fmt.Sprintf(AccountUTXOFmt, + res.OutputID, res.AssetID, res.Amount, + res.AccountID, res.CpIndex, res.Program, + res.Confirmed, res.SourceID, res.SourcePos, + res.RefData, res.Change) + + response = append(response, restring) } - /* outputs, nextAfter, err := bcr.indexer.Outputs(ctx, in.Filter, in.FilterParams, timestampMS, after, limit) - if err != nil { - return result, errors.Wrap(err, "querying outputs") - } - */ - outQuery := in - // outQuery.After = nextAfter.String() - return page{ - // Items: httpjson.Array(outputs), - // LastPage: len(outputs) < limit, - Next: outQuery, - }, nil + + return response } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 177706660..5e71dc40b 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -17,14 +17,15 @@ import ( "github.com/bytom/encoding/json" "github.com/bytom/log" "github.com/bytom/mining/cpuminer" + "github.com/bytom/net/http/httpjson" "github.com/bytom/p2p" "github.com/bytom/protocol" "github.com/bytom/protocol/bc/legacy" "github.com/bytom/types" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/bytom/net/http/httpjson" + "github.com/bytom/blockchain/pin" "github.com/bytom/errors" ) @@ -53,6 +54,7 @@ type BlockchainReactor struct { chain *protocol.Chain store *txdb.Store + pinStore *pin.Store accounts *account.Manager assets *asset.Registry accesstoken *accesstoken.Token @@ -126,7 +128,6 @@ func (bcr *BlockchainReactor) createblockkey(ctx context.Context) { log.Printf(ctx, "creat-block-key") } - func maxBytes(h http.Handler) http.Handler { const maxReqSize = 1e7 // 10MB return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -141,7 +142,7 @@ func maxBytes(h http.Handler) http.Handler { func (bcr *BlockchainReactor) BuildHander() { m := bcr.mux - if bcr.accounts != nil{ + if bcr.accounts != nil { m.Handle("/create-account", jsonHandler(bcr.createAccount)) m.Handle("/update-account-tags", jsonHandler(bcr.updateAccountTags)) m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver)) @@ -233,7 +234,14 @@ type page struct { LastPage bool `json:"last_page"` } -func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, hsm *pseudohsm.HSM, fastSync bool) *BlockchainReactor { +func NewBlockchainReactor(store *txdb.Store, + chain *protocol.Chain, + txPool *protocol.TxPool, + accounts *account.Manager, + assets *asset.Registry, + hsm *pseudohsm.HSM, + fastSync bool, + pinStore *pin.Store) *BlockchainReactor { requestsCh := make(chan BlockRequest, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity) pool := NewBlockPool( @@ -245,6 +253,7 @@ func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, txPool *prot bcR := &BlockchainReactor{ chain: chain, store: store, + pinStore: pinStore, accounts: accounts, assets: assets, pool: pool, diff --git a/blockchain/transact.go b/blockchain/transact.go index 4356b4315..d12fe1644 100644 --- a/blockchain/transact.go +++ b/blockchain/transact.go @@ -224,21 +224,21 @@ func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbu return nil } - _, err = a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight) + height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight) if err != nil { return err } if waitUntil == "confirmed" { return nil } - /* - select { - case <-ctx.Done(): - return ctx.Err() - case <-a.pinStore.AllWaiter(height): - } - */ + + select { + case <-ctx.Done(): + return ctx.Err() + case <-a.pinStore.AllWaiter(height): + } + return nil } diff --git a/cmd/bytomcli/main.go b/cmd/bytomcli/main.go index 3e801b7ff..9957011e6 100644 --- a/cmd/bytomcli/main.go +++ b/cmd/bytomcli/main.go @@ -60,9 +60,11 @@ var commands = map[string]*command{ "grant": {grant}, "revoke": {revoke}, "wait": {wait}, - "create-account": {createAccount}, + "create-account": {createAccount}, + "bind-account": {bindAccount}, "update-account-tags": {updateAccountTags}, "create-asset": {createAsset}, + "bind-asset": {bindAsset}, "update-asset-tags": {updateAssetTags}, "build-transaction": {buildTransaction}, "create-control-program": {createControlProgram}, @@ -336,7 +338,36 @@ func createAccount(client *rpc.Client, args []string) { ins.ClientToken = args[0] account := make([]query.AnnotatedAccount, 1) client.Call(context.Background(), "/create-account", &[]Ins{ins}, &account) - //dieOnRPCError(err) + fmt.Printf("responses:%v\n", account[0]) + fmt.Printf("account id:%v\n", account[0].ID) +} + +func bindAccount(client *rpc.Client, args []string) { + if len(args) != 2 { + fatalln("error: bindAccount need args [account alias name] [account pub key]") + } + var xpub chainkd.XPub + err := xpub.UnmarshalText([]byte(args[1])) + if err == nil { + fmt.Printf("xpub:%v\n", xpub) + } else { + fmt.Printf("xpub unmarshal error:%v\n", xpub) + } + type Ins struct { + RootXPubs []chainkd.XPub `json:"root_xpubs"` + Quorum int + Alias string + Tags map[string]interface{} + ClientToken string `json:"client_token"` + } + var ins Ins + ins.RootXPubs = []chainkd.XPub{xpub} + ins.Quorum = 1 + ins.Alias = args[0] + ins.Tags = map[string]interface{}{"test_tag": "v0"} + ins.ClientToken = args[0] + account := make([]query.AnnotatedAccount, 1) + client.Call(context.Background(), "/create-account", &[]Ins{ins}, &account) fmt.Printf("responses:%v\n", account[0]) fmt.Printf("account id:%v\n", account[0].ID) } @@ -371,11 +402,44 @@ func createAsset(client *rpc.Client, args []string) { ins.ClientToken = args[0] assets := make([]query.AnnotatedAsset, 1) client.Call(context.Background(), "/create-asset", &[]Ins{ins}, &assets) + fmt.Printf("responses:%v\n", assets) + fmt.Printf("asset id:%v\n", assets[0].ID.String()) +} + +func bindAsset(client *rpc.Client, args []string) { + if len(args) != 2 { + fatalln("error: bindAsset need args [asset name] [asset xpub]") + } + var xpub chainkd.XPub + err := xpub.UnmarshalText([]byte(args[1])) + if err == nil { + fmt.Printf("xpub:%v\n", xpub) + } else { + fmt.Printf("xpub unmarshal error:%v\n", xpub) + } + type Ins struct { + RootXPubs []chainkd.XPub `json:"root_xpubs"` + Quorum int + Alias string + Tags map[string]interface{} + Definition map[string]interface{} + ClientToken string `json:"client_token"` + } + var ins Ins + ins.RootXPubs = []chainkd.XPub{xpub} + ins.Quorum = 1 + ins.Alias = args[0] + ins.Tags = map[string]interface{}{"test_tag": "v0"} + ins.Definition = map[string]interface{}{} + ins.ClientToken = args[0] + assets := make([]query.AnnotatedAsset, 1) + client.Call(context.Background(), "/create-asset", &[]Ins{ins}, &assets) //dieOnRPCError(err) fmt.Printf("responses:%v\n", assets) fmt.Printf("asset id:%v\n", assets[0].ID.String()) } + func updateAccountTags(client *rpc.Client, args []string) { if len(args) != 2 { fatalln("update-account-tags [|] [tags_key:]") @@ -430,8 +494,8 @@ func updateAssetTags(client *rpc.Client, args []string) { } func buildTransaction(client *rpc.Client, args []string) { - if len(args) != 2 { - fatalln("error: need args: [account id] [asset id]") + if len(args) != 3 { + fatalln("error: need args: [account id] [asset id] [file name]") } // Build Transaction. fmt.Printf("To build transaction:\n") @@ -456,7 +520,11 @@ func buildTransaction(client *rpc.Client, args []string) { tpl := make([]txbuilder.Template, 1) client.Call(context.Background(), "/build-transaction", []*blockchain.BuildRequest{&buildReq}, &tpl) - fmt.Printf("tpl:%v\n", tpl[0]) + marshalTpl, _ := stdjson.Marshal(tpl[0]) + fmt.Printf("tpl:%v\n", string(marshalTpl)) + file, _ := os.Create(args[2]) + defer file.Close() + file.Write(marshalTpl) } func submitCreateIssueTransaction(client *rpc.Client, args []string) { @@ -762,10 +830,14 @@ func listUnspentOutputs(client *rpc.Client, args []string) { Aliases []string `json:"aliases,omitempty"` } var in requestQuery - after := in.After - out := in - out.After = after - client.Call(context.Background(), "/list-unspent-outputs", &[]requestQuery{in}, nil) + responses := make([]interface{}, 0) + + client.Call(context.Background(), "/list-unspent-outputs", in, &responses) + if len(responses) > 0 { + for i, item := range responses { + fmt.Println(i, "-----", item) + } + } } func createAccessToken(client *rpc.Client, args []string) { @@ -889,23 +961,36 @@ func listKeys(client *rpc.Client, args []string) { } func signTransactions(client *rpc.Client, args []string) { - if len(args) != 2 { - fatalln("error: signTransaction need args: [tpl] [xPub]") + if len(args) != 3 { + fatalln("error: signTransaction need args: [tpl file name] [xPub] [password], 3 args not equal ", len(args)) } - var tpl txbuilder.Template - var xprv_asset chainkd.XPrv - xprv_asset.UnmarshalText([]byte(args[1])) // sign-transaction - err := txbuilder.Sign(context.Background(), &tpl, []chainkd.XPub{xprv_asset.XPub()}, "", func(_ context.Context, _ chainkd.XPub, path [][]byte, data [32]byte, _ string) ([]byte, error) { - derived := xprv_asset.Derive(path) - return derived.Sign(data[:]), nil - }) - if err != nil { - fmt.Printf("sign-transaction error. err:%v\n", err) + type param struct { + Auth string + Txs []*txbuilder.Template `json:"transactions"` + XPubs []chainkd.XPub `json:"xpubs"` + } + var in param + in.Auth = args[2] + var xpub chainkd.XPub + err := xpub.UnmarshalText([]byte(args[1])) + if err == nil { + fmt.Printf("xpub:%v\n", xpub) + } else { + fmt.Printf("xpub unmarshal error:%v\n", xpub) } - fmt.Printf("sign tpl:%v\n", tpl) - fmt.Printf("sign tpl's SigningInstructions:%v\n", tpl.SigningInstructions[0]) - fmt.Printf("SigningInstructions's SignatureWitnesses:%v\n", tpl.SigningInstructions[0].SignatureWitnesses[0]) + in.XPubs = []chainkd.XPub{xpub} + var tpl txbuilder.Template + file, _ := os.Open(args[0]) + tpl_byte := make([]byte, 10000) + file.Read(tpl_byte) + fmt.Printf("tpl:%v\n", string(tpl_byte)) + stdjson.Unmarshal(tpl_byte, &tpl) + in.Txs = []*txbuilder.Template{&tpl} + + var response map[string][]interface{} + client.Call(context.Background(), "/sign-transactions", &in, &response) + fmt.Printf("sign response:%v\n", response) } func resetPassword(client *rpc.Client, args []string) { diff --git a/cmd/bytomd/commands/init.go b/cmd/bytomd/commands/init.go index bddde4f89..97e605a79 100644 --- a/cmd/bytomd/commands/init.go +++ b/cmd/bytomd/commands/init.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/bytom/types" + log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" ) @@ -30,5 +31,5 @@ func initFiles(cmd *cobra.Command, args []string) { genDoc.SaveAs(genFile) } - logger.Info("Initialized bytom", "genesis", config.GenesisFile()) + log.WithField("genesis", config.GenesisFile()).Info("Initialized bytom") } diff --git a/cmd/bytomd/commands/run_node.go b/cmd/bytomd/commands/run_node.go index 5d4167721..5a2ce7796 100644 --- a/cmd/bytomd/commands/run_node.go +++ b/cmd/bytomd/commands/run_node.go @@ -8,6 +8,7 @@ import ( "github.com/bytom/node" "github.com/bytom/types" + log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" ) @@ -54,7 +55,7 @@ func runNode(cmd *cobra.Command, args []string) error { if _, err := n.Start(); err != nil { return fmt.Errorf("Failed to start node: %v", err) } else { - logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo()) + log.WithField("nodeInfo", n.Switch().NodeInfo()).Info("Started node") } // Trap signal, run forever. diff --git a/cmd/bytomd/main.go b/cmd/bytomd/main.go index 0f4ccff61..59c74975b 100644 --- a/cmd/bytomd/main.go +++ b/cmd/bytomd/main.go @@ -1,13 +1,45 @@ package main import ( - "os" - "github.com/bytom/cmd/bytomd/commands" + "github.com/sirupsen/logrus" "github.com/tendermint/tmlibs/cli" + "os" + "path" + "runtime" + "strings" ) +type ContextHook struct{} + +func (hook ContextHook) Levels() []logrus.Level { + return logrus.AllLevels +} + +func (hook ContextHook) Fire(entry *logrus.Entry) error { + pc := make([]uintptr, 3, 3) + cnt := runtime.Callers(6, pc) + + for i := 0; i < cnt; i++ { + fu := runtime.FuncForPC(pc[i] - 1) + name := fu.Name() + if !strings.Contains(name, "github.com/Sirupsen/logrus") { + file, line := fu.FileLine(pc[i] - 1) + entry.Data["file"] = path.Base(file) + entry.Data["func"] = path.Base(name) + entry.Data["line"] = line + break + } + } + return nil +} + +func init() { + logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true}) +} + func main() { + logrus.AddHook(ContextHook{}) cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv("./.bytomd")) cmd.Execute() } diff --git a/common/bytes.go b/common/bytes.go index 4fb016a97..f71fdd867 100644 --- a/common/bytes.go +++ b/common/bytes.go @@ -21,7 +21,7 @@ import ( "bytes" "encoding/binary" "encoding/hex" - "fmt" + log "github.com/sirupsen/logrus" "math/big" "strings" ) @@ -55,7 +55,7 @@ func NumberToBytes(num interface{}, bits int) []byte { buf := new(bytes.Buffer) err := binary.Write(buf, binary.BigEndian, num) if err != nil { - fmt.Println("NumberToBytes failed:", err) + log.WithField("err", err).Error("NumberToBytes failed") } return buf.Bytes()[buf.Len()-(bits/8):] @@ -74,7 +74,7 @@ func BytesToNumber(b []byte) uint64 { buf := bytes.NewReader(data) err := binary.Read(buf, binary.BigEndian, &number) if err != nil { - fmt.Println("BytesToNumber failed:", err) + log.WithField("err", err).Error("BytesToNumber failed") } return number diff --git a/config/config.go b/config/config.go index 3f9cbd384..cfea436dd 100644 --- a/config/config.go +++ b/config/config.go @@ -191,6 +191,7 @@ func DefaultP2PConfig() *P2PConfig { ListenAddress: "tcp://0.0.0.0:46656", AddrBook: "addrbook.json", AddrBookStrict: true, + SkipUPNP: false, MaxNumPeers: 50, } } diff --git a/crypto/crypto.go b/crypto/crypto.go index 46c46007a..24fb10db1 100644 --- a/crypto/crypto.go +++ b/crypto/crypto.go @@ -17,15 +17,6 @@ package crypto import ( - "fmt" - //"io" - //"io/ioutil" - //"math/big" - //"os" - - //"encoding/hex" - //"errors" - "github.com/bytom/common" "golang.org/x/crypto/ripemd160" "golang.org/x/crypto/sha3" @@ -60,7 +51,6 @@ func Ripemd160(data []byte) []byte { func PubkeyToAddress(pubBytes []byte) common.Address { address, _ := common.AddressEncode("bm", 1, toInt(Ripemd160(Sha3(pubBytes)))) - fmt.Printf(address) return common.StringToAddress(address) } diff --git a/glide.lock b/glide.lock index c311e7292..6e1a3a4d5 100644 --- a/glide.lock +++ b/glide.lock @@ -151,6 +151,7 @@ imports: - name: github.com/kr/secureheader - name: github.com/pborman/uuid - name: github.com/rjeczalik/notify +- name: github.com/sirupsen/logrus - name: golang.org/x/sys version: e62c3de784db939836898e5c19ffd41bece347da subpackages: diff --git a/mining/cpuminer/cpuminer.go b/mining/cpuminer/cpuminer.go index 5456fd9d9..8dfbefb81 100644 --- a/mining/cpuminer/cpuminer.go +++ b/mining/cpuminer/cpuminer.go @@ -5,7 +5,7 @@ package cpuminer import ( - "fmt" + log "github.com/sirupsen/logrus" "sync" "time" @@ -89,15 +89,21 @@ out: payToAddr := []byte{} block, err := mining.NewBlockTemplate(m.chain, m.txPool, payToAddr) if err != nil { - fmt.Printf("Failed to create new block template: %v \n", err) + log.WithField("error", err).Error("Failed to create new block template") continue } if m.solveBlock(block, ticker, quit) { if err := m.chain.AddBlock(nil, block); err == nil { - fmt.Printf("finish commit block heigh %d, # of tx %d \n", block.BlockHeader.Height, len(block.Transactions)) + log.WithFields(log.Fields{ + "height": block.BlockHeader.Height, + "tx": len(block.Transactions), + }).Info("Finish committing block height") } else { - fmt.Printf("fail commit block heigh %d, err: %v \n", block.BlockHeader.Height, err) + log.WithFields(log.Fields{ + "height": block.BlockHeader.Height, + "error": err, + }).Error("Failed to commit block height") } } } diff --git a/mining/mining.go b/mining/mining.go index ecf0a3fad..ea3ea87ec 100644 --- a/mining/mining.go +++ b/mining/mining.go @@ -5,7 +5,7 @@ package mining import ( - "fmt" + log "github.com/sirupsen/logrus" "time" "github.com/bytom/blockchain/txbuilder" @@ -95,12 +95,12 @@ func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, addr []byte) ( break } if err := newSnap.ApplyTx(tx); err != nil { - fmt.Println("mining block generate skip tx due to %v", err) + log.WithField("error", err).Error("mining block generate skip tx due to") txPool.RemoveTransaction(&tx.ID) continue } if _, err := validation.ValidateTx(tx, preBcBlock); err != nil { - fmt.Println("mining block generate skip tx due to %v", err) + log.WithField("error", err).Error("mining block generate skip tx due to") txPool.RemoveTransaction(&tx.ID) continue } diff --git a/net/http/httpjson/io.go b/net/http/httpjson/io.go index 7cc39ace2..b27227208 100644 --- a/net/http/httpjson/io.go +++ b/net/http/httpjson/io.go @@ -8,7 +8,7 @@ import ( "reflect" "github.com/bytom/errors" - "github.com/bytom/log" + log "github.com/sirupsen/logrus" ) // ErrBadRequest indicates the user supplied malformed JSON input, @@ -42,7 +42,7 @@ func Write(ctx context.Context, w http.ResponseWriter, status int, v interface{} err := json.NewEncoder(w).Encode(Array(v)) if err != nil { - log.Error(ctx, err) + log.WithField("error", err).Error("Error encountered during writing the Content-Type header using status") } } diff --git a/net/http/httpjson/io_test.go b/net/http/httpjson/io_test.go index 321989388..19ac63893 100644 --- a/net/http/httpjson/io_test.go +++ b/net/http/httpjson/io_test.go @@ -1,15 +1,10 @@ package httpjson import ( - "bytes" "context" - "errors" "net/http/httptest" - "os" "strings" "testing" - - "github.com/bytom/log" ) func TestWriteArray(t *testing.T) { @@ -32,22 +27,6 @@ func TestWriteArray(t *testing.T) { } } -func TestWriteErr(t *testing.T) { - var buf bytes.Buffer - log.SetOutput(&buf) - defer log.SetOutput(os.Stderr) - - want := "test-error" - - ctx := context.Background() - resp := &errResponse{httptest.NewRecorder(), errors.New(want)} - Write(ctx, resp, 200, "ok") - got := buf.String() - if !strings.Contains(got, want) { - t.Errorf("log = %v; should contain %q", got, want) - } -} - type errResponse struct { *httptest.ResponseRecorder err error diff --git a/node/node.go b/node/node.go index 7a79d8c76..600388881 100644 --- a/node/node.go +++ b/node/node.go @@ -10,33 +10,33 @@ import ( "sync" "time" - bc "github.com/bytom/blockchain" "github.com/bytom/blockchain/account" "github.com/bytom/blockchain/asset" + "github.com/bytom/blockchain/pin" "github.com/bytom/blockchain/pseudohsm" "github.com/bytom/blockchain/txdb" - cfg "github.com/bytom/config" "github.com/bytom/consensus" + "github.com/bytom/env" + "github.com/bytom/errors" "github.com/bytom/net/http/reqid" - p2p "github.com/bytom/p2p" "github.com/bytom/protocol" "github.com/bytom/protocol/bc/legacy" + "github.com/bytom/types" + "github.com/bytom/version" + "github.com/kr/secureheader" + "github.com/tendermint/tmlibs/log" + + bc "github.com/bytom/blockchain" + cfg "github.com/bytom/config" + bytomlog "github.com/bytom/log" + p2p "github.com/bytom/p2p" rpccore "github.com/bytom/rpc/core" grpccore "github.com/bytom/rpc/grpc" rpcserver "github.com/bytom/rpc/lib/server" - "github.com/bytom/types" - "github.com/bytom/version" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tmlibs/log" - - "github.com/bytom/env" - "github.com/bytom/errors" - bytomlog "github.com/bytom/log" - "github.com/kr/secureheader" - _ "net/http/pprof" ) @@ -158,6 +158,8 @@ func rpcInit(h *bc.BlockchainReactor, config *cfg.Config) { } func NewNode(config *cfg.Config, logger log.Logger) *Node { + ctx := context.Background() + // Get store tx_db := dbm.NewDB("txdb", config.DBBackend, config.DBDir()) store := txdb.NewStore(tx_db) @@ -186,7 +188,7 @@ func NewNode(config *cfg.Config, logger log.Logger) *Node { genesisBlock.UnmarshalText(consensus.InitBlock()) txPool := protocol.NewTxPool() - chain, err := protocol.NewChain(context.Background(), genesisBlock.Hash(), store, txPool, nil) + chain, err := protocol.NewChain(ctx, genesisBlock.Hash(), store, txPool, nil) if store.Height() < 1 { if err := chain.AddBlock(nil, genesisBlock); err != nil { @@ -196,13 +198,37 @@ func NewNode(config *cfg.Config, logger log.Logger) *Node { var accounts *account.Manager = nil var assets *asset.Registry = nil + var pinStore *pin.Store = nil + if config.Wallet.Enable { accounts_db := dbm.NewDB("account", config.DBBackend, config.DBDir()) + acc_utxos_db := dbm.NewDB("accountutxos", config.DBBackend, config.DBDir()) + pinStore = pin.NewStore(acc_utxos_db) + err = pinStore.LoadAll(ctx) + if err != nil { + bytomlog.Error(ctx, err) + return nil + } + + pinHeight := store.Height() + if pinHeight > 0 { + pinHeight = pinHeight - 1 + } + + pins := []string{account.PinName, account.DeleteSpentsPinName} + for _, p := range pins { + err = pinStore.CreatePin(ctx, p, pinHeight) + if err != nil { + bytomlog.Fatalkv(ctx, bytomlog.KeyError, err) + } + } + + accounts = account.NewManager(accounts_db, chain, pinStore) + go accounts.ProcessBlocks(ctx) + assets_db := dbm.NewDB("asset", config.DBBackend, config.DBDir()) - accounts = account.NewManager(accounts_db, chain) assets = asset.NewRegistry(assets_db, chain) } - //Todo HSM /* if config.HsmUrl != ""{ @@ -220,7 +246,15 @@ func NewNode(config *cfg.Config, logger log.Logger) *Node { if err != nil { cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err)) } - bcReactor := bc.NewBlockchainReactor(store, chain, txPool, accounts, assets, hsm, fastSync) + bcReactor := bc.NewBlockchainReactor( + store, + chain, + txPool, + accounts, + assets, + hsm, + fastSync, + pinStore) bcReactor.SetLogger(logger.With("module", "blockchain")) sw.AddReactor("BLOCKCHAIN", bcReactor) diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 1df0817ee..daa4ffecf 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -14,6 +14,7 @@ import ( "sync" "time" + log "github.com/sirupsen/logrus" crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" ) @@ -147,7 +148,8 @@ func (a *AddrBook) Wait() { func (a *AddrBook) AddOurAddress(addr *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - a.Logger.Info("Add our address to book", "addr", addr) + log.WithField("addr", addr).Info("Add our address to book") + a.ourAddrs[addr.String()] = addr } @@ -163,7 +165,10 @@ func (a *AddrBook) OurAddresses() []*NetAddress { func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() - a.Logger.Info("Add address to book", "addr", addr, "src", src) + log.WithFields(log.Fields{ + "addr": addr, + "src": src, + }).Info("Add address to book") a.addAddress(addr, src) } @@ -271,7 +276,7 @@ func (a *AddrBook) RemoveAddress(addr *NetAddress) { if ka == nil { return } - a.Logger.Info("Remove address from book", "addr", addr) + log.WithField("addr", addr).Info("Remove address from book") a.removeFromAllBuckets(ka) } @@ -318,7 +323,7 @@ type addrBookJSON struct { } func (a *AddrBook) saveToFile(filePath string) { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) + log.WithField("size", a.Size()).Info("Saving AddrBook to file") a.mtx.Lock() defer a.mtx.Unlock() @@ -335,12 +340,15 @@ func (a *AddrBook) saveToFile(filePath string) { jsonBytes, err := json.MarshalIndent(aJSON, "", "\t") if err != nil { - a.Logger.Error("Failed to save AddrBook to file", "err", err) + log.WithField("err", err).Error("Failed to save AddrBook to file") return } err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644) if err != nil { - a.Logger.Error("Failed to save AddrBook to file", "file", filePath, "error", err) + log.WithFields(log.Fields{ + "file": filePath, + "err": err, + }).Error("Failed to save AddrBook to file") } } @@ -387,7 +395,7 @@ func (a *AddrBook) loadFromFile(filePath string) bool { // Save saves the book. func (a *AddrBook) Save() { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) + log.WithField("size", a.Size()).Info("Saving AddrBook to file") a.saveToFile(a.filePath) } @@ -407,7 +415,7 @@ out: dumpAddressTicker.Stop() a.saveToFile(a.filePath) a.wg.Done() - a.Logger.Info("Address handler done") + log.Info("Address handler done") } func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { @@ -427,7 +435,7 @@ func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Sanity check if ka.isOld() { - a.Logger.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka)) + log.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka)) return false } @@ -441,7 +449,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - a.Logger.Info("new bucket is full, expiring old ") + log.Info("new bucket is full, expiring old ") a.expireNew(bucketIdx) } @@ -461,11 +469,11 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { // Sanity check if ka.isNew() { - a.Logger.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka)) + log.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka)) return false } if len(ka.Buckets) != 0 { - a.Logger.Error(cmn.Fmt("Cannot add already old address to another old bucket: %v", ka)) + log.Error(cmn.Fmt("Cannot add already old address to another old bucket: %v", ka)) return false } @@ -496,7 +504,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool { func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) { if ka.BucketType != bucketType { - a.Logger.Error(cmn.Fmt("Bucket type mismatch: %v", ka)) + log.Error(cmn.Fmt("Bucket type mismatch: %v", ka)) return } bucket := a.getBucket(bucketType, bucketIdx) @@ -538,7 +546,7 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { func (a *AddrBook) addAddress(addr, src *NetAddress) { if a.routabilityStrict && !addr.Routable() { - a.Logger.Error(cmn.Fmt("Cannot add non-routable address %v", addr)) + log.Error(cmn.Fmt("Cannot add non-routable address %v", addr)) return } if _, ok := a.ourAddrs[addr.String()]; ok { @@ -569,7 +577,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - a.Logger.Info("Added new address", "address", addr, "total", a.size()) + log.Info("Added new address", "address", addr, "total", a.size()) } // Make space in the new buckets by expiring the really bad entries. @@ -578,7 +586,7 @@ func (a *AddrBook) expireNew(bucketIdx int) { for addrStr, ka := range a.addrNew[bucketIdx] { // If an entry is bad, throw it away if ka.isBad() { - a.Logger.Info(cmn.Fmt("expiring bad address %v", addrStr)) + log.Info(cmn.Fmt("expiring bad address %v", addrStr)) a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } @@ -595,11 +603,11 @@ func (a *AddrBook) expireNew(bucketIdx int) { func (a *AddrBook) moveToOld(ka *knownAddress) { // Sanity check if ka.isOld() { - a.Logger.Error(cmn.Fmt("Cannot promote address that is already old %v", ka)) + log.Error(cmn.Fmt("Cannot promote address that is already old %v", ka)) return } if len(ka.Buckets) == 0 { - a.Logger.Error(cmn.Fmt("Cannot promote address that isn't in any new buckets %v", ka)) + log.Error(cmn.Fmt("Cannot promote address that isn't in any new buckets %v", ka)) return } @@ -624,13 +632,13 @@ func (a *AddrBook) moveToOld(ka *knownAddress) { if !added { added := a.addToNewBucket(oldest, freedBucket) if !added { - a.Logger.Error(cmn.Fmt("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket)) + log.Error(cmn.Fmt("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket)) } } // Finally, add to bucket again. added = a.addToOldBucket(ka, oldBucketIdx) if !added { - a.Logger.Error(cmn.Fmt("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx)) + log.Error(cmn.Fmt("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx)) } } } diff --git a/p2p/connection.go b/p2p/connection.go index 36f15abb7..dfdcc4249 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + log "github.com/sirupsen/logrus" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" flow "github.com/tendermint/tmlibs/flowrate" @@ -177,10 +178,10 @@ func (c *MConnection) String() string { } func (c *MConnection) flush() { - c.Logger.Debug("Flush", "conn", c) + log.WithField("conn", c).Debug("Flush") err := c.bufWriter.Flush() if err != nil { - c.Logger.Error("MConnection flush failed", "error", err) + log.WithField("error", err).Error("MConnection flush failed") } } @@ -208,12 +209,16 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { return false } - c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg)) + log.WithFields(log.Fields{ + "chID": chID, + "conn": c, + "msg": msg, + }).Debug("Send") // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) + log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel")) return false } @@ -225,7 +230,11 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool { default: } } else { - c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg) + log.WithFields(log.Fields{ + "chID": chID, + "conn": c, + "msg": msg, + }).Error("Send failed") } return success } @@ -237,12 +246,16 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool { return false } - c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg) + log.WithFields(log.Fields{ + "chID": chID, + "conn": c, + "msg": msg, + }).Debug("TrySend") // Send message to channel. channel, ok := c.channelsIdx[chID] if !ok { - c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID)) + log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel")) return false } @@ -267,7 +280,7 @@ func (c *MConnection) CanSend(chID byte) bool { channel, ok := c.channelsIdx[chID] if !ok { - c.Logger.Error(cmn.Fmt("Unknown channel %X", chID)) + log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel")) return false } return channel.canSend() @@ -291,12 +304,12 @@ FOR_LOOP: channel.updateStats() } case <-c.pingTimer.Ch: - c.Logger.Debug("Send Ping") + log.Debug("Send Ping") wire.WriteByte(packetTypePing, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) c.flush() case <-c.pong: - c.Logger.Debug("Send Pong") + log.Debug("Send Pong") wire.WriteByte(packetTypePong, c.bufWriter, &n, &err) c.sendMonitor.Update(int(n)) c.flush() @@ -318,7 +331,10 @@ FOR_LOOP: break FOR_LOOP } if err != nil { - c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "error", err) + log.WithFields(log.Fields{ + "conn": c, + "error": err, + }).Error("Connection failed @ sendRoutine") c.stopForError(err) break FOR_LOOP } @@ -373,7 +389,7 @@ func (c *MConnection) sendMsgPacket() bool { // Make & send a msgPacket from this channel n, err := leastChannel.writeMsgPacketTo(c.bufWriter) if err != nil { - c.Logger.Error("Failed to write msgPacket", "error", err) + log.WithField("error", err).Error("Failed to write msgPacket") c.stopForError(err) return true } @@ -415,7 +431,10 @@ FOR_LOOP: c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { - c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err) + log.WithFields(log.Fields{ + "conn": c, + "error": err, + }).Error("Connection failed @ recvRoutine (reading byte)") c.stopForError(err) } break FOR_LOOP @@ -425,18 +444,21 @@ FOR_LOOP: switch pktType { case packetTypePing: // TODO: prevent abuse, as they cause flush()'s. - c.Logger.Debug("Receive Ping") + log.Debug("Receive Ping") c.pong <- struct{}{} case packetTypePong: // do nothing - c.Logger.Debug("Receive Pong") + log.Debug("Receive Pong") case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { if c.IsRunning() { - c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err) + log.WithFields(log.Fields{ + "conn": c, + "error": err, + }).Error("Connection failed @ recvRoutine") c.stopForError(err) } break FOR_LOOP @@ -448,13 +470,20 @@ FOR_LOOP: msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { if c.IsRunning() { - c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err) + log.WithFields(log.Fields{ + "conn": c, + "error": err, + }).Error("Connection failed @ recvRoutine") c.stopForError(err) } break FOR_LOOP } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes) + log.WithFields(log.Fields{ + "channelID": pkt.ChannelID, + "msgBytes": msgBytes, + }).Debug("Received bytes") c.onReceive(pkt.ChannelID, msgBytes) } default: @@ -626,7 +655,6 @@ func (ch *Channel) nextMsgPacket() msgPacket { // Not goroutine-safe func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() - // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet) wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err == nil { @@ -638,7 +666,6 @@ func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { // Handles incoming msgPackets. Returns a msg bytes if msg is complete. // Not goroutine-safe func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { - // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet) if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) { return nil, wire.ErrBinaryReadOverflow } diff --git a/p2p/listener.go b/p2p/listener.go index 8415bcc5b..668e641e2 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -7,8 +7,9 @@ import ( "time" "github.com/bytom/p2p/upnp" + log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" + tlog "github.com/tendermint/tmlibs/log" ) type Listener interface { @@ -48,7 +49,7 @@ func splitHostPort(addr string) (host string, port int) { } // skipUPNP: If true, does not try getUPNPExternalAddress() -func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log.Logger) Listener { +func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) Listener { // Local listen IP & port lAddrIP, lAddrPort := splitHostPort(lAddr) @@ -68,7 +69,10 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log } // Actual listener local IP & port listenerIP, listenerPort := splitHostPort(listener.Addr().String()) - logger.Info("Local listener", "ip", listenerIP, "port", listenerPort) + log.WithFields(log.Fields{ + "ip": listenerIP, + "port": listenerPort, + }).Info("Local listener") // Determine internal address... var intAddr *NetAddress @@ -82,7 +86,7 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log if !skipUPNP { // If the lAddrIP is INADDR_ANY, try UPnP if lAddrIP == "" || lAddrIP == "0.0.0.0" { - extAddr = getUPNPExternalAddress(lAddrPort, listenerPort, logger) + extAddr = getUPNPExternalAddress(lAddrPort, listenerPort) } } // Otherwise just use the local address... @@ -167,17 +171,17 @@ func (l *DefaultListener) String() string { /* external address helpers */ // UPNP external address discovery & port mapping -func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *NetAddress { - logger.Info("Getting UPNP external address") +func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress { + log.Info("Getting UPNP external address") nat, err := upnp.Discover() if err != nil { - logger.Info("Could not perform UPNP discover", "error", err) + log.WithField("error", err).Error("Could not perform UPNP discover") return nil } ext, err := nat.GetExternalAddress() if err != nil { - logger.Info("Could not get UPNP external address", "error", err) + log.WithField("error", err).Error("Could not perform UPNP external address") return nil } @@ -188,11 +192,11 @@ func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) * externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "tendermint", 0) if err != nil { - logger.Info("Could not add UPNP port mapping", "error", err) + log.WithField("error", err).Error("Could not add UPNP port mapping") return nil } - logger.Info("Got UPNP external address", "address", ext) + log.WithField("address", ext).Info("Got UPNP external address") return NewNetAddressIPPort(ext, uint16(externalPort)) } diff --git a/p2p/peer.go b/p2p/peer.go index 2602206c1..89e0a28fc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" @@ -156,7 +157,7 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er func() { var n int wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) - p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) + log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake") }) if err1 != nil { return errors.Wrap(err1, "Error during handshake/write") diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0ac776347..fcd48f951 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -1,7 +1,6 @@ package p2p import ( - golog "log" "net" "testing" "time" @@ -9,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + log "github.com/sirupsen/logrus" crypto "github.com/tendermint/go-crypto" ) @@ -116,7 +116,7 @@ func (p *remotePeer) PubKey() crypto.PubKeyEd25519 { func (p *remotePeer) Start() { l, e := net.Listen("tcp", "127.0.0.1:0") // any available address if e != nil { - golog.Fatalf("net.Listen tcp :0: %+v", e) + log.Fatalf("net.Listen tcp :0: %+v", e) } p.addr = NewNetAddress(l.Addr()) p.quit = make(chan struct{}) @@ -131,11 +131,11 @@ func (p *remotePeer) accept(l net.Listener) { for { conn, err := l.Accept() if err != nil { - golog.Fatalf("Failed to accept conn: %+v", err) + log.Fatalf("Failed to accept conn: %+v", err) } peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { - golog.Fatalf("Failed to create a peer: %+v", err) + log.Fatalf("Failed to create a peer: %+v", err) } err = peer.HandshakeTimeout(&NodeInfo{ PubKey: p.PrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519), @@ -144,7 +144,7 @@ func (p *remotePeer) accept(l net.Listener) { Version: "123.123.123", }, 1*time.Second) if err != nil { - golog.Fatalf("Failed to perform handshake: %+v", err) + log.Fatalf("Failed to perform handshake: %+v", err) } select { case <-p.quit: diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 269a8d006..75c383fe9 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -7,6 +7,7 @@ import ( "reflect" "time" + log "github.com/sirupsen/logrus" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" ) @@ -105,7 +106,10 @@ func (r *PEXReactor) AddPeer(p *Peer) { addr, err := NewNetAddressString(p.ListenAddr) if err != nil { // this should never happen - r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err) + log.WithFields(log.Fields{ + "addr": p.ListenAddr, + "error": err, + }).Error("Error in AddPeer: Invalid peer address") return } r.book.AddAddress(addr, addr) @@ -125,17 +129,17 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { r.IncrementMsgCountForPeer(srcAddrStr) if r.ReachedMaxMsgCountForPeer(srcAddrStr) { - r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddrStr) + log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer") // TODO remove src from peers? return } _, msg, err := DecodeMessage(msgBytes) if err != nil { - r.Logger.Error("Error decoding message", "error", err) + log.WithField("error", err).Error("Error decoding message") return } - r.Logger.Info("Received message", "msg", msg) + log.WithField("msg", msg).Info("Reveived message") switch msg := msg.(type) { case *pexRequestMessage: @@ -150,7 +154,7 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) { } } default: - r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type") } } @@ -230,7 +234,11 @@ func (r *PEXReactor) ensurePeersRoutine() { func (r *PEXReactor) ensurePeers() { numOutPeers, _, numDialing := r.Switch.NumPeers() numToDial := minNumOutboundPeers - (numOutPeers + numDialing) - r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) + log.WithFields(log.Fields{ + "numOutPeers": numOutPeers, + "numDialing": numDialing, + "numToDial": numToDial, + }).Info("Ensure peers") if numToDial <= 0 { return } @@ -257,13 +265,9 @@ func (r *PEXReactor) ensurePeers() { alreadyDialing := r.Switch.IsDialing(try) alreadyConnected := r.Switch.Peers().Has(try.IP.String()) if alreadySelected || alreadyDialing || alreadyConnected { - // r.Logger.Info("Cannot dial address", "addr", try, - // "alreadySelected", alreadySelected, - // "alreadyDialing", alreadyDialing, - // "alreadyConnected", alreadyConnected) continue } else { - r.Logger.Info("Will dial address", "addr", try) + log.WithField("addr", try).Info("Will dial address") picked = try break } @@ -289,7 +293,7 @@ func (r *PEXReactor) ensurePeers() { if peers := r.Switch.Peers().List(); len(peers) > 0 { i := rand.Int() % len(peers) peer := peers[i] - r.Logger.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer) + log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer") r.RequestPEX(peer) } } diff --git a/p2p/switch.go b/p2p/switch.go index e896b67e7..097900780 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -8,6 +8,7 @@ import ( "time" cfg "github.com/bytom/config" + log "github.com/sirupsen/logrus" crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" ) @@ -238,7 +239,7 @@ func (sw *Switch) AddPeer(peer *Peer) error { return err } - sw.Logger.Info("Added peer", "peer", peer) + log.WithField("peer", peer).Info("Added peer") return nil } @@ -309,9 +310,9 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { - sw.Logger.Error("Error dialing seed", "error", err) + log.WithField("error", err).Error("Error dialing seed") } else { - sw.Logger.Info("Connected to seed", "peer", peer) + log.WithField("peer", peer).Info("Connected to seed") } } @@ -319,10 +320,13 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) - sw.Logger.Info("Dialing peer", "address", addr) + log.WithField("address", addr).Info("Dialing peer") peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { - sw.Logger.Error("Failed to dial peer", "address", addr, "error", err) + log.WithFields(log.Fields{ + "address": addr, + "error": err, + }).Info("Failed to dial peer") return nil, err } peer.SetLogger(sw.Logger.With("peer", addr)) @@ -331,11 +335,17 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, } err = sw.AddPeer(peer) if err != nil { - sw.Logger.Error("Failed to add peer", "address", addr, "error", err) + log.WithFields(log.Fields{ + "address": addr, + "error": err, + }).Info("Failed to add peer") peer.CloseConn() return nil, err } - sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer) + log.WithFields(log.Fields{ + "address": addr, + "error": err, + }).Info("Dialed and added peer") return peer, nil } @@ -349,7 +359,10 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) - sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) + log.WithFields(log.Fields{ + "chID": chID, + "msg": msg, + }).Debug("Broadcast") for _, peer := range sw.peers.List() { go func(peer *Peer) { success := peer.Send(chID, msg) @@ -381,12 +394,15 @@ func (sw *Switch) Peers() IPeerSet { // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) - sw.Logger.Info("Stopping peer for error", "peer", peer, "error", reason) + log.WithFields(log.Fields{ + "peer": peer, + "error": reason, + }).Info("Stopping peer due to error") sw.stopAndRemovePeer(peer, reason) if peer.IsPersistent() { go func() { - sw.Logger.Info("Reconnecting to peer", "peer", peer) + log.WithField("peer", peer).Info("Reconnecting to peer") for i := 1; i < reconnectAttempts; i++ { if !sw.IsRunning() { return @@ -395,15 +411,21 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { peer, err := sw.DialPeerWithAddress(addr, true) if err != nil { if i == reconnectAttempts { - sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "error", err) + log.WithFields(log.Fields{ + "retries": i, + "error": err, + }).Info("Error reconnecting to peer. Giving up") return } - sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "error", err) + log.WithFields(log.Fields{ + "retries": i, + "error": err, + }).Info("Error reconnecting to peer. Trying again") time.Sleep(reconnectInterval) continue } - sw.Logger.Info("Reconnected to peer", "peer", peer) + log.WithField("peer", peer).Info("Reconnected to peer") return } }() @@ -413,7 +435,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { - sw.Logger.Info("Stopping peer gracefully") + log.Info("Stopping peer gracefully") sw.stopAndRemovePeer(peer, nil) } @@ -435,14 +457,21 @@ func (sw *Switch) listenerRoutine(l Listener) { // ignore connection if we already have enough maxPeers := sw.config.MaxNumPeers if maxPeers <= sw.peers.Size() { - sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers) + log.WithFields(log.Fields{ + "address": inConn.RemoteAddr().String(), + "numPeers": sw.peers.Size(), + "max": maxPeers, + }).Info("Ignoring inbound connection: already have enough peers") continue } // New inbound connection! err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) if err != nil { - sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err) + log.WithFields(log.Fields{ + "address": inConn.RemoteAddr().String(), + "error": err, + }).Info("Ignoring inbound connection: error while adding peer") continue } diff --git a/protocol/bc/legacy/block.go b/protocol/bc/legacy/block.go index 7d9ce9c70..ebce09c6a 100644 --- a/protocol/bc/legacy/block.go +++ b/protocol/bc/legacy/block.go @@ -93,7 +93,6 @@ func (b *Block) Value() (driver.Value, error) { } func (b *Block) readFrom(r *blockchain.Reader) error { - fmt.Printf("--------------block:%v", b) serflags, err := b.BlockHeader.readFrom(r) if err != nil { return err diff --git a/protocol/block.go b/protocol/block.go index 73759f1cf..49e346886 100644 --- a/protocol/block.go +++ b/protocol/block.go @@ -104,45 +104,9 @@ func (c *Chain) CommitAppliedBlock(ctx context.Context, block *legacy.Block, sna // it's not redundant. c.setState(block, snapshot) - go c.SetAssetsAmount(block) - return nil } -func (c *Chain) SetAssetsAmount(block *legacy.Block) { - assets_amount := c.assets_utxo.assets_amount - - if block.Transactions != nil { - c.assets_utxo.cond.L.Lock() - for _, item := range block.Transactions[1:] { - if item.Outputs != nil { - for _, utxo := range item.Outputs { - if _, ok := assets_amount[utxo.AssetId.String()]; ok { - assets_amount[utxo.AssetId.String()] += utxo.Amount - } else { - assets_amount[utxo.AssetId.String()] = utxo.Amount - } - - } - } - } - c.assets_utxo.cond.L.Unlock() - } -} - -func (c *Chain) GetAssetsAmount() []interface{} { - var result = make([]interface{}, 0) - - c.assets_utxo.cond.L.Lock() - defer c.assets_utxo.cond.L.Unlock() - - if len(c.assets_utxo.assets_amount) > 0 { - result = append(result, c.assets_utxo.assets_amount) - } - - return result -} - func (c *Chain) AddBlock(ctx context.Context, block *legacy.Block) error { currentBlock, _ := c.State() if err := c.ValidateBlock(block, currentBlock); err != nil { diff --git a/protocol/protocol.go b/protocol/protocol.go index dcb35cc11..d60c8b5a9 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -58,10 +58,6 @@ type Chain struct { pendingSnapshots chan pendingSnapshot txPool *TxPool - assets_utxo struct { - cond sync.Cond - assets_amount map[string]uint64 - } } type pendingSnapshot struct { @@ -79,9 +75,6 @@ func NewChain(ctx context.Context, initialBlockHash bc.Hash, store Store, txPool } c.state.cond.L = new(sync.Mutex) - c.assets_utxo.assets_amount = make(map[string]uint64, 1024) //prepared buffer 1024 key-values - c.assets_utxo.cond.L = new(sync.Mutex) - log.Printf(ctx, "bytom's Height:%v.", store.Height()) c.state.height = store.Height() diff --git a/version/version.go b/version/version.go index 7d9417658..beb5cfd80 100644 --- a/version/version.go +++ b/version/version.go @@ -1,12 +1,12 @@ package version const Maj = "0" -const Min = "1" +const Min = "5" const Fix = "0" var ( // The full version string - Version = "0.1.0" + Version = "0.5.0" // GitCommit is set with --ldflags "-X main.gitCommit=$(git rev-parse HEAD)" GitCommit string