aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-09-08 23:07:34 +0800
committerCheng Lian <lian@databricks.com>2015-09-08 23:07:34 +0800
commit990c9f79c28db501018a0a3af446ff879962475d (patch)
treeef18018fb463f7ed0348bdaabb3f4509d8e06cd5 /sql
parent6ceed852ab716d8acc46ce90cba9cfcff6d3616f (diff)
downloadspark-990c9f79c28db501018a0a3af446ff879962475d.tar.gz
spark-990c9f79c28db501018a0a3af446ff879962475d.tar.bz2
spark-990c9f79c28db501018a0a3af446ff879962475d.zip
[SPARK-9170] [SQL] Use OrcStructInspector to be case preserving when writing ORC files
JIRA: https://issues.apache.org/jira/browse/SPARK-9170 `StandardStructObjectInspector` will implicitly lowercase column names. But I think Orc format doesn't have such requirement. In fact, there is a `OrcStructInspector` specified for Orc format. We should use it when serialize rows to Orc file. It can be case preserving when writing ORC files. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7520 from viirya/use_orcstruct.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala47
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala14
2 files changed, 40 insertions, 21 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4eeca9aec1..7e89109259 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -25,9 +25,9 @@ import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit}
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSerde, OrcSplit, OrcStruct}
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
+import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, StructTypeInfo}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -89,21 +89,10 @@ private[orc] class OrcOutputWriter(
TypeInfoUtils.getTypeInfoFromTypeString(
HiveMetastoreTypes.toMetastoreType(dataSchema))
- TypeInfoUtils
- .getStandardJavaObjectInspectorFromTypeInfo(typeInfo)
- .asInstanceOf[StructObjectInspector]
+ OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
+ .asInstanceOf[SettableStructObjectInspector]
}
- // Used to hold temporary `Writable` fields of the next row to be written.
- private val reusableOutputBuffer = new Array[Any](dataSchema.length)
-
- // Used to convert Catalyst values into Hadoop `Writable`s.
- private val wrappers = structOI.getAllStructFieldRefs.asScala
- .zip(dataSchema.fields.map(_.dataType))
- .map { case (ref, dt) =>
- wrapperFor(ref.getFieldObjectInspector, dt)
- }.toArray
-
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
// flag to decide whether `OrcRecordWriter.close()` needs to be called.
private var recordWriterInstantiated = false
@@ -127,16 +116,32 @@ private[orc] class OrcOutputWriter(
override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
- override protected[sql] def writeInternal(row: InternalRow): Unit = {
+ private def wrapOrcStruct(
+ struct: OrcStruct,
+ oi: SettableStructObjectInspector,
+ row: InternalRow): Unit = {
+ val fieldRefs = oi.getAllStructFieldRefs
var i = 0
- while (i < row.numFields) {
- reusableOutputBuffer(i) = wrappers(i)(row.get(i, dataSchema(i).dataType))
+ while (i < fieldRefs.size) {
+ oi.setStructFieldData(
+ struct,
+ fieldRefs.get(i),
+ wrap(
+ row.get(i, dataSchema(i).dataType),
+ fieldRefs.get(i).getFieldObjectInspector,
+ dataSchema(i).dataType))
i += 1
}
+ }
+
+ val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
+
+ override protected[sql] def writeInternal(row: InternalRow): Unit = {
+ wrapOrcStruct(cachedOrcStruct, structOI, row)
recordWriter.write(
NullWritable.get(),
- serializer.serialize(reusableOutputBuffer, structOI))
+ serializer.serialize(cachedOrcStruct, structOI))
}
override def close(): Unit = {
@@ -259,7 +264,7 @@ private[orc] case class OrcTableScan(
maybeStructOI.map { soi =>
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map {
case (attr, ordinal) =>
- soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal
+ soi.getStructFieldRef(attr.name) -> ordinal
}.unzip
val unwrappers = fieldRefs.map(unwrapperFor)
// Map each tuple to a row object
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 744d462938..8bc33fcf5d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -287,6 +287,20 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
+ test("SPARK-9170: Don't implicitly lowercase of user-provided columns") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ sqlContext.range(0, 10).select('id as "Acol").write.format("orc").save(path)
+ sqlContext.read.format("orc").load(path).schema("Acol")
+ intercept[IllegalArgumentException] {
+ sqlContext.read.format("orc").load(path).schema("acol")
+ }
+ checkAnswer(sqlContext.read.format("orc").load(path).select("acol").sort("acol"),
+ (0 until 10).map(Row(_)))
+ }
+ }
+
test("SPARK-8501: Avoids discovery schema from empty ORC files") {
withTempPath { dir =>
val path = dir.getCanonicalPath