aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-07-08 10:56:31 -0700
committerReynold Xin <rxin@databricks.com>2015-07-08 10:56:31 -0700
commitf61c989b404808f79a58b6503cf3835cf602528a (patch)
tree3663f739af3905493320db42c88ce9475aa8eed1
parent0ba98c04c726a827df8cb19b0db17c352a647960 (diff)
downloadspark-f61c989b404808f79a58b6503cf3835cf602528a.tar.gz
spark-f61c989b404808f79a58b6503cf3835cf602528a.tar.bz2
spark-f61c989b404808f79a58b6503cf3835cf602528a.zip
[SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
Just a baby step towards making it more efficient. Author: Reynold Xin <rxin@databricks.com> Closes #7282 from rxin/SPARK-8888 and squashes the following commits: 3da51ae [Reynold Xin] [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala36
1 files changed, 23 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index a97142d4a1..ecbc889770 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
import java.util.{Date, UUID}
-import scala.collection.mutable
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
@@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
!exists
}
// If we are appending data to an existing dir.
- val isAppend = (pathExists) && (mode == SaveMode.Append)
+ val isAppend = pathExists && (mode == SaveMode.Append)
if (doInsertion) {
val job = new Job(hadoopConf)
@@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation(
}
}
- Seq.empty[InternalRow]
+ Seq.empty[Row]
}
+ /**
+ * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
+ */
private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
// Uses local vals for serialization
val needsConversion = relation.needConversion
@@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation(
}
}
+ /**
+ * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
+ */
private def insertWithDynamicPartitions(
sqlContext: SQLContext,
writerContainer: BaseWriterContainer,
@@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer(
extends BaseWriterContainer(relation, job, isAppend) {
// All output writers are created on executor side.
- @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _
+ @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
override protected def initWriters(): Unit = {
- outputWriters = mutable.Map.empty[String, OutputWriter]
+ outputWriters = new java.util.HashMap[String, OutputWriter]
}
override def outputWriterForRow(row: Row): OutputWriter = {
+ // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
val string = if (rawValue == null) null else String.valueOf(rawValue)
val valueString = if (string == null || string.isEmpty) {
@@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer(
s"/$col=$valueString"
}.mkString.stripPrefix(Path.SEPARATOR)
- outputWriters.getOrElseUpdate(partitionPath, {
+ val writer = outputWriters.get(partitionPath)
+ if (writer.eq(null)) {
val path = new Path(getWorkPath, partitionPath)
- taskAttemptContext.getConfiguration.set(
- "spark.sql.sources.output.path",
+ taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
- outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
- })
+ val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
+ outputWriters.put(partitionPath, newWriter)
+ newWriter
+ } else {
+ writer
+ }
}
private def clearOutputWriters(): Unit = {
- if (outputWriters.nonEmpty) {
- outputWriters.values.foreach(_.close())
+ if (!outputWriters.isEmpty) {
+ val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
+ iter.foreach(_.close())
outputWriters.clear()
}
}