diff --git a/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java b/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java index ce8d84f4..8eb9554d 100644 --- a/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java +++ b/src/main/java/kafdrop/util/ProtobufMessageDeserializer.java @@ -33,15 +33,12 @@ public class ProtobufMessageDeserializer implements MessageDeserializer { private String topic; private String fullDescFile; private String msgTypeName; - private String descFileName; private static final Logger LOG = LoggerFactory.getLogger(ProtobufMessageDeserializer.class); public ProtobufMessageDeserializer(String topic, String fullDescFile, String msgTypeName) { this.topic = topic; this.fullDescFile = fullDescFile; - Path path = Paths.get(fullDescFile); - descFileName = path.getFileName().toString(); this.msgTypeName = msgTypeName; } @@ -49,48 +46,29 @@ public ProtobufMessageDeserializer(String topic, String fullDescFile, String msg public String deserializeMessage(ByteBuffer buffer) { try (InputStream input = new FileInputStream(new File(fullDescFile))) { - // LOG.info("decoding message: " + - // Base64.getEncoder().encodeToString(buffer.array())); - FileDescriptorSet set = FileDescriptorSet.parseFrom(input); - String protoFileName = descFileName.replace(".desc", ".proto"); - Predicate byName = desc -> protoFileName.equals(desc.getName()); - var results = set.getFileList().stream().filter(byName).collect(Collectors.toList()); - if (CollectionUtils.isEmpty(results)) { - final String errorMsg = "Can't find descriptor in provided descriptor file: " + protoFileName; - LOG.error(errorMsg); - throw new DeserializationException(errorMsg); - } List descs = new ArrayList<>(); - for (FileDescriptorProto ffdp : set.getFileList()) { FileDescriptor fd = Descriptors.FileDescriptor.buildFrom(ffdp, (FileDescriptor[]) descs.toArray(new FileDescriptor[descs.size()])); descs.add(fd); } - Predicate fdByName = desc -> protoFileName.equals(desc.getName()); - var fd = descs.stream().filter(fdByName).collect(Collectors.toList()).get(0); - - Predicate byMsgTypeName = desc -> msgTypeName.equals(desc.getName()); + final var descriptors = descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList()); - var msgTypes = fd.getMessageTypes().stream().filter(byMsgTypeName).collect(Collectors.toList()); - if (CollectionUtils.isEmpty(msgTypes)) { + final var messageDescriptor = descriptors.stream().filter(desc -> msgTypeName.equals(desc.getName())).findFirst(); + if (messageDescriptor.isEmpty()) { final String errorMsg = "Can't find specific message type: " + msgTypeName; LOG.error(errorMsg); throw new DeserializationException(errorMsg); } - Descriptor messageType = msgTypes.get(0); - - DynamicMessage dMsg = DynamicMessage.parseFrom(messageType, CodedInputStream.newInstance(buffer)); + DynamicMessage message = DynamicMessage.parseFrom(messageDescriptor.get(), CodedInputStream.newInstance(buffer)); - JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder() - .add(descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList())) - .build(); + JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder().add(descriptors).build(); Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry); - return printer.print(dMsg).replaceAll("\n", ""); // must remove line break so it defaults to collapse mode + return printer.print(message).replaceAll("\n", ""); // must remove line break so it defaults to collapse mode } catch (FileNotFoundException e) { final String errorMsg = "Couldn't open descriptor file: " + fullDescFile; LOG.error(errorMsg, e); @@ -100,7 +78,7 @@ public String deserializeMessage(ByteBuffer buffer) { LOG.error(errorMsg, e); throw new DeserializationException(errorMsg); } catch (DescriptorValidationException e) { - final String errorMsg = "Can't compile proto message type" + msgTypeName; + final String errorMsg = "Can't compile proto message type: " + msgTypeName; LOG.error(errorMsg, e); throw new DeserializationException(errorMsg); }