Skip to content

Commit

Permalink
Merge branch 'main' into config-entry
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Dec 25, 2024
2 parents 5e6ceb1 + 173de24 commit 803b893
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

conf.set(MAPRED_OUTPUT_SCHEMA.getAttribute, OrcFileFormat.getQuotedSchemaString(dataSchema))

// Pass compression to job conf so that the file extension can be aware of it.
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)

conf
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == "true") {
val nativeConf =
GlutenFormatFactory(shortName()).nativeConf(options, orcOptions.compressionCodec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {

// pass compression to job conf so that the file extension can be aware of it.
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == "true") {
// Pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
Expand All @@ -103,7 +102,6 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
context: TaskAttemptContext): OutputWriter = {
GlutenFormatFactory(shortName())
.createOutputWriter(path, dataSchema, context, nativeConf)

}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable

val conf = job.getConfiguration

// Pass compression to job conf so that the file extension can be aware of it.
conf.set(COMPRESS.getAttribute, orcOptions.compressionCodec)

conf
.asInstanceOf[JobConf]
.setOutputFormat(classOf[org.apache.orc.mapred.OrcOutputFormat[OrcStruct]])

if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {
// pass compression to job conf so that the file extension can be aware of it.
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == "true") {
val nativeConf =
GlutenFormatFactory(shortName()).nativeConf(options, orcOptions.compressionCodec)

Expand All @@ -93,7 +93,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val name = context.getConfiguration.get(COMPRESS.getAttribute)
OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
}

compressionExtension + ".orc"
}

Expand All @@ -119,7 +118,6 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
val name = context.getConfiguration.get(COMPRESS.getAttribute)
OrcUtils.extensionsForCompressionCodecNames.getOrElse(name, "")
}

compressionExtension + ".orc"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
if ("true" == sparkSession.sparkContext.getLocalProperty("isNativeApplicable")) {

// pass compression to job conf so that the file extension can be aware of it.
if (sparkSession.sparkContext.getLocalProperty("isNativeApplicable") == "true") {
// Pass compression to job conf so that the file extension can be aware of it.
val conf = ContextUtil.getConfiguration(job)
val parquetOptions = new ParquetOptions(options, sparkSession.sessionState.conf)
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
Expand Down

0 comments on commit 803b893

Please sign in to comment.