Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
number571 committed Aug 18, 2024
1 parent ff121a6 commit 4e5b4af
Show file tree
Hide file tree
Showing 19 changed files with 4,475 additions and 4,391 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
### CHANGES

- Update `pkg/network/anonymity`: delete GetRetryEnqueue
- Update `pkg/network/anonymity/queue`: change EnqueueMessage, rename IMessageQueue -> IMessageQueueProcessor
- Update `pkg/network/anonymity/queue`: change EnqueueMessage, rename IMessageQueue -> IQBTaskProcessor
- Update `pkg/network/anonymity/queue`: delete GetQBTDisabled
- Update `cmd/hidden_lake/service`: qbt_disabled=true -> queue_period_ms=0
- Update `cmd/hidden_lake/service`: append fetch_timeout_ms param
Expand Down
3,228 changes: 1,614 additions & 1,614 deletions cmd/hidden_lake/_test/result/coverage.out

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/hidden_lake/service/internal/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func testNewNode(dbPath, addr string) anonymity.INode {
),
db,
testNewNetworkNode(addr),
queue.NewMessageQueueProcessor(
queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: testutils.TCWorkSize,
Expand Down
2 changes: 1 addition & 1 deletion cmd/hidden_lake/service/pkg/app/init_anon_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (p *sApp) initAnonNode() error {
}),
lru.NewLRUCache(hls_settings.CNetworkQueueCapacity),
),
queue.NewMessageQueueProcessor(
queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: hls_settings.CNetworkMask,
FWorkSizeBits: cfgSettings.GetWorkSizeBits(),
Expand Down
4 changes: 4 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (p *sClient) EncryptMessage(pRecv asymmetric.IPubKey, pMsg []byte) ([]byte,
resultSize = uint64(len(pMsg))
)

if pRecv.GetSize() != p.GetPubKey().GetSize() {
return nil, ErrInvalidPubKeySize
}

if resultSize > msgLimitSize {
return nil, ErrLimitMessageSize
}
Expand Down
1 change: 1 addition & 0 deletions pkg/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ var (
ErrDecodePayload = &SClientError{"decode payload"}
ErrEncryptSymmetricKey = &SClientError{"encrypt symmetric key"}
ErrDecodeBytesJoiner = &SClientError{"decode bytes joiner"}
ErrInvalidPubKeySize = &SClientError{"invalid pub key size"}
)
6 changes: 3 additions & 3 deletions pkg/network/anonymity/anonymity.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type sNode struct {
fLogger logger.ILogger
fKVDatavase database.IKVDatabase
fNetwork network.INode
fQueue queue.IMessageQueueProcessor
fQueue queue.IQBTaskProcessor
fFriends asymmetric.IListPubKeys
fHandleRoutes map[uint32]IHandlerF
fHandleActions map[string]chan []byte
Expand All @@ -44,7 +44,7 @@ func NewNode(
pLogger logger.ILogger,
pKVDatavase database.IKVDatabase,
pNetwork network.INode,
pQueue queue.IMessageQueueProcessor,
pQueue queue.IQBTaskProcessor,
pFriends asymmetric.IListPubKeys,
) INode {
return &sNode{
Expand Down Expand Up @@ -124,7 +124,7 @@ func (p *sNode) GetNetworkNode() network.INode {
return p.fNetwork
}

func (p *sNode) GetMessageQueue() queue.IMessageQueueProcessor {
func (p *sNode) GetMessageQueue() queue.IQBTaskProcessor {
return p.fQueue
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/anonymity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func testNewNode(timeWait time.Duration, addr string, typeDB, numDB int, f2fDisa
}),
lru.NewLRUCache(testutils.TCCapacity),
),
queue.NewMessageQueueProcessor(
queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: testutils.TCWorkSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/examples/echo/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newNode(serviceName, address string) anonymity.INode {
newVSettings(networkKey),
lru.NewLRUCache(1024),
),
queue.NewMessageQueueProcessor(
queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: workSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/examples/ping-ping/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newNode(serviceName, address string) anonymity.INode {
newVSettings(networkKey),
lru.NewLRUCache(1024),
),
queue.NewMessageQueueProcessor(
queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FNetworkMask: networkMask,
FWorkSizeBits: workSize,
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/queue/examples/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

func main() {
q := queue.NewMessageQueueProcessor(
q := queue.NewQBTaskProcessor(
queue.NewSettings(&queue.SSettings{
FQueuePeriod: time.Second,
FMainPoolCapacity: 1 << 5,
Expand Down
36 changes: 18 additions & 18 deletions pkg/network/anonymity/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
)

var (
_ IMessageQueueProcessor = &sMessageQueueProcessor{}
_ IQBTaskProcessor = &sQBTaskProcessor{}
)

type sMessageQueueProcessor struct {
type sQBTaskProcessor struct {
fMutex sync.RWMutex
fState state.IState

Expand All @@ -45,18 +45,18 @@ type sRandPool struct {
fReceiver asymmetric.IPubKey
}

func NewMessageQueueProcessor(
func NewQBTaskProcessor(
pSettings ISettings,
pVSettings IVSettings,
pClient client.IClient,
pReceiver asymmetric.IPubKey,
) IMessageQueueProcessor {
) IQBTaskProcessor {
if pSettings.GetQueuePeriod() != 0 {
if pClient.GetPubKey().GetSize() != pReceiver.GetSize() {
panic("pClient.GetPubKey().GetSize() != pReceiver.GetSize()")
}
}
return &sMessageQueueProcessor{
return &sQBTaskProcessor{
fState: state.NewBoolState(),
fSettings: pSettings,
fVSettings: pVSettings,
Expand All @@ -72,19 +72,19 @@ func NewMessageQueueProcessor(
}
}

func (p *sMessageQueueProcessor) GetSettings() ISettings {
func (p *sQBTaskProcessor) GetSettings() ISettings {
return p.fSettings
}

func (p *sMessageQueueProcessor) GetVSettings() IVSettings {
func (p *sQBTaskProcessor) GetVSettings() IVSettings {
return p.getVSettings()
}

func (p *sMessageQueueProcessor) GetClient() client.IClient {
func (p *sQBTaskProcessor) GetClient() client.IClient {
return p.fClient
}

func (p *sMessageQueueProcessor) Run(pCtx context.Context) error {
func (p *sQBTaskProcessor) Run(pCtx context.Context) error {
if err := p.fState.Enable(nil); err != nil {
return utils.MergeErrors(ErrRunning, err)
}
Expand All @@ -109,7 +109,7 @@ func (p *sMessageQueueProcessor) Run(pCtx context.Context) error {
return utils.MergeErrors(errList...)
}

func (p *sMessageQueueProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
func (p *sQBTaskProcessor) runRandPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()

if p.fSettings.GetQueuePeriod() == 0 { // if QB=false
Expand All @@ -132,7 +132,7 @@ func (p *sMessageQueueProcessor) runRandPoolFiller(pCtx context.Context, pWg *sy
}
}

func (p *sMessageQueueProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
func (p *sQBTaskProcessor) runMainPoolFiller(pCtx context.Context, pWg *sync.WaitGroup, chErr chan<- error) {
defer pWg.Done()
for {
select {
Expand All @@ -148,7 +148,7 @@ func (p *sMessageQueueProcessor) runMainPoolFiller(pCtx context.Context, pWg *sy
}
}

func (p *sMessageQueueProcessor) SetVSettings(pVSettings IVSettings) {
func (p *sQBTaskProcessor) SetVSettings(pVSettings IVSettings) {
p.fMutex.Lock()
defer p.fMutex.Unlock()

Expand All @@ -169,7 +169,7 @@ func (p *sMessageQueueProcessor) SetVSettings(pVSettings IVSettings) {
}
}

func (p *sMessageQueueProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []byte) error {
func (p *sQBTaskProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pBytes []byte) error {
incCount := atomic.AddInt64(&p.fMainPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetMainPoolCapacity() {
atomic.AddInt64(&p.fMainPool.fCount, -1)
Expand All @@ -184,7 +184,7 @@ func (p *sMessageQueueProcessor) EnqueueMessage(pPubKey asymmetric.IPubKey, pByt
return nil
}

func (p *sMessageQueueProcessor) DequeueMessage(pCtx context.Context) net_message.IMessage {
func (p *sQBTaskProcessor) DequeueMessage(pCtx context.Context) net_message.IMessage {
if p.fSettings.GetQueuePeriod() == 0 { // if QB=false
select {
case <-pCtx.Done():
Expand Down Expand Up @@ -223,7 +223,7 @@ func (p *sMessageQueueProcessor) DequeueMessage(pCtx context.Context) net_messag
}
}

func (p *sMessageQueueProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error {
func (p *sQBTaskProcessor) fillMainPool(pCtx context.Context, pMsg []byte) error {
oldVSettings := p.getVSettings()
chNetMsg := make(chan net_message.IMessage)

Expand All @@ -250,7 +250,7 @@ func (p *sMessageQueueProcessor) fillMainPool(pCtx context.Context, pMsg []byte)
}
}

func (p *sMessageQueueProcessor) fillRandPool(pCtx context.Context) error {
func (p *sQBTaskProcessor) fillRandPool(pCtx context.Context) error {
incCount := atomic.AddInt64(&p.fRandPool.fCount, 1)
if uint64(incCount) > p.fSettings.GetRandPoolCapacity() {
atomic.AddInt64(&p.fRandPool.fCount, -1)
Expand Down Expand Up @@ -296,14 +296,14 @@ func (p *sMessageQueueProcessor) fillRandPool(pCtx context.Context) error {
}
}

func (p *sMessageQueueProcessor) getVSettings() IVSettings {
func (p *sQBTaskProcessor) getVSettings() IVSettings {
p.fMutex.RLock()
defer p.fMutex.RUnlock()

return p.fVSettings
}

func (p *sMessageQueueProcessor) vSettingsNotChanged(oldVSettings IVSettings) bool {
func (p *sQBTaskProcessor) vSettingsNotChanged(oldVSettings IVSettings) bool {
currVSettings := p.getVSettings()
return currVSettings.GetNetworkKey() == oldVSettings.GetNetworkKey()
}
10 changes: 5 additions & 5 deletions pkg/network/anonymity/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func testSettings(t *testing.T, n int) {
func TestQueueVoidDisabled(t *testing.T) {
t.Parallel()

queue := NewMessageQueueProcessor(
queue := NewQBTaskProcessor(
NewSettings(&SSettings{
FNetworkMask: 1,
FWorkSizeBits: 10,
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestRunStopQueue(t *testing.T) {
}),
asymmetric.LoadRSAPrivKey(testutils.Tc1PrivKey1024),
)
queue := NewMessageQueueProcessor(
queue := NewQBTaskProcessor(
NewSettings(&SSettings{
FMainPoolCapacity: testutils.TCQueueCapacity,
FRandPoolCapacity: 1,
Expand All @@ -122,7 +122,7 @@ func TestRunStopQueue(t *testing.T) {

err := testutils.TryN(50, 10*time.Millisecond, func() error {
sett := queue.GetSettings()
sQueue := queue.(*sMessageQueueProcessor)
sQueue := queue.(*sQBTaskProcessor)
if len(sQueue.fRandPool.fQueue) == int(sett.GetRandPoolCapacity()) {
return nil
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func TestRunStopQueue(t *testing.T) {
func TestQueue(t *testing.T) {
t.Parallel()

queue := NewMessageQueueProcessor(
queue := NewQBTaskProcessor(
NewSettings(&SSettings{
FNetworkMask: 1,
FWorkSizeBits: 10,
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestQueue(t *testing.T) {
}
}

func testQueue(queue IMessageQueueProcessor) error {
func testQueue(queue IQBTaskProcessor) error {
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/queue/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
net_message "github.com/number571/go-peer/pkg/network/message"
)

type IMessageQueueProcessor interface {
type IQBTaskProcessor interface {
types.IRunner

SetVSettings(IVSettings)
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/anonymity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type INode interface {
GetSettings() ISettings
GetKVDatabase() database.IKVDatabase
GetNetworkNode() network.INode
GetMessageQueue() queue.IMessageQueueProcessor
GetMessageQueue() queue.IQBTaskProcessor
GetListPubKeys() asymmetric.IListPubKeys

SendPayload(asymmetric.IPubKey, payload.IPayload64) error
Expand Down
2 changes: 1 addition & 1 deletion test/result/badge_codelines.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion test/result/badge_coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 4e5b4af

Please sign in to comment.