diff options
author | Ergin Seyfe <eseyfe@fb.com> | 2016-05-21 16:08:31 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-05-21 16:08:31 -0700 |
commit | c18fa464f404ed2612f8c4d355cb0544b355975b (patch) | |
tree | 77bcad85d3a5fe2b32b60ecabde2947d9ee85c38 | |
parent | 201a51f36682726d78d9d2fe2c388093bb860ee0 (diff) | |
download | spark-c18fa464f404ed2612f8c4d355cb0544b355975b.tar.gz spark-c18fa464f404ed2612f8c4d355cb0544b355975b.tar.bz2 spark-c18fa464f404ed2612f8c4d355cb0544b355975b.zip |
[SPARK-15280] Input/Output] Refactored OrcOutputWriter and moved serialization to a new class.
## What changes were proposed in this pull request?
Refactoring: Separated ORC serialization logic from OrcOutputWriter and moved to a new class called OrcSerializer.
## How was this patch tested?
Manual tests & existing tests.
Author: Ergin Seyfe <eseyfe@fb.com>
Closes #13066 from seyfe/orc_serializer.
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 84 |
1 files changed, 45 insertions, 39 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 6e55137dd7..38f50c112a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -149,39 +149,70 @@ private[sql] class DefaultSource } } -private[orc] class OrcOutputWriter( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext) - extends OutputWriter with HiveInspectors { +private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) + extends HiveInspectors { + + def serialize(row: InternalRow): Writable = { + wrapOrcStruct(cachedOrcStruct, structOI, row) + serializer.serialize(cachedOrcStruct, structOI) + } - private val serializer = { + private[this] val serializer = { val table = new Properties() table.setProperty("columns", dataSchema.fieldNames.mkString(",")) table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":")) val serde = new OrcSerde - val configuration = context.getConfiguration - serde.initialize(configuration, table) + serde.initialize(conf, table) serde } - // Object inspector converted from the schema of the relation to be written. - private val structOI = { + // Object inspector converted from the schema of the relation to be serialized. + private[this] val structOI = { val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString) OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) .asInstanceOf[SettableStructObjectInspector] } + private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] + + private[this] def wrapOrcStruct( + struct: OrcStruct, + oi: SettableStructObjectInspector, + row: InternalRow): Unit = { + val fieldRefs = oi.getAllStructFieldRefs + var i = 0 + while (i < fieldRefs.size) { + + oi.setStructFieldData( + struct, + fieldRefs.get(i), + wrap( + row.get(i, dataSchema(i).dataType), + fieldRefs.get(i).getFieldObjectInspector, + dataSchema(i).dataType)) + i += 1 + } + } +} + +private[orc] class OrcOutputWriter( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter { + + private[this] val conf = context.getConfiguration + + private[this] val serializer = new OrcSerializer(dataSchema, conf) + // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this // flag to decide whether `OrcRecordWriter.close()` needs to be called. private var recordWriterInstantiated = false private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { recordWriterInstantiated = true - - val conf = context.getConfiguration val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") val taskAttemptId = context.getTaskAttemptID val partition = taskAttemptId.getTaskID.getId @@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - private def wrapOrcStruct( - struct: OrcStruct, - oi: SettableStructObjectInspector, - row: InternalRow): Unit = { - val fieldRefs = oi.getAllStructFieldRefs - var i = 0 - while (i < fieldRefs.size) { - - oi.setStructFieldData( - struct, - fieldRefs.get(i), - wrap( - row.get(i, dataSchema(i).dataType), - fieldRefs.get(i).getFieldObjectInspector, - dataSchema(i).dataType)) - i += 1 - } - } - - val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] - override protected[sql] def writeInternal(row: InternalRow): Unit = { - wrapOrcStruct(cachedOrcStruct, structOI, row) - - recordWriter.write( - NullWritable.get(), - serializer.serialize(cachedOrcStruct, structOI)) + recordWriter.write(NullWritable.get(), serializer.serialize(row)) } override def close(): Unit = { |