aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-02 21:30:57 -0700
committerCheng Lian <lian@databricks.com>2015-07-02 21:30:57 -0700
commit20a4d7dbd18fd4d1e3fb9324749453123714f99f (patch)
treeab6fb1774cb63597fcc27c6628eb5e93365bafa0 /sql
parentdfd8bac8f5b4f2b733c1ddd58e53ee0ba431e6b3 (diff)
downloadspark-20a4d7dbd18fd4d1e3fb9324749453123714f99f.tar.gz
spark-20a4d7dbd18fd4d1e3fb9324749453123714f99f.tar.bz2
spark-20a4d7dbd18fd4d1e3fb9324749453123714f99f.zip
[SPARK-8501] [SQL] Avoids reading schema from empty ORC files
ORC writes empty schema (`struct<>`) to ORC files containing zero rows. This is OK for Hive since the table schema is managed by the metastore. But it causes trouble when reading raw ORC files via Spark SQL since we have to discover the schema from the files. Notice that the ORC data source always avoids writing empty ORC files, but it's still problematic when reading Hive tables which contain empty part-files. Author: Cheng Lian <lian@databricks.com> Closes #7199 from liancheng/spark-8501 and squashes the following commits: bb8cd95 [Cheng Lian] Addresses comments a290221 [Cheng Lian] Avoids reading schema from empty ORC files
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala60
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala44
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala55
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala28
4 files changed, 135 insertions, 52 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index e3ab9442b4..0f9a1a6ef3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,30 +24,70 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType
private[orc] object OrcFileOperator extends Logging {
- def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
+ /**
+ * Retrieves a ORC file reader from a given path. The path can point to either a directory or a
+ * single ORC file. If it points to an directory, it picks any non-empty ORC file within that
+ * directory.
+ *
+ * The reader returned by this method is mainly used for two purposes:
+ *
+ * 1. Retrieving file metadata (schema and compression codecs, etc.)
+ * 2. Read the actual file content (in this case, the given path should point to the target file)
+ *
+ * @note As recorded by SPARK-8501, ORC writes an empty schema (<code>struct&lt;&gt;</code) to an
+ * ORC file if the file contains zero rows. This is OK for Hive since the schema of the
+ * table is managed by metastore. But this becomes a problem when reading ORC files
+ * directly from HDFS via Spark SQL, because we have to discover the schema from raw ORC
+ * files. So this method always tries to find a ORC file whose schema is non-empty, and
+ * create the result reader from that file. If no such file is found, it returns `None`.
+ *
+ * @todo Needs to consider all files when schema evolution is taken into account.
+ */
+ def getFileReader(basePath: String, config: Option[Configuration] = None): Option[Reader] = {
+ def isWithNonEmptySchema(path: Path, reader: Reader): Boolean = {
+ reader.getObjectInspector match {
+ case oi: StructObjectInspector if oi.getAllStructFieldRefs.size() == 0 =>
+ logInfo(
+ s"ORC file $path has empty schema, it probably contains no rows. " +
+ "Trying to read another ORC file to figure out the schema.")
+ false
+ case _ => true
+ }
+ }
+
val conf = config.getOrElse(new Configuration)
- val fspath = new Path(pathStr)
- val fs = fspath.getFileSystem(conf)
- val orcFiles = listOrcFiles(pathStr, conf)
- logDebug(s"Creating ORC Reader from ${orcFiles.head}")
- // TODO Need to consider all files when schema evolution is taken into account.
- OrcFile.createReader(fs, orcFiles.head)
+ val fs = {
+ val hdfsPath = new Path(basePath)
+ hdfsPath.getFileSystem(conf)
+ }
+
+ listOrcFiles(basePath, conf).iterator.map { path =>
+ path -> OrcFile.createReader(fs, path)
+ }.collectFirst {
+ case (path, reader) if isWithNonEmptySchema(path, reader) => reader
+ }
}
def readSchema(path: String, conf: Option[Configuration]): StructType = {
- val reader = getFileReader(path, conf)
+ val reader = getFileReader(path, conf).getOrElse {
+ throw new AnalysisException(
+ s"Failed to discover schema from ORC files stored in $path. " +
+ "Probably there are either no ORC files or only empty ORC files.")
+ }
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
val schema = readerInspector.getTypeName
logDebug(s"Reading schema from file $path, got Hive schema string: $schema")
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
}
- def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = {
- getFileReader(path, conf).getObjectInspector.asInstanceOf[StructObjectInspector]
+ def getObjectInspector(
+ path: String, conf: Option[Configuration]): Option[StructObjectInspector] = {
+ getFileReader(path, conf).map(_.getObjectInspector.asInstanceOf[StructObjectInspector])
}
def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 300f83d914..9dc9fbb78e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -242,26 +242,34 @@ private[orc] case class OrcTableScan(
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[InternalRow] = {
val deserializer = new OrcSerde
- val soi = OrcFileOperator.getObjectInspector(path, Some(conf))
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
- case (attr, ordinal) =>
- soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
- }.unzip
- val unwrappers = fieldRefs.map(unwrapperFor)
- // Map each tuple to a row object
- iterator.map { value =>
- val raw = deserializer.deserialize(value)
- var i = 0
- while (i < fieldRefs.length) {
- val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
- if (fieldValue == null) {
- mutableRow.setNullAt(fieldOrdinals(i))
- } else {
- unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
+
+ // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
+ // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
+ // partition since we know that this file is empty.
+ maybeStructOI.map { soi =>
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
+ case (attr, ordinal) =>
+ soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
+ }.unzip
+ val unwrappers = fieldRefs.map(unwrapperFor)
+ // Map each tuple to a row object
+ iterator.map { value =>
+ val raw = deserializer.deserialize(value)
+ var i = 0
+ while (i < fieldRefs.length) {
+ val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+ if (fieldValue == null) {
+ mutableRow.setNullAt(fieldOrdinals(i))
+ } else {
+ unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ }
+ i += 1
}
- i += 1
+ mutableRow: InternalRow
}
- mutableRow: InternalRow
+ }.getOrElse {
+ Iterator.empty
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 267d22c6b5..ca131faaee 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -23,10 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.expressions.InternalRow
-import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -170,7 +167,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Default compression options for writing to an ORC file") {
withOrcFile((1 to 100).map(i => (i, s"val_$i"))) { file =>
assertResult(CompressionKind.ZLIB) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
@@ -183,21 +180,21 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
withOrcFile(data) { file =>
assertResult(CompressionKind.SNAPPY) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "NONE")
withOrcFile(data) { file =>
assertResult(CompressionKind.NONE) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "LZO")
withOrcFile(data) { file =>
assertResult(CompressionKind.LZO) {
- OrcFileOperator.getFileReader(file).getCompression
+ OrcFileOperator.getFileReader(file).get.getCompression
}
}
}
@@ -289,4 +286,48 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
List(Row("same", "run_5", 100)))
}
}
+
+ test("SPARK-8501: Avoids discovery schema from empty ORC files") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ withTable("empty_orc") {
+ withTempTable("empty", "single") {
+ sqlContext.sql(
+ s"""CREATE TABLE empty_orc(key INT, value STRING)
+ |STORED AS ORC
+ |LOCATION '$path'
+ """.stripMargin)
+
+ val emptyDF = Seq.empty[(Int, String)].toDF("key", "value").coalesce(1)
+ emptyDF.registerTempTable("empty")
+
+ // This creates 1 empty ORC file with Hive ORC SerDe. We are using this trick because
+ // Spark SQL ORC data source always avoids write empty ORC files.
+ sqlContext.sql(
+ s"""INSERT INTO TABLE empty_orc
+ |SELECT key, value FROM empty
+ """.stripMargin)
+
+ val errorMessage = intercept[AnalysisException] {
+ sqlContext.read.format("orc").load(path)
+ }.getMessage
+
+ assert(errorMessage.contains("Failed to discover schema from ORC files"))
+
+ val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1)
+ singleRowDF.registerTempTable("single")
+
+ sqlContext.sql(
+ s"""INSERT INTO TABLE empty_orc
+ |SELECT key, value FROM single
+ """.stripMargin)
+
+ val df = sqlContext.read.format("orc").load(path)
+ assert(df.schema === singleRowDF.schema.asNullable)
+ checkAnswer(df, singleRowDF)
+ }
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index a0cdd0db42..82e08caf46 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -43,14 +43,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
orcTableDir.mkdir()
import org.apache.spark.sql.hive.test.TestHive.implicits._
- // Originally we were using a 10-row RDD for testing. However, when default parallelism is
- // greater than 10 (e.g., running on a node with 32 cores), this RDD contains empty partitions,
- // which result in empty ORC files. Unfortunately, ORC doesn't handle empty files properly and
- // causes build failure on Jenkins, which happens to have 32 cores. Please refer to SPARK-8501
- // for more details. To workaround this issue before fixing SPARK-8501, we simply increase row
- // number in this RDD to avoid empty partitions.
sparkContext
- .makeRDD(1 to 100)
+ .makeRDD(1 to 10)
.map(i => OrcData(i, s"part-$i"))
.toDF()
.registerTempTable(s"orc_temp_table")
@@ -76,35 +70,35 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
}
test("create temporary orc table") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(100))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 100).map(i => Row(i, s"part-$i")))
+ (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source where intField > 5"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 100).map(i => Row(1, s"part-$i")))
+ (1 to 10).map(i => Row(1, s"part-$i")))
}
test("create temporary orc table as") {
- checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(100))
+ checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10))
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 100).map(i => Row(i, s"part-$i")))
+ (1 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT * FROM normal_orc_source WHERE intField > 5"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
checkAnswer(
sql("SELECT COUNT(intField), stringField FROM normal_orc_source GROUP BY stringField"),
- (1 to 100).map(i => Row(1, s"part-$i")))
+ (1 to 10).map(i => Row(1, s"part-$i")))
}
test("appending insert") {
@@ -112,7 +106,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_source"),
- (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 100).flatMap { i =>
+ (1 to 5).map(i => Row(i, s"part-$i")) ++ (6 to 10).flatMap { i =>
Seq.fill(2)(Row(i, s"part-$i"))
})
}
@@ -125,7 +119,7 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll {
checkAnswer(
sql("SELECT * FROM normal_orc_as_source"),
- (6 to 100).map(i => Row(i, s"part-$i")))
+ (6 to 10).map(i => Row(i, s"part-$i")))
}
}