diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 21d2de6ad68..64b08162557 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -149,30 +149,24 @@ object DumpUtils extends Logging { class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer with AutoCloseable { private[this] val tempBuffer = new Array[Byte](128 * 1024) - private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() def this(path: String, table: Table) = { this(new FileOutputStream(path), table) } - val tableWriter: TableWriter = { + private lazy val tableWriter: TableWriter = { // avoid anything conversion, just dump as it is val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table) .withCompressionType(ParquetDumper.COMPRESS_TYPE) Table.writeParquetChunked(builder.build(), this) } - override - def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = - buffers += Tuple2(buffer, len) - - def writeBufferedData(): Unit = { - ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream) - } + override def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = + ColumnarOutputWriter.writeBufferedData(mutable.Queue((buffer, len)), tempBuffer, + outputStream) def writeTable(table: Table): Unit = { tableWriter.write(table) - writeBufferedData() } /** @@ -181,7 +175,6 @@ class ParquetDumper(private val outputStream: OutputStream, table: Table) extend */ def close(): Unit = { tableWriter.close() - writeBufferedData() outputStream.close() } }