aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala2
9 files changed, 29 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index d7ce9a0ce8..0e6b9855c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -140,7 +140,7 @@ private[sql] class CsvOutputWriter(
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.csv$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 497e3c59e9..05b44d1a2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -167,7 +167,7 @@ private[json] class JsonOutputWriter(
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString.json$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 82404b8499..f1060074d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -379,6 +379,9 @@ private[sql] class ParquetOutputWriter(
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
+ // It has the `.parquet` extension at the end because (de)compression tools
+ // such as gunzip would not be able to decompress this as the compression
+ // is not applied on this whole file but on each "page" in Parquet format.
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index b3297254cb..2869a6a1ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -140,7 +140,7 @@ class TextOutputWriter(path: String, dataSchema: StructType, context: TaskAttemp
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
- new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$extension")
+ new Path(path, f"part-r-$split%05d-$uniqueWriteJobId.txt$extension")
}
}.getRecordWriter(context)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 076fe5e041..680759d457 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -404,7 +404,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.save(csvDir)
val compressedFiles = new File(csvDir).listFiles()
- assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+ assert(compressedFiles.exists(_.getName.endsWith(".csv.gz")))
val carsCopy = sqlContext.read
.format("csv")
@@ -439,7 +439,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.save(csvDir)
val compressedFiles = new File(csvDir).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+ assert(compressedFiles.exists(!_.getName.endsWith(".csv.gz")))
val carsCopy = sqlContext.read
.format("csv")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 02b173d30a..097ece3525 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1441,7 +1441,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.save(jsonDir)
val compressedFiles = new File(jsonDir).listFiles()
- assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+ assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
val jsonCopy = sqlContext.read
.format("json")
@@ -1479,7 +1479,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
.save(jsonDir)
val compressedFiles = new File(jsonDir).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+ assert(compressedFiles.exists(!_.getName.endsWith(".json.gz")))
val jsonCopy = sqlContext.read
.format("json")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
index 9eb1016b64..ee398721c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/text/TextSuite.scala
@@ -74,7 +74,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
val tempDirPath = tempDir.getAbsolutePath
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
- assert(compressedFiles.exists(_.getName.endsWith(extension)))
+ assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
verifyFrame(sqlContext.read.text(tempDirPath))
}
@@ -102,7 +102,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
val tempDirPath = tempDir.getAbsolutePath
testDf.write.option("compression", "none").mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
- assert(compressedFiles.exists(!_.getName.endsWith(".gz")))
+ assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
verifyFrame(sqlContext.read.text(tempDirPath))
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 041e0fb477..8a39d95fc5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -161,7 +161,14 @@ private[orc] class OrcOutputWriter(
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
- val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString.orc"
+ val compressionExtension = {
+ val name = conf.get(OrcTableProperties.COMPRESSION.getPropName)
+ OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
+ }
+ // It has the `.orc` extension at the end because (de)compression tools
+ // such as gunzip would not be able to decompress this as the compression
+ // is not applied on this whole file but on each "stream" in ORC format.
+ val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc"
new OrcOutputFormat().getRecordWriter(
new Path(path, filename).getFileSystem(conf),
@@ -328,5 +335,13 @@ private[orc] object OrcRelation {
"snappy" -> CompressionKind.SNAPPY,
"zlib" -> CompressionKind.ZLIB,
"lzo" -> CompressionKind.LZO)
+
+ // The extensions for ORC compression codecs
+ val extensionsForCompressionCodecNames = Map(
+ CompressionKind.NONE.name -> "",
+ CompressionKind.SNAPPY.name -> ".snappy",
+ CompressionKind.ZLIB.name -> ".zlib",
+ CompressionKind.LZO.name -> ".lzo"
+ )
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 823b52af1b..2345c1cf9c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -96,7 +96,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
// Check if this is compressed as ZLIB.
val conf = sparkContext.hadoopConfiguration
val fs = FileSystem.getLocal(conf)
- val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".orc"))
+ val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))