aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-09-07 10:42:30 -1000
committerReynold Xin <rxin@databricks.com>2015-09-07 10:42:30 -1000
commit5ffe752b59e468d55363f1f24b17a3677927ca8f (patch)
tree57877752db2b3618ae764aed63d1298ff6f2db6a /streaming
parent871764c6ce531af5b1ac7ccccb32e7a903b59a2a (diff)
downloadspark-5ffe752b59e468d55363f1f24b17a3677927ca8f.tar.gz
spark-5ffe752b59e468d55363f1f24b17a3677927ca8f.tar.bz2
spark-5ffe752b59e468d55363f1f24b17a3677927ca8f.zip
[SPARK-9767] Remove ConnectionManager.
We introduced the Netty network module for shuffle in Spark 1.2, and has turned it on by default for 3 releases. The old ConnectionManager is difficult to maintain. If we merge the patch now, by the time it is released, it would be 1 yr for which ConnectionManager is off by default. It's time to remove it. Author: Reynold Xin <rxin@databricks.com> Closes #8161 from rxin/SPARK-9767.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala10
1 files changed, 6 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 6c0c926755..13cfe29d7b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
@@ -47,7 +47,9 @@ class ReceivedBlockHandlerSuite
with Matchers
with Logging {
- val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
+ val conf = new SparkConf()
+ .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
+ .set("spark.app.id", "streaming-test")
val hadoopConf = new Configuration()
val streamId = 1
val securityMgr = new SecurityManager(conf)
@@ -184,7 +186,7 @@ class ReceivedBlockHandlerSuite
}
test("Test Block - isFullyConsumed") {
- val sparkConf = new SparkConf()
+ val sparkConf = new SparkConf().set("spark.app.id", "streaming-test")
sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
// spark.storage.unrollFraction set to 0.4 for BlockManager
sparkConf.set("spark.storage.unrollFraction", "0.4")
@@ -251,7 +253,7 @@ class ReceivedBlockHandlerSuite
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- val transfer = new NioBlockTransferService(conf, securityMgr)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")