From 6bcb39a0a7283a92f0db4d31518305d0934fb555 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 8 Jul 2024 14:55:55 +0800 Subject: [PATCH 1/3] Fix oom --- .../com/nvidia/spark/rapids/DumpUtils.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index 21d2de6ad68..a42dd06e7c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -149,30 +149,25 @@ object DumpUtils extends Logging { class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer with AutoCloseable { private[this] val tempBuffer = new Array[Byte](128 * 1024) - private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() +// private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() def this(path: String, table: Table) = { this(new FileOutputStream(path), table) } - val tableWriter: TableWriter = { + private lazy val tableWriter: TableWriter = { // avoid anything conversion, just dump as it is val builder = ParquetDumper.parquetWriterOptionsFromTable(ParquetWriterOptions.builder(), table) .withCompressionType(ParquetDumper.COMPRESS_TYPE) Table.writeParquetChunked(builder.build(), this) } - override - def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = - buffers += Tuple2(buffer, len) - - def writeBufferedData(): Unit = { - ColumnarOutputWriter.writeBufferedData(buffers, tempBuffer, outputStream) - } + override def handleBuffer(buffer: HostMemoryBuffer, len: Long): Unit = + ColumnarOutputWriter.writeBufferedData(mutable.Queue((buffer, len)), tempBuffer, + outputStream) def writeTable(table: Table): Unit = { tableWriter.write(table) - writeBufferedData() } /** @@ -181,7 +176,6 @@ class ParquetDumper(private val outputStream: OutputStream, table: Table) extend */ def close(): Unit = { tableWriter.close() - writeBufferedData() outputStream.close() } } From f5836c8eacaa44522b89278745cf098d35592aed Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 8 Jul 2024 15:51:24 +0800 Subject: [PATCH 2/3] Remove unused code Signed-off-by: liurenjie1024 --- .../src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index a42dd06e7c5..d3c3a1d81c5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -149,7 +149,6 @@ object DumpUtils extends Logging { class ParquetDumper(private val outputStream: OutputStream, table: Table) extends HostBufferConsumer with AutoCloseable { private[this] val tempBuffer = new Array[Byte](128 * 1024) -// private[this] val buffers = mutable.Queue[(HostMemoryBuffer, Long)]() def this(path: String, table: Table) = { this(new FileOutputStream(path), table) From d8f98ef55445c4c2c2b9466a8b3a9e315ea5fbd7 Mon Sep 17 00:00:00 2001 From: liurenjie1024 Date: Mon, 8 Jul 2024 15:51:55 +0800 Subject: [PATCH 3/3] Update copy right --- .../src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala index d3c3a1d81c5..64b08162557 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DumpUtils.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2023, NVIDIA CORPORATION. + * Copyright (c) 2021-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.