aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-08-06 09:12:41 -0700
committerDavies Liu <davies.liu@gmail.com>2015-08-06 09:12:41 -0700
commit93085c992e40dbc06714cb1a64c838e25e683a6f (patch)
tree0b673dc432d13714dcc69f33c4d8849efc0a1c93
parent5b965d64ee1687145ba793da749659c8f67384e8 (diff)
downloadspark-93085c992e40dbc06714cb1a64c838e25e683a6f.tar.gz
spark-93085c992e40dbc06714cb1a64c838e25e683a6f.tar.bz2
spark-93085c992e40dbc06714cb1a64c838e25e683a6f.zip
[SPARK-9482] [SQL] Fix thread-safey issue of using UnsafeProjection in join
This PR also change to use `def` instead of `lazy val` for UnsafeProjection, because it's not thread safe. TODO: cleanup the debug code once the flaky test passed 100 times. Author: Davies Liu <davies@databricks.com> Closes #7940 from davies/semijoin and squashes the following commits: 93baac7 [Davies Liu] fix outerjoin 5c40ded [Davies Liu] address comments aa3de46 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin 7590a25 [Davies Liu] Merge branch 'master' of github.com:apache/spark into semijoin 2d4085b [Davies Liu] use def for resultProjection 0833407 [Davies Liu] Merge branch 'semijoin' of github.com:davies/spark into semijoin e0d8c71 [Davies Liu] use lazy val 6a59e8f [Davies Liu] Update HashedRelation.scala 0fdacaf [Davies Liu] fix broadcast and thread-safety of UnsafeProjection 2fc3ef6 [Davies Liu] reproduce failure in semijoin
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala6
8 files changed, 44 insertions, 44 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index ec1a148342..f7a68e4f5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -20,14 +20,14 @@ package org.apache.spark.sql.execution.joins
import scala.concurrent._
import scala.concurrent.duration._
-import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
import org.apache.spark.util.ThreadUtils
+import org.apache.spark.{InternalAccumulator, TaskContext}
/**
* :: DeveloperApi ::
@@ -102,6 +102,6 @@ case class BroadcastHashJoin(
object BroadcastHashJoin {
- private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
+ private[joins] val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index e342fd914d..a3626de49a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.joins
import scala.concurrent._
import scala.concurrent.duration._
-import org.apache.spark.{InternalAccumulator, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, Distribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, Partitioning, UnspecifiedDistribution}
import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
-import org.apache.spark.sql.execution.{BinaryNode, SparkPlan, SQLExecution}
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.sql.execution.{BinaryNode, SQLExecution, SparkPlan}
+import org.apache.spark.{InternalAccumulator, TaskContext}
/**
* :: DeveloperApi ::
@@ -76,7 +75,7 @@ case class BroadcastHashOuterJoin(
val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
sparkContext.broadcast(hashed)
}
- }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+ }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
}
protected override def doPrepare(): Unit = {
@@ -98,19 +97,20 @@ case class BroadcastHashOuterJoin(
case _ =>
}
+ val resultProj = resultProjection
joinType match {
case LeftOuter =>
streamedIter.flatMap(currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey))
+ leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj)
})
case RightOuter =>
streamedIter.flatMap(currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow)
+ rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj)
})
case x =>
@@ -120,9 +120,3 @@ case class BroadcastHashOuterJoin(
}
}
}
-
-object BroadcastHashOuterJoin {
-
- private val broadcastHashOuterJoinExecutionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-outer-join", 128))
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 83b726a8e2..23aebf4b06 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -47,7 +47,7 @@ case class BroadcastNestedLoopJoin(
override def outputsUnsafeRows: Boolean = left.outputsUnsafeRows || right.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
- @transient private[this] lazy val resultProjection: InternalRow => InternalRow = {
+ private[this] def genResultProjection: InternalRow => InternalRow = {
if (outputsUnsafeRows) {
UnsafeProjection.create(schema)
} else {
@@ -88,6 +88,7 @@ case class BroadcastNestedLoopJoin(
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
+ val resultProj = genResultProjection
streamedIter.foreach { streamedRow =>
var i = 0
@@ -97,11 +98,11 @@ case class BroadcastNestedLoopJoin(
val broadcastedRow = broadcastedRelation.value(i)
buildSide match {
case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) =>
- matchedRows += resultProjection(joinedRow(streamedRow, broadcastedRow)).copy()
+ matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy()
streamRowMatched = true
includedBroadcastTuples += i
case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) =>
- matchedRows += resultProjection(joinedRow(broadcastedRow, streamedRow)).copy()
+ matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy()
streamRowMatched = true
includedBroadcastTuples += i
case _ =>
@@ -111,9 +112,9 @@ case class BroadcastNestedLoopJoin(
(streamRowMatched, joinType, buildSide) match {
case (false, LeftOuter | FullOuter, BuildRight) =>
- matchedRows += resultProjection(joinedRow(streamedRow, rightNulls)).copy()
+ matchedRows += resultProj(joinedRow(streamedRow, rightNulls)).copy()
case (false, RightOuter | FullOuter, BuildLeft) =>
- matchedRows += resultProjection(joinedRow(leftNulls, streamedRow)).copy()
+ matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
case _ =>
}
}
@@ -127,6 +128,8 @@ case class BroadcastNestedLoopJoin(
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
+ val resultProj = genResultProjection
+
/** Rows from broadcasted joined with nulls. */
val broadcastRowsWithNulls: Seq[InternalRow] = {
val buf: CompactBuffer[InternalRow] = new CompactBuffer()
@@ -138,7 +141,7 @@ case class BroadcastNestedLoopJoin(
joinedRow.withLeft(leftNulls)
while (i < rel.length) {
if (!allIncludedBroadcastTuples.contains(i)) {
- buf += resultProjection(joinedRow.withRight(rel(i))).copy()
+ buf += resultProj(joinedRow.withRight(rel(i))).copy()
}
i += 1
}
@@ -147,7 +150,7 @@ case class BroadcastNestedLoopJoin(
joinedRow.withRight(rightNulls)
while (i < rel.length) {
if (!allIncludedBroadcastTuples.contains(i)) {
- buf += resultProjection(joinedRow.withLeft(rel(i))).copy()
+ buf += resultProj(joinedRow.withLeft(rel(i))).copy()
}
i += 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 6b3d165292..5e9cd9fd23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -52,14 +52,14 @@ trait HashJoin {
override def canProcessUnsafeRows: Boolean = isUnsafeMode
override def canProcessSafeRows: Boolean = !isUnsafeMode
- @transient protected lazy val buildSideKeyGenerator: Projection =
+ protected def buildSideKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(buildKeys, buildPlan.output)
} else {
newMutableProjection(buildKeys, buildPlan.output)()
}
- @transient protected lazy val streamSideKeyGenerator: Projection =
+ protected def streamSideKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(streamedKeys, streamedPlan.output)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
index a323aea4ea..346337e642 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashOuterJoin.scala
@@ -76,14 +76,14 @@ trait HashOuterJoin {
override def canProcessUnsafeRows: Boolean = isUnsafeMode
override def canProcessSafeRows: Boolean = !isUnsafeMode
- @transient protected lazy val buildKeyGenerator: Projection =
+ protected def buildKeyGenerator: Projection =
if (isUnsafeMode) {
UnsafeProjection.create(buildKeys, buildPlan.output)
} else {
newMutableProjection(buildKeys, buildPlan.output)()
}
- @transient protected[this] lazy val streamedKeyGenerator: Projection = {
+ protected[this] def streamedKeyGenerator: Projection = {
if (isUnsafeMode) {
UnsafeProjection.create(streamedKeys, streamedPlan.output)
} else {
@@ -91,7 +91,7 @@ trait HashOuterJoin {
}
}
- @transient private[this] lazy val resultProjection: InternalRow => InternalRow = {
+ protected[this] def resultProjection: InternalRow => InternalRow = {
if (isUnsafeMode) {
UnsafeProjection.create(self.schema)
} else {
@@ -113,7 +113,8 @@ trait HashOuterJoin {
protected[this] def leftOuterIterator(
key: InternalRow,
joinedRow: JoinedRow,
- rightIter: Iterable[InternalRow]): Iterator[InternalRow] = {
+ rightIter: Iterable[InternalRow],
+ resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (rightIter != null) {
@@ -124,12 +125,12 @@ trait HashOuterJoin {
List.empty
}
if (temp.isEmpty) {
- resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil
+ resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
} else {
temp
}
} else {
- resultProjection(joinedRow.withRight(rightNullRow)).copy :: Nil
+ resultProjection(joinedRow.withRight(rightNullRow)) :: Nil
}
}
ret.iterator
@@ -138,24 +139,24 @@ trait HashOuterJoin {
protected[this] def rightOuterIterator(
key: InternalRow,
leftIter: Iterable[InternalRow],
- joinedRow: JoinedRow): Iterator[InternalRow] = {
+ joinedRow: JoinedRow,
+ resultProjection: InternalRow => InternalRow): Iterator[InternalRow] = {
val ret: Iterable[InternalRow] = {
if (!key.anyNull) {
val temp = if (leftIter != null) {
leftIter.collect {
- case l if boundCondition(joinedRow.withLeft(l)) =>
- resultProjection(joinedRow).copy()
+ case l if boundCondition(joinedRow.withLeft(l)) => resultProjection(joinedRow).copy()
}
} else {
List.empty
}
if (temp.isEmpty) {
- resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil
+ resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
} else {
temp
}
} else {
- resultProjection(joinedRow.withLeft(leftNullRow)).copy :: Nil
+ resultProjection(joinedRow.withLeft(leftNullRow)) :: Nil
}
}
ret.iterator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 97fde8f975..47a7d370f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -43,14 +43,14 @@ trait HashSemiJoin {
override def canProcessUnsafeRows: Boolean = supportUnsafe
override def canProcessSafeRows: Boolean = !supportUnsafe
- @transient protected lazy val leftKeyGenerator: Projection =
+ protected def leftKeyGenerator: Projection =
if (supportUnsafe) {
UnsafeProjection.create(leftKeys, left.output)
} else {
newMutableProjection(leftKeys, left.output)()
}
- @transient protected lazy val rightKeyGenerator: Projection =
+ protected def rightKeyGenerator: Projection =
if (supportUnsafe) {
UnsafeProjection.create(rightKeys, right.output)
} else {
@@ -62,12 +62,11 @@ trait HashSemiJoin {
protected def buildKeyHashSet(buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
val hashSet = new java.util.HashSet[InternalRow]()
- var currentRow: InternalRow = null
// Create a Hash set of buildKeys
val rightKey = rightKeyGenerator
while (buildIter.hasNext) {
- currentRow = buildIter.next()
+ val currentRow = buildIter.next()
val rowKey = rightKey(currentRow)
if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
@@ -76,6 +75,7 @@ trait HashSemiJoin {
}
}
}
+
hashSet
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 58b4236f7b..3f257ecdd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -17,12 +17,11 @@
package org.apache.spark.sql.execution.joins
-import java.io.{IOException, Externalizable, ObjectInput, ObjectOutput}
+import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.nio.ByteOrder
import java.util.{HashMap => JavaHashMap}
import org.apache.spark.shuffle.ShuffleMemoryManager
-import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
@@ -31,6 +30,7 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager}
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.{SparkConf, SparkEnv}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
index eee8ad800f..6a8c35efca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
@@ -60,19 +60,21 @@ case class ShuffledHashOuterJoin(
case LeftOuter =>
val hashed = HashedRelation(rightIter, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
+ val resultProj = resultProjection
leftIter.flatMap( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withLeft(currentRow)
- leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey))
+ leftOuterIterator(rowKey, joinedRow, hashed.get(rowKey), resultProj)
})
case RightOuter =>
val hashed = HashedRelation(leftIter, buildKeyGenerator)
val keyGenerator = streamedKeyGenerator
+ val resultProj = resultProjection
rightIter.flatMap ( currentRow => {
val rowKey = keyGenerator(currentRow)
joinedRow.withRight(currentRow)
- rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow)
+ rightOuterIterator(rowKey, hashed.get(rowKey), joinedRow, resultProj)
})
case FullOuter =>