diff options
author | Davies Liu <davies@databricks.com> | 2015-08-06 09:12:41 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-08-06 09:12:41 -0700 |
commit | 93085c992e40dbc06714cb1a64c838e25e683a6f (patch) | |
tree | 0b673dc432d13714dcc69f33c4d8849efc0a1c93 | |
parent | 5b965d64ee1687145ba793da749659c8f67384e8 (diff) | |
download | spark-93085c992e40dbc06714cb1a64c838e25e683a6f.tar.gz spark-93085c992e40dbc06714cb1a64c838e25e683a6f.tar.bz2 spark-93085c992e40dbc06714cb1a64c838e25e683a6f.zip |
[SPARK-9482] [SQL] Fix thread-safey issue of using UnsafeProjection in join
This PR also change to use `def` instead of `lazy val` for UnsafeProjection, because it's not thread safe.
TODO: cleanup the debug code once the flaky test passed 100 times.
Author: Davies Liu <davies@databricks.com>
Closes #7940 from davies/semijoin and squashes the following commits:
93baac7 [Davies Liu] fix outerjoin
5c40ded [Davies Liu] address comments
aa3de46 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin
7590a25 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin
2d4085b [Davies Liu] use def for resultProjection
0833407 [Davies Liu] Merge branch 'semijoin' of github.com:davies/spark into semijoin
e0d8c71 [Davies Liu] use lazy val
6a59e8f [Davies Liu] Update HashedRelation.scala
0fdacaf [Davies Liu] fix broadcast and thread-safety of UnsafeProjection
2fc3ef6 [Davies Liu] reproduce failure in semijoin
8 files changed, 44 insertions, 44 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index ec1a148342..f7a68e4f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -20,14 +20,14 @@ package org.apache.spark.sql.execution.joins import scala.concurrent._ import scala.concurrent.duration._ -import org.apache.spark.{InternalAccumulator, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution} -import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan} import org.apache.spark.util.ThreadUtils +import org.apache.spark.{InternalAccumulator, TaskContext} /** * :: DeveloperApi :: @@ -102,6 +102,6 @@ case class BroadcastHashJoin( object BroadcastHashJoin { - private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService( + private[joins] val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala index e342fd914d..a3626de49a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala @@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.joins import scala.concurrent._
import scala.concurrent.duration._
-import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, Distribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.{InternalAccumulator, TaskContext}
/**
* :: DeveloperApi ::
@@ -76,7 +75,7 @@ case class BroadcastHashOuterJoin( val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
- }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+ }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
}
protected override def doPrepare(): Unit = {
@@ -98,19 +97,20 @@ case class BroadcastHashOuterJoin( case _ =>
}
+ val resultProj = resultProjection
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey))
+ leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj)
})
case RightOuter =>
streamedIter.flatMap(currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow)
+ rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj)
})
case x =>
@@ -120,9 +120,3 @@ case class BroadcastHashOuterJoin( }
}
}
-
-object BroadcastHashOuterJoin {
-
- private val broadcastHashOuterJoinExecutionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-outer-join", 128))
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 83b726a8e2..23aebf4b06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -47,7 +47,7 @@ case class BroadcastNestedLoopJoin( override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows override def canProcessUnsafeRows: Boolean = true - @transient private[this] lazy val resultProjection: InternalRow => InternalRow = { + private[this] def genResultProjection: InternalRow => InternalRow = { if (outputsUnsafeRows) { UnsafeProjection.create(schema) } else { @@ -88,6 +88,7 @@ case class BroadcastNestedLoopJoin( val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) + val resultProj = genResultProjection streamedIter.foreach { streamedRow => var i = 0 @@ -97,11 +98,11 @@ case class BroadcastNestedLoopJoin( val broadcastedRow = broadcastedRelation.value(i) buildSide match { case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) => - matchedRows += resultProjection(joinedRow(streamedRow, broadcastedRow)).copy() + matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy() streamRowMatched = true includedBroadcastTuples += i case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) => - matchedRows += resultProjection(joinedRow(broadcastedRow, streamedRow)).copy() + matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy() streamRowMatched = true includedBroadcastTuples += i case _ => @@ -111,9 +112,9 @@ case class BroadcastNestedLoopJoin( (streamRowMatched, joinType, buildSide) match { case (false, LeftOuter | FullOuter, BuildRight) => - matchedRows += resultProjection(joinedRow(streamedRow, rightNulls)).copy() + matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy() case (false, RightOuter | FullOuter, BuildLeft) => - matchedRows += resultProjection(joinedRow(leftNulls, streamedRow)).copy() + matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy() case _ => } } @@ -127,6 +128,8 @@ case class BroadcastNestedLoopJoin( val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) + val resultProj = genResultProjection + /** Rows from broadcasted joined with nulls. */ val broadcastRowsWithNulls: Seq[InternalRow] = { val buf: CompactBuffer[InternalRow] = new CompactBuffer() @@ -138,7 +141,7 @@ case class BroadcastNestedLoopJoin( joinedRow.withLeft(leftNulls) while (i < rel.length) { if (!allIncludedBroadcastTuples.contains(i)) { - buf += resultProjection(joinedRow.withRight(rel(i))).copy() + buf += resultProj(joinedRow.withRight(rel(i))).copy() } i += 1 } @@ -147,7 +150,7 @@ case class BroadcastNestedLoopJoin( joinedRow.withRight(rightNulls) while (i < rel.length) { if (!allIncludedBroadcastTuples.contains(i)) { - buf += resultProjection(joinedRow.withLeft(rel(i))).copy() + buf += resultProj(joinedRow.withLeft(rel(i))).copy() } i += 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 6b3d165292..5e9cd9fd23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -52,14 +52,14 @@ trait HashJoin { override def canProcessUnsafeRows: Boolean = isUnsafeMode override def canProcessSafeRows: Boolean = !isUnsafeMode - @transient protected lazy val buildSideKeyGenerator: Projection = + protected def buildSideKeyGenerator: Projection = if (isUnsafeMode) { UnsafeProjection.create(buildKeys, buildPlan.output) } else { newMutableProjection(buildKeys, buildPlan.output)() } - @transient protected lazy val streamSideKeyGenerator: Projection = + protected def streamSideKeyGenerator: Projection = if (isUnsafeMode) { UnsafeProjection.create(streamedKeys, streamedPlan.output) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala index a323aea4ea..346337e642 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala @@ -76,14 +76,14 @@ trait HashOuterJoin { override def canProcessUnsafeRows: Boolean = isUnsafeMode override def canProcessSafeRows: Boolean = !isUnsafeMode - @transient protected lazy val buildKeyGenerator: Projection = + protected def buildKeyGenerator: Projection = if (isUnsafeMode) { UnsafeProjection.create(buildKeys, buildPlan.output) } else { newMutableProjection(buildKeys, buildPlan.output)() } - @transient protected[this] lazy val streamedKeyGenerator: Projection = { + protected[this] def streamedKeyGenerator: Projection = { if (isUnsafeMode) { UnsafeProjection.create(streamedKeys, streamedPlan.output) } else { @@ -91,7 +91,7 @@ trait HashOuterJoin { } } - @transient private[this] lazy val resultProjection: InternalRow => InternalRow = { + protected[this] def resultProjection: InternalRow => InternalRow = { if (isUnsafeMode) { UnsafeProjection.create(self.schema) } else { @@ -113,7 +113,8 @@ trait HashOuterJoin { protected[this] def leftOuterIterator( key: InternalRow, joinedRow: JoinedRow, - rightIter: Iterable[InternalRow]): Iterator[InternalRow] = { + rightIter: Iterable[InternalRow], + resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = { val ret: Iterable[InternalRow] = { if (!key.anyNull) { val temp = if (rightIter != null) { @@ -124,12 +125,12 @@ trait HashOuterJoin { List.empty } if (temp.isEmpty) { - resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil + resultProjection(joinedRow.withRight(rightNullRow)) :: Nil } else { temp } } else { - resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil + resultProjection(joinedRow.withRight(rightNullRow)) :: Nil } } ret.iterator @@ -138,24 +139,24 @@ trait HashOuterJoin { protected[this] def rightOuterIterator( key: InternalRow, leftIter: Iterable[InternalRow], - joinedRow: JoinedRow): Iterator[InternalRow] = { + joinedRow: JoinedRow, + resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = { val ret: Iterable[InternalRow] = { if (!key.anyNull) { val temp = if (leftIter != null) { leftIter.collect { - case l if boundCondition(joinedRow.withLeft(l)) => - resultProjection(joinedRow).copy() + case l if boundCondition(joinedRow.withLeft(l)) => resultProjection(joinedRow).copy() } } else { List.empty } if (temp.isEmpty) { - resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil + resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil } else { temp } } else { - resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil + resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil } } ret.iterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala index 97fde8f975..47a7d370f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala @@ -43,14 +43,14 @@ trait HashSemiJoin { override def canProcessUnsafeRows: Boolean = supportUnsafe override def canProcessSafeRows: Boolean = !supportUnsafe - @transient protected lazy val leftKeyGenerator: Projection = + protected def leftKeyGenerator: Projection = if (supportUnsafe) { UnsafeProjection.create(leftKeys, left.output) } else { newMutableProjection(leftKeys, left.output)() } - @transient protected lazy val rightKeyGenerator: Projection = + protected def rightKeyGenerator: Projection = if (supportUnsafe) { UnsafeProjection.create(rightKeys, right.output) } else { @@ -62,12 +62,11 @@ trait HashSemiJoin { protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = { val hashSet = new java.util.HashSet[InternalRow]() - var currentRow: InternalRow = null // Create a Hash set of buildKeys val rightKey = rightKeyGenerator while (buildIter.hasNext) { - currentRow = buildIter.next() + val currentRow = buildIter.next() val rowKey = rightKey(currentRow) if (!rowKey.anyNull) { val keyExists = hashSet.contains(rowKey) @@ -76,6 +75,7 @@ trait HashSemiJoin { } } } + hashSet } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 58b4236f7b..3f257ecdd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.execution.joins -import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput} +import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import java.nio.ByteOrder import java.util.{HashMap => JavaHashMap} import org.apache.spark.shuffle.ShuffleMemoryManager -import org.apache.spark.{SparkConf, SparkEnv, TaskContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer @@ -31,6 +30,7 @@ import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} import org.apache.spark.util.Utils import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.{SparkConf, SparkEnv} /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index eee8ad800f..6a8c35efca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -60,19 +60,21 @@ case class ShuffledHashOuterJoin( case LeftOuter =>
val hashed = HashedRelation(rightIter, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
+ val resultProj = resultProjection
leftIter.flatMap( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey))
+ leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj)
})
case RightOuter =>
val hashed = HashedRelation(leftIter, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
+ val resultProj = resultProjection
rightIter.flatMap ( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow)
+ rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj)
})
case FullOuter =>
|