diff --git a/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md b/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md new file mode 100644 index 0000000000..dfe9a5c048 --- /dev/null +++ b/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md @@ -0,0 +1,208 @@ +--- +title: Seata's RPC Communication Source Code Analysis 01(Transport) +author: Xie Minghua +keywords: [Seata, RPC Module, Protocol ] +date: 2024/08/15 +--- +# Seata's RPC Communication Source Code Analysis 01(Transport) + +## Overview + +In a distributed system, the design of the communication protocol directly affects the reliability and scalability of the system. apache Seata's RPC communication protocol provides the basis for data transfer between components, and source code analysis in this regard is another good way to gain a deeper understanding of seata. In the recent version 2.2.0, I refactored Seata's communication mechanism to support multi-version protocol compatibility, now that the transformation is complete, I will analyze the source code in the new version from the two aspects of the transport mechanism and communication protocol. +This article is the first one to introduce the Seata transport mechanism. + +The main characters of RPC communication in seata are `TC`, `TM` and `RM`, of course, the process may also involve other network interactions such as the registration center and even the configuration center, but these relative contents of the communication mechanism is relatively independent, and will not be discussed in this article. + +I will take you on an exploration following a few intuitive questions I asked when I first learned about the source code. + +## Netty in Seata (who's transmitting) +First question: what is the underlying layer of seata communication responsible for the sending of request messages and receiving of request messages? The answer is Netty, and how does Netty work inside Seata? We will explore the core package org.apache.seata.core.rpc.netty to find out. + + + +From this inheritance hierarchy we can see that `AbstractNettyRemoting` acts as the parent class of the core, which is implemented by RM and TM and Server(TC), and in fact the core send and receive are already implemented inside this class. + +The synchronous sending logic is implemented in `sendSync`, the logic for asynchronous sending `sendAsync` is similar and simpler, so I won't repeat it here, just get the channel and send it. +```java +protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { + // Non-critical code omitted here + + MessageFuture messageFuture = new MessageFuture(); + messageFuture.setRequestMessage(rpcMessage); + messageFuture.setTimeout(timeoutMillis); + futures.put(rpcMessage.getId(), messageFuture); + + channelWritableCheck(channel, rpcMessage.getBody()); + + String remoteAddr = ChannelUtil.getAddressFromChannel(channel); + doBeforeRpcHooks(remoteAddr, rpcMessage); + + // (netty write) + channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); + if (messageFuture1 != null) { + messageFuture1.setResultMessage(future.cause()); + } + destroyChannel(future.channel()); + } + }); + + try { + Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + doAfterRpcHooks(remoteAddr, rpcMessage, result); + return result; + } catch (Exception exx) { + // Non-critical code omitted here + } + } +``` +And the way messages are received is mainly in the processMessage method, which is called by the classes `AbstractNettyRemotingClient.ClientHandler` and `AbstractNettyRemotingServer.ServerHandler`. ChannelRead, both of which are subclasses of `ChannelDuplexHandler`, are each registered in the client and server bootstrap (why register to the bootstrap to be able to do the receiving?). You have to move to the netty principle for this one) + + + +Once the message is received it is called into the `processMessage` method of the parent class, let's take a look at the source code +```java +protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + // Non-critical code + Object body = rpcMessage.getBody(); + if (body instanceof MessageTypeAware) { + MessageTypeAware messageTypeAware = (MessageTypeAware) body; + final Pair pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); + if (pair != null) { + // FIRST is Processor for normal processing, and SECOND is Thread Pool for pooled processing. + if (pair.getSecond() != null) { + try { + pair.getSecond().execute(() -> { + try { + pair.getFirst().process(ctx, rpcMessage); + } catch (Throwable th) { + LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); + } finally { + MDC.clear(); + } + }); + } catch (RejectedExecutionException e) { + // Non-critical code + } + } else { + try { + pair.getFirst().process(ctx, rpcMessage); + } catch (Throwable th) { + LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); + } + } + } else { + LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); + } + } else { + LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); + } + } +``` +These processors and executors are actually processors registered by the client and server: here are some of the processors, which correspond to different MessageTypes, and here is an example of the registration of some of them (they are registered in the NettyRemotingServer# registerProcessor) +```java + super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); + super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); +``` +You can see that these processors are actually the processors for seata's various commit rollbacks and so on. + +## NettyChannel in Seata (how channels are managed) +So, the second question, since netty relies on a channel to send and receive, how will this channel come about? Will it always be held? If it breaks, how do we reconnect it? The answer can be found in the `ChannelManager` and the `processor` of the two regs above. + +When RM/TM gets the address of the server and registers (the first time it communicates), if the server can successfully parse the message and find it is a REG message, it will enter `regRmProcessor`/`regTmProcessor`, take TM as an example here. + +```java +// server RegTmProcessor + private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { + RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); + String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + boolean isSuccess = false; + String errorInfo = StringUtils.EMPTY; + try { + if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) { + // Register the channel in the ChannelManager, it can be expected that after the registration, the server will be able to get the channel when it sendsSync(channel,xxx). + ChannelManager.registerTMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + isSuccess = true; + } + } catch (Exception exx) { + isSuccess = false; + errorInfo = exx.getMessage(); + LOGGER.error("TM register fail, error message:{}", errorInfo); + } + RegisterTMResponse response = new RegisterTMResponse(isSuccess); + // async response + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); + // ... + } + +// ChannelManager + public static void registerTMChannel(RegisterTMRequest request, Channel channel) + throws IncompatibleVersionException { + RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(), + request.getApplicationId(), + request.getTransactionServiceGroup(), + null, channel); + rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); + String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR + ChannelUtil.getClientIpFromChannel(channel); + ConcurrentMap clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS, clientIdentified, key -> new ConcurrentHashMap<>()); + rpcContext.holdInClientChannels(clientIdentifiedMap); + } +``` +The ChannelManager manages `RM_CHANNELS` and `RM_CHANNELS`, two complex maps, especially RM_CHANNELS which has 4 layers (resourceId -> applicationId -> ip -> port -> RpcContext). + +Having said that the server manages the channel, what about the client? This map management is a little simpler, that is, after successful registration in the onRegisterMsgSuccess also use a `NettyClientChannelManager` in registerChannel, subsequent interaction with the server as much as possible with this channel. + +The third problem is that the client can create a new channel if the channel is not available, +but what if the server receives it and realizes that it is a new channel? +Or what if the server realizes that the channel is not available when it replies asynchronously? +The answer is still in the `NettyClientChannelManager`, which is relatively complex, the client side need to use the channel, +in fact, managed by an object pool `nettyClientKeyPool`, which is an apache object pool, +so when the channel is unavailable, it will also be managed by this pool. +This is an Apache objectPool, Thus, when the channel is unavailable, it will be created with the help of this pool and then returned to the pool after use. +This object pool actually holds the `RegisterTMRequest` at all times, just as it did when it first came in, +so every time a channel is created , a registration occurs. +```java +// NettyClientChannelManager + public Channel makeObject(NettyPoolKey key) { + InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("NettyPool create channel to " + key); + } + Channel tmpChannel = clientBootstrap.getNewChannel(address); + Object response; + Channel channelToServer = null; + // key RegisterTMRequest + if (key.getMessage() == null) { + throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name()); + } + try { + // a register operation + response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage()); + if (!isRegisterSuccess(response, key.getTransactionRole())) { + rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage()); + } else { + channelToServer = tmpChannel; + rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage()); + } + } + // ... + + return channelToServer; + } +``` + +## Summarize +In this article we learned how seata transfers data with the help of netty, to better see the full picture of netty processing, I created a hierarchical diagram + + + +We have already talked about the processing of serverHandler/clientHandler and NettyRemoting (including RM, TM, TC) when the request is sent, and we know the process from the external to the netty processor and then to the internal DefaultCoordinator, but we are still missing Decoder/Encoder. Didn't talk about it, the parsing/encapsulation of the protocol will be done here, serialization and deserialization will also be done, see [Seata's RPC Communication Source Code Analysis 02(Protocol)](seata-rpc-multi-protocol02.md) + diff --git a/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md b/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md new file mode 100644 index 0000000000..dfc1bcac88 --- /dev/null +++ b/i18n/en/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md @@ -0,0 +1,304 @@ +--- +title: Seata's RPC Communication Source Code Analysis 02(Multi-Version Protocols) +author: Xie Minghua +keywords: [Seata, RPC Module, Protocol ] +date: 2024/08/15 +--- +# Seata's RPC Communication Source Code Analysis 02(Multi-Version Protocols) + +### Overview + +In the previous article,[Seata's RPC Communication Source Code Analysis 01(Transport)](seata-rpc-multi-protocol01.md)we introduced the transmission mechanism of RPC communication. In this article, we will continue with the protocol part, completing the unaddressed encode/decode sections in the diagram. + + + +Similarly, we will delve into the topic using a question-driven approach. In this article, we aim not only to understand how binary data is parsed into the rpcMsg type but also to explore how different protocol versions are supported. So, the first question is: What does the protocol look like? +## Structure of Protocol + + +The diagram illustrates the changes in the protocol before and after version 0.7.1 (you can also refer to the comments in ProtocolDecoderV1, and for older versions, check ProtocolV1Decoder). In the new version, the protocol consists of the following components: + +- magic-code: 0xdada +- protocol-version: Version number +- full-length: Total length +- head-length: Header length +- msgtype: Message type +- serializer/codecType: Serialization method +- compress: Compression method +- requestid: Request ID + + +Here, we will explain the differences in protocol handling across various versions of Seata's server: +- version`<`0.7.1 : Can only handle the v0 version of the protocol (the upper part of the diagram, which includes the flag section) and cannot recognize other protocol versions. +- 0.7.1`<=`version`<`2.2.0 : Can only handle the v1 version of the protocol (the lower part of the diagram) and cannot recognize other protocol versions. +- version`>=`2.2.0 : Can recognize and process both v0 and v1 versions of the protocol. + +So, how does version 2.2.0 achieve compatibility? Let's keep that a mystery for now. Before explaining this, let's first take a look at how the v1 encoder and decoder operate. It is important to note that, just like the transmission mechanism we discussed earlier, protocol handling is also shared between the client and server. Therefore, the logic we will discuss below applies to both. +## From ByteBuf to RpcMessage (What the Encoder/Decoder Does) +First`ProtocolDecoderV1` +```java + public RpcMessage decodeFrame(ByteBuf frame) { + byte b0 = frame.readByte(); + byte b1 = frame.readByte(); + + // get version + byte version = frame.readByte(); + // get header,body,... + int fullLength = frame.readInt(); + short headLength = frame.readShort(); + byte messageType = frame.readByte(); + byte codecType = frame.readByte(); + byte compressorType = frame.readByte(); + int requestId = frame.readInt(); + + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); + rpcMessage.setCodec(codecType); + rpcMessage.setId(requestId); + rpcMessage.setCompressor(compressorType); + rpcMessage.setMessageType(messageType); + + // header + int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH; + if (headMapLength > 0) { + Map map = HeadMapSerializer.getInstance().decode(frame, headMapLength); + rpcMessage.getHeadMap().putAll(map); + } + + if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { + rpcMessage.setBody(HeartbeatMessage.PING); + } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + rpcMessage.setBody(HeartbeatMessage.PONG); + } else { + int bodyLength = fullLength - headLength; + if (bodyLength > 0) { + byte[] bs = new byte[bodyLength]; + frame.readBytes(bs); + // According to the previously extracted compressorType, decompression is performed as needed. + Compressor compressor = CompressorFactory.getCompressor(compressorType); + bs = compressor.decompress(bs); + SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec()); + if (this.supportDeSerializerTypes.contains(protocolType)) { + // Since this is the ProtocolDecoderV1 specifically for version 1, the serializer can directly use version1 as input. + Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1); + rpcMessage.setBody(serializer.deserialize(bs)); + } else { + throw new IllegalArgumentException("SerializerType not match"); + } + } + } + return rpcMessage.protocolMsg2RpcMsg(); + } +``` +Since the encode operation is the exact reverse of the decode operation, we won’t go over it again. Let’s continue by examining the serialize operation. +the serialize comes from `SerializerServiceLoader` +```java + public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException { + // PROTOBUF + if (type == SerializerType.PROTOBUF) { + try { + ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME); + } catch (ClassNotFoundException e) { + throw new EnhancedServiceNotFoundException("'ProtobufSerializer' not found. " + + "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency ", e); + } + } + + String key = serialzerKey(type, version); + //Here is a SERIALIZER_MAP, which acts as a cache for serializer classes. The reason for caching is that the scope of SeataSerializer is set to Scope.PROTOTYPE, which prevents the class from being created multiple times. + Serializer serializer = SERIALIZER_MAP.get(key); + if (serializer == null) { + if (type == SerializerType.SEATA) { + // SPI of seata + serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version}); + } else { + serializer = EnhancedServiceLoader.load(Serializer.class, type.name()); + } + SERIALIZER_MAP.put(key, serializer); + } + return serializer; + } + + public SeataSerializer(Byte version) { + if (version == ProtocolConstants.VERSION_0) { + versionSeataSerializer = SeataSerializerV0.getInstance(); + } else if (version == ProtocolConstants.VERSION_1) { + versionSeataSerializer = SeataSerializerV1.getInstance(); + } + if (versionSeataSerializer == null) { + throw new UnsupportedOperationException("version is not supported"); + } + } +``` +With this, the decoder obtains a Serializer. When the program reaches`rpcMessage.setBody(serializer.deserialize(bs))`, +let's take a look at how the deserialize method processes the data. +```java + public T deserialize(byte[] bytes) { + return deserializeByVersion(bytes, ProtocolConstants.VERSION_0); + } + private static T deserializeByVersion(byte[] bytes, byte version) { + //The previous part involves validity checks, which we will skip here. + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + short typecode = byteBuffer.getShort(); + ByteBuffer in = byteBuffer.slice(); + //create Codec + AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typecode); + MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode, version); + //codec decode + messageCodec.decode(abstractMessage, in); + return (T) abstractMessage; + } +``` +This serialize does not contain much logic, the key components is in the MessageCodecFactory and Codec, let's delve deeper. +You can see that `MessageCodecFactory` has quite a lot of content, but in a single form, they all return message and codec according to MessageType, +so we won't show the content of factory here, we will directly look at message and codec, that is, `messageCodec.decode( abstractMessage, in)`, +although there are still a lot of codec types, but we can see that their structure is similar, parsing each field: +```java + // BranchRegisterRequestCodec decode + public void decode(T t, ByteBuffer in) { + BranchRegisterRequest branchRegisterRequest = (BranchRegisterRequest)t; + + // get xid + short xidLen = in.getShort(); + if (xidLen > 0) { + byte[] bs = new byte[xidLen]; + in.get(bs); + branchRegisterRequest.setXid(new String(bs, UTF8)); + } + // get branchType + branchRegisterRequest.setBranchType(BranchType.get(in.get())); + short len = in.getShort(); + if (len > 0) { + byte[] bs = new byte[len]; + in.get(bs); + branchRegisterRequest.setResourceId(new String(bs, UTF8)); + } + // get lockKey + int iLen = in.getInt(); + if (iLen > 0) { + byte[] bs = new byte[iLen]; + in.get(bs); + branchRegisterRequest.setLockKey(new String(bs, UTF8)); + } + // get applicationData + int applicationDataLen = in.getInt(); + if (applicationDataLen > 0) { + byte[] bs = new byte[applicationDataLen]; + in.get(bs); + branchRegisterRequest.setApplicationData(new String(bs, UTF8)); + } + } +``` +Well, by this point, we've got the branchRegisterRequest, which can be handed off to the TCInboundHandler for processing. + +But the problem is again, we only see the client (RM/TM) has the following kind of code to add encoder/decoder, that is, we know the client are using the current version of encoder/decoder processing: +```java + bootstrap.handler( + new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds())) + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + if (channelHandlers != null) { + addChannelPipelineLast(ch, channelHandlers); + } + } + }); +``` +But how does server handle it? And what about the promised multi-version protocol? + +## Multi-version protocol (version recognition and binding) +Let's start by looking at a class diagram for encoder/decoder: + + + +ProtocolDecoderV1 we have analyzed, ProtocolEncoderV1 is the reverse operation, it should be better understood, as for ProtocolDecoderV0 and ProtocolEncoderV0, from the diagram you can also see that they are in parallel with v1, except for the operation of v0 (although so far we haven't put him to use yet), they are both subclasses of the typical encode and decode in netty, but what about MultiProtocolDecoder? He's the protagonist of the MultiProtocolDecoder and is registered into the server's bootstrap at startup. + +```java + protected boolean isV0(ByteBuf in) { + boolean isV0 = false; + in.markReaderIndex(); + byte b0 = in.readByte(); + byte b1 = in.readByte(); + // In fact, identifying the protocol relies on the 3rd byte (b2), as long as it is a normal new version, b2 is the version number greater than 0. For versions below 0.7, b2 is the first bit of the FLAG, which just so happens to be 0 in either case! + // v1/v2/v3 : b2 = version + // v0 : b2 = 0 ,1st byte in FLAG(2byte:0x10/0x20/0x40/0x80) + byte b2 = in.readByte(); + if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0 && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1 && 0 == b2) { + isV0 = true; + } + // The read bytes have to be reset back in order for each version of the decoder to parse them from scratch. + in.resetReaderIndex(); + return isV0; + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + ByteBuf frame; + Object decoded; + byte version; + try { + // Identify the version number and get the current version number + if (isV0(in)) { + decoded = in; + version = ProtocolConstants.VERSION_0; + } else { + decoded = super.decode(ctx, in); + version = decideVersion(decoded); + } + + if (decoded instanceof ByteBuf) { + frame = (ByteBuf) decoded; + ProtocolDecoder decoder = protocolDecoderMap.get(version); + ProtocolEncoder encoder = protocolEncoderMap.get(version); + try { + if (decoder == null || encoder == null) { + throw new UnsupportedOperationException("Unsupported version: " + version); + } + // First time invoke ,use a well-judged decoder for the operation + return decoder.decodeFrame(frame); + } finally { + if (version != ProtocolConstants.VERSION_0) { + frame.release(); + } + // First time invoke , bind the encoder and decoder corresponding to the version, which is equivalent to binding the channel + ctx.pipeline().addLast((ChannelHandler)decoder); + ctx.pipeline().addLast((ChannelHandler)encoder); + if (channelHandlers != null) { + ctx.pipeline().addLast(channelHandlers); + } + // After binding, remove itself and do not judge it subsequently + ctx.pipeline().remove(this); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + + protected byte decideVersion(Object in) { + if (in instanceof ByteBuf) { + ByteBuf frame = (ByteBuf) in; + frame.markReaderIndex(); + byte b0 = frame.readByte(); + byte b1 = frame.readByte(); + if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { + throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); + } + + byte version = frame.readByte(); + frame.resetReaderIndex(); + return version; + } + return -1; + } +``` +With the above analysis, v0 finally comes in handy (when a client with an older version registers, +the server assigns it a lower version of encoder/decoder), and we've figured out how multi-version protocols are recognized and bound. + + + + + + diff --git a/i18n/en/docusaurus-plugin-content-docs/current/user/quickstart.md b/i18n/en/docusaurus-plugin-content-docs/current/user/quickstart.md index 6dd024c2f7..9ef2ae57a7 100644 --- a/i18n/en/docusaurus-plugin-content-docs/current/user/quickstart.md +++ b/i18n/en/docusaurus-plugin-content-docs/current/user/quickstart.md @@ -218,3 +218,26 @@ sh seata-server.sh -p 8091 -h 127.0.0.1 -m file ### Step 5: Run example Go to samples repo: [seata-samples/at-samples](https://github.com/apache/incubator-seata-samples/tree/master/at-sample), and find a suitable dependency setup. Start `Account`, `Storage`, `Order`, `Business` services accordingly. + +## RocketMQ Integration to Seata + +Using RocketMQ as a participant in seata global transaction is simple, +First, make sure you have introduced seata-all or springboot-starter of seata dependency. + +Create the producer by `SeataMQProducerFactory`, then send messages by `SeataMQProducer`. Here is an example: +```java +public class BusinessServiceImpl implements BusinessService { + private static final String NAME_SERVER = "127.0.0.1:9876"; + private static final String PRODUCER_GROUP = "test-group"; + private static final String TOPIC = "test-topic"; + private static SeataMQProducer producer= SeataMQProducerFactory.createSingle(NAME_SERVER, PRODUCER_GROUP); + + public void purchase(String userId, String commodityCode, int orderCount) { + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + //do something + } +} +``` +The effect of this approach is that the production message acts as a participant RM in the seata global transaction. When the 1st phase of the global transaction is completed, the MQ message will be committed or rollback based on the transaction 2nd phase’s request, +the message will not be consumed until then. +Note: If there is no xid in the current thread, the producer will degrade to a normal send instead of sending a half-message. diff --git a/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md index 1fc0c3c1a7..6c85163f28 100644 --- a/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md +++ b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md @@ -197,6 +197,6 @@ protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) -上面已经讲了请求发送时,serverHandler/clientHandler和NettyRemoting(包括RM、TM、TC)的处理,知道了从外部到netty处理器再到内部的DefaultCoodinator的过程,但我们还缺Decoder/Encoder没讲,这里面会进行协议的解析/封装,也会进行序列化和反序列化,请看 [Seata的RPC通信源码分析02:协议篇](seata-rpc-multi-protocol02.md) +上面已经讲了请求发送时,serverHandler/clientHandler和NettyRemoting(包括RM、TM、TC)的处理,知道了从外部到netty处理器再到内部的DefaultCoordinator的过程,但我们还缺Decoder/Encoder没讲,这里面会进行协议的解析/封装,也会进行序列化和反序列化,请看 [Seata的RPC通信源码分析02:协议篇](seata-rpc-multi-protocol02.md) diff --git a/i18n/zh-cn/docusaurus-plugin-content-docs/current/user/quickstart.md b/i18n/zh-cn/docusaurus-plugin-content-docs/current/user/quickstart.md index a64a8fa360..7f1e327ccd 100644 --- a/i18n/zh-cn/docusaurus-plugin-content-docs/current/user/quickstart.md +++ b/i18n/zh-cn/docusaurus-plugin-content-docs/current/user/quickstart.md @@ -216,3 +216,27 @@ sh seata-server.sh -p 8091 -h 127.0.0.1 -m file ### 步骤 5: 运行示例 示例仓库: [seata-samples/at-samples](https://github.com/apache/incubator-seata-samples/tree/master/at-sample)。找到合适的依赖项设置,按顺序启动 `Account`, `Storage`, `Order`, `Business` 服务。 + + +## RocketMQ 接入 Seata + +使用RocketMQ作为Seata分布式事务的参与者很简单,先确保已经引入了seata-all或者seata的springboot-starter依赖。 + +然后通过`SeataMQProducerFactory`创建生产者,然后通过 `SeataMQProducer` 可以直接使用 RocketMQ 发送消息。以下是一个例子: + + +```java +public class BusinessServiceImpl implements BusinessService { + private static final String NAME_SERVER = "127.0.0.1:9876"; + private static final String PRODUCER_GROUP = "test-group"; + private static final String TOPIC = "test-topic"; + private static SeataMQProducer producer= SeataMQProducerFactory.createSingle(NAME_SERVER, PRODUCER_GROUP); + + public void purchase(String userId, String commodityCode, int orderCount) { + producer.send(new Message(TOPIC, "testMessage".getBytes(StandardCharsets.UTF_8))); + //do something + } +} +``` +这样达到的效果是:生产消息作为Seata分布式事务的参与者RM,当全局事务的一阶段完成,这个MQ消息会根据二阶段要求commit/rollback进行消息的提交或撤回,在此之前消息不会被消费。 +注: 当前线程中如果没有xid,该producer会退化为普通的send,而不是发送半消息