diff options
author | Reynold Xin <rxin@databricks.com> | 2015-09-07 10:42:30 -1000 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-09-07 10:42:30 -1000 |
commit | 5ffe752b59e468d55363f1f24b17a3677927ca8f (patch) | |
tree | 57877752db2b3618ae764aed63d1298ff6f2db6a /streaming/src | |
parent | 871764c6ce531af5b1ac7ccccb32e7a903b59a2a (diff) | |
download | spark-5ffe752b59e468d55363f1f24b17a3677927ca8f.tar.gz spark-5ffe752b59e468d55363f1f24b17a3677927ca8f.tar.bz2 spark-5ffe752b59e468d55363f1f24b17a3677927ca8f.zip |
[SPARK-9767] Remove ConnectionManager.
We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it.
Author: Reynold Xin <rxin@databricks.com>
Closes #8161 from rxin/SPARK-9767.
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 6c0c926755..13cfe29d7b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.network.nio.NioBlockTransferService +import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.KryoSerializer @@ -47,7 +47,9 @@ class ReceivedBlockHandlerSuite with Matchers with Logging { - val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") + val conf = new SparkConf() + .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1") + .set("spark.app.id", "streaming-test") val hadoopConf = new Configuration() val streamId = 1 val securityMgr = new SecurityManager(conf) @@ -184,7 +186,7 @@ class ReceivedBlockHandlerSuite } test("Test Block - isFullyConsumed") { - val sparkConf = new SparkConf() + val sparkConf = new SparkConf().set("spark.app.id", "streaming-test") sparkConf.set("spark.storage.unrollMemoryThreshold", "512") // spark.storage.unrollFraction set to 0.4 for BlockManager sparkConf.set("spark.storage.unrollFraction", "0.4") @@ -251,7 +253,7 @@ class ReceivedBlockHandlerSuite maxMem: Long, conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { - val transfer = new NioBlockTransferService(conf, securityMgr) + val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) manager.initialize("app-id") |