Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON deserializer considers a field to be required while it actually is not. #10417

Open
vszyndler opened this issue Aug 29, 2024 · 0 comments
Open

Comments

@vszyndler
Copy link

Describe the bug
JSON deserializer considers a field to be required while it actually is not.

To Reproduce

  • Using version 7.7.0 of cp-ksqldb-server and cli.
  • I create a Stream from an existing topic that uses a JSON Schema from schema registry.
CREATE STREAM DATASET (
  proposalIdKey VARCHAR KEY
)
WITH (KAFKA_TOPIC='dataset', VALUE_FORMAT='JSON_SR');
  • Below is the JSON schema used by my topic :
{
  "$schema": "https://json-schema.org/draft/2020-12/schema",
  "title": "Dataset",
  "description": "Dataset",
  "type": "object",
  "properties": {
    "name": {
      "type": "string"
    },
    "description": {
      "type": "string"
    },
    "proposalId": {
      "type": "string"
    },
    "sessionId": {
      "type": "string"
    },
    "sampleId": {
      "type": "string"
    },
    "instrumentId": {
      "type": "string"
    },
    "beamline": {
      "type": "string"
    },
    "branch": {
      "type": "string"
    },
    "owner": {
      "type": "string"
    },
    "ownerOrcid": {
      "type": "string"
    },
    "contactEmail": {
      "type": "string"
    },
    "scientificMetadata": {
      "type": "string"
    },
    "sourceFolder": {
      "type": "string"
    },
    "dataFormat": {
      "type": "string"
    },
    "datafiles": {
      "type": "array",
      "items": {
        "$ref": "#/$defs/datafile"
      }
    }
  },
  "required": [
    "name",
    "owner",
    "beamline",
    "branch",
    "sourceFolder"
  ],
  "$defs": {
    "datafile": {
      "type": "object",
      "properties": {
        "path": {
          "type": "string"
        },
        "size": {
          "type": "number"
        },
        "time": {
          "type": "string",
          "format": "date-time"
        },
        "checksum": {
          "type": "string"
        },
        "uid": {
          "type": "string"
        },
        "gid": {
          "type": "string"
        },
        "permissions": {
          "type": "string"
        }
      },
      "required": [
        "path",
        "size",
        "time"
      ]
    }
  }
}
  • I run the following push query to print data in ksqldb-cli.
    SELECT * FROM DATASET EMIT CHANGES;

  • Then, I produce the following data into my topic :

{
  "name": "my dataset",
  "proposalId": "20241601",
  "beamline": "my beamline",
  "branch": "my branch",
  "owner": "Dataset owner",
  "sourceFolder": "/my/source/folder",
  "dataFormat": "NeXus",
  "datafiles": [
    {
      "path": "S01000-01999/c2_01033_2024-08-25_18-04-11.h5",
      "size": 1000000000,
      "time": "2024-08-25T17:05:42.232000+00:00"
    },
    {
      "path": "S01000-01999/c2_01013_2024-08-25_17-37-23.h5",
      "size": 1000000000,
      "time": "2024-08-25T16:38:56.441000+00:00"
    }
  ]
}

Expected behavior

  • The data should be displayed in the ksqldb console.

Actual behaviour

  • Nothing is printed out in the ksqldb console.
  • According to ksqldb-server logs, the JSON deserializer considers the uid field to be required (while it is actually not required in the schema) :
[2024-08-29 16:23:08,319] ERROR {"type":0,"deserializationError":{"target":"value","errorMessage":"Error deserializing message from topic: dataset","recordB64":null,"cause":["Invalid value: null used for required field: \"uid\", schema type: STRING"],"topic":"dataset"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CSAS_DATASET_SCICAT_23.KafkaTopic_Left.Source.deserializer)
[2024-08-29 16:23:08,320] WARN stream-thread [_confluent-ksql-default_query_CSAS_DATASET_SCICAT_23-35b599ae-5eb6-4e02-a0b4-a49c1fff6741-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[dataset] partition=[0] offset=[10] (org.apache.kafka.streams.processor.internals.RecordDeserializer)
org.apache.kafka.common.errors.SerializationException: Error deserializing message from topic: dataset
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:55)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:239)
        at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:218)
        at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:284)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:1039)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1782)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1254)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:955)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:669)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "uid", schema type: STRING
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
        at org.apache.kafka.connect.data.Struct.validate(Struct.java:233)
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:250)
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:255)
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
        at io.confluent.connect.json.JsonSchemaData.lambda$static$11(JsonSchemaData.java:234)
        at io.confluent.connect.json.JsonSchemaData.toConnectData(JsonSchemaData.java:636)
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:135)
        at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:121)
        at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:49)
        ... 17 more

Additional context

  • This was working fine with confluent version 7.2.0 of cp-ksqldb-server and cli
  • The issue is present since version 7.3.0.
  • As a workaround, I can add empty values for the optionnal fields, but this solution is not convenient at all.
  • The problem only concerns the optional fields of the "datafile" sub-object. There is no problem with the optional fields of the top-level object.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant