diff options
author | Reynold Xin <rxin@databricks.com> | 2015-07-08 10:56:31 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-08 10:56:31 -0700 |
commit | f61c989b404808f79a58b6503cf3835cf602528a (patch) | |
tree | 3663f739af3905493320db42c88ce9475aa8eed1 /sql | |
parent | 0ba98c04c726a827df8cb19b0db17c352a647960 (diff) | |
download | spark-f61c989b404808f79a58b6503cf3835cf602528a.tar.gz spark-f61c989b404808f79a58b6503cf3835cf602528a.tar.bz2 spark-f61c989b404808f79a58b6503cf3835cf602528a.zip |
[SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
Just a baby step towards making it more efficient.
Author: Reynold Xin <rxin@databricks.com>
Closes #7282 from rxin/SPARK-8888 and squashes the following commits:
3da51ae [Reynold Xin] [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala | 36 |
1 files changed, 23 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index a97142d4a1..ecbc889770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} -import scala.collection.mutable - import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} @@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation( !exists } // If we are appending data to an existing dir. - val isAppend = (pathExists) && (mode == SaveMode.Append) + val isAppend = pathExists && (mode == SaveMode.Append) if (doInsertion) { val job = new Job(hadoopConf) @@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation( } } - Seq.empty[InternalRow] + Seq.empty[Row] } + /** + * Inserts the content of the [[DataFrame]] into a table without any partitioning columns. + */ private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = { // Uses local vals for serialization val needsConversion = relation.needConversion @@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation( } } + /** + * Inserts the content of the [[DataFrame]] into a table with partitioning columns. + */ private def insertWithDynamicPartitions( sqlContext: SQLContext, writerContainer: BaseWriterContainer, @@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer( extends BaseWriterContainer(relation, job, isAppend) { // All output writers are created on executor side. - @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _ + @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ override protected def initWriters(): Unit = { - outputWriters = mutable.Map.empty[String, OutputWriter] + outputWriters = new java.util.HashMap[String, OutputWriter] } override def outputWriterForRow(row: Row): OutputWriter = { + // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient. val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) => val string = if (rawValue == null) null else String.valueOf(rawValue) val valueString = if (string == null || string.isEmpty) { @@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer( s"/$col=$valueString" }.mkString.stripPrefix(Path.SEPARATOR) - outputWriters.getOrElseUpdate(partitionPath, { + val writer = outputWriters.get(partitionPath) + if (writer.eq(null)) { val path = new Path(getWorkPath, partitionPath) - taskAttemptContext.getConfiguration.set( - "spark.sql.sources.output.path", + taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) - }) + val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + outputWriters.put(partitionPath, newWriter) + newWriter + } else { + writer + } } private def clearOutputWriters(): Unit = { - if (outputWriters.nonEmpty) { - outputWriters.values.foreach(_.close()) + if (!outputWriters.isEmpty) { + val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator()) + iter.foreach(_.close()) outputWriters.clear() } } |