aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-08-11 20:45:14 -0700
committerMichael Armbrust <michael@databricks.com>2014-08-11 20:45:14 -0700
commit5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8 (patch)
tree01916ba2c12f4bd1a967adc46bb095eeac13d8a3
parentbad21ed085a505559dccc06223b486170371ddd2 (diff)
downloadspark-5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8.tar.gz
spark-5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8.tar.bz2
spark-5d54d71ddbac1fbb26925a8c9138bbb8c0e81db8.zip
[SQL] [SPARK-2826] Reduce the memory copy while building the hashmap for HashOuterJoin
This is a follow up for #1147 , this PR will improve the performance about 10% - 15% in my local tests. ``` Before: LeftOuterJoin: took 16750 ms ([3000000] records) LeftOuterJoin: took 15179 ms ([3000000] records) RightOuterJoin: took 15515 ms ([3000000] records) RightOuterJoin: took 15276 ms ([3000000] records) FullOuterJoin: took 19150 ms ([6000000] records) FullOuterJoin: took 18935 ms ([6000000] records) After: LeftOuterJoin: took 15218 ms ([3000000] records) LeftOuterJoin: took 13503 ms ([3000000] records) RightOuterJoin: took 13663 ms ([3000000] records) RightOuterJoin: took 14025 ms ([3000000] records) FullOuterJoin: took 16624 ms ([6000000] records) FullOuterJoin: took 16578 ms ([6000000] records) ``` Besides the performance improvement, I also do some clean up as suggested in #1147 Author: Cheng Hao <hao.cheng@intel.com> Closes #1765 from chenghao-intel/hash_outer_join_fixing and squashes the following commits: ab1f9e0 [Cheng Hao] Reduce the memory copy while building the hashmap
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala54
1 files changed, 28 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index ea075f8c65..c86811e838 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution
+import java.util.{HashMap => JavaHashMap}
+
import scala.collection.mutable.{ArrayBuffer, BitSet}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
@@ -137,14 +139,6 @@ trait HashJoin {
}
/**
- * Constant Value for Binary Join Node
- */
-object HashOuterJoin {
- val DUMMY_LIST = Seq[Row](null)
- val EMPTY_LIST = Seq[Row]()
-}
-
-/**
* :: DeveloperApi ::
* Performs a hash based outer join for two child relations by shuffling the data using
* the join keys. This operator requires loading the associated partition in both side into memory.
@@ -181,6 +175,9 @@ case class HashOuterJoin(
}
}
+ @transient private[this] lazy val DUMMY_LIST = Seq[Row](null)
+ @transient private[this] lazy val EMPTY_LIST = Seq.empty[Row]
+
// TODO we need to rewrite all of the iterators with our own implementation instead of the Scala
// iterator for performance purpose.
@@ -199,8 +196,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
- }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
- // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+ }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in right side.
// If we didn't get any proper row, then append a single row with empty right
@@ -224,8 +221,8 @@ case class HashOuterJoin(
joinedRow.copy
} else {
Nil
- }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
- // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+ }) ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all of the
// records in left side.
// If we didn't get any proper row, then append a single row with empty left.
@@ -259,10 +256,10 @@ case class HashOuterJoin(
rightMatchedSet.add(idx)
joinedRow.copy
}
- } ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+ } ++ DUMMY_LIST.filter(_ => !matched).map( _ => {
// 2. For those unmatched records in left, append additional records with empty right.
- // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
+ // DUMMY_LIST.filter(_ => !matched) is a tricky way to add additional row,
// as we don't know whether we need to append it until finish iterating all
// of the records in right side.
// If we didn't get any proper row, then append a single row with empty right.
@@ -287,18 +284,22 @@ case class HashOuterJoin(
}
private[this] def buildHashTable(
- iter: Iterator[Row], keyGenerator: Projection): Map[Row, ArrayBuffer[Row]] = {
- // TODO: Use Spark's HashMap implementation.
- val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
+ iter: Iterator[Row], keyGenerator: Projection): JavaHashMap[Row, ArrayBuffer[Row]] = {
+ val hashTable = new JavaHashMap[Row, ArrayBuffer[Row]]()
while (iter.hasNext) {
val currentRow = iter.next()
val rowKey = keyGenerator(currentRow)
- val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new ArrayBuffer[Row]()})
+ var existingMatchList = hashTable.get(rowKey)
+ if (existingMatchList == null) {
+ existingMatchList = new ArrayBuffer[Row]()
+ hashTable.put(rowKey, existingMatchList)
+ }
+
existingMatchList += currentRow.copy()
}
-
- hashTable.toMap[Row, ArrayBuffer[Row]]
+
+ hashTable
}
def execute() = {
@@ -309,21 +310,22 @@ case class HashOuterJoin(
// Build HashMap for current partition in right relation
val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output))
+ import scala.collection.JavaConversions._
val boundCondition =
condition.map(newPredicate(_, left.output ++ right.output)).getOrElse((row: Row) => true)
joinType match {
case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
- leftOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
- rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ leftOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
+ rightHashTable.getOrElse(key, EMPTY_LIST))
}
case RightOuter => rightHashTable.keysIterator.flatMap { key =>
- rightOuterIterator(key, leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
- rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ rightOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST),
+ rightHashTable.getOrElse(key, EMPTY_LIST))
}
case FullOuter => (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key =>
fullOuterIterator(key,
- leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
- rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ leftHashTable.getOrElse(key, EMPTY_LIST),
+ rightHashTable.getOrElse(key, EMPTY_LIST))
}
case x => throw new Exception(s"HashOuterJoin should not take $x as the JoinType")
}