Skip to content

Commit

Permalink
Merge pull request #92 from AndreiIvanitckii/search-type-descriptor-i…
Browse files Browse the repository at this point in the history
…n-all-proto-files

Search type descriptor in all proto files
  • Loading branch information
ekoutanov authored Mar 10, 2020
2 parents 28335f3 + 84db3ea commit d91be16
Showing 1 changed file with 7 additions and 29 deletions.
36 changes: 7 additions & 29 deletions src/main/java/kafdrop/util/ProtobufMessageDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,64 +33,42 @@ public class ProtobufMessageDeserializer implements MessageDeserializer {
private String topic;
private String fullDescFile;
private String msgTypeName;
private String descFileName;

private static final Logger LOG = LoggerFactory.getLogger(ProtobufMessageDeserializer.class);

public ProtobufMessageDeserializer(String topic, String fullDescFile, String msgTypeName) {
this.topic = topic;
this.fullDescFile = fullDescFile;
Path path = Paths.get(fullDescFile);
descFileName = path.getFileName().toString();
this.msgTypeName = msgTypeName;
}

@Override
public String deserializeMessage(ByteBuffer buffer) {

try (InputStream input = new FileInputStream(new File(fullDescFile))) {
// LOG.info("decoding message: " +
// Base64.getEncoder().encodeToString(buffer.array()));

FileDescriptorSet set = FileDescriptorSet.parseFrom(input);
String protoFileName = descFileName.replace(".desc", ".proto");

Predicate<FileDescriptorProto> byName = desc -> protoFileName.equals(desc.getName());
var results = set.getFileList().stream().filter(byName).collect(Collectors.toList());
if (CollectionUtils.isEmpty(results)) {
final String errorMsg = "Can't find descriptor in provided descriptor file: " + protoFileName;
LOG.error(errorMsg);
throw new DeserializationException(errorMsg);
}
List<FileDescriptor> descs = new ArrayList<>();

for (FileDescriptorProto ffdp : set.getFileList()) {
FileDescriptor fd = Descriptors.FileDescriptor.buildFrom(ffdp,
(FileDescriptor[]) descs.toArray(new FileDescriptor[descs.size()]));
descs.add(fd);
}

Predicate<FileDescriptor> fdByName = desc -> protoFileName.equals(desc.getName());
var fd = descs.stream().filter(fdByName).collect(Collectors.toList()).get(0);

Predicate<Descriptor> byMsgTypeName = desc -> msgTypeName.equals(desc.getName());
final var descriptors = descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList());

var msgTypes = fd.getMessageTypes().stream().filter(byMsgTypeName).collect(Collectors.toList());
if (CollectionUtils.isEmpty(msgTypes)) {
final var messageDescriptor = descriptors.stream().filter(desc -> msgTypeName.equals(desc.getName())).findFirst();
if (messageDescriptor.isEmpty()) {
final String errorMsg = "Can't find specific message type: " + msgTypeName;
LOG.error(errorMsg);
throw new DeserializationException(errorMsg);
}
Descriptor messageType = msgTypes.get(0);

DynamicMessage dMsg = DynamicMessage.parseFrom(messageType, CodedInputStream.newInstance(buffer));
DynamicMessage message = DynamicMessage.parseFrom(messageDescriptor.get(), CodedInputStream.newInstance(buffer));

JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder()
.add(descs.stream().flatMap(desc -> desc.getMessageTypes().stream()).collect(Collectors.toList()))
.build();
JsonFormat.TypeRegistry typeRegistry = JsonFormat.TypeRegistry.newBuilder().add(descriptors).build();
Printer printer = JsonFormat.printer().usingTypeRegistry(typeRegistry);

return printer.print(dMsg).replaceAll("\n", ""); // must remove line break so it defaults to collapse mode
return printer.print(message).replaceAll("\n", ""); // must remove line break so it defaults to collapse mode
} catch (FileNotFoundException e) {
final String errorMsg = "Couldn't open descriptor file: " + fullDescFile;
LOG.error(errorMsg, e);
Expand All @@ -100,7 +78,7 @@ public String deserializeMessage(ByteBuffer buffer) {
LOG.error(errorMsg, e);
throw new DeserializationException(errorMsg);
} catch (DescriptorValidationException e) {
final String errorMsg = "Can't compile proto message type" + msgTypeName;
final String errorMsg = "Can't compile proto message type: " + msgTypeName;
LOG.error(errorMsg, e);
throw new DeserializationException(errorMsg);
}
Expand Down

0 comments on commit d91be16

Please sign in to comment.