Skip to content

Commit

Permalink
feat(amqp): implement amqp binding proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
timonback committed Aug 27, 2024
1 parent 24f00a1 commit 1f369d9
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ public class ReferenceUtil {
private static final String FORBIDDEN_ID_CHARACTER = "/";

public static String toValidId(String name) {
return name.replaceAll(FORBIDDEN_ID_CHARACTER, "_");
return name.replaceAll(FORBIDDEN_ID_CHARACTER, "_"); // TODO: easier to verify correct usage +"_id";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
import io.github.springwolf.asyncapi.v3.bindings.ChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.MessageBinding;
import io.github.springwolf.asyncapi.v3.bindings.OperationBinding;
import io.github.springwolf.asyncapi.v3.model.ReferenceUtil;
import io.github.springwolf.asyncapi.v3.model.schema.SchemaObject;

import java.util.Map;

public interface BindingFactory<T> {
String getChannelName(T annotation);
default String getChannelId(T annotation) {
return ReferenceUtil.toValidId(getChannelName(annotation));
}
String getChannelName(T annotation);

Map<String, ChannelBinding> buildChannelBinding(T annotation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private ChannelObject buildChannelItem(ClassAnnotation classAnnotation, Map<Stri
Map<String, ChannelBinding> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String channelName = bindingFactory.getChannelName(classAnnotation);
return ChannelObject.builder()
.channelId(ReferenceUtil.toValidId(channelName))
.channelId(bindingFactory.getChannelId(classAnnotation))
.address(channelName)
.bindings(chBinding)
.messages(new HashMap<>(messages))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private ChannelObject buildChannelItem(MethodAnnotation annotation, MessageObjec
Map<String, ChannelBinding> chBinding = channelBinding != null ? new HashMap<>(channelBinding) : null;
String channelName = bindingFactory.getChannelName(annotation);
return ChannelObject.builder()
.channelId(ReferenceUtil.toValidId(channelName))
.channelId(bindingFactory.getChannelId(annotation))
.address(channelName)
.messages(Map.of(message.getMessageId(), MessageReference.toComponentMessage(message)))
.bindings(chBinding)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ protected Map<String, MessageReference> buildMessages(
.collect(toSet());

if (messageType == MessageType.OPERATION) {
String channelName = bindingFactory.getChannelName(classAnnotation);
return toOperationsMessagesMap(channelName, messages);
String channelId = bindingFactory.getChannelId(classAnnotation);
return toOperationsMessagesMap(channelId, messages);
}
return toMessagesMap(messages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public static Map<String, MessageReference> toMessagesMap(Set<MessageObject> mes
}

public static Map<String, MessageReference> toOperationsMessagesMap(
String channelName, Set<MessageObject> messages) {
if (channelName == null || channelName.isBlank()) {
throw new IllegalArgumentException("channelName must not be empty");
String channelId, Set<MessageObject> messages) {
if (channelId == null || channelId.isBlank()) {
throw new IllegalArgumentException("channelId must not be empty");
}

if (messages.isEmpty()) {
Expand All @@ -46,6 +46,6 @@ public static Map<String, MessageReference> toOperationsMessagesMap(
.collect(Collectors.toMap(
MessageObject::getMessageId,
e -> MessageReference.toChannelMessage(
ReferenceUtil.toValidId(channelName), e.getMessageId())));
channelId, e.getMessageId())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private Stream<Map.Entry<String, Operation>> mapClassToOperation(Class<?> compon
return Stream.empty();
}

String channelName = bindingFactory.getChannelName(classAnnotation);
String channelId = bindingFactory.getChannelId(classAnnotation);
String operationId = StringUtils.joinWith(
"_", ReferenceUtil.toValidId(channelName), OperationAction.RECEIVE, component.getSimpleName());
"_", channelId, OperationAction.RECEIVE, component.getSimpleName());

Operation operation = buildOperation(classAnnotation, annotatedMethods);
annotatedMethods.forEach(method -> customizers.forEach(customizer -> customizer.customize(operation, method)));
Expand All @@ -89,11 +89,11 @@ private Operation buildOperation(ClassAnnotation classAnnotation, Set<Method> me
private Operation buildOperation(ClassAnnotation classAnnotation, Map<String, MessageReference> messages) {
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(classAnnotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelName = bindingFactory.getChannelName(classAnnotation);
String channelId = bindingFactory.getChannelId(classAnnotation);

return Operation.builder()
.action(OperationAction.RECEIVE)
.channel(ChannelReference.fromChannel(ReferenceUtil.toValidId(channelName)))
.channel(ChannelReference.fromChannel(channelId))
.messages(messages.values().stream().toList())
.bindings(opBinding)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ private Map.Entry<String, Operation> mapMethodToOperation(Method method) {

MethodAnnotation annotation = AnnotationScannerUtil.findAnnotationOrThrow(methodAnnotationClass, method);

String channelName = bindingFactory.getChannelName(annotation);
String channelId = bindingFactory.getChannelId(annotation);
String operationId = StringUtils.joinWith(
"_", ReferenceUtil.toValidId(channelName), OperationAction.RECEIVE, method.getName());
"_", channelId, OperationAction.RECEIVE, method.getName());

NamedSchemaObject payloadSchema = payloadMethodParameterService.extractSchema(method);
SchemaObject headerSchema = headerClassExtractor.extractHeader(method, payloadSchema);
Expand All @@ -87,7 +87,7 @@ private Operation buildOperation(
MessageObject message = buildMessage(annotation, payloadType, headerSchema);
Map<String, OperationBinding> operationBinding = bindingFactory.buildOperationBinding(annotation);
Map<String, OperationBinding> opBinding = operationBinding != null ? new HashMap<>(operationBinding) : null;
String channelId = ReferenceUtil.toValidId(bindingFactory.getChannelName(annotation));
String channelId = bindingFactory.getChannelId(annotation);

return Operation.builder()
.action(OperationAction.RECEIVE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public String getChannelName(RabbitListener annotation) {
return RabbitListenerUtil.getChannelName(annotation, stringValueResolver);
}

@Override
public String getChannelId(RabbitListener annotation) {
return RabbitListenerUtil.getChannelId(annotation, stringValueResolver);
}

@Override
public Map<String, ChannelBinding> buildChannelBinding(RabbitListener annotation) {
return RabbitListenerUtil.buildChannelBinding(annotation, stringValueResolver, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ public static String getChannelName(RabbitListener annotation, StringValueResolv
"No channel name was found in @RabbitListener annotation (neither in queues nor bindings property)"));
}

public static String getChannelId(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> channelIdFromAnnotationBindings(binding, resolver));

return Stream.concat(streamQueueNames(annotation), annotationBindingChannelNames)
.map(resolver::resolveStringValue)
.filter(Objects::nonNull)
.peek(queue -> log.debug("Resolved channel id: {}", queue))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException(
"No channel id was found in @RabbitListener annotation (neither in queues nor bindings property)"));
}

private static String getQueueName(RabbitListener annotation, StringValueResolver resolver) {
Stream<String> annotationBindingChannelNames = Arrays.stream(annotation.bindings())
.flatMap(binding -> Stream.of(binding.value().name()));
Expand All @@ -96,7 +110,22 @@ private static Stream<String> channelNameFromAnnotationBindings(

return Arrays.stream(routingKeys)
.map(resolver::resolveStringValue)
.map(routingKey -> String.join("_", queueName, routingKey, exchangeName));
.map(routingKey -> exchangeName);
}

private static Stream<String> channelIdFromAnnotationBindings(
QueueBinding binding, StringValueResolver resolver) {
String queueName = resolver.resolveStringValue(binding.value().name());
String exchangeName = resolver.resolveStringValue(binding.exchange().name());

String[] routingKeys = binding.key();
if (routingKeys.length == 0) {
routingKeys = List.of(DEFAULT_EXCHANGE_ROUTING_KEY).toArray(new String[0]);
}

return Arrays.stream(routingKeys)
.map(resolver::resolveStringValue)
.map(routingKey -> String.join("_", queueName+"-id", routingKey, exchangeName+"-id"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ public Map<String, ChannelObject> scan() {
return queues.stream()
.map(RabbitListenerUtil::buildChannelObject)
.collect(Collectors.toMap(
o -> ((AMQPChannelBinding) o.getBindings().get(RabbitListenerUtil.BINDING_NAME))
.getQueue()
.getName(),
ChannelObject::getChannelId,
c -> c,
(a, b) -> a));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class SendToCustomizer implements OperationCustomizer {
public void customize(Operation operation, Method method) {
SendTo annotation = AnnotationScannerUtil.findAnnotation(SendTo.class, method);
if (annotation != null) {
String channelId = ReferenceUtil.toValidId(bindingFactory.getChannelName(annotation));
String channelId = bindingFactory.getChannelId(annotation);
String payloadName = payloadService.extractSchema(method).name();

operation.setReply(OperationReply.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class SendToUserCustomizer implements OperationCustomizer {
public void customize(Operation operation, Method method) {
SendToUser annotation = AnnotationScannerUtil.findAnnotation(SendToUser.class, method);
if (annotation != null) {
String channelId = ReferenceUtil.toValidId(bindingFactory.getChannelName(annotation));
String channelId = bindingFactory.getChannelId(annotation);
String payloadName = payloadService.extractSchema(method).name();

operation.setReply(OperationReply.builder()
Expand Down

0 comments on commit 1f369d9

Please sign in to comment.