aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-04-25 12:33:32 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-25 12:33:32 -0700
commit6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b (patch)
treed98096dd1f3cba95289ed51a2f65accc38e6d7e4 /yarn
parent425f6916462ca5d0595c61101d52686006ed6b8b (diff)
downloadspark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.tar.gz
spark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.tar.bz2
spark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.zip
[SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle service compatible with 1.x
## What changes were proposed in this pull request? SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service. I put string comparison into executor's register at first avoid string comparison in getBlockData every time. ## How was this patch tested? N/A Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #12568 from lianhuiwang/SPARK-14731.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala13
1 files changed, 7 insertions, 6 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 5a426b86d1..0e433f6c1b 100644
--- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
+ private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
override def beforeEach(): Unit = {
super.beforeEach()
@@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
- val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
- val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+ val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+ val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
- val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
- val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+ val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
+ val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s1.initializeApplication(app1Data)
val execStateFile = s1.registeredExecutorFile
- val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
+ val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
@@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val app2Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app2Id, null)
s2.initializeApplication(app2Data)
- val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
+ val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2))
s2.stop()