aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-02 21:30:13 -0700
committerCheng Lian <lian@databricks.com>2015-07-02 21:30:13 -0700
commitde0802499a188a249bd5faf2f411b2e8ee522c33 (patch)
tree4ac1fc51c863ca085d1e0dca104525545fdfb977
parent3f1e4efbd8f67a3bb1a9e33be91e2a07e0aa01b0 (diff)
downloadspark-de0802499a188a249bd5faf2f411b2e8ee522c33.tar.gz
spark-de0802499a188a249bd5faf2f411b2e8ee522c33.tar.bz2
spark-de0802499a188a249bd5faf2f411b2e8ee522c33.zip
[SPARK-8501] [SQL] Avoids reading schema from empty ORC files (backport to 1.4)
This PR backports #7199 to branch-1.4 Author: Cheng Lian <lian@databricks.com> Closes #7200 from liancheng/spark-8501-for-1.4 and squashes the following commits: 725e9e3 [Cheng Lian] Addresses comments 0fa25af [Cheng Lian] Avoids reading schema from empty ORC files
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala62
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala44
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala56
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala22
4 files changed, 136 insertions, 48 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index e5e92e6caf..bc5cc55b7b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType
-private[orc] object OrcFileOperator extends Logging{
- def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
+private[orc] object OrcFileOperator extends Logging {
+ /**
+ * Retrieves a ORC file reader from a given path. The path can point to either a directory or a
+ * single ORC file. If it points to an directory, it picks any non-empty ORC file within that
+ * directory.
+ *
+ * The reader returned by this method is mainly used for two purposes:
+ *
+ * 1. Retrieving file metadata (schema and compression codecs, etc.)
+ * 2. Read the actual file content (in this case, the given path should point to the target file)
+ *
+ * @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code>) to an
+ * ORC file if the file contains zero rows. This is OK for Hive since the schema of the
+ * table is managed by metastore. But this becomes a problem when reading ORC files
+ * directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
+ * files. So this method always tries to find a ORC file whose schema is non-empty, and
+ * create the result reader from that file. If no such file is found, it returns `None`.
+ *
+ * @todo Needs to consider all files when schema evolution is taken into account.
+ */
+ def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
+ def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
+ reader.getObjectInspector match {
+ case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
+ logInfo(
+ s"ORC file $path has empty schema, it probably contains no rows. " +
+ "Trying to read another ORC file to figure out the schema.")
+ false
+ case _ => true
+ }
+ }
+
val conf = config.getOrElse(new Configuration)
- val fspath = new Path(pathStr)
- val fs = fspath.getFileSystem(conf)
- val orcFiles = listOrcFiles(pathStr, conf)
- logDebug(s"Creating ORC Reader from ${orcFiles.head}")
- // TODO Need to consider all files when schema evolution is taken into account.
- OrcFile.createReader(fs, orcFiles.head)
+ val fs = {
+ val hdfsPath = new Path(basePath)
+ hdfsPath.getFileSystem(conf)
+ }
+
+ listOrcFiles(basePath, conf).iterator.map { path =>
+ path -> OrcFile.createReader(fs, path)
+ }.collectFirst {
+ case (path, reader) if isWithNonEmptySchema(path, reader) => reader
+ }
}
def readSchema(path: String, conf: Option[Configuration]): StructType = {
- val reader = getFileReader(path, conf)
+ val reader = getFileReader(path, conf).getOrElse {
+ throw new AnalysisException(
+ s"Failed to discover schema from ORC files stored in $path. " +
+ "Probably there are either no ORC files or only empty ORC files.")
+ }
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}
- def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
- getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
+ def getObjectInspector(
+ path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
+ getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
}
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 3713f6fd94..204bbe03af 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -241,26 +241,34 @@ private[orc] case class OrcTableScan(
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {
val deserializer = new OrcSerde
- val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
- case (attr, ordinal) =>
- soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
- }.unzip
- val unwrappers = fieldRefs.map(unwrapperFor)
- // Map each tuple to a row object
- iterator.map { value =>
- val raw = deserializer.deserialize(value)
- var i = 0
- while (i < fieldRefs.length) {
- val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
- if (fieldValue == null) {
- mutableRow.setNullAt(fieldOrdinals(i))
- } else {
- unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+
+ // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
+ // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
+ // partition since we know that this file is empty.
+ maybeStructOI.map { soi =>
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+ case (attr, ordinal) =>
+ soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
+ }.unzip
+ val unwrappers = fieldRefs.map(unwrapperFor)
+ // Map each tuple to a row object
+ iterator.map { value =>
+ val raw = deserializer.deserialize(value)
+ var i = 0
+ while (i < fieldRefs.length) {
+ val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+ if (fieldValue == null) {
+ mutableRow.setNullAt(fieldOrdinals(i))
+ } else {
+ unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ }
+ i += 1
}
- i += 1
+ mutableRow: Row
}
- mutableRow: Row
+ }.getOrElse {
+ Iterator.empty
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 57c23fe77f..dd63a8e1bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -23,9 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -54,8 +52,6 @@ case class Person(name: String, age: Int, contacts: Seq[Contact])
class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
override val sqlContext = TestHive
- import TestHive.read
-
def getTempFilePath(prefix: String, suffix: String = ""): File = {
val tempFile = File.createTempFile(prefix, suffix)
tempFile.delete()
@@ -173,7 +169,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Default compression options for writing to an ORC file") {
withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
assertResult(CompressionKind.ZLIB) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
@@ -186,21 +182,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file =>
assertResult(CompressionKind.SNAPPY) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
withOrcFile(data) { file =>
assertResult(CompressionKind.NONE) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
withOrcFile(data) { file =>
assertResult(CompressionKind.LZO) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
@@ -292,4 +288,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
List(Row("same", "run_5", 100)))
}
}
+
+ test("SPARK-8501: Avoids discovery schema from empty ORC files") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withTable("empty_orc") {
+ withTempTable("empty", "single") {
+ sqlContext.sql(
+ s"""CREATE TABLE empty_orc(key INT, value STRING)
+ |STORED AS ORC
+ |LOCATION '$path'
+ """.stripMargin)
+
+ val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
+ emptyDF.registerTempTable("empty")
+
+ // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because
+ // Spark SQL ORC data source always avoids write empty ORC files.
+ sqlContext.sql(
+ s"""INSERT INTO TABLE empty_orc
+ |SELECT key, value FROM empty
+ """.stripMargin)
+
+ val errorMessage = intercept[AnalysisException] {
+ sqlContext.read.format("orc").load(path)
+ }.getMessage
+
+ assert(errorMessage.contains("Failed to discover schema from ORC files"))
+
+ val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
+ singleRowDF.registerTempTable("single")
+
+ sqlContext.sql(
+ s"""INSERT INTO TABLE empty_orc
+ |SELECT key, value FROM single
+ """.stripMargin)
+
+ val df = sqlContext.read.format("orc").load(path)
+ assert(df.schema === singleRowDF.schema.asNullable)
+ checkAnswer(df, singleRowDF)
+ }
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 433ea9b853..82e08caf46 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -44,7 +44,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
import org.apache.spark.sql.hive.test.TestHive.implicits._
sparkContext
- .makeRDD(1 to 100)
+ .makeRDD(1 to 10)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
@@ -70,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}
test("create temporary orc table") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 100).map(i => Row(i, s"part-$i")))
+ (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 100).map(i => Row(1, s"part-$i")))
+ (1 to 10).map(i => Row(1, s"part-$i")))
}
test("create temporary orc table as") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 100).map(i => Row(i, s"part-$i")))
+ (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 100).map(i => Row(1, s"part-$i")))
+ (1 to 10).map(i => Row(1, s"part-$i")))
}
test("appending insert") {
@@ -106,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
+ (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
@@ -119,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
}
}