aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-10 18:15:36 -0700
committerCheng Lian <lian@databricks.com>2015-07-10 18:15:36 -0700
commit33630883685eafcc3ee4521ea8363be342f6e6b4 (patch)
tree0e0ca8d10b5e5027e5917e1b5f61ac6f20cd653f /sql
parentb6fc0adf6874fc26ab27cdaa8ebb28474c0681f0 (diff)
downloadspark-33630883685eafcc3ee4521ea8363be342f6e6b4.tar.gz
spark-33630883685eafcc3ee4521ea8363be342f6e6b4.tar.bz2
spark-33630883685eafcc3ee4521ea8363be342f6e6b4.zip
[SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row
This is a follow-up of [SPARK-8888] [1], which also aims to optimize writing dynamic partitions. Three more changes can be made here: 1. Using `InternalRow` instead of `Row` in `BaseWriterContainer.outputWriterForRow` 2. Using `Cast` expressions to convert partition columns to strings, so that we can leverage code generation. 3. Replacing the FP-style `zip` and `map` calls with a faster imperative `while` loop. [1]: https://issues.apache.org/jira/browse/SPARK-8888 Author: Cheng Lian <lian@databricks.com> Closes #7331 from liancheng/spark-8961 and squashes the following commits: b5ab9ae [Cheng Lian] Casts Java iterator to Scala iterator explicitly 719e63b [Cheng Lian] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala73
1 files changed, 42 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 9189d17611..5c6ef2dc90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.sources
import java.util.{Date, UUID}
+import scala.collection.JavaConversions.asScalaIterator
+
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
@@ -26,15 +28,14 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceF
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration
private[sql] case class InsertIntoDataSource(
@@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)
- val converter = if (needsConversion) {
+ val converter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
r: InternalRow => r.asInstanceOf[Row]
}
while (iterator.hasNext) {
- val row = converter(iterator.next())
- writerContainer.outputWriterForRow(row).write(row)
+ val internalRow = iterator.next()
+ writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
}
writerContainer.commitTask()
@@ -239,7 +240,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)
- val partitionProj = newProjection(codegenEnabled, partitionOutput, output)
+ // Projects all partition columns and casts them to strings to build partition directories.
+ val partitionCasts = partitionOutput.map(Cast(_, StringType))
+ val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)
val dataConverter: InternalRow => Row = if (needsConversion) {
@@ -247,15 +250,11 @@ private[sql] case class InsertIntoHadoopFsRelation(
} else {
r: InternalRow => r.asInstanceOf[Row]
}
- val partitionSchema = StructType.fromAttributes(partitionOutput)
- val partConverter: InternalRow => Row =
- CatalystTypeConverters.createToScalaConverter(partitionSchema)
- .asInstanceOf[InternalRow => Row]
while (iterator.hasNext) {
- val row = iterator.next()
- val partitionPart = partConverter(partitionProj(row))
- val dataPart = dataConverter(dataProj(row))
+ val internalRow = iterator.next()
+ val partitionPart = partitionProj(internalRow)
+ val dataPart = dataConverter(dataProj(internalRow))
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
}
@@ -435,7 +434,7 @@ private[sql] abstract class BaseWriterContainer(
}
// Called on executor side when writing rows
- def outputWriterForRow(row: Row): OutputWriter
+ def outputWriterForRow(row: InternalRow): OutputWriter
protected def initWriters(): Unit
@@ -477,7 +476,7 @@ private[sql] class DefaultWriterContainer(
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}
- override def outputWriterForRow(row: Row): OutputWriter = writer
+ override def outputWriterForRow(row: InternalRow): OutputWriter = writer
override def commitTask(): Unit = {
try {
@@ -518,23 +517,36 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters = new java.util.HashMap[String, OutputWriter]
}
- override def outputWriterForRow(row: Row): OutputWriter = {
- // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
- val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
- val string = if (rawValue == null) null else String.valueOf(rawValue)
- val valueString = if (string == null || string.isEmpty) {
- defaultPartitionName
- } else {
- PartitioningUtils.escapePathName(string)
+ // The `row` argument is supposed to only contain partition column values which have been casted
+ // to strings.
+ override def outputWriterForRow(row: InternalRow): OutputWriter = {
+ val partitionPath = {
+ val partitionPathBuilder = new StringBuilder
+ var i = 0
+
+ while (i < partitionColumns.length) {
+ val col = partitionColumns(i)
+ val partitionValueString = {
+ val string = row.getString(i)
+ if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
+ }
+
+ if (i > 0) {
+ partitionPathBuilder.append(Path.SEPARATOR_CHAR)
+ }
+
+ partitionPathBuilder.append(s"$col=$partitionValueString")
+ i += 1
}
- s"/$col=$valueString"
- }.mkString.stripPrefix(Path.SEPARATOR)
+
+ partitionPathBuilder.toString()
+ }
val writer = outputWriters.get(partitionPath)
if (writer.eq(null)) {
val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
- new Path(outputPath, partitionPath).toString)
+ taskAttemptContext.getConfiguration.set(
+ "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
outputWriters.put(partitionPath, newWriter)
newWriter
@@ -545,8 +557,7 @@ private[sql] class DynamicPartitionWriterContainer(
private def clearOutputWriters(): Unit = {
if (!outputWriters.isEmpty) {
- val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
- iter.foreach(_.close())
+ asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
outputWriters.clear()
}
}