Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implements seata-tcp-transport and format some codes #23

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions packages/seata-js/src/seata-common/byte-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

import { Buffer } from 'node:buffer'

const DEFAULT_ALLOC_SIZE = 1024

export interface ByteBufferProp {
buffer?: Buffer
defaultAllocSize?: number
}

export interface ReadWriteProp {
/**
* set read or write index
Expand All @@ -44,8 +45,6 @@ export interface ReadWriteProp {
len?: number
}

const DEFAULT_ALLOC_SIZE = 1024

/**
* BufferBuffer is a buffer wrapper class
* which can be used to read and write data to buffer.
Expand Down
20 changes: 20 additions & 0 deletions packages/seata-js/src/seata-common/retry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

describe('restry test suite', () => {
it('test init retry', () => {})
})
99 changes: 99 additions & 0 deletions packages/seata-js/src/seata-common/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import debug from 'debug'
import { noop } from './util'

const MAX_RETRIES = 10
const dlog = debug('seata-common:retry~')

export interface RetryProps {
initialDelay?: number
period?: number
maxRetry?: number
onFailedEnd?: Function
run: (onSuccess: Function, onFailed: Function) => void
}

export class Retry {
private retryTime: number
private readonly initialDelay: number
private readonly period: number
private readonly maxRetryCount: number

private run: (onSuccess: Function, onFailed: Function) => void
private onFailedEnd: Function

static from(props: RetryProps) {
return new Retry(props)
}

constructor(props: RetryProps) {
this.maxRetryCount = props.maxRetry || MAX_RETRIES
this.retryTime = this.maxRetryCount

this.initialDelay = props.initialDelay || 0
this.period = props.period || 1000

dlog(`init props: %j`, {
initialDelay: this.initialDelay,
period: this.period,
maxRetry: this.maxRetryCount,
})

this.run = props.run
this.onFailedEnd = props.onFailedEnd || noop
}

start() {
dlog(`starting retry, current retry num:%d`, this.retryTime)

// first run
if (this.retryTime === this.maxRetryCount) {
setTimeout(() => {
this.run(this.onSuccess, this.onFailed)
this.retryTime--
}, this.initialDelay)
return
}

// stop retry
if (this.retryTime === 0) {
this.onFailedEnd()
return
}

// retry
setTimeout(() => {
this.run(this.onSuccess, this.onFailed)
this.retryTime--
}, this.period)
}

reset() {
dlog('reset')
this.retryTime = this.maxRetryCount
}

private onSuccess() {
this.reset()
}

private onFailed() {
this.start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ export class AbstractConfiguration {
getConfig(dataId: string, defaultValue: string) {
return ''
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,4 @@
* limitations under the License.
*/


export class ConfigurationCache {

}
export class ConfigurationCache {}
3 changes: 1 addition & 2 deletions packages/seata-js/src/seata-config/config-core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ export class ConfigurationFactory {

static instance = null

private static load() {
}
private static load() {}

/**
* Gets instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { AbstractConfiguration } from '../config-core/abstract-configuration'

/**
* the type Nacos configuration
*
*
* @author godkun
*/
export class NacosConfiguration extends AbstractConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

import { RegistryService, RegistryType } from "../../discovery"
import { RegistryService, RegistryType } from '../../discovery'

import { NacosRegistryProvider } from '../../discovery-nacos/nacos-registry-provider'

Expand All @@ -37,6 +37,6 @@ export default class RegistryFactory {
private static buildRegistryService(): RegistryService {
let registryType: RegistryType
registryType = RegistryType.Nacos
return (new NacosRegistryProvider).provide()
return new NacosRegistryProvider().provide()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import { NacosRegistryService } from './nacos-registry-service'

/**
* the nacos registry provider
*
*
* @author godkun
*/
export class NacosRegistryProvider implements RegistryProvider {
provide(): RegistryService {
return NacosRegistryService.getInstance()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export class NacosRegistryService implements RegistryService {

private static instance: NacosRegistryService


static getInstance(): NacosRegistryService {
if (!NacosRegistryService.instance) {
NacosRegistryService.instance = new NacosRegistryService()
Expand All @@ -67,10 +66,10 @@ export class NacosRegistryService implements RegistryService {
unsubscribe(cluster: string, listener) {
// TODO:
}

lookup(key: string): Array<string> {
return ['1']
}

close() {}
}
}
2 changes: 1 addition & 1 deletion packages/seata-js/src/seata-rpc-client/seata-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

import { SeataQueue } from './seata-queue'
import config from '../seata-config/config'
import { RpcMessage } from '../seata-protocol/rpc-message'
import { SeataQueue } from './seata-queue'

describe('seata queue test suites', () => {
// set max req timeout
Expand Down
6 changes: 3 additions & 3 deletions packages/seata-js/src/seata-rpc-client/seata-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import config from '../seata-config/config'
import { RpcMessage } from '../seata-protocol/rpc-message'
import { SeataContext } from './seata-context'

// init log
const log = debug('seata:rpc:seata-queue')

export type SeataQueueId = number
export type SeataRpcResponse = {
err: Error | null
res: any
}
export type SeataQueueSubscribe = (id: SeataQueueId, msg: SeataContext) => void

// init log
const log = debug('seata:rpc:seata-queue')

/**
* seata-rpc-queue
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,47 @@
* limitations under the License.
*/

import { Socket } from 'net'
import { genNextId } from '../seata-id'
import prot from '../../seata-protocol/protocol-constants'
import { RpcMessage } from '../../seata-protocol/rpc-message'
import { Socket } from 'net'
import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message'
import { ProtocolV1Encoder } from '../v1/protocol-v1-encoder'
import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message'

export default class AbstractSeataRemoting {
export default abstract class AbstractSeataRemoting {
protected transport: Socket

constructor(transport: Socket) {
this.transport = transport
}

protected buildRequestMessage(msg: Object, messageType: number) {
const rpcMessage = new RpcMessage()
rpcMessage.setId(genNextId())
rpcMessage.setMessageType(messageType)
rpcMessage.setCodec(prot.CONFIGURED_CODEC)
rpcMessage.setCompressor(prot.CONFIGURED_COMPRESSOR)
rpcMessage.setBody(msg)
return rpcMessage
return new RpcMessage()
.setId(genNextId())
.setMessageType(messageType)
.setCodec(prot.CONFIGURED_CODEC)
.setCompressor(prot.CONFIGURED_COMPRESSOR)
.setBody(msg)
}

protected buildResponseMessage(
rpcMessage: RpcMessage,
msg: Object,
messageType: number,
) {
const rpcMsg = new RpcMessage()
rpcMsg.setMessageType(messageType)
rpcMsg.setCodec(rpcMessage.getCodec()) // same with request
rpcMsg.setCompressor(rpcMessage.getCompressor())
rpcMsg.setBody(msg)
rpcMsg.setId(rpcMessage.getId())
return rpcMsg
return new RpcMessage()
.setMessageType(messageType)
.setCodec(rpcMessage.getCodec()) // same with request
.setCompressor(rpcMessage.getCompressor())
.setBody(msg)
.setId(rpcMessage.getId())
}

protected send(msg: Object) {
if (!this.transport) {
return
}

const rpcMessage = this.buildRequestMessage(
msg,
msg instanceof HeartbeatMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
* limitations under the License.
*/

import { Socket } from 'net'
import { EventEmitter } from 'events'

import { Socket } from 'node:net'
import { EventEmitter } from 'node:events'
import SeataTcpBuffer from './seata-tcp-buffer'
import { RpcMessage } from '../../seata-protocol/rpc-message'
import { MessageType } from '../../seata-protocol/message-type'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@
* limitations under the License.
*/

import { Socket } from 'net'
import { Socket } from 'node:net'
import { Buffer } from 'node:buffer'
import debug from 'debug'

import { noop } from '../../seata-common/util'
import ByteBuffer from '../../seata-common/byte-buffer'
import prot from '../../seata-protocol/protocol-constants'

const log = debug('seata:tcp-buffer')

export interface SeataTcpBufferSubscriber {
(data: Buffer): void
}

const log = debug('seata:tcp-buffer')

/**
* 在并发的tcp数据传输中,会出现少包,粘包的现象
* 好在tcp的传输是可以保证顺序的
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/

import { Socket } from 'net'
import debug from 'debug'
import { HeartbeatMessage } from '../../seata-protocol/heartbeat-message'
import AbstractSeataRemoting from './seata-abstract-remoting'

const log = debug('seata:heartbeat~')
// reference: NettyBasicConfig.java
const DEFAULT_WRITE_IDLE_SECONDS = 5_000

Expand Down Expand Up @@ -60,8 +62,9 @@ export class SeataHeartBeat extends AbstractSeataRemoting {
/**
* receive heartbeat response message
*/
async receive(): Promise<void> {
receive() {
this.lastActivityTime = Date.now()
log(`receive heartbeat pong`)
}

destroy() {
Expand Down
Loading