From f61c989b404808f79a58b6503cf3835cf602528a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 8 Jul 2015 10:56:31 -0700 Subject: [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer. Just a baby step towards making it more efficient. Author: Reynold Xin Closes #7282 from rxin/SPARK-8888 and squashes the following commits: 3da51ae [Reynold Xin] [SPARK-8888][SQL] Use java.util.HashMap in DynamicPartitionWriterContainer. --- .../org/apache/spark/sql/sources/commands.scala | 36 ++++++++++++++-------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index a97142d4a1..ecbc889770 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} -import scala.collection.mutable - import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} @@ -110,7 +108,7 @@ private[sql] case class InsertIntoHadoopFsRelation( !exists } // If we are appending data to an existing dir. - val isAppend = (pathExists) && (mode == SaveMode.Append) + val isAppend = pathExists && (mode == SaveMode.Append) if (doInsertion) { val job = new Job(hadoopConf) @@ -142,9 +140,12 @@ private[sql] case class InsertIntoHadoopFsRelation( } } - Seq.empty[InternalRow] + Seq.empty[Row] } + /** + * Inserts the content of the [[DataFrame]] into a table without any partitioning columns. + */ private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = { // Uses local vals for serialization val needsConversion = relation.needConversion @@ -188,6 +189,9 @@ private[sql] case class InsertIntoHadoopFsRelation( } } + /** + * Inserts the content of the [[DataFrame]] into a table with partitioning columns. + */ private def insertWithDynamicPartitions( sqlContext: SQLContext, writerContainer: BaseWriterContainer, @@ -497,13 +501,14 @@ private[sql] class DynamicPartitionWriterContainer( extends BaseWriterContainer(relation, job, isAppend) { // All output writers are created on executor side. - @transient protected var outputWriters: mutable.Map[String, OutputWriter] = _ + @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ override protected def initWriters(): Unit = { - outputWriters = mutable.Map.empty[String, OutputWriter] + outputWriters = new java.util.HashMap[String, OutputWriter] } override def outputWriterForRow(row: Row): OutputWriter = { + // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient. val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) => val string = if (rawValue == null) null else String.valueOf(rawValue) val valueString = if (string == null || string.isEmpty) { @@ -514,18 +519,23 @@ private[sql] class DynamicPartitionWriterContainer( s"/$col=$valueString" }.mkString.stripPrefix(Path.SEPARATOR) - outputWriters.getOrElseUpdate(partitionPath, { + val writer = outputWriters.get(partitionPath) + if (writer.eq(null)) { val path = new Path(getWorkPath, partitionPath) - taskAttemptContext.getConfiguration.set( - "spark.sql.sources.output.path", + taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) - }) + val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + outputWriters.put(partitionPath, newWriter) + newWriter + } else { + writer + } } private def clearOutputWriters(): Unit = { - if (outputWriters.nonEmpty) { - outputWriters.values.foreach(_.close()) + if (!outputWriters.isEmpty) { + val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator()) + iter.foreach(_.close()) outputWriters.clear() } } -- cgit v1.2.3