diff options
author | Lianhui Wang <lianhuiwang09@gmail.com> | 2016-04-25 12:33:32 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-04-25 12:33:32 -0700 |
commit | 6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b (patch) | |
tree | d98096dd1f3cba95289ed51a2f65accc38e6d7e4 /yarn | |
parent | 425f6916462ca5d0595c61101d52686006ed6b8b (diff) | |
download | spark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.tar.gz spark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.tar.bz2 spark-6bfe42a3be4fbf8bc6f93a4709038fda8ad0610b.zip |
[SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle service compatible with 1.x
## What changes were proposed in this pull request?
SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service.
I put string comparison into executor's register at first avoid string comparison in getBlockData every time.
## How was this patch tested?
N/A
Author: Lianhui Wang <lianhuiwang09@gmail.com>
Closes #12568 from lianhuiwang/SPARK-14731.
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 5a426b86d1..0e433f6c1b 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration + private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" override def beforeEach(): Unit = { super.beforeEach() @@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.initializeApplication(app1Data) val execStateFile = s1.registeredExecutorFile - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val app2Data: ApplicationInitializationContext = new ApplicationInitializationContext("user", app2Id, null) s2.initializeApplication(app2Data) - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2) ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2)) s2.stop() |