diff --git a/featurestore-ee/src/main/java/io/hops/hopsworks/featurestore/databricks/client/DatabricksClient.java b/featurestore-ee/src/main/java/io/hops/hopsworks/featurestore/databricks/client/DatabricksClient.java index 2937039fae..11746e711f 100644 --- a/featurestore-ee/src/main/java/io/hops/hopsworks/featurestore/databricks/client/DatabricksClient.java +++ b/featurestore-ee/src/main/java/io/hops/hopsworks/featurestore/databricks/client/DatabricksClient.java @@ -5,7 +5,6 @@ package io.hops.hopsworks.featurestore.databricks.client; import com.damnhandy.uri.template.UriTemplate; -import com.twitter.bijection.codec.Base64; import io.hops.hopsworks.common.proxies.client.HttpClient; import io.hops.hopsworks.common.proxies.client.HttpRetryableAction; import io.hops.hopsworks.common.proxies.client.NotRetryableClientProtocolException; @@ -17,6 +16,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; +import org.glassfish.jersey.internal.util.Base64; import javax.ejb.ConcurrencyManagement; import javax.ejb.ConcurrencyManagementType; @@ -251,7 +251,7 @@ private void uploadStream(HttpHost dbInstanceHost, DbfsClose dbfsClose, InputStr int read = 0; while ((read = inputStream.read(data)) > -1) { // Send any pending block data - sendBlock(dbInstanceHost, token, dbfsClose, Base64.encodeBase64String(Arrays.copyOf(data, read))); + sendBlock(dbInstanceHost, token, dbfsClose, Base64.encodeAsString(Arrays.copyOf(data, read))); } } diff --git a/hopsworks-common/pom.xml b/hopsworks-common/pom.xml index 1b14e77338..1ccfa8213c 100644 --- a/hopsworks-common/pom.xml +++ b/hopsworks-common/pom.xml @@ -171,10 +171,6 @@ io.hops.hive hive-standalone-metastore - - com.twitter - bijection-avro_2.12 - org.freemarker freemarker diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.java index dcff495cfe..f658cfae52 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/serving/inference/logger/KafkaInferenceLogger.java @@ -16,8 +16,6 @@ package io.hops.hopsworks.common.serving.inference.logger; -import com.twitter.bijection.Injection; -import com.twitter.bijection.avro.GenericAvroCodecs; import io.hops.hopsworks.common.dao.kafka.KafkaConst; import io.hops.hopsworks.common.kafka.KafkaBrokers; import io.hops.hopsworks.persistence.entity.project.Project; @@ -28,7 +26,10 @@ import io.hops.hopsworks.exceptions.CryptoPasswordNotFoundException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -41,6 +42,7 @@ import javax.ejb.Asynchronous; import javax.ejb.EJB; import javax.ejb.Stateless; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.Properties; @@ -107,8 +109,7 @@ public void logInferenceRequest(Serving serving, String inferenceRequest, //Get the schema for the topic and the serializer Schema avroSchema = new Schema.Parser().parse(serving.getKafkaTopic().getSubjects().getSchema().getSchema()); - Injection recordSerializer = GenericAvroCodecs.toBinary(avroSchema); - + // Create the GenericRecord from the avroSchema GenericData.Record inferenceRecord = new GenericData.Record(avroSchema); @@ -116,14 +117,16 @@ public void logInferenceRequest(Serving serving, String inferenceRequest, populateInfererenceRecord(serving, inferenceRequest, responseHttpCode, inferenceResponse, inferenceRecord, schemaVersion); - // Serialize record to byte - byte[] inferenceRecordBytes = recordSerializer.apply(inferenceRecord); + // Serialize record to byte array and send it to kafka + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + DatumWriter writer = new GenericDatumWriter<>(avroSchema); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + writer.write(inferenceRecord, encoder); + encoder.flush(); - // Push the record to the topic - ProducerRecord inferenceKakfaRecord = new ProducerRecord<>( - serving.getKafkaTopic().getTopicName(), inferenceRecordBytes); - - try { + // Push the record to the topic + ProducerRecord inferenceKakfaRecord = new ProducerRecord<>( + serving.getKafkaTopic().getTopicName(), out.toByteArray()); kafkaProducer.send(inferenceKakfaRecord); } catch (Exception e) { LOGGER.log(Level.FINE, "Cannot write to topic: " + serving.getKafkaTopic().getTopicName(), e); diff --git a/pom.xml b/pom.xml index baf5e8e25f..56a38f879e 100644 --- a/pom.xml +++ b/pom.xml @@ -92,7 +92,6 @@ 2.17.19 1.69 1.69 - 0.9.6 2.8.0 1.32.0 1.32.0 @@ -259,11 +258,6 @@ oauth2-oidc-sdk ${oauth2-oidc-sdk.version} - - com.twitter - bijection-avro_2.12 - ${bijection-avro_2.12.version} - commons-fileupload commons-fileupload