使用前必须import com.github.kdyzm.queue.core.config.QueueConfig
<dependency>
<groupId>com.github.kdyzm.queue</groupId>
<artifactId>queue-core</artifactId>
<version>1.1.0</version>
<dependency>
添加MQ配置信息必须实现MQConnection
接口
@Component
public class ConnectionConfig implements MQConnection {
@Value("${mq.secretKey}")
private String secretKey;
@Value("${mq.accessKey}")
private String accessKey;
@Value("${mq.onsAddr}")
private String onsAddr;
@Value("${mq.prefix}")
private String prefix;
@Override
public String getAccessKey() {
return accessKey;
}
@Override
public String getSecretKey() {
return secretKey;
}
@Override
public String getONSAddr() {
return onsAddr;
}
@Override
public String getPrefix() {
return prefix;
}
}
消费者提供api供生产者发送消息到指定的Topic,并实现api以订阅该Topic;比如A发送消息给B,B定义api并实现订阅,过程中,A是生产者,B是消费者
定义模板如下
@Topic(name="kdyzm",producerId="kdyzm_producer")
public interface UserQueueResource {
@Tag("test1")
public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user);
@Tag("test2")
public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user);
}
注意,这里的topicName
物理上的名字为环境变量前缀_kdyzm
,比如QA环境上为QA_kdyzm
;producerId
物理上的名字为PID_环境变量前缀_kdyzm_producer
,比如,QA环境为PID_QA_kdyzm_producer
package org.server2.api.config;
import org.server2.api.queue.UserQueueResource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.github.kdyzm.queue.core.factory.QueueResourceFactory;
@Configuration
public class QueueConfig {
@Autowired
@Bean
public UserQueueResource userQueueResource() {
return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class);
}
}
实现类需要加上
@QueueResource
注解 实现类方法上必须加上@ConsumerAnnotation
注解以提供consumerId
package org.server2.server.queue.impl;
import org.server2.api.models.UserModel;
import org.server2.api.queue.UserQueueResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import com.google.gson.Gson;
import com.github.kdyzm.queue.core.annotation.ConsumerAnnotation;
import com.github.kdyzm.queue.core.annotation.QueueResource;
@Controller
@QueueResource
public class UserQueueResourceImpl implements UserQueueResource {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
@ConsumerAnnotation("kdyzm_consumer")
public void handleUserInfo(UserModel user) {
logger.info("收到消息1:{}", new Gson().toJson(user));
}
@Override
@ConsumerAnnotation("kdyzm_consumer1")
public void handleUserInfo1(UserModel user) {
logger.info("收到消息2:{}", new Gson().toJson(user));
}
}
注意,这里的consumerId
为物理上形式为CID_环境变量前缀_kdyzm_consumer
,比如QA环境为CID_QA_kdyzm_consumer
@Import({ org.server2.api.config.QueueConfig.class,com.github.kdyzm.queue.core.config.QueueConfig.class })
@Autowired
private UserQueueResource userQueueResource;
@Override
public void sendMessage() {
UserModel userModel=new UserModel();
userModel.setName("kdyzm");
userModel.setAge(25);
userQueueResource.handleUserInfo(userModel);
}
待定