aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-28 14:08:07 -0700
committerReynold Xin <rxin@apache.org>2014-08-28 14:08:07 -0700
commitbe53c54b5c685e1d04d49bd554e05029a5a106e1 (patch)
tree9ca20f367ebcb6176453321cbf0617c31843697e /core
parent41dc5987d9abeca6fc0f5935c780d48f517cdf95 (diff)
downloadspark-be53c54b5c685e1d04d49bd554e05029a5a106e1.tar.gz
spark-be53c54b5c685e1d04d49bd554e05029a5a106e1.tar.bz2
spark-be53c54b5c685e1d04d49bd554e05029a5a106e1.zip
[SPARK-3281] Remove Netty specific code in BlockManager / shuffle
Netty functionality will be added back in subsequent PRs by using the BlockTransferService interface. Author: Reynold Xin <rxin@apache.org> Closes #2181 from rxin/SPARK-3281 and squashes the following commits: 5494b0e [Reynold Xin] Fix extra port. ff6d1e1 [Reynold Xin] [SPARK-3281] Remove Netty specific code in BlockManager.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala73
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageSuite.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala15
13 files changed, 64 insertions, 178 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index ca60ec78b6..4ab8ec8f0f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -33,16 +33,8 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
/**
- * A block fetcher iterator interface. There are two implementations:
- *
- * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
- * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
- *
- * Eventually we would like the two to converge and use a single NIO-based communication layer,
- * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
- * NIO would perform poorly and thus the need for the Netty OIO one.
+ * A block fetcher iterator interface for fetching shuffle blocks.
*/
-
private[storage]
trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
def initialize()
@@ -262,67 +254,4 @@ object BlockFetcherIterator {
}
}
// End of BasicBlockFetcherIterator
-
- class NettyBlockFetcherIterator(
- blockManager: BlockManager,
- blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
- serializer: Serializer,
- readMetrics: ShuffleReadMetrics)
- extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) {
-
- override protected def sendRequest(req: FetchRequest) {
- logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
- val cmId = new ConnectionManagerId(req.address.host, req.address.port)
-
- bytesInFlight += req.size
- val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
-
- // This could throw a TimeoutException. In that case we will just retry the task.
- val client = blockManager.nettyBlockClientFactory.createClient(
- cmId.host, req.address.nettyPort)
- val blocks = req.blocks.map(_._1.toString)
-
- client.fetchBlocks(
- blocks,
- new BlockClientListener {
- override def onFetchFailure(blockId: String, errorMsg: String): Unit = {
- logError(s"Could not get block(s) from $cmId with error: $errorMsg")
- for ((blockId, size) <- req.blocks) {
- results.put(new FetchResult(blockId, -1, null))
- }
- }
-
- override def onFetchSuccess(blockId: String, data: ReferenceCountedBuffer): Unit = {
- // Increment the reference count so the buffer won't be recycled.
- // TODO: This could result in memory leaks when the task is stopped due to exception
- // before the iterator is exhausted.
- data.retain()
- val buf = data.byteBuffer()
- val blockSize = buf.remaining()
- val bid = BlockId(blockId)
-
- // TODO: remove code duplication between here and BlockManager.dataDeserialization.
- results.put(new FetchResult(bid, sizeMap(bid), () => {
- def createIterator: Iterator[Any] = {
- val stream = blockManager.wrapForCompression(bid, data.inputStream())
- serializer.newInstance().deserializeStream(stream).asIterator
- }
- new LazyInitIterator(createIterator) {
- // Release the buffer when we are done traversing it.
- override def close(): Unit = data.release()
- }
- }))
-
- readMetrics.synchronized {
- readMetrics.remoteBytesRead += blockSize
- readMetrics.remoteBlocksFetched += 1
- }
- logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
- }
- }
- )
- }
- }
- // End of NettyBlockFetcherIterator
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 12a92d44f4..1eb622c12a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -32,8 +32,6 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
-import org.apache.spark.network.netty.client.BlockFetchingClientFactory
-import org.apache.spark.network.netty.server.BlockServer
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
@@ -90,27 +88,8 @@ private[spark] class BlockManager(
new TachyonStore(this, tachyonBlockManager)
}
- private val useNetty = conf.getBoolean("spark.shuffle.use.netty", false)
-
- // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
- private[storage] val nettyBlockClientFactory: BlockFetchingClientFactory = {
- if (useNetty) new BlockFetchingClientFactory(conf) else null
- }
-
- private val nettyBlockServer: BlockServer = {
- if (useNetty) {
- val server = new BlockServer(conf, this)
- logInfo(s"Created NettyBlockServer binding to port: ${server.port}")
- server
- } else {
- null
- }
- }
-
- private val nettyPort: Int = if (useNetty) nettyBlockServer.port else 0
-
val blockManagerId = BlockManagerId(
- executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
+ executorId, connectionManager.id.host, connectionManager.id.port)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@@ -572,14 +551,8 @@ private[spark] class BlockManager(
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
serializer: Serializer,
readMetrics: ShuffleReadMetrics): BlockFetcherIterator = {
- val iter =
- if (conf.getBoolean("spark.shuffle.use.netty", false)) {
- new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer,
- readMetrics)
- } else {
- new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
- readMetrics)
- }
+ val iter = new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer,
+ readMetrics)
iter.initialize()
iter
}
@@ -1092,14 +1065,6 @@ private[spark] class BlockManager(
connectionManager.stop()
shuffleBlockManager.stop()
diskBlockManager.stop()
-
- if (nettyBlockClientFactory != null) {
- nettyBlockClientFactory.stop()
- }
- if (nettyBlockServer != null) {
- nettyBlockServer.stop()
- }
-
actorSystem.stop(slaveActor)
blockInfo.clear()
memoryStore.clear()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b1585bd819..b7bcb2d85d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -36,11 +36,10 @@ import org.apache.spark.util.Utils
class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
- private var port_ : Int,
- private var nettyPort_ : Int
+ private var port_ : Int
) extends Externalizable {
- private def this() = this(null, null, 0, 0) // For deserialization only
+ private def this() = this(null, null, 0) // For deserialization only
def executorId: String = executorId_
@@ -60,32 +59,28 @@ class BlockManagerId private (
def port: Int = port_
- def nettyPort: Int = nettyPort_
-
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
- out.writeInt(nettyPort_)
}
override def readExternal(in: ObjectInput) {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
- nettyPort_ = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
+ override def toString = s"BlockManagerId($executorId, $host, $port)"
- override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
+ override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
override def equals(that: Any) = that match {
case id: BlockManagerId =>
- executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
+ executorId == id.executorId && port == id.port && host == id.host
case _ =>
false
}
@@ -100,11 +95,10 @@ private[spark] object BlockManagerId {
* @param execId ID of the executor.
* @param host Host name of the block manager.
* @param port Port of the block manager.
- * @param nettyPort Optional port for the Netty-based shuffle sender.
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
- def apply(execId: String, host: String, port: Int, nettyPort: Int) =
- getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
+ def apply(execId: String, host: String, port: Int) =
+ getCachedBlockManagerId(new BlockManagerId(execId, host, port))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index db7384705f..a7543454ec 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -295,8 +295,7 @@ private[spark] object JsonProtocol {
def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
("Executor ID" -> blockManagerId.executorId) ~
("Host" -> blockManagerId.host) ~
- ("Port" -> blockManagerId.port) ~
- ("Netty Port" -> blockManagerId.nettyPort)
+ ("Port" -> blockManagerId.port)
}
def jobResultToJson(jobResult: JobResult): JValue = {
@@ -644,8 +643,7 @@ private[spark] object JsonProtocol {
val executorId = (json \ "Executor ID").extract[String]
val host = (json \ "Host").extract[String]
val port = (json \ "Port").extract[Int]
- val nettyPort = (json \ "Netty Port").extract[Int]
- BlockManagerId(executorId, host, port, nettyPort)
+ BlockManagerId(executorId, host, port)
}
def jobResultFromJson(json: JValue): JobResult = {
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 9702838085..5369169811 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -69,13 +69,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
- assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000),
- (BlockManagerId("b", "hostB", 1000, 0), size10000)))
+ assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
+ (BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
}
@@ -86,9 +86,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
assert(tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).nonEmpty)
@@ -105,14 +105,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
- tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
+ tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
- tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
+ tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simultaneous fetch failures
- tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
- tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
+ tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
// The remaining reduce task might try to grab the output despite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
@@ -145,13 +145,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+ BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
- masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
+ masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
@@ -174,7 +174,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// Frame size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("88", "mph", 1000, 0), Array.fill[Byte](10)(0)))
+ BlockManagerId("88", "mph", 1000), Array.fill[Byte](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
}
@@ -195,7 +195,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
- BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
+ BlockManagerId("999", "mps", 1000), Array.fill[Byte](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index f5fed988ad..1a42fc1b23 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -736,7 +736,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
private def makeBlockManagerId(host: String): BlockManagerId =
- BlockManagerId("exec-" + host, host, 12345, 0)
+ BlockManagerId("exec-" + host, host, 12345)
private def assertDataStructuresEmpty = {
assert(scheduler.activeJobs.isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
index bcbfe8baf3..1591284383 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala
@@ -41,7 +41,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
val blockManager = mock(classOf[BlockManager])
val connManager = mock(classOf[ConnectionManager])
doReturn(connManager).when(blockManager).connectionManager
- doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
+ doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
@@ -66,7 +66,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
- val bmId = BlockManagerId("test-client", "test-client",1 , 0)
+ val bmId = BlockManagerId("test-client", "test-client", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
)
@@ -97,7 +97,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
val blockManager = mock(classOf[BlockManager])
val connManager = mock(classOf[ConnectionManager])
doReturn(connManager).when(blockManager).connectionManager
- doReturn(BlockManagerId("test-client", "test-client", 1, 0)).when(blockManager).blockManagerId
+ doReturn(BlockManagerId("test-client", "test-client", 1)).when(blockManager).blockManagerId
doReturn((48 * 1024 * 1024).asInstanceOf[Long]).when(blockManager).maxBytesInFlight
@@ -117,7 +117,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(3)), any())
doReturn(optItr).when(blockManager).getLocalFromDisk(meq(blIds(4)), any())
- val bmId = BlockManagerId("test-client", "test-client",1 , 0)
+ val bmId = BlockManagerId("test-client", "test-client", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq)
)
@@ -155,12 +155,12 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
when(blockManager.futureExecContext).thenReturn(global)
when(blockManager.blockManagerId).thenReturn(
- BlockManagerId("test-client", "test-client", 1, 0))
+ BlockManagerId("test-client", "test-client", 1))
when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
val blId1 = ShuffleBlockId(0,0,0)
val blId2 = ShuffleBlockId(0,1,0)
- val bmId = BlockManagerId("test-server", "test-server",1 , 0)
+ val bmId = BlockManagerId("test-server", "test-server", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(bmId, Seq((blId1, 1L), (blId2, 1L)))
)
@@ -211,10 +211,10 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
when(blockManager.futureExecContext).thenReturn(global)
when(blockManager.blockManagerId).thenReturn(
- BlockManagerId("test-client", "test-client", 1, 0))
+ BlockManagerId("test-client", "test-client", 1))
when(blockManager.maxBytesInFlight).thenReturn(48 * 1024 * 1024)
- val bmId = BlockManagerId("test-server", "test-server",1 , 0)
+ val bmId = BlockManagerId("test-server", "test-server", 1)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
(bmId, Seq((blId1, 1L), (blId2, 1L)))
)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f32ce6f9fc..bdcea07e57 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -139,9 +139,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("BlockManagerId object caching") {
- val id1 = BlockManagerId("e1", "XXX", 1, 0)
- val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1
- val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
+ val id1 = BlockManagerId("e1", "XXX", 1)
+ val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
+ val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 7671cb969a..4e022a69c8 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -26,8 +26,8 @@ import org.apache.spark.scheduler._
* Test the behavior of StorageStatusListener in response to all relevant events.
*/
class StorageStatusListenerSuite extends FunSuite {
- private val bm1 = BlockManagerId("big", "dog", 1, 1)
- private val bm2 = BlockManagerId("fat", "duck", 2, 2)
+ private val bm1 = BlockManagerId("big", "dog", 1)
+ private val bm2 = BlockManagerId("fat", "duck", 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
index 38678bbd1d..ef5c55f91c 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -27,7 +27,7 @@ class StorageSuite extends FunSuite {
// For testing add, update, and remove (for non-RDD blocks)
private def storageStatus1: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
assert(status.blocks.isEmpty)
assert(status.rddBlocks.isEmpty)
assert(status.memUsed === 0L)
@@ -78,7 +78,7 @@ class StorageSuite extends FunSuite {
// For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
private def storageStatus2: StorageStatus = {
- val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
assert(status.rddBlocks.isEmpty)
status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
@@ -271,9 +271,9 @@ class StorageSuite extends FunSuite {
// For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
private def stockStorageStatuses: Seq[StorageStatus] = {
- val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
- val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L)
- val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L)
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1), 1000L)
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2), 2000L)
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3), 3000L)
status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index a537c72ce7..d9e9c70a8a 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -39,7 +39,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
- private val bm1 = BlockManagerId("big", "dog", 1, 1)
+ private val bm1 = BlockManagerId("big", "dog", 1)
before {
bus = new LiveListenerBus
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index c4765e53de..76bf4cfd11 100644
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -17,13 +17,16 @@
package org.apache.spark.util
+import scala.concurrent.Await
+
import akka.actor._
+
+import org.scalatest.FunSuite
+
import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.scalatest.FunSuite
-import scala.concurrent.Await
/**
* Test the AkkaUtils with various security settings.
@@ -35,7 +38,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
conf.set("spark.authenticate", "true")
conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf);
+ val securityManager = new SecurityManager(conf)
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
@@ -106,13 +109,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+ BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
// this should succeed since security off
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
actorSystem.shutdown()
slaveSystem.shutdown()
@@ -157,13 +160,13 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
- BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+ BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementEpoch()
slaveTracker.updateEpoch(masterTracker.getEpoch)
// this should succeed since security on and passwords match
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+ Seq((BlockManagerId("a", "hostA", 1000), size1000)))
actorSystem.shutdown()
slaveSystem.shutdown()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2fd3b9cfd2..66a17de9ec 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -53,9 +53,9 @@ class JsonProtocolSuite extends FunSuite {
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
val blockManagerAdded = SparkListenerBlockManagerAdded(
- BlockManagerId("Stars", "In your multitude...", 300, 400), 500)
+ BlockManagerId("Stars", "In your multitude...", 300), 500)
val blockManagerRemoved = SparkListenerBlockManagerRemoved(
- BlockManagerId("Scarce", "to be counted...", 100, 200))
+ BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
@@ -81,7 +81,7 @@ class JsonProtocolSuite extends FunSuite {
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
- testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
+ testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
// StorageLevel
testStorageLevel(StorageLevel.NONE)
@@ -104,7 +104,7 @@ class JsonProtocolSuite extends FunSuite {
testJobResult(jobFailed)
// TaskEndReason
- val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15, 16), 17, 18, 19)
+ val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19)
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
testTaskEndReason(Success)
testTaskEndReason(Resubmitted)
@@ -343,7 +343,6 @@ class JsonProtocolSuite extends FunSuite {
assert(bm1.executorId === bm2.executorId)
assert(bm1.host === bm2.host)
assert(bm1.port === bm2.port)
- assert(bm1.nettyPort === bm2.nettyPort)
}
private def assertEquals(result1: JobResult, result2: JobResult) {
@@ -944,8 +943,7 @@ class JsonProtocolSuite extends FunSuite {
| "Block Manager ID": {
| "Executor ID": "Stars",
| "Host": "In your multitude...",
- | "Port": 300,
- | "Netty Port": 400
+ | "Port": 300
| },
| "Maximum Memory": 500
|}
@@ -958,8 +956,7 @@ class JsonProtocolSuite extends FunSuite {
| "Block Manager ID": {
| "Executor ID": "Scarce",
| "Host": "to be counted...",
- | "Port": 100,
- | "Netty Port": 200
+ | "Port": 100
| }
|}
"""