aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErgin Seyfe <eseyfe@fb.com>2016-05-21 16:08:31 -0700
committerYin Huai <yhuai@databricks.com>2016-05-21 16:08:31 -0700
commitc18fa464f404ed2612f8c4d355cb0544b355975b (patch)
tree77bcad85d3a5fe2b32b60ecabde2947d9ee85c38
parent201a51f36682726d78d9d2fe2c388093bb860ee0 (diff)
downloadspark-c18fa464f404ed2612f8c4d355cb0544b355975b.tar.gz
spark-c18fa464f404ed2612f8c4d355cb0544b355975b.tar.bz2
spark-c18fa464f404ed2612f8c4d355cb0544b355975b.zip
[SPARK-15280] Input/Output] Refactored OrcOutputWriter and moved serialization to a new class.
## What changes were proposed in this pull request? Refactoring: Separated ORC serialization logic from OrcOutputWriter and moved to a new class called OrcSerializer. ## How was this patch tested? Manual tests & existing tests. Author: Ergin Seyfe <eseyfe@fb.com> Closes #13066 from seyfe/orc_serializer.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala84
1 files changed, 45 insertions, 39 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 6e55137dd7..38f50c112a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -149,39 +149,70 @@ private[sql] class DefaultSource
}
}
-private[orc] class OrcOutputWriter(
- path: String,
- bucketId: Option[Int],
- dataSchema: StructType,
- context: TaskAttemptContext)
- extends OutputWriter with HiveInspectors {
+private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration)
+ extends HiveInspectors {
+
+ def serialize(row: InternalRow): Writable = {
+ wrapOrcStruct(cachedOrcStruct, structOI, row)
+ serializer.serialize(cachedOrcStruct, structOI)
+ }
- private val serializer = {
+ private[this] val serializer = {
val table = new Properties()
table.setProperty("columns", dataSchema.fieldNames.mkString(","))
table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":"))
val serde = new OrcSerde
- val configuration = context.getConfiguration
- serde.initialize(configuration, table)
+ serde.initialize(conf, table)
serde
}
- // Object inspector converted from the schema of the relation to be written.
- private val structOI = {
+ // Object inspector converted from the schema of the relation to be serialized.
+ private[this] val structOI = {
val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
.asInstanceOf[SettableStructObjectInspector]
}
+ private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
+
+ private[this] def wrapOrcStruct(
+ struct: OrcStruct,
+ oi: SettableStructObjectInspector,
+ row: InternalRow): Unit = {
+ val fieldRefs = oi.getAllStructFieldRefs
+ var i = 0
+ while (i < fieldRefs.size) {
+
+ oi.setStructFieldData(
+ struct,
+ fieldRefs.get(i),
+ wrap(
+ row.get(i, dataSchema(i).dataType),
+ fieldRefs.get(i).getFieldObjectInspector,
+ dataSchema(i).dataType))
+ i += 1
+ }
+ }
+}
+
+private[orc] class OrcOutputWriter(
+ path: String,
+ bucketId: Option[Int],
+ dataSchema: StructType,
+ context: TaskAttemptContext)
+ extends OutputWriter {
+
+ private[this] val conf = context.getConfiguration
+
+ private[this] val serializer = new OrcSerializer(dataSchema, conf)
+
// `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this
// flag to decide whether `OrcRecordWriter.close()` needs to be called.
private var recordWriterInstantiated = false
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
-
- val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
@@ -206,33 +237,8 @@ private[orc] class OrcOutputWriter(
override def write(row: Row): Unit =
throw new UnsupportedOperationException("call writeInternal")
- private def wrapOrcStruct(
- struct: OrcStruct,
- oi: SettableStructObjectInspector,
- row: InternalRow): Unit = {
- val fieldRefs = oi.getAllStructFieldRefs
- var i = 0
- while (i < fieldRefs.size) {
-
- oi.setStructFieldData(
- struct,
- fieldRefs.get(i),
- wrap(
- row.get(i, dataSchema(i).dataType),
- fieldRefs.get(i).getFieldObjectInspector,
- dataSchema(i).dataType))
- i += 1
- }
- }
-
- val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct]
-
override protected[sql] def writeInternal(row: InternalRow): Unit = {
- wrapOrcStruct(cachedOrcStruct, structOI, row)
-
- recordWriter.write(
- NullWritable.get(),
- serializer.serialize(cachedOrcStruct, structOI))
+ recordWriter.write(NullWritable.get(), serializer.serialize(row))
}
override def close(): Unit = {