Skip to content

Commit

Permalink
update: fastjson --> 1.2.49 & 序列化反序列化全部改为fastjson来实现
Browse files Browse the repository at this point in the history
  • Loading branch information
Wizzercn committed Aug 6, 2018
1 parent 1ca2b5e commit 05bc228
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 25 deletions.
2 changes: 1 addition & 1 deletion mqtt-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>mqtt-wk</artifactId>
<groupId>cn.wizzer</groupId>
<version>1.0.2-netty</version>
<version>1.0.3-netty</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion mqtt-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>mqtt-wk</artifactId>
<groupId>cn.wizzer</groupId>
<version>1.0.2-netty</version>
<version>1.0.3-netty</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package cn.wizzer.iot.mqtt.server.broker.cluster;

import cn.hutool.core.util.HexUtil;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage;
import cn.wizzer.iot.mqtt.server.common.subscribe.SubscribeStore;
import cn.wizzer.iot.mqtt.server.store.message.MessageIdService;
import cn.wizzer.iot.mqtt.server.store.session.SessionStoreService;
import cn.wizzer.iot.mqtt.server.store.subscribe.SubscribeStoreService;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
Expand All @@ -16,7 +16,6 @@
import org.nutz.integration.jedis.pubsub.PubSubService;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.lang.Lang;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -52,13 +51,13 @@ public void init() {

@Override
public void onMessage(String channel, String message) {
InternalMessage internalMessage = Lang.fromBytes(HexUtil.decodeHex(message), InternalMessage.class);
InternalMessage internalMessage = JSONObject.parseObject(message, InternalMessage.class);
this.sendPublishMessage(internalMessage.getClientId(), internalMessage.getTopic(), MqttQoS.valueOf(internalMessage.getMqttQoS()), internalMessage.getMessageBytes(), internalMessage.isRetain(), internalMessage.isDup());
}

@Async
public void sendMessage(InternalMessage internalMessage) {
pubSubService.fire(CLUSTER_TOPIC, HexUtil.encodeHexStr(Lang.toBytes(internalMessage)));
pubSubService.fire(CLUSTER_TOPIC, JSONObject.toJSONString(internalMessage));
}

private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) {
Expand All @@ -73,7 +72,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS,
new MqttPublishVariableHeader(topic, 0), ByteBuffer.wrap(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value());
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
Expand All @@ -85,7 +84,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS,
new MqttPublishVariableHeader(topic, messageId), ByteBuffer.wrap(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
Expand All @@ -97,7 +96,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS,
new MqttPublishVariableHeader(topic, messageId), ByteBuffer.wrap(messageBytes));
LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId);
ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId());
if(channelId!=null) {
if (channelId != null) {
Channel channel = channelGroup.find(channelId);
if (channel != null) channel.writeAndFlush(publishMessage);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package cn.wizzer.iot.mqtt.server.broker.service;

import cn.hutool.core.util.HexUtil;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -29,7 +29,7 @@ public class KafkaService {
public void send(InternalMessage internalMessage) {
try {
//消息体转换为Hex字符串进行转发
ProducerRecord<String, String> data = new ProducerRecord<>(brokerProperties.getProducerTopic(), internalMessage.getTopic(), HexUtil.encodeHexStr(internalMessage.getMessageBytes()));
ProducerRecord<String, String> data = new ProducerRecord<>(brokerProperties.getProducerTopic(), internalMessage.getTopic(), JSONObject.toJSONString(internalMessage));
kafkaProducer.send(data,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion mqtt-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>mqtt-wk</artifactId>
<groupId>cn.wizzer</groupId>
<version>1.0.2-netty</version>
<version>1.0.3-netty</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion mqtt-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>mqtt-wk</artifactId>
<groupId>cn.wizzer</groupId>
<version>1.0.2-netty</version>
<version>1.0.3-netty</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
Expand Down
6 changes: 3 additions & 3 deletions mqtt-zoo/mqtt-test-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>cn.wizzer</groupId>
<artifactId>mqtt-test-kafka</artifactId>
<version>1.0.0-netty</version>
<version>1.0.3-netty</version>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<properties>
<mqttwk.version>1.0.0-netty</mqttwk.version>
<mqttwk.version>1.0.3-netty</mqttwk.version>
<nutzboot.version>2.3-SNAPSHOT</nutzboot.version>
<netty.version>4.1.28.Final</netty.version>
<fastjson.version>1.2.47</fastjson.version>
<fastjson.version>1.2.49</fastjson.version>
<hutool.version>4.1.2</hutool.version>
<kafka_2.12.version>2.0.0</kafka_2.12.version>
<slf4j-log4j12.version>1.7.25</slf4j-log4j12.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package cn.wizzer.iot.mqtt.server.test;

import java.io.Serializable;
/**
* 内部消息
*/
public class InternalMessage implements Serializable {

private static final long serialVersionUID = -1L;

//当前频道clientId
private String clientId;

private String topic;

private int mqttQoS;

private byte[] messageBytes;

private boolean retain;

private boolean dup;

public String getClientId() {
return clientId;
}

public InternalMessage setClientId(String clientId) {
this.clientId = clientId;
return this;
}

public String getTopic() {
return topic;
}

public InternalMessage setTopic(String topic) {
this.topic = topic;
return this;
}

public int getMqttQoS() {
return mqttQoS;
}

public InternalMessage setMqttQoS(int mqttQoS) {
this.mqttQoS = mqttQoS;
return this;
}

public byte[] getMessageBytes() {
return messageBytes;
}

public InternalMessage setMessageBytes(byte[] messageBytes) {
this.messageBytes = messageBytes;
return this;
}

public boolean isRetain() {
return retain;
}

public InternalMessage setRetain(boolean retain) {
this.retain = retain;
return this;
}

public boolean isDup() {
return dup;
}

public InternalMessage setDup(boolean dup) {
this.dup = dup;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package cn.wizzer.iot.mqtt.server.test;

import cn.hutool.core.util.HexUtil;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.nutz.boot.NbApp;
import org.nutz.ioc.impl.PropertiesProxy;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import org.nutz.mvc.annotation.Modules;
Expand Down Expand Up @@ -44,15 +43,15 @@ public Properties getProperties() {
}

public void init() {
KafkaConsumer kafkaConsumer=new KafkaConsumer(getProperties());
KafkaConsumer kafkaConsumer = new KafkaConsumer(getProperties());
//kafka消费消息,接收MQTT发来的消息
kafkaConsumer.subscribe(Arrays.asList(conf.get("mqttwk.broker.kafka.producer.topic")));
int sum=0;
int sum = 0;
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
log.debugf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), new String(HexUtil.decodeHex(record.value())));
log.debugf("总计收到 %s条",++sum);
log.debugf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), JSONObject.parseObject(record.value(), InternalMessage.class));
log.debugf("总计收到 %s条", ++sum);
}
}

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>cn.wizzer</groupId>
<artifactId>mqtt-wk</artifactId>
<packaging>pom</packaging>
<version>1.0.2-netty</version>
<version>1.0.3-netty</version>
<name>MqttWk</name>
<modules>
<module>mqtt-common</module>
Expand All @@ -15,10 +15,10 @@
<module>mqtt-store</module>
</modules>
<properties>
<mqttwk.version>1.0.2-netty</mqttwk.version>
<mqttwk.version>1.0.3-netty</mqttwk.version>
<nutzboot.version>2.3-SNAPSHOT</nutzboot.version>
<netty.version>4.1.28.Final</netty.version>
<fastjson.version>1.2.47</fastjson.version>
<fastjson.version>1.2.49</fastjson.version>
<hutool.version>4.1.2</hutool.version>
<kafka_2.12.version>2.0.0</kafka_2.12.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down

0 comments on commit 05bc228

Please sign in to comment.