aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala175
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala140
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Connection.scala619
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala1157
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala37
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/Message.scala114
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala217
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala160
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala296
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala10
-rw-r--r--docs/configuration.md11
-rw-r--r--project/MimaExcludes.scala1257
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala10
21 files changed, 651 insertions, 3855 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 0f1e2e0695..c6fef7f91f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -33,7 +33,6 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
-import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
import org.apache.spark.rpc.akka.AkkaRpcEnv
import org.apache.spark.scheduler.{OutputCommitCoordinator, LiveListenerBus}
@@ -326,15 +325,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
- val blockTransferService =
- conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
- case "netty" =>
- new NettyBlockTransferService(conf, securityManager, numUsableCores)
- case "nio" =>
- logWarning("NIO-based block transfer service is deprecated, " +
- "and will be removed in Spark 1.6.0.")
- new NioBlockTransferService(conf, securityManager)
- }
+ val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index d5ad2c9ad0..4b851bcb36 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -149,7 +149,11 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
}
override def close(): Unit = {
- server.close()
- clientFactory.close()
+ if (server != null) {
+ server.close()
+ }
+ if (clientFactory != null) {
+ clientFactory.close()
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala
deleted file mode 100644
index 79cb0640c8..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
-
-import scala.collection.mutable.{ArrayBuffer, StringBuilder}
-
-// private[spark] because we need to register them in Kryo
-private[spark] case class GetBlock(id: BlockId)
-private[spark] case class GotBlock(id: BlockId, data: ByteBuffer)
-private[spark] case class PutBlock(id: BlockId, data: ByteBuffer, level: StorageLevel)
-
-private[nio] class BlockMessage() {
- // Un-initialized: typ = 0
- // GetBlock: typ = 1
- // GotBlock: typ = 2
- // PutBlock: typ = 3
- private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
- private var id: BlockId = null
- private var data: ByteBuffer = null
- private var level: StorageLevel = null
-
- def set(getBlock: GetBlock) {
- typ = BlockMessage.TYPE_GET_BLOCK
- id = getBlock.id
- }
-
- def set(gotBlock: GotBlock) {
- typ = BlockMessage.TYPE_GOT_BLOCK
- id = gotBlock.id
- data = gotBlock.data
- }
-
- def set(putBlock: PutBlock) {
- typ = BlockMessage.TYPE_PUT_BLOCK
- id = putBlock.id
- data = putBlock.data
- level = putBlock.level
- }
-
- def set(buffer: ByteBuffer) {
- typ = buffer.getInt()
- val idLength = buffer.getInt()
- val idBuilder = new StringBuilder(idLength)
- for (i <- 1 to idLength) {
- idBuilder += buffer.getChar()
- }
- id = BlockId(idBuilder.toString)
-
- if (typ == BlockMessage.TYPE_PUT_BLOCK) {
-
- val booleanInt = buffer.getInt()
- val replication = buffer.getInt()
- level = StorageLevel(booleanInt, replication)
-
- val dataLength = buffer.getInt()
- data = ByteBuffer.allocate(dataLength)
- if (dataLength != buffer.remaining) {
- throw new Exception("Error parsing buffer")
- }
- data.put(buffer)
- data.flip()
- } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
-
- val dataLength = buffer.getInt()
- data = ByteBuffer.allocate(dataLength)
- if (dataLength != buffer.remaining) {
- throw new Exception("Error parsing buffer")
- }
- data.put(buffer)
- data.flip()
- }
-
- }
-
- def set(bufferMsg: BufferMessage) {
- val buffer = bufferMsg.buffers.apply(0)
- buffer.clear()
- set(buffer)
- }
-
- def getType: Int = typ
- def getId: BlockId = id
- def getData: ByteBuffer = data
- def getLevel: StorageLevel = level
-
- def toBufferMessage: BufferMessage = {
- val buffers = new ArrayBuffer[ByteBuffer]()
- var buffer = ByteBuffer.allocate(4 + 4 + id.name.length * 2)
- buffer.putInt(typ).putInt(id.name.length)
- id.name.foreach((x: Char) => buffer.putChar(x))
- buffer.flip()
- buffers += buffer
-
- if (typ == BlockMessage.TYPE_PUT_BLOCK) {
- buffer = ByteBuffer.allocate(8).putInt(level.toInt).putInt(level.replication)
- buffer.flip()
- buffers += buffer
-
- buffer = ByteBuffer.allocate(4).putInt(data.remaining)
- buffer.flip()
- buffers += buffer
-
- buffers += data
- } else if (typ == BlockMessage.TYPE_GOT_BLOCK) {
- buffer = ByteBuffer.allocate(4).putInt(data.remaining)
- buffer.flip()
- buffers += buffer
-
- buffers += data
- }
-
- Message.createBufferMessage(buffers)
- }
-
- override def toString: String = {
- "BlockMessage [type = " + typ + ", id = " + id + ", level = " + level +
- ", data = " + (if (data != null) data.remaining.toString else "null") + "]"
- }
-}
-
-private[nio] object BlockMessage {
- val TYPE_NON_INITIALIZED: Int = 0
- val TYPE_GET_BLOCK: Int = 1
- val TYPE_GOT_BLOCK: Int = 2
- val TYPE_PUT_BLOCK: Int = 3
-
- def fromBufferMessage(bufferMessage: BufferMessage): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(bufferMessage)
- newBlockMessage
- }
-
- def fromByteBuffer(buffer: ByteBuffer): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(buffer)
- newBlockMessage
- }
-
- def fromGetBlock(getBlock: GetBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(getBlock)
- newBlockMessage
- }
-
- def fromGotBlock(gotBlock: GotBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(gotBlock)
- newBlockMessage
- }
-
- def fromPutBlock(putBlock: PutBlock): BlockMessage = {
- val newBlockMessage = new BlockMessage()
- newBlockMessage.set(putBlock)
- newBlockMessage
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
deleted file mode 100644
index f1c9ea8b64..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import org.apache.spark._
-import org.apache.spark.storage.{StorageLevel, TestBlockId}
-
-import scala.collection.mutable.ArrayBuffer
-
-private[nio]
-class BlockMessageArray(var blockMessages: Seq[BlockMessage])
- extends Seq[BlockMessage] with Logging {
-
- def this(bm: BlockMessage) = this(Array(bm))
-
- def this() = this(null.asInstanceOf[Seq[BlockMessage]])
-
- def apply(i: Int): BlockMessage = blockMessages(i)
-
- def iterator: Iterator[BlockMessage] = blockMessages.iterator
-
- def length: Int = blockMessages.length
-
- def set(bufferMessage: BufferMessage) {
- val startTime = System.currentTimeMillis
- val newBlockMessages = new ArrayBuffer[BlockMessage]()
- val buffer = bufferMessage.buffers(0)
- buffer.clear()
- while (buffer.remaining() > 0) {
- val size = buffer.getInt()
- logDebug("Creating block message of size " + size + " bytes")
- val newBuffer = buffer.slice()
- newBuffer.clear()
- newBuffer.limit(size)
- logDebug("Trying to convert buffer " + newBuffer + " to block message")
- val newBlockMessage = BlockMessage.fromByteBuffer(newBuffer)
- logDebug("Created " + newBlockMessage)
- newBlockMessages += newBlockMessage
- buffer.position(buffer.position() + size)
- }
- val finishTime = System.currentTimeMillis
- logDebug("Converted block message array from buffer message in " +
- (finishTime - startTime) / 1000.0 + " s")
- this.blockMessages = newBlockMessages
- }
-
- def toBufferMessage: BufferMessage = {
- val buffers = new ArrayBuffer[ByteBuffer]()
-
- blockMessages.foreach(blockMessage => {
- val bufferMessage = blockMessage.toBufferMessage
- logDebug("Adding " + blockMessage)
- val sizeBuffer = ByteBuffer.allocate(4).putInt(bufferMessage.size)
- sizeBuffer.flip
- buffers += sizeBuffer
- buffers ++= bufferMessage.buffers
- logDebug("Added " + bufferMessage)
- })
-
- logDebug("Buffer list:")
- buffers.foreach((x: ByteBuffer) => logDebug("" + x))
- Message.createBufferMessage(buffers)
- }
-}
-
-private[nio] object BlockMessageArray extends Logging {
-
- def fromBufferMessage(bufferMessage: BufferMessage): BlockMessageArray = {
- val newBlockMessageArray = new BlockMessageArray()
- newBlockMessageArray.set(bufferMessage)
- newBlockMessageArray
- }
-
- def main(args: Array[String]) {
- val blockMessages =
- (0 until 10).map { i =>
- if (i % 2 == 0) {
- val buffer = ByteBuffer.allocate(100)
- buffer.clear()
- BlockMessage.fromPutBlock(PutBlock(TestBlockId(i.toString), buffer,
- StorageLevel.MEMORY_ONLY_SER))
- } else {
- BlockMessage.fromGetBlock(GetBlock(TestBlockId(i.toString)))
- }
- }
- val blockMessageArray = new BlockMessageArray(blockMessages)
- logDebug("Block message array created")
-
- val bufferMessage = blockMessageArray.toBufferMessage
- logDebug("Converted to buffer message")
-
- val totalSize = bufferMessage.size
- val newBuffer = ByteBuffer.allocate(totalSize)
- newBuffer.clear()
- bufferMessage.buffers.foreach(buffer => {
- assert (0 == buffer.position())
- newBuffer.put(buffer)
- buffer.rewind()
- })
- newBuffer.flip
- val newBufferMessage = Message.createBufferMessage(newBuffer)
- logDebug("Copied to new buffer message, size = " + newBufferMessage.size)
-
- val newBlockMessageArray = BlockMessageArray.fromBufferMessage(newBufferMessage)
- logDebug("Converted back to block message array")
- // scalastyle:off println
- newBlockMessageArray.foreach(blockMessage => {
- blockMessage.getType match {
- case BlockMessage.TYPE_PUT_BLOCK => {
- val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
- println(pB)
- }
- case BlockMessage.TYPE_GET_BLOCK => {
- val gB = new GetBlock(blockMessage.getId)
- println(gB)
- }
- }
- })
- // scalastyle:on println
- }
-}
-
-
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala
deleted file mode 100644
index 9a9e22b0c2..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.storage.BlockManager
-
-
-private[nio]
-class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
- extends Message(Message.BUFFER_MESSAGE, id_) {
-
- val initialSize = currentSize()
- var gotChunkForSendingOnce = false
-
- def size: Int = initialSize
-
- def currentSize(): Int = {
- if (buffers == null || buffers.isEmpty) {
- 0
- } else {
- buffers.map(_.remaining).reduceLeft(_ + _)
- }
- }
-
- def getChunkForSending(maxChunkSize: Int): Option[MessageChunk] = {
- if (maxChunkSize <= 0) {
- throw new Exception("Max chunk size is " + maxChunkSize)
- }
-
- val security = if (isSecurityNeg) 1 else 0
- if (size == 0 && !gotChunkForSendingOnce) {
- val newChunk = new MessageChunk(
- new MessageChunkHeader(typ, id, 0, 0, ackId, hasError, security, senderAddress), null)
- gotChunkForSendingOnce = true
- return Some(newChunk)
- }
-
- while(!buffers.isEmpty) {
- val buffer = buffers(0)
- if (buffer.remaining == 0) {
- BlockManager.dispose(buffer)
- buffers -= buffer
- } else {
- val newBuffer = if (buffer.remaining <= maxChunkSize) {
- buffer.duplicate()
- } else {
- buffer.slice().limit(maxChunkSize).asInstanceOf[ByteBuffer]
- }
- buffer.position(buffer.position + newBuffer.remaining)
- val newChunk = new MessageChunk(new MessageChunkHeader(
- typ, id, size, newBuffer.remaining, ackId,
- hasError, security, senderAddress), newBuffer)
- gotChunkForSendingOnce = true
- return Some(newChunk)
- }
- }
- None
- }
-
- def getChunkForReceiving(chunkSize: Int): Option[MessageChunk] = {
- // STRONG ASSUMPTION: BufferMessage created when receiving data has ONLY ONE data buffer
- if (buffers.size > 1) {
- throw new Exception("Attempting to get chunk from message with multiple data buffers")
- }
- val buffer = buffers(0)
- val security = if (isSecurityNeg) 1 else 0
- if (buffer.remaining > 0) {
- if (buffer.remaining < chunkSize) {
- throw new Exception("Not enough space in data buffer for receiving chunk")
- }
- val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
- buffer.position(buffer.position + newBuffer.remaining)
- val newChunk = new MessageChunk(new MessageChunkHeader(
- typ, id, size, newBuffer.remaining, ackId, hasError, security, senderAddress), newBuffer)
- return Some(newChunk)
- }
- None
- }
-
- def flip() {
- buffers.foreach(_.flip)
- }
-
- def hasAckId(): Boolean = ackId != 0
-
- def isCompletelyReceived: Boolean = !buffers(0).hasRemaining
-
- override def toString: String = {
- if (hasAckId) {
- "BufferAckMessage(aid = " + ackId + ", id = " + id + ", size = " + size + ")"
- } else {
- "BufferMessage(id = " + id + ", size = " + size + ")"
- }
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
deleted file mode 100644
index 8d9ebadaf7..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.net._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.LinkedList
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.util.control.NonFatal
-
-import org.apache.spark._
-import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
-
-private[nio]
-abstract class Connection(val channel: SocketChannel, val selector: Selector,
- val socketRemoteConnectionManagerId: ConnectionManagerId, val connectionId: ConnectionId,
- val securityMgr: SecurityManager)
- extends Logging {
-
- var sparkSaslServer: SparkSaslServer = null
- var sparkSaslClient: SparkSaslClient = null
-
- def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId,
- securityMgr_ : SecurityManager) = {
- this(channel_, selector_,
- ConnectionManagerId.fromSocketAddress(
- channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]),
- id_, securityMgr_)
- }
-
- channel.configureBlocking(false)
- channel.socket.setTcpNoDelay(true)
- channel.socket.setReuseAddress(true)
- channel.socket.setKeepAlive(true)
- /* channel.socket.setReceiveBufferSize(32768) */
-
- @volatile private var closed = false
- var onCloseCallback: Connection => Unit = null
- val onExceptionCallbacks = new ConcurrentLinkedQueue[(Connection, Throwable) => Unit]
- var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
-
- val remoteAddress = getRemoteAddress()
-
- def isSaslComplete(): Boolean
-
- def resetForceReregister(): Boolean
-
- // Read channels typically do not register for write and write does not for read
- // Now, we do have write registering for read too (temporarily), but this is to detect
- // channel close NOT to actually read/consume data on it !
- // How does this work if/when we move to SSL ?
-
- // What is the interest to register with selector for when we want this connection to be selected
- def registerInterest()
-
- // What is the interest to register with selector for when we want this connection to
- // be de-selected
- // Traditionally, 0 - but in our case, for example, for close-detection on SendingConnection hack,
- // it will be SelectionKey.OP_READ (until we fix it properly)
- def unregisterInterest()
-
- // On receiving a read event, should we change the interest for this channel or not ?
- // Will be true for ReceivingConnection, false for SendingConnection.
- def changeInterestForRead(): Boolean
-
- private def disposeSasl() {
- if (sparkSaslServer != null) {
- sparkSaslServer.dispose()
- }
-
- if (sparkSaslClient != null) {
- sparkSaslClient.dispose()
- }
- }
-
- // On receiving a write event, should we change the interest for this channel or not ?
- // Will be false for ReceivingConnection, true for SendingConnection.
- // Actually, for now, should not get triggered for ReceivingConnection
- def changeInterestForWrite(): Boolean
-
- def getRemoteConnectionManagerId(): ConnectionManagerId = {
- socketRemoteConnectionManagerId
- }
-
- def key(): SelectionKey = channel.keyFor(selector)
-
- def getRemoteAddress(): InetSocketAddress = {
- channel.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
- }
-
- // Returns whether we have to register for further reads or not.
- def read(): Boolean = {
- throw new UnsupportedOperationException(
- "Cannot read on connection of type " + this.getClass.toString)
- }
-
- // Returns whether we have to register for further writes or not.
- def write(): Boolean = {
- throw new UnsupportedOperationException(
- "Cannot write on connection of type " + this.getClass.toString)
- }
-
- def close() {
- closed = true
- val k = key()
- if (k != null) {
- k.cancel()
- }
- channel.close()
- disposeSasl()
- callOnCloseCallback()
- }
-
- protected def isClosed: Boolean = closed
-
- def onClose(callback: Connection => Unit) {
- onCloseCallback = callback
- }
-
- def onException(callback: (Connection, Throwable) => Unit) {
- onExceptionCallbacks.add(callback)
- }
-
- def onKeyInterestChange(callback: (Connection, Int) => Unit) {
- onKeyInterestChangeCallback = callback
- }
-
- def callOnExceptionCallbacks(e: Throwable) {
- onExceptionCallbacks.asScala.foreach {
- callback =>
- try {
- callback(this, e)
- } catch {
- case NonFatal(e) => {
- logWarning("Ignored error in onExceptionCallback", e)
- }
- }
- }
- }
-
- def callOnCloseCallback() {
- if (onCloseCallback != null) {
- onCloseCallback(this)
- } else {
- logWarning("Connection to " + getRemoteConnectionManagerId() +
- " closed and OnExceptionCallback not registered")
- }
-
- }
-
- def changeConnectionKeyInterest(ops: Int) {
- if (onKeyInterestChangeCallback != null) {
- onKeyInterestChangeCallback(this, ops)
- } else {
- throw new Exception("OnKeyInterestChangeCallback not registered")
- }
- }
-
- def printRemainingBuffer(buffer: ByteBuffer) {
- val bytes = new Array[Byte](buffer.remaining)
- val curPosition = buffer.position
- buffer.get(bytes)
- bytes.foreach(x => print(x + " "))
- buffer.position(curPosition)
- print(" (" + bytes.length + ")")
- }
-
- def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
- val bytes = new Array[Byte](length)
- val curPosition = buffer.position
- buffer.position(position)
- buffer.get(bytes)
- bytes.foreach(x => print(x + " "))
- print(" (" + position + ", " + length + ")")
- buffer.position(curPosition)
- }
-}
-
-
-private[nio]
-class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
- remoteId_ : ConnectionManagerId, id_ : ConnectionId,
- securityMgr_ : SecurityManager)
- extends Connection(SocketChannel.open, selector_, remoteId_, id_, securityMgr_) {
-
- def isSaslComplete(): Boolean = {
- if (sparkSaslClient != null) sparkSaslClient.isComplete() else false
- }
-
- private class Outbox {
- val messages = new LinkedList[Message]()
- val defaultChunkSize = 65536
- var nextMessageToBeUsed = 0
-
- def addMessage(message: Message) {
- messages.synchronized {
- messages.add(message)
- logDebug("Added [" + message + "] to outbox for sending to " +
- "[" + getRemoteConnectionManagerId() + "]")
- }
- }
-
- def getChunk(): Option[MessageChunk] = {
- messages.synchronized {
- while (!messages.isEmpty) {
- /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
- /* val message = messages(nextMessageToBeUsed) */
-
- val message = if (securityMgr.isAuthenticationEnabled() && !isSaslComplete()) {
- // only allow sending of security messages until sasl is complete
- var pos = 0
- var securityMsg: Message = null
- while (pos < messages.size() && securityMsg == null) {
- if (messages.get(pos).isSecurityNeg) {
- securityMsg = messages.remove(pos)
- }
- pos = pos + 1
- }
- // didn't find any security messages and auth isn't completed so return
- if (securityMsg == null) return None
- securityMsg
- } else {
- messages.removeFirst()
- }
-
- val chunk = message.getChunkForSending(defaultChunkSize)
- if (chunk.isDefined) {
- messages.add(message)
- nextMessageToBeUsed = nextMessageToBeUsed + 1
- if (!message.started) {
- logDebug(
- "Starting to send [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
- message.started = true
- message.startTime = System.currentTimeMillis
- }
- logTrace(
- "Sending chunk from [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
- return chunk
- } else {
- message.finishTime = System.currentTimeMillis
- logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
- "] in " + message.timeTaken )
- }
- }
- }
- None
- }
- }
-
- // outbox is used as a lock - ensure that it is always used as a leaf (since methods which
- // lock it are invoked in context of other locks)
- private val outbox = new Outbox()
- /*
- This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
- different purpose. This flag is to see if we need to force reregister for write even when we
- do not have any pending bytes to write to socket.
- This can happen due to a race between adding pending buffers, and checking for existing of
- data as detailed in https://github.com/mesos/spark/pull/791
- */
- private var needForceReregister = false
-
- val currentBuffers = new ArrayBuffer[ByteBuffer]()
-
- /* channel.socket.setSendBufferSize(256 * 1024) */
-
- override def getRemoteAddress(): InetSocketAddress = address
-
- val DEFAULT_INTEREST = SelectionKey.OP_READ
-
- override def registerInterest() {
- // Registering read too - does not really help in most cases, but for some
- // it does - so let us keep it for now.
- changeConnectionKeyInterest(SelectionKey.OP_WRITE | DEFAULT_INTEREST)
- }
-
- override def unregisterInterest() {
- changeConnectionKeyInterest(DEFAULT_INTEREST)
- }
-
- def registerAfterAuth(): Unit = {
- outbox.synchronized {
- needForceReregister = true
- }
- if (channel.isConnected) {
- registerInterest()
- }
- }
-
- def send(message: Message) {
- outbox.synchronized {
- outbox.addMessage(message)
- needForceReregister = true
- }
- if (channel.isConnected) {
- registerInterest()
- }
- }
-
- // return previous value after resetting it.
- def resetForceReregister(): Boolean = {
- outbox.synchronized {
- val result = needForceReregister
- needForceReregister = false
- result
- }
- }
-
- // MUST be called within the selector loop
- def connect() {
- try {
- channel.register(selector, SelectionKey.OP_CONNECT)
- channel.connect(address)
- logInfo("Initiating connection to [" + address + "]")
- } catch {
- case e: Exception =>
- logError("Error connecting to " + address, e)
- callOnExceptionCallbacks(e)
- }
- }
-
- def finishConnect(force: Boolean): Boolean = {
- try {
- // Typically, this should finish immediately since it was triggered by a connect
- // selection - though need not necessarily always complete successfully.
- val connected = channel.finishConnect
- if (!force && !connected) {
- logInfo(
- "finish connect failed [" + address + "], " + outbox.messages.size + " messages pending")
- return false
- }
-
- // Fallback to previous behavior - assume finishConnect completed
- // This will happen only when finishConnect failed for some repeated number of times
- // (10 or so)
- // Is highly unlikely unless there was an unclean close of socket, etc
- registerInterest()
- logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
- } catch {
- case e: Exception => {
- logWarning("Error finishing connection to " + address, e)
- callOnExceptionCallbacks(e)
- }
- }
- true
- }
-
- override def write(): Boolean = {
- try {
- while (true) {
- if (currentBuffers.size == 0) {
- outbox.synchronized {
- outbox.getChunk() match {
- case Some(chunk) => {
- val buffers = chunk.buffers
- // If we have 'seen' pending messages, then reset flag - since we handle that as
- // normal registering of event (below)
- if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
-
- currentBuffers ++= buffers
- }
- case None => {
- // changeConnectionKeyInterest(0)
- /* key.interestOps(0) */
- return false
- }
- }
- }
- }
-
- if (currentBuffers.size > 0) {
- val buffer = currentBuffers(0)
- val remainingBytes = buffer.remaining
- val writtenBytes = channel.write(buffer)
- if (buffer.remaining == 0) {
- currentBuffers -= buffer
- }
- if (writtenBytes < remainingBytes) {
- // re-register for write.
- return true
- }
- }
- }
- } catch {
- case e: Exception => {
- logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
- callOnExceptionCallbacks(e)
- close()
- return false
- }
- }
- // should not happen - to keep scala compiler happy
- true
- }
-
- // This is a hack to determine if remote socket was closed or not.
- // SendingConnection DOES NOT expect to receive any data - if it does, it is an error
- // For a bunch of cases, read will return -1 in case remote socket is closed : hence we
- // register for reads to determine that.
- override def read(): Boolean = {
- // We don't expect the other side to send anything; so, we just read to detect an error or EOF.
- try {
- val length = channel.read(ByteBuffer.allocate(1))
- if (length == -1) { // EOF
- close()
- } else if (length > 0) {
- logWarning(
- "Unexpected data read from SendingConnection to " + getRemoteConnectionManagerId())
- }
- } catch {
- case e: Exception =>
- logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
- e)
- callOnExceptionCallbacks(e)
- close()
- }
-
- false
- }
-
- override def changeInterestForRead(): Boolean = false
-
- override def changeInterestForWrite(): Boolean = ! isClosed
-}
-
-
-// Must be created within selector loop - else deadlock
-private[spark] class ReceivingConnection(
- channel_ : SocketChannel,
- selector_ : Selector,
- id_ : ConnectionId,
- securityMgr_ : SecurityManager)
- extends Connection(channel_, selector_, id_, securityMgr_) {
-
- def isSaslComplete(): Boolean = {
- if (sparkSaslServer != null) sparkSaslServer.isComplete() else false
- }
-
- class Inbox() {
- val messages = new HashMap[Int, BufferMessage]()
-
- def getChunk(header: MessageChunkHeader): Option[MessageChunk] = {
-
- def createNewMessage: BufferMessage = {
- val newMessage = Message.create(header).asInstanceOf[BufferMessage]
- newMessage.started = true
- newMessage.startTime = System.currentTimeMillis
- newMessage.isSecurityNeg = header.securityNeg == 1
- logDebug(
- "Starting to receive [" + newMessage + "] from [" + getRemoteConnectionManagerId() + "]")
- messages += ((newMessage.id, newMessage))
- newMessage
- }
-
- val message = messages.getOrElseUpdate(header.id, createNewMessage)
- logTrace(
- "Receiving chunk of [" + message + "] from [" + getRemoteConnectionManagerId() + "]")
- message.getChunkForReceiving(header.chunkSize)
- }
-
- def getMessageForChunk(chunk: MessageChunk): Option[BufferMessage] = {
- messages.get(chunk.header.id)
- }
-
- def removeMessage(message: Message) {
- messages -= message.id
- }
- }
-
- @volatile private var inferredRemoteManagerId: ConnectionManagerId = null
-
- override def getRemoteConnectionManagerId(): ConnectionManagerId = {
- val currId = inferredRemoteManagerId
- if (currId != null) currId else super.getRemoteConnectionManagerId()
- }
-
- // The receiver's remote address is the local socket on remote side : which is NOT
- // the connection manager id of the receiver.
- // We infer that from the messages we receive on the receiver socket.
- private def processConnectionManagerId(header: MessageChunkHeader) {
- val currId = inferredRemoteManagerId
- if (header.address == null || currId != null) return
-
- val managerId = ConnectionManagerId.fromSocketAddress(header.address)
-
- if (managerId != null) {
- inferredRemoteManagerId = managerId
- }
- }
-
-
- val inbox = new Inbox()
- val headerBuffer: ByteBuffer = ByteBuffer.allocate(MessageChunkHeader.HEADER_SIZE)
- var onReceiveCallback: (Connection, Message) => Unit = null
- var currentChunk: MessageChunk = null
-
- channel.register(selector, SelectionKey.OP_READ)
-
- override def read(): Boolean = {
- try {
- while (true) {
- if (currentChunk == null) {
- val headerBytesRead = channel.read(headerBuffer)
- if (headerBytesRead == -1) {
- close()
- return false
- }
- if (headerBuffer.remaining > 0) {
- // re-register for read event ...
- return true
- }
- headerBuffer.flip
- if (headerBuffer.remaining != MessageChunkHeader.HEADER_SIZE) {
- throw new Exception(
- "Unexpected number of bytes (" + headerBuffer.remaining + ") in the header")
- }
- val header = MessageChunkHeader.create(headerBuffer)
- headerBuffer.clear()
-
- processConnectionManagerId(header)
-
- header.typ match {
- case Message.BUFFER_MESSAGE => {
- if (header.totalSize == 0) {
- if (onReceiveCallback != null) {
- onReceiveCallback(this, Message.create(header))
- }
- currentChunk = null
- // re-register for read event ...
- return true
- } else {
- currentChunk = inbox.getChunk(header).orNull
- }
- }
- case _ => throw new Exception("Message of unknown type received")
- }
- }
-
- if (currentChunk == null) throw new Exception("No message chunk to receive data")
-
- val bytesRead = channel.read(currentChunk.buffer)
- if (bytesRead == 0) {
- // re-register for read event ...
- return true
- } else if (bytesRead == -1) {
- close()
- return false
- }
-
- /* logDebug("Read " + bytesRead + " bytes for the buffer") */
-
- if (currentChunk.buffer.remaining == 0) {
- /* println("Filled buffer at " + System.currentTimeMillis) */
- val bufferMessage = inbox.getMessageForChunk(currentChunk).get
- if (bufferMessage.isCompletelyReceived) {
- bufferMessage.flip()
- bufferMessage.finishTime = System.currentTimeMillis
- logDebug("Finished receiving [" + bufferMessage + "] from " +
- "[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken)
- if (onReceiveCallback != null) {
- onReceiveCallback(this, bufferMessage)
- }
- inbox.removeMessage(bufferMessage)
- }
- currentChunk = null
- }
- }
- } catch {
- case e: Exception => {
- logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
- callOnExceptionCallbacks(e)
- close()
- return false
- }
- }
- // should not happen - to keep scala compiler happy
- true
- }
-
- def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
-
- // override def changeInterestForRead(): Boolean = ! isClosed
- override def changeInterestForRead(): Boolean = true
-
- override def changeInterestForWrite(): Boolean = {
- throw new IllegalStateException("Unexpected invocation right now")
- }
-
- override def registerInterest() {
- // Registering read too - does not really help in most cases, but for some
- // it does - so let us keep it for now.
- changeConnectionKeyInterest(SelectionKey.OP_READ)
- }
-
- override def unregisterInterest() {
- changeConnectionKeyInterest(0)
- }
-
- // For read conn, always false.
- override def resetForceReregister(): Boolean = false
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala
deleted file mode 100644
index b3b281ff46..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-private[nio] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
- override def toString: String = {
- connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
- }
-}
-
-private[nio] object ConnectionId {
-
- def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
- val res = connectionIdString.split("_").map(_.trim())
- if (res.size != 3) {
- throw new Exception("Error converting ConnectionId string: " + connectionIdString +
- " to a ConnectionId Object")
- }
- new ConnectionId(new ConnectionManagerId(res(0), res(1).toInt), res(2).toInt)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
deleted file mode 100644
index 9143918790..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ /dev/null
@@ -1,1157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.io.IOException
-import java.lang.ref.WeakReference
-import java.net._
-import java.nio._
-import java.nio.channels._
-import java.nio.channels.spi._
-import java.util.concurrent.atomic.AtomicInteger
-import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, SynchronizedMap, SynchronizedQueue}
-import scala.concurrent.duration._
-import scala.concurrent.{Await, ExecutionContext, Future, Promise}
-import scala.language.postfixOps
-
-import com.google.common.base.Charsets.UTF_8
-import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
-
-import org.apache.spark._
-import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-import scala.util.Try
-import scala.util.control.NonFatal
-
-private[nio] class ConnectionManager(
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager,
- name: String = "Connection manager")
- extends Logging {
-
- /**
- * Used by sendMessageReliably to track messages being sent.
- * @param message the message that was sent
- * @param connectionManagerId the connection manager that sent this message
- * @param completionHandler callback that's invoked when the send has completed or failed
- */
- class MessageStatus(
- val message: Message,
- val connectionManagerId: ConnectionManagerId,
- completionHandler: Try[Message] => Unit) {
-
- def success(ackMessage: Message) {
- if (ackMessage == null) {
- failure(new NullPointerException)
- }
- else {
- completionHandler(scala.util.Success(ackMessage))
- }
- }
-
- def failWithoutAck() {
- completionHandler(scala.util.Failure(new IOException("Failed without being ACK'd")))
- }
-
- def failure(e: Throwable) {
- completionHandler(scala.util.Failure(e))
- }
- }
-
- private val selector = SelectorProvider.provider.openSelector()
- private val ackTimeoutMonitor =
- new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))
-
- private val ackTimeout =
- conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
- conf.get("spark.network.timeout", "120s"))
-
- // Get the thread counts from the Spark Configuration.
- //
- // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
- // we only query for the minimum value because we are using LinkedBlockingDeque.
- //
- // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
- // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
- // parameter is necessary.
- private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
- private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4)
- private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1)
-
- private val handleMessageExecutor = new ThreadPoolExecutor(
- handlerThreadCount,
- handlerThreadCount,
- conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable](),
- ThreadUtils.namedThreadFactory("handle-message-executor")) {
-
- override def afterExecute(r: Runnable, t: Throwable): Unit = {
- super.afterExecute(r, t)
- if (t != null && NonFatal(t)) {
- logError("Error in handleMessageExecutor is not handled properly", t)
- }
- }
- }
-
- private val handleReadWriteExecutor = new ThreadPoolExecutor(
- ioThreadCount,
- ioThreadCount,
- conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable](),
- ThreadUtils.namedThreadFactory("handle-read-write-executor")) {
-
- override def afterExecute(r: Runnable, t: Throwable): Unit = {
- super.afterExecute(r, t)
- if (t != null && NonFatal(t)) {
- logError("Error in handleReadWriteExecutor is not handled properly", t)
- }
- }
- }
-
- // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
- // which should be executed asap
- private val handleConnectExecutor = new ThreadPoolExecutor(
- connectThreadCount,
- connectThreadCount,
- conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable](),
- ThreadUtils.namedThreadFactory("handle-connect-executor")) {
-
- override def afterExecute(r: Runnable, t: Throwable): Unit = {
- super.afterExecute(r, t)
- if (t != null && NonFatal(t)) {
- logError("Error in handleConnectExecutor is not handled properly", t)
- }
- }
- }
-
- private val serverChannel = ServerSocketChannel.open()
- // used to track the SendingConnections waiting to do SASL negotiation
- private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection]
- with SynchronizedMap[ConnectionId, SendingConnection]
- private val connectionsByKey =
- new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
- private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
- with SynchronizedMap[ConnectionManagerId, SendingConnection]
- // Tracks sent messages for which we are awaiting acknowledgements. Entries are added to this
- // map when messages are sent and are removed when acknowledgement messages are received or when
- // acknowledgement timeouts expire
- private val messageStatuses = new HashMap[Int, MessageStatus] // [MessageId, MessageStatus]
- private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
- private val registerRequests = new SynchronizedQueue[SendingConnection]
-
- implicit val futureExecContext = ExecutionContext.fromExecutor(
- ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))
-
- @volatile
- private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
-
- private val authEnabled = securityManager.isAuthenticationEnabled()
-
- serverChannel.configureBlocking(false)
- serverChannel.socket.setReuseAddress(true)
- serverChannel.socket.setReceiveBufferSize(256 * 1024)
-
- private def startService(port: Int): (ServerSocketChannel, Int) = {
- serverChannel.socket.bind(new InetSocketAddress(port))
- (serverChannel, serverChannel.socket.getLocalPort)
- }
- Utils.startServiceOnPort[ServerSocketChannel](port, startService, conf, name)
- serverChannel.register(selector, SelectionKey.OP_ACCEPT)
-
- val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
- logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
-
- // used in combination with the ConnectionManagerId to create unique Connection ids
- // to be able to track asynchronous messages
- private val idCount: AtomicInteger = new AtomicInteger(1)
-
- private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
- private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
-
- @volatile private var isActive = true
- private val selectorThread = new Thread("connection-manager-thread") {
- override def run(): Unit = ConnectionManager.this.run()
- }
- selectorThread.setDaemon(true)
- // start this thread last, since it invokes run(), which accesses members above
- selectorThread.start()
-
- private def triggerWrite(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- writeRunnableStarted.synchronized {
- // So that we do not trigger more write events while processing this one.
- // The write method will re-register when done.
- if (conn.changeInterestForWrite()) conn.unregisterInterest()
- if (writeRunnableStarted.contains(key)) {
- // key.interestOps(key.interestOps() & ~ SelectionKey.OP_WRITE)
- return
- }
-
- writeRunnableStarted += key
- }
- handleReadWriteExecutor.execute(new Runnable {
- override def run() {
- try {
- var register: Boolean = false
- try {
- register = conn.write()
- } finally {
- writeRunnableStarted.synchronized {
- writeRunnableStarted -= key
- val needReregister = register || conn.resetForceReregister()
- if (needReregister && conn.changeInterestForWrite()) {
- conn.registerInterest()
- }
- }
- }
- } catch {
- case NonFatal(e) => {
- logError("Error when writing to " + conn.getRemoteConnectionManagerId(), e)
- conn.callOnExceptionCallbacks(e)
- }
- }
- }
- } )
- }
-
-
- private def triggerRead(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- readRunnableStarted.synchronized {
- // So that we do not trigger more read events while processing this one.
- // The read method will re-register when done.
- if (conn.changeInterestForRead())conn.unregisterInterest()
- if (readRunnableStarted.contains(key)) {
- return
- }
-
- readRunnableStarted += key
- }
- handleReadWriteExecutor.execute(new Runnable {
- override def run() {
- try {
- var register: Boolean = false
- try {
- register = conn.read()
- } finally {
- readRunnableStarted.synchronized {
- readRunnableStarted -= key
- if (register && conn.changeInterestForRead()) {
- conn.registerInterest()
- }
- }
- }
- } catch {
- case NonFatal(e) => {
- logError("Error when reading from " + conn.getRemoteConnectionManagerId(), e)
- conn.callOnExceptionCallbacks(e)
- }
- }
- }
- } )
- }
-
- private def triggerConnect(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null).asInstanceOf[SendingConnection]
- if (conn == null) return
-
- // prevent other events from being triggered
- // Since we are still trying to connect, we do not need to do the additional steps in
- // triggerWrite
- conn.changeConnectionKeyInterest(0)
-
- handleConnectExecutor.execute(new Runnable {
- override def run() {
- try {
- var tries: Int = 10
- while (tries >= 0) {
- if (conn.finishConnect(false)) return
- // Sleep ?
- Thread.sleep(1)
- tries -= 1
- }
-
- // fallback to previous behavior : we should not really come here since this method was
- // triggered since channel became connectable : but at times, the first finishConnect need
- // not succeed : hence the loop to retry a few 'times'.
- conn.finishConnect(true)
- } catch {
- case NonFatal(e) => {
- logError("Error when finishConnect for " + conn.getRemoteConnectionManagerId(), e)
- conn.callOnExceptionCallbacks(e)
- }
- }
- }
- } )
- }
-
- // MUST be called within selector loop - else deadlock.
- private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
- try {
- key.interestOps(0)
- } catch {
- // ignore exceptions
- case e: Exception => logDebug("Ignoring exception", e)
- }
-
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- // Pushing to connect threadpool
- handleConnectExecutor.execute(new Runnable {
- override def run() {
- try {
- conn.callOnExceptionCallbacks(e)
- } catch {
- // ignore exceptions
- case NonFatal(e) => logDebug("Ignoring exception", e)
- }
- try {
- conn.close()
- } catch {
- // ignore exceptions
- case NonFatal(e) => logDebug("Ignoring exception", e)
- }
- }
- })
- }
-
-
- def run() {
- try {
- while (isActive) {
- while (!registerRequests.isEmpty) {
- val conn: SendingConnection = registerRequests.dequeue()
- addListeners(conn)
- conn.connect()
- addConnection(conn)
- }
-
- while(!keyInterestChangeRequests.isEmpty) {
- val (key, ops) = keyInterestChangeRequests.dequeue()
-
- try {
- if (key.isValid) {
- val connection = connectionsByKey.getOrElse(key, null)
- if (connection != null) {
- val lastOps = key.interestOps()
- key.interestOps(ops)
-
- // hot loop - prevent materialization of string if trace not enabled.
- if (isTraceEnabled()) {
- def intToOpStr(op: Int): String = {
- val opStrs = ArrayBuffer[String]()
- if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
- if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
- if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
- if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
- if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
- }
-
- logTrace("Changed key for connection to [" +
- connection.getRemoteConnectionManagerId() + "] changed from [" +
- intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
- }
- }
- } else {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
-
- val selectedKeysCount =
- try {
- selector.select()
- } catch {
- // Explicitly only dealing with CancelledKeyException here since other exceptions
- // should be dealt with differently.
- case e: CancelledKeyException =>
- // Some keys within the selectors list are invalid/closed. clear them.
- val allKeys = selector.keys().iterator()
-
- while (allKeys.hasNext) {
- val key = allKeys.next()
- try {
- if (! key.isValid) {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
- 0
-
- case e: ClosedSelectorException =>
- logDebug("Failed select() as selector is closed.", e)
- return
- }
-
- if (selectedKeysCount == 0) {
- logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size +
- " keys")
- }
- if (selectorThread.isInterrupted) {
- logInfo("Selector thread was interrupted!")
- return
- }
-
- if (0 != selectedKeysCount) {
- val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext) {
- val key = selectedKeys.next
- selectedKeys.remove()
- try {
- if (key.isValid) {
- if (key.isAcceptable) {
- acceptConnection(key)
- } else
- if (key.isConnectable) {
- triggerConnect(key)
- } else
- if (key.isReadable) {
- triggerRead(key)
- } else
- if (key.isWritable) {
- triggerWrite(key)
- }
- } else {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- // weird, but we saw this happening - even though key.isValid was true,
- // key.isAcceptable would throw CancelledKeyException.
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
- }
- }
- } catch {
- case e: Exception => logError("Error in select loop", e)
- }
- }
-
- def acceptConnection(key: SelectionKey) {
- val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
-
- var newChannel = serverChannel.accept()
-
- // accept them all in a tight loop. non blocking accept with no processing, should be fine
- while (newChannel != null) {
- try {
- val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
- val newConnection = new ReceivingConnection(newChannel, selector, newConnectionId,
- securityManager)
- newConnection.onReceive(receiveMessage)
- addListeners(newConnection)
- addConnection(newConnection)
- logInfo("Accepted connection from [" + newConnection.remoteAddress + "]")
- } catch {
- // might happen in case of issues with registering with selector
- case e: Exception => logError("Error in accept loop", e)
- }
-
- newChannel = serverChannel.accept()
- }
- }
-
- private def addListeners(connection: Connection) {
- connection.onKeyInterestChange(changeConnectionKeyInterest)
- connection.onException(handleConnectionError)
- connection.onClose(removeConnection)
- }
-
- def addConnection(connection: Connection) {
- connectionsByKey += ((connection.key, connection))
- }
-
- def removeConnection(connection: Connection) {
- connectionsByKey -= connection.key
-
- try {
- connection match {
- case sendingConnection: SendingConnection =>
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
- logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
-
- connectionsById -= sendingConnectionManagerId
- connectionsAwaitingSasl -= connection.connectionId
-
- messageStatuses.synchronized {
- messageStatuses.values.filter(_.connectionManagerId == sendingConnectionManagerId)
- .foreach(status => {
- logInfo("Notifying " + status)
- status.failWithoutAck()
- })
-
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
- case receivingConnection: ReceivingConnection =>
- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
- logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)
-
- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
- if (!sendingConnectionOpt.isDefined) {
- logError(s"Corresponding SendingConnection to ${remoteConnectionManagerId} not found")
- return
- }
-
- val sendingConnection = sendingConnectionOpt.get
- connectionsById -= remoteConnectionManagerId
- sendingConnection.close()
-
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
-
- assert(sendingConnectionManagerId == remoteConnectionManagerId)
-
- messageStatuses.synchronized {
- for (s <- messageStatuses.values
- if s.connectionManagerId == sendingConnectionManagerId) {
- logInfo("Notifying " + s)
- s.failWithoutAck()
- }
-
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
- case _ => logError("Unsupported type of connection.")
- }
- } finally {
- // So that the selection keys can be removed.
- wakeupSelector()
- }
- }
-
- def handleConnectionError(connection: Connection, e: Throwable) {
- logInfo("Handling connection error on connection to " +
- connection.getRemoteConnectionManagerId())
- removeConnection(connection)
- }
-
- def changeConnectionKeyInterest(connection: Connection, ops: Int) {
- keyInterestChangeRequests += ((connection.key, ops))
- // so that registrations happen !
- wakeupSelector()
- }
-
- def receiveMessage(connection: Connection, message: Message) {
- val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
- logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
- val runnable = new Runnable() {
- val creationTime = System.currentTimeMillis
- def run() {
- try {
- logDebug("Handler thread delay is " + (System.currentTimeMillis - creationTime) + " ms")
- handleMessage(connectionManagerId, message, connection)
- logDebug("Handling delay is " + (System.currentTimeMillis - creationTime) + " ms")
- } catch {
- case NonFatal(e) => {
- logError("Error when handling messages from " +
- connection.getRemoteConnectionManagerId(), e)
- connection.callOnExceptionCallbacks(e)
- }
- }
- }
- }
- handleMessageExecutor.execute(runnable)
- /* handleMessage(connection, message) */
- }
-
- private def handleClientAuthentication(
- waitingConn: SendingConnection,
- securityMsg: SecurityMessage,
- connectionId : ConnectionId) {
- if (waitingConn.isSaslComplete()) {
- logDebug("Client sasl completed for id: " + waitingConn.connectionId)
- connectionsAwaitingSasl -= waitingConn.connectionId
- waitingConn.registerAfterAuth()
- wakeupSelector()
- return
- } else {
- var replyToken : Array[Byte] = null
- try {
- replyToken = waitingConn.sparkSaslClient.response(securityMsg.getToken)
- if (waitingConn.isSaslComplete()) {
- logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId)
- connectionsAwaitingSasl -= waitingConn.connectionId
- waitingConn.registerAfterAuth()
- wakeupSelector()
- return
- }
- val securityMsgResp = SecurityMessage.fromResponse(replyToken,
- securityMsg.getConnectionId.toString)
- val message = securityMsgResp.toBufferMessage
- if (message == null) throw new IOException("Error creating security message")
- sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
- } catch {
- case e: Exception =>
- logError("Error handling sasl client authentication", e)
- waitingConn.close()
- throw new IOException("Error evaluating sasl response: ", e)
- }
- }
- }
-
- private def handleServerAuthentication(
- connection: Connection,
- securityMsg: SecurityMessage,
- connectionId: ConnectionId) {
- if (!connection.isSaslComplete()) {
- logDebug("saslContext not established")
- var replyToken : Array[Byte] = null
- try {
- connection.synchronized {
- if (connection.sparkSaslServer == null) {
- logDebug("Creating sasl Server")
- connection.sparkSaslServer = new SparkSaslServer(conf.getAppId, securityManager, false)
- }
- }
- replyToken = connection.sparkSaslServer.response(securityMsg.getToken)
- if (connection.isSaslComplete()) {
- logDebug("Server sasl completed: " + connection.connectionId +
- " for: " + connectionId)
- } else {
- logDebug("Server sasl not completed: " + connection.connectionId +
- " for: " + connectionId)
- }
- if (replyToken != null) {
- val securityMsgResp = SecurityMessage.fromResponse(replyToken,
- securityMsg.getConnectionId)
- val message = securityMsgResp.toBufferMessage
- if (message == null) throw new Exception("Error creating security Message")
- sendSecurityMessage(connection.getRemoteConnectionManagerId(), message)
- }
- } catch {
- case e: Exception => {
- logError("Error in server auth negotiation: " + e)
- // It would probably be better to send an error message telling other side auth failed
- // but for now just close
- connection.close()
- }
- }
- } else {
- logDebug("connection already established for this connection id: " + connection.connectionId)
- }
- }
-
-
- private def handleAuthentication(conn: Connection, bufferMessage: BufferMessage): Boolean = {
- if (bufferMessage.isSecurityNeg) {
- logDebug("This is security neg message")
-
- // parse as SecurityMessage
- val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
- val connectionId = ConnectionId.createConnectionIdFromString(securityMsg.getConnectionId)
-
- connectionsAwaitingSasl.get(connectionId) match {
- case Some(waitingConn) => {
- // Client - this must be in response to us doing Send
- logDebug("Client handleAuth for id: " + waitingConn.connectionId)
- handleClientAuthentication(waitingConn, securityMsg, connectionId)
- }
- case None => {
- // Server - someone sent us something and we haven't authenticated yet
- logDebug("Server handleAuth for id: " + connectionId)
- handleServerAuthentication(conn, securityMsg, connectionId)
- }
- }
- return true
- } else {
- if (!conn.isSaslComplete()) {
- // We could handle this better and tell the client we need to do authentication
- // negotiation, but for now just ignore them.
- logError("message sent that is not security negotiation message on connection " +
- "not authenticated yet, ignoring it!!")
- return true
- }
- }
- false
- }
-
- private def handleMessage(
- connectionManagerId: ConnectionManagerId,
- message: Message,
- connection: Connection) {
- logDebug("Handling [" + message + "] from [" + connectionManagerId + "]")
- message match {
- case bufferMessage: BufferMessage => {
- if (authEnabled) {
- val res = handleAuthentication(connection, bufferMessage)
- if (res) {
- // message was security negotiation so skip the rest
- logDebug("After handleAuth result was true, returning")
- return
- }
- }
- if (bufferMessage.hasAckId()) {
- messageStatuses.synchronized {
- messageStatuses.get(bufferMessage.ackId) match {
- case Some(status) => {
- messageStatuses -= bufferMessage.ackId
- status.success(message)
- }
- case None => {
- /**
- * We can fall down on this code because of following 2 cases
- *
- * (1) Invalid ack sent due to buggy code.
- *
- * (2) Late-arriving ack for a SendMessageStatus
- * To avoid unwilling late-arriving ack
- * caused by long pause like GC, you can set
- * larger value than default to spark.core.connection.ack.wait.timeout
- */
- logWarning(s"Could not find reference for received ack Message ${message.id}")
- }
- }
- }
- } else {
- var ackMessage : Option[Message] = None
- try {
- ackMessage = if (onReceiveCallback != null) {
- logDebug("Calling back")
- onReceiveCallback(bufferMessage, connectionManagerId)
- } else {
- logDebug("Not calling back as callback is null")
- None
- }
-
- if (ackMessage.isDefined) {
- if (!ackMessage.get.isInstanceOf[BufferMessage]) {
- logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
- + ackMessage.get.getClass)
- } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
- logDebug("Response to " + bufferMessage + " does not have ack id set")
- ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
- }
- }
- } catch {
- case e: Exception => {
- logError(s"Exception was thrown while processing message", e)
- ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
- }
- } finally {
- sendMessage(connectionManagerId, ackMessage.getOrElse {
- Message.createBufferMessage(bufferMessage.id)
- })
- }
- }
- }
- case _ => throw new Exception("Unknown type message received")
- }
- }
-
- private def checkSendAuthFirst(connManagerId: ConnectionManagerId, conn: SendingConnection) {
- // see if we need to do sasl before writing
- // this should only be the first negotiation as the Client!!!
- if (!conn.isSaslComplete()) {
- conn.synchronized {
- if (conn.sparkSaslClient == null) {
- conn.sparkSaslClient = new SparkSaslClient(conf.getAppId, securityManager, false)
- var firstResponse: Array[Byte] = null
- try {
- firstResponse = conn.sparkSaslClient.firstToken()
- val securityMsg = SecurityMessage.fromResponse(firstResponse,
- conn.connectionId.toString())
- val message = securityMsg.toBufferMessage
- if (message == null) throw new Exception("Error creating security message")
- connectionsAwaitingSasl += ((conn.connectionId, conn))
- sendSecurityMessage(connManagerId, message)
- logDebug("adding connectionsAwaitingSasl id: " + conn.connectionId +
- " to: " + connManagerId)
- } catch {
- case e: Exception => {
- logError("Error getting first response from the SaslClient.", e)
- conn.close()
- throw new Exception("Error getting first response from the SaslClient")
- }
- }
- }
- }
- } else {
- logDebug("Sasl already established ")
- }
- }
-
- // allow us to add messages to the inbox for doing sasl negotiating
- private def sendSecurityMessage(connManagerId: ConnectionManagerId, message: Message) {
- def startNewConnection(): SendingConnection = {
- val inetSocketAddress = new InetSocketAddress(connManagerId.host, connManagerId.port)
- val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
- val newConnection = new SendingConnection(inetSocketAddress, selector, connManagerId,
- newConnectionId, securityManager)
- logInfo("creating new sending connection for security! " + newConnectionId )
- registerRequests.enqueue(newConnection)
-
- newConnection
- }
- // I removed the lookupKey stuff as part of merge ... should I re-add it ?
- // We did not find it useful in our test-env ...
- // If we do re-add it, we should consistently use it everywhere I guess ?
- message.senderAddress = id.toSocketAddress()
- logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
- val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
-
- // send security message until going connection has been authenticated
- connection.send(message)
-
- wakeupSelector()
- }
-
- private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
- def startNewConnection(): SendingConnection = {
- val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
- connectionManagerId.port)
- val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
- val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId,
- newConnectionId, securityManager)
- newConnection.onException {
- case (conn, e) => {
- logError("Exception while sending message.", e)
- reportSendingMessageFailure(message.id, e)
- }
- }
- logTrace("creating new sending connection: " + newConnectionId)
- registerRequests.enqueue(newConnection)
-
- newConnection
- }
- val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
-
- message.senderAddress = id.toSocketAddress()
- logDebug("Before Sending [" + message + "] to [" + connectionManagerId + "]" + " " +
- "connectionid: " + connection.connectionId)
-
- if (authEnabled) {
- try {
- checkSendAuthFirst(connectionManagerId, connection)
- } catch {
- case NonFatal(e) => {
- reportSendingMessageFailure(message.id, e)
- }
- }
- }
- logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
- connection.send(message)
- wakeupSelector()
- }
-
- private def reportSendingMessageFailure(messageId: Int, e: Throwable): Unit = {
- // need to tell sender it failed
- messageStatuses.synchronized {
- val s = messageStatuses.get(messageId)
- s match {
- case Some(msgStatus) => {
- messageStatuses -= messageId
- logInfo("Notifying " + msgStatus.connectionManagerId)
- msgStatus.failure(e)
- }
- case None => {
- logError("no messageStatus for failed message id: " + messageId)
- }
- }
- }
- }
-
- private def wakeupSelector() {
- selector.wakeup()
- }
-
- /**
- * Send a message and block until an acknowledgment is received or an error occurs.
- * @param connectionManagerId the message's destination
- * @param message the message being sent
- * @return a Future that either returns the acknowledgment message or captures an exception.
- */
- def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
- : Future[Message] = {
- val promise = Promise[Message]()
-
- // It's important that the TimerTask doesn't capture a reference to `message`, which can cause
- // memory leaks since cancelled TimerTasks won't necessarily be garbage collected until the time
- // at which they would originally be scheduled to run. Therefore, extract the message id
- // from outside of the TimerTask closure (see SPARK-4393 for more context).
- val messageId = message.id
- // Keep a weak reference to the promise so that the completed promise may be garbage-collected
- val promiseReference = new WeakReference(promise)
- val timeoutTask: TimerTask = new TimerTask {
- override def run(timeout: Timeout): Unit = {
- messageStatuses.synchronized {
- messageStatuses.remove(messageId).foreach { s =>
- val e = new IOException("sendMessageReliably failed because ack " +
- s"was not received within $ackTimeout sec")
- val p = promiseReference.get
- if (p != null) {
- // Attempt to fail the promise with a Timeout exception
- if (!p.tryFailure(e)) {
- // If we reach here, then someone else has already signalled success or failure
- // on this promise, so log a warning:
- logError("Ignore error because promise is completed", e)
- }
- } else {
- // The WeakReference was empty, which should never happen because
- // sendMessageReliably's caller should have a strong reference to promise.future;
- logError("Promise was garbage collected; this should never happen!", e)
- }
- }
- }
- }
- }
-
- val timeoutTaskHandle = ackTimeoutMonitor.newTimeout(timeoutTask, ackTimeout, TimeUnit.SECONDS)
-
- val status = new MessageStatus(message, connectionManagerId, s => {
- timeoutTaskHandle.cancel()
- s match {
- case scala.util.Failure(e) =>
- // Indicates a failure where we either never sent or never got ACK'd
- if (!promise.tryFailure(e)) {
- logWarning("Ignore error because promise is completed", e)
- }
- case scala.util.Success(ackMessage) =>
- if (ackMessage.hasError) {
- val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
- val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
- errorMsgByteBuf.get(errorMsgBytes)
- val errorMsg = new String(errorMsgBytes, UTF_8)
- val e = new IOException(
- s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
- if (!promise.tryFailure(e)) {
- logWarning("Ignore error because promise is completed", e)
- }
- } else {
- if (!promise.trySuccess(ackMessage)) {
- logWarning("Drop ackMessage because promise is completed")
- }
- }
- }
- })
- messageStatuses.synchronized {
- messageStatuses += ((message.id, status))
- }
-
- sendMessage(connectionManagerId, message)
- promise.future
- }
-
- def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) {
- onReceiveCallback = callback
- }
-
- def stop() {
- isActive = false
- ackTimeoutMonitor.stop()
- selector.close()
- selectorThread.interrupt()
- selectorThread.join()
- val connections = connectionsByKey.values
- connections.foreach(_.close())
- if (connectionsByKey.size != 0) {
- logWarning("All connections not cleaned up")
- }
- handleMessageExecutor.shutdown()
- handleReadWriteExecutor.shutdown()
- handleConnectExecutor.shutdown()
- logInfo("ConnectionManager stopped")
- }
-}
-
-
-private[spark] object ConnectionManager {
- import scala.concurrent.ExecutionContext.Implicits.global
-
- def main(args: Array[String]) {
- val conf = new SparkConf
- val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- // scalastyle:off println
- println("Received [" + msg + "] from [" + id + "]")
- // scalastyle:on println
- None
- })
-
- /* testSequentialSending(manager) */
- /* System.gc() */
-
- /* testParallelSending(manager) */
- /* System.gc() */
-
- /* testParallelDecreasingSending(manager) */
- /* System.gc() */
-
- testContinuousSending(manager)
- System.gc()
- }
-
- // scalastyle:off println
- def testSequentialSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Sequential Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- Await.result(manager.sendMessageReliably(manager.id, bufferMessage), Duration.Inf)
- })
- println("--------------------------")
- println()
- }
-
- def testParallelSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Parallel Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val startTime = System.currentTimeMillis
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- f.onFailure {
- case e => println("Failed due to " + e)
- }
- Await.ready(f, 1 second)
- })
- val finishTime = System.currentTimeMillis
-
- val mb = size * count / 1024.0 / 1024.0
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("--------------------------")
- println("Started at " + startTime + ", finished at " + finishTime)
- println("Sent " + count + " messages of size " + size + " in " + ms + " ms " +
- "(" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
-
- def testParallelDecreasingSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Parallel Decreasing Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
- val buffers = Array.tabulate(count) { i =>
- val bufferLen = size * (i + 1)
- val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
- ByteBuffer.allocate(bufferLen).put(bufferContent)
- }
- buffers.foreach(_.flip)
- val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
-
- val startTime = System.currentTimeMillis
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffers(count - 1 - i).duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- f.onFailure {
- case e => println("Failed due to " + e)
- }
- Await.ready(f, 1 second)
- })
- val finishTime = System.currentTimeMillis
-
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("--------------------------")
- /* println("Started at " + startTime + ", finished at " + finishTime) */
- println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
-
- def testContinuousSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Continuous Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val startTime = System.currentTimeMillis
- while(true) {
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- f.onFailure {
- case e => println("Failed due to " + e)
- }
- Await.ready(f, 1 second)
- })
- val finishTime = System.currentTimeMillis
- Thread.sleep(1000)
- val mb = size * count / 1024.0 / 1024.0
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
- }
- // scalastyle:on println
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala
deleted file mode 100644
index 1cd13d887c..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.net.InetSocketAddress
-
-import org.apache.spark.util.Utils
-
-private[nio] case class ConnectionManagerId(host: String, port: Int) {
- // DEBUG code
- Utils.checkHost(host)
- assert (port > 0)
-
- def toSocketAddress(): InetSocketAddress = new InetSocketAddress(host, port)
-}
-
-
-private[nio] object ConnectionManagerId {
- def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = {
- new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort)
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
deleted file mode 100644
index 85d2fe2bf9..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-import com.google.common.base.Charsets.UTF_8
-
-import org.apache.spark.util.Utils
-
-private[nio] abstract class Message(val typ: Long, val id: Int) {
- var senderAddress: InetSocketAddress = null
- var started = false
- var startTime = -1L
- var finishTime = -1L
- var isSecurityNeg = false
- var hasError = false
-
- def size: Int
-
- def getChunkForSending(maxChunkSize: Int): Option[MessageChunk]
-
- def getChunkForReceiving(chunkSize: Int): Option[MessageChunk]
-
- def timeTaken(): String = (finishTime - startTime).toString + " ms"
-
- override def toString: String = {
- this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
- }
-}
-
-
-private[nio] object Message {
- val BUFFER_MESSAGE = 1111111111L
-
- var lastId = 1
-
- def getNewId(): Int = synchronized {
- lastId += 1
- if (lastId == 0) {
- lastId += 1
- }
- lastId
- }
-
- def createBufferMessage(dataBuffers: Seq[ByteBuffer], ackId: Int): BufferMessage = {
- if (dataBuffers == null) {
- return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer], ackId)
- }
- if (dataBuffers.exists(_ == null)) {
- throw new Exception("Attempting to create buffer message with null buffer")
- }
- new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
- }
-
- def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
- createBufferMessage(dataBuffers, 0)
-
- def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
- if (dataBuffer == null) {
- createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
- } else {
- createBufferMessage(Array(dataBuffer), ackId)
- }
- }
-
- def createBufferMessage(dataBuffer: ByteBuffer): BufferMessage =
- createBufferMessage(dataBuffer, 0)
-
- def createBufferMessage(ackId: Int): BufferMessage = {
- createBufferMessage(new Array[ByteBuffer](0), ackId)
- }
-
- /**
- * Create a "negative acknowledgment" to notify a sender that an error occurred
- * while processing its message. The exception's stacktrace will be formatted
- * as a string, serialized into a byte array, and sent as the message payload.
- */
- def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
- val exceptionString = Utils.exceptionString(exception)
- val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes(UTF_8))
- val errorMessage = createBufferMessage(serializedExceptionString, ackId)
- errorMessage.hasError = true
- errorMessage
- }
-
- def create(header: MessageChunkHeader): Message = {
- val newMessage: Message = header.typ match {
- case BUFFER_MESSAGE => new BufferMessage(header.id,
- ArrayBuffer(ByteBuffer.allocate(header.totalSize)), header.other)
- }
- newMessage.hasError = header.hasError
- newMessage.senderAddress = header.address
- newMessage
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala b/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala
deleted file mode 100644
index a4568e849f..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-private[nio]
-class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
-
- val size: Int = if (buffer == null) 0 else buffer.remaining
-
- lazy val buffers: ArrayBuffer[ByteBuffer] = {
- val ab = new ArrayBuffer[ByteBuffer]()
- ab += header.buffer
- if (buffer != null) {
- ab += buffer
- }
- ab
- }
-
- override def toString: String = {
- "" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")"
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala b/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala
deleted file mode 100644
index 7b3da4bb9d..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.net.{InetAddress, InetSocketAddress}
-import java.nio.ByteBuffer
-
-private[nio] class MessageChunkHeader(
- val typ: Long,
- val id: Int,
- val totalSize: Int,
- val chunkSize: Int,
- val other: Int,
- val hasError: Boolean,
- val securityNeg: Int,
- val address: InetSocketAddress) {
- lazy val buffer = {
- // No need to change this, at 'use' time, we do a reverse lookup of the hostname.
- // Refer to network.Connection
- val ip = address.getAddress.getAddress()
- val port = address.getPort()
- ByteBuffer.
- allocate(MessageChunkHeader.HEADER_SIZE).
- putLong(typ).
- putInt(id).
- putInt(totalSize).
- putInt(chunkSize).
- putInt(other).
- put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
- putInt(securityNeg).
- putInt(ip.size).
- put(ip).
- putInt(port).
- position(MessageChunkHeader.HEADER_SIZE).
- flip.asInstanceOf[ByteBuffer]
- }
-
- override def toString: String = {
- "" + this.getClass.getSimpleName + ":" + id + " of type " + typ +
- " and sizes " + totalSize + " / " + chunkSize + " bytes, securityNeg: " + securityNeg
- }
-
-}
-
-
-private[nio] object MessageChunkHeader {
- val HEADER_SIZE = 45
-
- def create(buffer: ByteBuffer): MessageChunkHeader = {
- if (buffer.remaining != HEADER_SIZE) {
- throw new IllegalArgumentException("Cannot convert buffer data to Message")
- }
- val typ = buffer.getLong()
- val id = buffer.getInt()
- val totalSize = buffer.getInt()
- val chunkSize = buffer.getInt()
- val other = buffer.getInt()
- val hasError = buffer.get() != 0
- val securityNeg = buffer.getInt()
- val ipSize = buffer.getInt()
- val ipBytes = new Array[Byte](ipSize)
- buffer.get(ipBytes)
- val ip = InetAddress.getByAddress(ipBytes)
- val port = buffer.getInt()
- new MessageChunkHeader(typ, id, totalSize, chunkSize, other, hasError, securityNeg,
- new InetSocketAddress(ip, port))
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
deleted file mode 100644
index b2aec16063..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.network._
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.shuffle.BlockFetchingListener
-import org.apache.spark.storage.{BlockId, StorageLevel}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-
-import scala.concurrent.Future
-
-
-/**
- * A [[BlockTransferService]] implementation based on [[ConnectionManager]], a custom
- * implementation using Java NIO.
- */
-final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
- extends BlockTransferService with Logging {
-
- private var cm: ConnectionManager = _
-
- private var blockDataManager: BlockDataManager = _
-
- /**
- * Port number the service is listening on, available only after [[init]] is invoked.
- */
- override def port: Int = {
- checkInit()
- cm.id.port
- }
-
- /**
- * Host name the service is listening on, available only after [[init]] is invoked.
- */
- override def hostName: String = {
- checkInit()
- cm.id.host
- }
-
- /**
- * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
- * local blocks or put local blocks.
- */
- override def init(blockDataManager: BlockDataManager): Unit = {
- this.blockDataManager = blockDataManager
- cm = new ConnectionManager(
- conf.getInt("spark.blockManager.port", 0),
- conf,
- securityManager,
- "Connection manager for block manager")
- cm.onReceiveMessage(onBlockMessageReceive)
- }
-
- /**
- * Tear down the transfer service.
- */
- override def close(): Unit = {
- if (cm != null) {
- cm.stop()
- }
- }
-
- override def fetchBlocks(
- host: String,
- port: Int,
- execId: String,
- blockIds: Array[String],
- listener: BlockFetchingListener): Unit = {
- checkInit()
-
- val cmId = new ConnectionManagerId(host, port)
- val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
- BlockMessage.fromGetBlock(GetBlock(BlockId(blockId)))
- })
-
- val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
-
- // Register the listener on success/failure future callback.
- future.onSuccess { case message =>
- val bufferMessage = message.asInstanceOf[BufferMessage]
- val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
-
- // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
- if (blockMessageArray.isEmpty) {
- blockIds.foreach { id =>
- listener.onBlockFetchFailure(id, new SparkException(s"Received empty message from $cmId"))
- }
- } else {
- for (blockMessage: BlockMessage <- blockMessageArray) {
- val msgType = blockMessage.getType
- if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
- if (blockMessage.getId != null) {
- listener.onBlockFetchFailure(blockMessage.getId.toString,
- new SparkException(s"Unexpected message $msgType received from $cmId"))
- }
- } else {
- val blockId = blockMessage.getId
- val networkSize = blockMessage.getData.limit()
- listener.onBlockFetchSuccess(
- blockId.toString, new NioManagedBuffer(blockMessage.getData))
- }
- }
- }
- }(cm.futureExecContext)
-
- future.onFailure { case exception =>
- blockIds.foreach { blockId =>
- listener.onBlockFetchFailure(blockId, exception)
- }
- }(cm.futureExecContext)
- }
-
- /**
- * Upload a single block to a remote node, available only after [[init]] is invoked.
- *
- * This call blocks until the upload completes, or throws an exception upon failures.
- */
- override def uploadBlock(
- hostname: String,
- port: Int,
- execId: String,
- blockId: BlockId,
- blockData: ManagedBuffer,
- level: StorageLevel)
- : Future[Unit] = {
- checkInit()
- val msg = PutBlock(blockId, blockData.nioByteBuffer(), level)
- val blockMessageArray = new BlockMessageArray(BlockMessage.fromPutBlock(msg))
- val remoteCmId = new ConnectionManagerId(hostName, port)
- val reply = cm.sendMessageReliably(remoteCmId, blockMessageArray.toBufferMessage)
- reply.map(x => ())(cm.futureExecContext)
- }
-
- private def checkInit(): Unit = if (cm == null) {
- throw new IllegalStateException(getClass.getName + " has not been initialized")
- }
-
- private def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
- logDebug("Handling message " + msg)
- msg match {
- case bufferMessage: BufferMessage =>
- try {
- logDebug("Handling as a buffer message " + bufferMessage)
- val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
- logDebug("Parsed as a block message array")
- val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
- Some(new BlockMessageArray(responseMessages).toBufferMessage)
- } catch {
- case e: Exception =>
- logError("Exception handling buffer message", e)
- Some(Message.createErrorMessage(e, msg.id))
- }
-
- case otherMessage: Any =>
- val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
- logError(errorMsg)
- Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
- }
- }
-
- private def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
- blockMessage.getType match {
- case BlockMessage.TYPE_PUT_BLOCK =>
- val msg = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
- logDebug("Received [" + msg + "]")
- putBlock(msg.id, msg.data, msg.level)
- None
-
- case BlockMessage.TYPE_GET_BLOCK =>
- val msg = new GetBlock(blockMessage.getId)
- logDebug("Received [" + msg + "]")
- val buffer = getBlock(msg.id)
- if (buffer == null) {
- return None
- }
- Some(BlockMessage.fromGotBlock(GotBlock(msg.id, buffer)))
-
- case _ => None
- }
- }
-
- private def putBlock(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) {
- val startTimeMs = System.currentTimeMillis()
- logDebug("PutBlock " + blockId + " started from " + startTimeMs + " with data: " + bytes)
- blockDataManager.putBlockData(blockId, new NioManagedBuffer(bytes), level)
- logDebug("PutBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
- + " with data size: " + bytes.limit)
- }
-
- private def getBlock(blockId: BlockId): ByteBuffer = {
- val startTimeMs = System.currentTimeMillis()
- logDebug("GetBlock " + blockId + " started from " + startTimeMs)
- val buffer = blockDataManager.getBlockData(blockId)
- logDebug("GetBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
- + " and got buffer " + buffer)
- buffer.nioByteBuffer()
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala
deleted file mode 100644
index 232c552f98..0000000000
--- a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.{ArrayBuffer, StringBuilder}
-
-import org.apache.spark._
-
-/**
- * SecurityMessage is class that contains the connectionId and sasl token
- * used in SASL negotiation. SecurityMessage has routines for converting
- * it to and from a BufferMessage so that it can be sent by the ConnectionManager
- * and easily consumed by users when received.
- * The api was modeled after BlockMessage.
- *
- * The connectionId is the connectionId of the client side. Since
- * message passing is asynchronous and its possible for the server side (receiving)
- * to get multiple different types of messages on the same connection the connectionId
- * is used to know which connnection the security message is intended for.
- *
- * For instance, lets say we are node_0. We need to send data to node_1. The node_0 side
- * is acting as a client and connecting to node_1. SASL negotiation has to occur
- * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message.
- * node_1 receives the message from node_0 but before it can process it and send a response,
- * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0
- * and sends a security message of its own to authenticate as a client. Now node_0 gets
- * the message and it needs to decide if this message is in response to it being a client
- * (from the first send) or if its just node_1 trying to connect to it to send data. This
- * is where the connectionId field is used. node_0 can lookup the connectionId to see if
- * it is in response to it being a client or if its in response to someone sending other data.
- *
- * The format of a SecurityMessage as its sent is:
- * - Length of the ConnectionId
- * - ConnectionId
- * - Length of the token
- * - Token
- */
-private[nio] class SecurityMessage extends Logging {
-
- private var connectionId: String = null
- private var token: Array[Byte] = null
-
- def set(byteArr: Array[Byte], newconnectionId: String) {
- if (byteArr == null) {
- token = new Array[Byte](0)
- } else {
- token = byteArr
- }
- connectionId = newconnectionId
- }
-
- /**
- * Read the given buffer and set the members of this class.
- */
- def set(buffer: ByteBuffer) {
- val idLength = buffer.getInt()
- val idBuilder = new StringBuilder(idLength)
- for (i <- 1 to idLength) {
- idBuilder += buffer.getChar()
- }
- connectionId = idBuilder.toString()
-
- val tokenLength = buffer.getInt()
- token = new Array[Byte](tokenLength)
- if (tokenLength > 0) {
- buffer.get(token, 0, tokenLength)
- }
- }
-
- def set(bufferMsg: BufferMessage) {
- val buffer = bufferMsg.buffers.apply(0)
- buffer.clear()
- set(buffer)
- }
-
- def getConnectionId: String = {
- return connectionId
- }
-
- def getToken: Array[Byte] = {
- return token
- }
-
- /**
- * Create a BufferMessage that can be sent by the ConnectionManager containing
- * the security information from this class.
- * @return BufferMessage
- */
- def toBufferMessage: BufferMessage = {
- val buffers = new ArrayBuffer[ByteBuffer]()
-
- // 4 bytes for the length of the connectionId
- // connectionId is of type char so multiple the length by 2 to get number of bytes
- // 4 bytes for the length of token
- // token is a byte buffer so just take the length
- var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length)
- buffer.putInt(connectionId.length())
- connectionId.foreach((x: Char) => buffer.putChar(x))
- buffer.putInt(token.length)
-
- if (token.length > 0) {
- buffer.put(token)
- }
- buffer.flip()
- buffers += buffer
-
- var message = Message.createBufferMessage(buffers)
- logDebug("message total size is : " + message.size)
- message.isSecurityNeg = true
- return message
- }
-
- override def toString: String = {
- "SecurityMessage [connId= " + connectionId + ", Token = " + token + "]"
- }
-}
-
-private[nio] object SecurityMessage {
-
- /**
- * Convert the given BufferMessage to a SecurityMessage by parsing the contents
- * of the BufferMessage and populating the SecurityMessage fields.
- * @param bufferMessage is a BufferMessage that was received
- * @return new SecurityMessage
- */
- def fromBufferMessage(bufferMessage: BufferMessage): SecurityMessage = {
- val newSecurityMessage = new SecurityMessage()
- newSecurityMessage.set(bufferMessage)
- newSecurityMessage
- }
-
- /**
- * Create a SecurityMessage to send from a given saslResponse.
- * @param response is the response to a challenge from the SaslClient or Saslserver
- * @param connectionId the client connectionId we are negotiation authentication for
- * @return a new SecurityMessage
- */
- def fromResponse(response : Array[Byte], connectionId : String) : SecurityMessage = {
- val newSecurityMessage = new SecurityMessage()
- newSecurityMessage.set(response, connectionId)
- newSecurityMessage
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index b977711e7d..c5195c1143 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -35,7 +35,6 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
-import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
@@ -362,9 +361,6 @@ private[serializer] object KryoSerializer {
private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
classOf[StorageLevel],
- classOf[PutBlock],
- classOf[GotBlock],
- classOf[GetBlock],
classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
diff --git a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
deleted file mode 100644
index 5e364cc0ed..0000000000
--- a/core/src/test/scala/org/apache/spark/network/nio/ConnectionManagerSuite.scala
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network.nio
-
-import java.io.IOException
-import java.nio._
-
-import scala.concurrent.duration._
-import scala.concurrent.{Await, TimeoutException}
-import scala.language.postfixOps
-
-import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
-import org.apache.spark.util.Utils
-
-/**
- * Test the ConnectionManager with various security settings.
- */
-class ConnectionManagerSuite extends SparkFunSuite {
-
- test("security default off") {
- val conf = new SparkConf
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- var receivedMessage = false
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- receivedMessage = true
- None
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 10 seconds)
-
- assert(receivedMessage == true)
-
- manager.stop()
- }
-
- test("security on same password") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
- conf.set("spark.app.id", "app-id")
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- var numReceivedMessages = 0
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedMessages += 1
- None
- })
- val managerServer = new ConnectionManager(0, conf, securityManager)
- var numReceivedServerMessages = 0
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedServerMessages += 1
- None
- })
-
- val size = 10 * 1024 * 1024
- val count = 10
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- Await.result(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)
- })
-
- assert(numReceivedServerMessages == 10)
- assert(numReceivedMessages == 0)
-
- manager.stop()
- managerServer.stop()
- }
-
- test("security mismatch password") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "true")
- conf.set("spark.app.id", "app-id")
- conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- var numReceivedMessages = 0
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedMessages += 1
- None
- })
-
- val badconf = conf.clone.set("spark.authenticate.secret", "bad")
- val badsecurityManager = new SecurityManager(badconf)
- val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
- var numReceivedServerMessages = 0
-
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedServerMessages += 1
- None
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- // Expect managerServer to close connection, which we'll report as an error:
- intercept[IOException] {
- Await.result(manager.sendMessageReliably(managerServer.id, bufferMessage), 10 seconds)
- }
-
- assert(numReceivedServerMessages == 0)
- assert(numReceivedMessages == 0)
-
- manager.stop()
- managerServer.stop()
- }
-
- test("security mismatch auth off") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "false")
- conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- var numReceivedMessages = 0
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedMessages += 1
- None
- })
-
- val badconf = new SparkConf
- badconf.set("spark.authenticate", "true")
- badconf.set("spark.authenticate.secret", "good")
- val badsecurityManager = new SecurityManager(badconf)
- val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
- var numReceivedServerMessages = 0
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedServerMessages += 1
- None
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- (0 until 1).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(managerServer.id, bufferMessage)
- }).foreach(f => {
- try {
- val g = Await.result(f, 1 second)
- assert(false)
- } catch {
- case i: IOException =>
- assert(true)
- case e: TimeoutException => {
- // we should timeout here since the client can't do the negotiation
- assert(true)
- }
- }
- })
-
- assert(numReceivedServerMessages == 0)
- assert(numReceivedMessages == 0)
- manager.stop()
- managerServer.stop()
- }
-
- test("security auth off") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "false")
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- var numReceivedMessages = 0
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedMessages += 1
- None
- })
-
- val badconf = new SparkConf
- badconf.set("spark.authenticate", "false")
- val badsecurityManager = new SecurityManager(badconf)
- val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
- var numReceivedServerMessages = 0
-
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- numReceivedServerMessages += 1
- None
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- (0 until 10).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(managerServer.id, bufferMessage)
- }).foreach(f => {
- try {
- val g = Await.result(f, 1 second)
- } catch {
- case e: Exception => {
- assert(false)
- }
- }
- })
- assert(numReceivedServerMessages == 10)
- assert(numReceivedMessages == 0)
-
- manager.stop()
- managerServer.stop()
- }
-
- test("Ack error message") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "false")
- val securityManager = new SecurityManager(conf)
- val manager = new ConnectionManager(0, conf, securityManager)
- val managerServer = new ConnectionManager(0, conf, securityManager)
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- throw new Exception("Custom exception text")
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
- val bufferMessage = Message.createBufferMessage(buffer)
-
- val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
-
- val exception = intercept[IOException] {
- Await.result(future, 1 second)
- }
- assert(Utils.exceptionString(exception).contains("Custom exception text"))
-
- manager.stop()
- managerServer.stop()
-
- }
-
- test("sendMessageReliably timeout") {
- val clientConf = new SparkConf
- clientConf.set("spark.authenticate", "false")
- val ackTimeoutS = 30
- clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeoutS}s")
-
- val clientSecurityManager = new SecurityManager(clientConf)
- val manager = new ConnectionManager(0, clientConf, clientSecurityManager)
-
- val serverConf = new SparkConf
- serverConf.set("spark.authenticate", "false")
- val serverSecurityManager = new SecurityManager(serverConf)
- val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
- managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- // sleep 60 sec > ack timeout for simulating server slow down or hang up
- Thread.sleep(ackTimeoutS * 3 * 1000)
- None
- })
-
- val size = 10 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-
- val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
-
- // Future should throw IOException in 30 sec.
- // Otherwise TimeoutExcepton is thrown from Await.result.
- // We expect TimeoutException is not thrown.
- intercept[IOException] {
- Await.result(future, (ackTimeoutS * 2) second)
- }
-
- manager.stop()
- managerServer.stop()
- }
-
-}
-
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 0f5ba46f69..eb5af70d57 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -26,10 +26,10 @@ import org.mockito.Mockito.{mock, when}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
+import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark._
import org.apache.spark.network.BlockTransferService
-import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -38,7 +38,7 @@ import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
- private val conf = new SparkConf(false)
+ private val conf = new SparkConf(false).set("spark.app.id", "test")
var rpcEnv: RpcEnv = null
var master: BlockManagerMaster = null
val securityMgr = new SecurityManager(conf)
@@ -59,7 +59,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- val transfer = new NioBlockTransferService(conf, securityMgr)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val store = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
store.initialize("app-id")
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index e5b54d66c8..34bb4952e7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -30,10 +30,10 @@ import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
+import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.network.nio.NioBlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -44,7 +44,7 @@ import org.apache.spark.util._
class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
with PrivateMethodTester with ResetSystemProperties {
- private val conf = new SparkConf(false)
+ private val conf = new SparkConf(false).set("spark.app.id", "test")
var store: BlockManager = null
var store2: BlockManager = null
var store3: BlockManager = null
@@ -66,7 +66,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- val transfer = new NioBlockTransferService(conf, securityMgr)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val manager = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
@@ -819,7 +819,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
- val transfer = new NioBlockTransferService(conf, securityMgr)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
0)
@@ -833,7 +833,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Make sure get a1 doesn't hang and returns None.
failAfter(1 second) {
- assert(store.getSingle("a1") == None, "a1 should not be in store")
+ assert(store.getSingle("a1").isEmpty, "a1 should not be in store")
}
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 29a36bd67f..a2cc7a37e2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -383,17 +383,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.shuffle.blockTransferService</code></td>
- <td>netty</td>
- <td>
- Implementation to use for transferring shuffle and cached blocks between executors. There
- are two implementations available: <code>netty</code> and <code>nio</code>. Netty-based
- block transfer is intended to be simpler but equally efficient and is the default option
- starting in 1.2, and <code>nio</code> block transfer is deprecated in Spark 1.5.0 and will
- be removed in Spark 1.6.0.
- </td>
-</tr>
-<tr>
<td><code>spark.shuffle.compress</code></td>
<td>true</td>
<td>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 714ce3cd9b..3b8b6c8ffa 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -32,635 +32,638 @@ import com.typesafe.tools.mima.core.ProblemFilters._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
- def excludes(version: String) =
- version match {
- case v if v.startsWith("1.5") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("network"),
- // These are needed if checking against the sbt build, since they are part of
- // the maven-generated artifacts in 1.3.
- excludePackage("org.spark-project.jetty"),
- MimaBuild.excludeSparkPackage("unused"),
- // JavaRDDLike is not meant to be extended by user programs
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.partitioner"),
- // Modification of private static method
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.streaming.kafka.KafkaUtils.org$apache$spark$streaming$kafka$KafkaUtils$$leadersForRanges"),
- // Mima false positive (was a private[spark] class)
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.util.collection.PairIterator"),
- // Removing a testing method from a private class
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
- // While private MiMa is still not happy about the changes,
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.ml.regression.LeastSquaresAggregator.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.ml.regression.LeastSquaresCostFun.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.ml.classification.LogisticCostFun.this"),
- // SQL execution is considered private.
- excludePackage("org.apache.spark.sql.execution"),
- // The old JSON RDD is removed in favor of streaming Jackson
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
- // local function inside a method
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.sql.UDFRegistration.org$apache$spark$sql$UDFRegistration$$builder$24")
- ) ++ Seq(
- // SPARK-8479 Add numNonzeros and numActives to Matrix.
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.numNonzeros"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.numActives")
- ) ++ Seq(
- // SPARK-8914 Remove RDDApi
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RDDApi")
- ) ++ Seq(
- // SPARK-7292 Provide operator to truncate lineage cheaply
- ProblemFilters.exclude[AbstractClassProblem](
- "org.apache.spark.rdd.RDDCheckpointData"),
- ProblemFilters.exclude[AbstractClassProblem](
- "org.apache.spark.rdd.CheckpointRDD")
- ) ++ Seq(
- // SPARK-8701 Add input metadata in the batch page.
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.streaming.scheduler.InputInfo$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.streaming.scheduler.InputInfo")
- ) ++ Seq(
- // SPARK-6797 Support YARN modes for SparkR
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.r.PairwiseRRDD.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.r.RRDD.createRWorker"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.r.RRDD.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.r.StringRRDD.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.r.BaseRRDD.this")
- ) ++ Seq(
- // SPARK-7422 add argmax for sparse vectors
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.argmax")
- ) ++ Seq(
- // SPARK-8906 Move all internal data source classes into execution.datasources
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.BaseWriterContainer"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLParser"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
- // SPARK-9763 Minimize exposure of internal SQL classes
- excludePackage("org.apache.spark.sql.parquet"),
- excludePackage("org.apache.spark.sql.json"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
- ) ++ Seq(
- // SPARK-4751 Dynamic allocation for standalone mode
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.SparkContext.supportDynamicAllocation")
- ) ++ Seq(
- // SPARK-9580: Remove SQL test singletons
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.test.LocalSQLContext$SQLSession"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.test.LocalSQLContext"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.test.TestSQLContext"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.test.TestSQLContext$")
- ) ++ Seq(
- // SPARK-9704 Made ProbabilisticClassifier, Identifiable, VectorUDT public APIs
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.mllib.linalg.VectorUDT.serialize")
- )
+ def excludes(version: String) = version match {
+ case v if v.startsWith("1.6") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("network")
+ )
+ case v if v.startsWith("1.5") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("network"),
+ MimaBuild.excludeSparkPackage("deploy"),
+ // These are needed if checking against the sbt build, since they are part of
+ // the maven-generated artifacts in 1.3.
+ excludePackage("org.spark-project.jetty"),
+ MimaBuild.excludeSparkPackage("unused"),
+ // JavaRDDLike is not meant to be extended by user programs
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.partitioner"),
+ // Modification of private static method
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.streaming.kafka.KafkaUtils.org$apache$spark$streaming$kafka$KafkaUtils$$leadersForRanges"),
+ // Mima false positive (was a private[spark] class)
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.util.collection.PairIterator"),
+ // Removing a testing method from a private class
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
+ // While private MiMa is still not happy about the changes,
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.ml.regression.LeastSquaresAggregator.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.ml.regression.LeastSquaresCostFun.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.ml.classification.LogisticCostFun.this"),
+ // SQL execution is considered private.
+ excludePackage("org.apache.spark.sql.execution"),
+ // The old JSON RDD is removed in favor of streaming Jackson
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
+ // local function inside a method
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.sql.UDFRegistration.org$apache$spark$sql$UDFRegistration$$builder$24")
+ ) ++ Seq(
+ // SPARK-8479 Add numNonzeros and numActives to Matrix.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.numNonzeros"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.numActives")
+ ) ++ Seq(
+ // SPARK-8914 Remove RDDApi
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RDDApi")
+ ) ++ Seq(
+ // SPARK-7292 Provide operator to truncate lineage cheaply
+ ProblemFilters.exclude[AbstractClassProblem](
+ "org.apache.spark.rdd.RDDCheckpointData"),
+ ProblemFilters.exclude[AbstractClassProblem](
+ "org.apache.spark.rdd.CheckpointRDD")
+ ) ++ Seq(
+ // SPARK-8701 Add input metadata in the batch page.
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.streaming.scheduler.InputInfo$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.streaming.scheduler.InputInfo")
+ ) ++ Seq(
+ // SPARK-6797 Support YARN modes for SparkR
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.PairwiseRRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.RRDD.createRWorker"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.RRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.StringRRDD.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.r.BaseRRDD.this")
+ ) ++ Seq(
+ // SPARK-7422 add argmax for sparse vectors
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.argmax")
+ ) ++ Seq(
+ // SPARK-8906 Move all internal data source classes into execution.datasources
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.BaseWriterContainer"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
+ // SPARK-9763 Minimize exposure of internal SQL classes
+ excludePackage("org.apache.spark.sql.parquet"),
+ excludePackage("org.apache.spark.sql.json"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
+ ) ++ Seq(
+ // SPARK-4751 Dynamic allocation for standalone mode
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.SparkContext.supportDynamicAllocation")
+ ) ++ Seq(
+ // SPARK-9580: Remove SQL test singletons
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.test.LocalSQLContext$SQLSession"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.test.LocalSQLContext"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.test.TestSQLContext"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.test.TestSQLContext$")
+ ) ++ Seq(
+ // SPARK-9704 Made ProbabilisticClassifier, Identifiable, VectorUDT public APIs
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.mllib.linalg.VectorUDT.serialize")
+ )
- case v if v.startsWith("1.4") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("ml"),
- // SPARK-7910 Adding a method to get the partioner to JavaRDD,
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner"),
- // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff"),
- // These are needed if checking against the sbt build, since they are part of
- // the maven-generated artifacts in 1.3.
- excludePackage("org.spark-project.jetty"),
- MimaBuild.excludeSparkPackage("unused"),
- ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.rdd.JdbcRDD.compute"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint")
- ) ++ Seq(
- // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
- // the stage class is defined as private[spark]
- ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.scheduler.Stage")
- ) ++ Seq(
- // SPARK-6510 Add a Graph#minus method acting as Set#difference
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
- ) ++ Seq(
- // SPARK-6492 Fix deadlock in SparkContext.stop()
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
- "apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
- )++ Seq(
- // SPARK-6693 add tostring with max lines and width for matrix
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.toString")
- )++ Seq(
- // SPARK-6703 Add getOrCreate method to SparkContext
- ProblemFilters.exclude[IncompatibleResultTypeProblem]
- ("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
- )++ Seq(
- // SPARK-7090 Introduce LDAOptimizer to LDA to further improve extensibility
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.mllib.clustering.LDA$EMOptimizer")
- ) ++ Seq(
- // SPARK-6756 add toSparse, toDense, numActives, numNonzeros, and compressed to Vector
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.compressed"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.toDense"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.numNonzeros"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.toSparse"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Vector.numActives"),
- // SPARK-7681 add SparseVector support for gemv
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.multiply"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.DenseMatrix.multiply"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.SparseMatrix.multiply")
- ) ++ Seq(
- // Execution should never be included as its always internal.
- MimaBuild.excludeSparkPackage("sql.execution"),
- // This `protected[sql]` method was removed in 1.3.1
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.sql.SQLContext.checkAnalysis"),
- // These `private[sql]` class were removed in 1.4.0:
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.execution.AddExchange"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.execution.AddExchange$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.PartitionSpec"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.PartitionSpec$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.Partition"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.Partition$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetRelation2"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetRelation2$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache"),
- // These test support classes were moved out of src/main and into src/test:
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetTestData"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetTestData$"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.TestGroupWriteSupport"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager"),
- // TODO: Remove the following rule once ParquetTest has been moved to src/test.
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.sql.parquet.ParquetTest")
- ) ++ Seq(
- // SPARK-7530 Added StreamingContext.getState()
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.StreamingContext.state_=")
- ) ++ Seq(
- // SPARK-7081 changed ShuffleWriter from a trait to an abstract class and removed some
- // unnecessary type bounds in order to fix some compiler warnings that occurred when
- // implementing this interface in Java. Note that ShuffleWriter is private[spark].
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.shuffle.ShuffleWriter")
- ) ++ Seq(
- // SPARK-6888 make jdbc driver handling user definable
- // This patch renames some classes to API friendly names.
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DriverQuirks$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DriverQuirks"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.PostgresQuirks"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.NoQuirks"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.MySQLQuirks")
- )
+ case v if v.startsWith("1.4") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("ml"),
+ // SPARK-7910 Adding a method to get the partioner to JavaRDD,
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner"),
+ // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff"),
+ // These are needed if checking against the sbt build, since they are part of
+ // the maven-generated artifacts in 1.3.
+ excludePackage("org.spark-project.jetty"),
+ MimaBuild.excludeSparkPackage("unused"),
+ ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.rdd.JdbcRDD.compute"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint")
+ ) ++ Seq(
+ // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
+ // the stage class is defined as private[spark]
+ ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.scheduler.Stage")
+ ) ++ Seq(
+ // SPARK-6510 Add a Graph#minus method acting as Set#difference
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
+ ) ++ Seq(
+ // SPARK-6492 Fix deadlock in SparkContext.stop()
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
+ "apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
+ )++ Seq(
+ // SPARK-6693 add tostring with max lines and width for matrix
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.toString")
+ )++ Seq(
+ // SPARK-6703 Add getOrCreate method to SparkContext
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]
+ ("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
+ )++ Seq(
+ // SPARK-7090 Introduce LDAOptimizer to LDA to further improve extensibility
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.mllib.clustering.LDA$EMOptimizer")
+ ) ++ Seq(
+ // SPARK-6756 add toSparse, toDense, numActives, numNonzeros, and compressed to Vector
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.compressed"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.toDense"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.numNonzeros"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.toSparse"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Vector.numActives"),
+ // SPARK-7681 add SparseVector support for gemv
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.multiply"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.DenseMatrix.multiply"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.SparseMatrix.multiply")
+ ) ++ Seq(
+ // Execution should never be included as its always internal.
+ MimaBuild.excludeSparkPackage("sql.execution"),
+ // This `protected[sql]` method was removed in 1.3.1
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.sql.SQLContext.checkAnalysis"),
+ // These `private[sql]` class were removed in 1.4.0:
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.execution.AddExchange"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.execution.AddExchange$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.PartitionSpec"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.PartitionSpec$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.Partition"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.Partition$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetRelation2$PartitionValues$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetRelation2"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetRelation2$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache"),
+ // These test support classes were moved out of src/main and into src/test:
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetTestData"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetTestData$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.TestGroupWriteSupport"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager"),
+ // TODO: Remove the following rule once ParquetTest has been moved to src/test.
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.parquet.ParquetTest")
+ ) ++ Seq(
+ // SPARK-7530 Added StreamingContext.getState()
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.StreamingContext.state_=")
+ ) ++ Seq(
+ // SPARK-7081 changed ShuffleWriter from a trait to an abstract class and removed some
+ // unnecessary type bounds in order to fix some compiler warnings that occurred when
+ // implementing this interface in Java. Note that ShuffleWriter is private[spark].
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.shuffle.ShuffleWriter")
+ ) ++ Seq(
+ // SPARK-6888 make jdbc driver handling user definable
+ // This patch renames some classes to API friendly names.
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DriverQuirks$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DriverQuirks"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.PostgresQuirks"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.NoQuirks"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.MySQLQuirks")
+ )
- case v if v.startsWith("1.3") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("ml"),
- // These are needed if checking against the sbt build, since they are part of
- // the maven-generated artifacts in the 1.2 build.
- MimaBuild.excludeSparkPackage("unused"),
- ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional")
- ) ++ Seq(
- // SPARK-2321
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.SparkStageInfoImpl.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.SparkStageInfo.submissionTime")
- ) ++ Seq(
- // SPARK-4614
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrices.randn"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrices.rand")
- ) ++ Seq(
- // SPARK-5321
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.SparseMatrix.transposeMultiply"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.transpose"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.DenseMatrix.transposeMultiply"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix." +
- "org$apache$spark$mllib$linalg$Matrix$_setter_$isTransposed_="),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.isTransposed"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.linalg.Matrix.foreachActive")
- ) ++ Seq(
- // SPARK-5540
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.solveLeastSquares"),
- // SPARK-5536
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateBlock")
- ) ++ Seq(
- // SPARK-3325
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.api.java.JavaDStreamLike.print"),
- // SPARK-2757
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +
- "removeAndGetProcessor")
- ) ++ Seq(
- // SPARK-5123 (SparkSQL data type change) - alpha component only
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.ml.feature.HashingTF.outputDataType"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.ml.feature.Tokenizer.outputDataType"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.ml.feature.Tokenizer.validateInputType"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.ml.classification.LogisticRegression.validateAndTransformSchema")
- ) ++ Seq(
- // SPARK-4014
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.TaskContext.taskAttemptId"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.TaskContext.attemptNumber")
- ) ++ Seq(
- // SPARK-5166 Spark SQL API stabilization
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Transformer.transform"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Pipeline.fit"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.PipelineModel.transform"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Estimator.fit"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Evaluator.evaluate"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Evaluator.evaluate"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidator.fit"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidatorModel.transform"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScaler.fit"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScalerModel.transform"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.transform"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegression.fit"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate")
- ) ++ Seq(
- // SPARK-5270
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.isEmpty")
- ) ++ Seq(
- // SPARK-5430
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.treeReduce"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.treeAggregate")
- ) ++ Seq(
- // SPARK-5297 Java FileStream do not work with custom key/values
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
- ) ++ Seq(
- // SPARK-5315 Spark Streaming Java API returns Scala DStream
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
- ) ++ Seq(
- // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.graphx.Graph.getCheckpointFiles"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.graphx.Graph.isCheckpointed")
- ) ++ Seq(
- // SPARK-4789 Standardize ML Prediction APIs
- ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.linalg.VectorUDT"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
- ) ++ Seq(
- // SPARK-5814
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$wrapDoubleArray"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$fillFullMatrix"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$iterations"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$computeYtY"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeLinkRDDs"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$alpha"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$randomFactor"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$dspr"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$lambda"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$implicitPrefs"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$rank")
- ) ++ Seq(
- // SPARK-4682
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
- ) ++ Seq(
- // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
- )
+ case v if v.startsWith("1.3") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("ml"),
+ // These are needed if checking against the sbt build, since they are part of
+ // the maven-generated artifacts in the 1.2 build.
+ MimaBuild.excludeSparkPackage("unused"),
+ ProblemFilters.exclude[MissingClassProblem]("com.google.common.base.Optional")
+ ) ++ Seq(
+ // SPARK-2321
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.SparkStageInfoImpl.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.SparkStageInfo.submissionTime")
+ ) ++ Seq(
+ // SPARK-4614
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrices.randn"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrices.rand")
+ ) ++ Seq(
+ // SPARK-5321
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.SparseMatrix.transposeMultiply"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.transpose"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.DenseMatrix.transposeMultiply"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Matrix." +
+ "org$apache$spark$mllib$linalg$Matrix$_setter_$isTransposed_="),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.isTransposed"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.linalg.Matrix.foreachActive")
+ ) ++ Seq(
+ // SPARK-5540
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.solveLeastSquares"),
+ // SPARK-5536
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateBlock")
+ ) ++ Seq(
+ // SPARK-3325
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.api.java.JavaDStreamLike.print"),
+ // SPARK-2757
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler." +
+ "removeAndGetProcessor")
+ ) ++ Seq(
+ // SPARK-5123 (SparkSQL data type change) - alpha component only
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.ml.feature.HashingTF.outputDataType"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.ml.feature.Tokenizer.outputDataType"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.ml.feature.Tokenizer.validateInputType"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.ml.classification.LogisticRegressionModel.validateAndTransformSchema"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.ml.classification.LogisticRegression.validateAndTransformSchema")
+ ) ++ Seq(
+ // SPARK-4014
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.TaskContext.taskAttemptId"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.TaskContext.attemptNumber")
+ ) ++ Seq(
+ // SPARK-5166 Spark SQL API stabilization
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Transformer.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Estimator.fit"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Transformer.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Pipeline.fit"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.PipelineModel.transform"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Estimator.fit"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.Evaluator.evaluate"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.Evaluator.evaluate"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidator.fit"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.tuning.CrossValidatorModel.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScaler.fit"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.feature.StandardScalerModel.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionModel.transform"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.LogisticRegression.fit"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.evaluate")
+ ) ++ Seq(
+ // SPARK-5270
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.isEmpty")
+ ) ++ Seq(
+ // SPARK-5430
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.treeReduce"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.treeAggregate")
+ ) ++ Seq(
+ // SPARK-5297 Java FileStream do not work with custom key/values
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.api.java.JavaStreamingContext.fileStream")
+ ) ++ Seq(
+ // SPARK-5315 Spark Streaming Java API returns Scala DStream
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.api.java.JavaDStreamLike.reduceByWindow")
+ ) ++ Seq(
+ // SPARK-5461 Graph should have isCheckpointed, getCheckpointFiles methods
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.graphx.Graph.getCheckpointFiles"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.graphx.Graph.isCheckpointed")
+ ) ++ Seq(
+ // SPARK-4789 Standardize ML Prediction APIs
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.linalg.VectorUDT"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
+ ) ++ Seq(
+ // SPARK-5814
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$wrapDoubleArray"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$fillFullMatrix"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$iterations"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$computeYtY"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeLinkRDDs"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$alpha"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$randomFactor"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$dspr"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$lambda"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$implicitPrefs"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$rank")
+ ) ++ Seq(
+ // SPARK-4682
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
+ ) ++ Seq(
+ // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
+ )
- case v if v.startsWith("1.2") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("graphx")
- ) ++
- MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
- MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
- Seq(
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.scheduler.TaskLocation"),
- // Added normL1 and normL2 to trait MultivariateStatisticalSummary
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
- // MapStatus should be private[spark]
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.scheduler.MapStatus"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.network.netty.PathResolver"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.spark.network.netty.client.BlockClientListener"),
+ case v if v.startsWith("1.2") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("graphx")
+ ) ++
+ MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
+ MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
+ Seq(
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.scheduler.TaskLocation"),
+ // Added normL1 and normL2 to trait MultivariateStatisticalSummary
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2"),
+ // MapStatus should be private[spark]
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.scheduler.MapStatus"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.network.netty.PathResolver"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.network.netty.client.BlockClientListener"),
- // TaskContext was promoted to Abstract class
- ProblemFilters.exclude[AbstractClassProblem](
- "org.apache.spark.TaskContext"),
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.util.collection.SortDataFormat")
- ) ++ Seq(
- // Adding new methods to the JavaRDDLike trait:
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.takeAsync"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.countAsync"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.collectAsync")
- ) ++ Seq(
- // SPARK-3822
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
- ) ++ Seq(
- // SPARK-1209
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
- ProblemFilters.exclude[MissingClassProblem](
- "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
- ProblemFilters.exclude[MissingTypesProblem](
- "org.apache.spark.rdd.PairRDDFunctions")
- ) ++ Seq(
- // SPARK-4062
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this")
- )
+ // TaskContext was promoted to Abstract class
+ ProblemFilters.exclude[AbstractClassProblem](
+ "org.apache.spark.TaskContext"),
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.util.collection.SortDataFormat")
+ ) ++ Seq(
+ // Adding new methods to the JavaRDDLike trait:
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.takeAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachPartitionAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.countAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.collectAsync")
+ ) ++ Seq(
+ // SPARK-3822
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
+ ) ++ Seq(
+ // SPARK-1209
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
+ ProblemFilters.exclude[MissingTypesProblem](
+ "org.apache.spark.rdd.PairRDDFunctions")
+ ) ++ Seq(
+ // SPARK-4062
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this")
+ )
- case v if v.startsWith("1.1") =>
- Seq(
- MimaBuild.excludeSparkPackage("deploy"),
- MimaBuild.excludeSparkPackage("graphx")
- ) ++
- Seq(
- // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
- // Should probably mark this as Experimental
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
- // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
- // for countApproxDistinct* functions, which does not work in Java. We later removed
- // them, and use the following to tell Mima to not care about them.
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.DiskStore.getValues"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.MemoryStore.Entry")
- ) ++
- Seq(
- // Serializer interface change. See SPARK-3045.
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.serializer.DeserializationStream"),
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.serializer.Serializer"),
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.serializer.SerializationStream"),
- ProblemFilters.exclude[IncompatibleTemplateDefProblem](
- "org.apache.spark.serializer.SerializerInstance")
- )++
- Seq(
- // Renamed putValues -> putArray + putIterator
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.MemoryStore.putValues"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.DiskStore.putValues"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.storage.TachyonStore.putValues")
- ) ++
- Seq(
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.streaming.flume.FlumeReceiver.this"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.streaming.kafka.KafkaReceiver.this")
- ) ++
- Seq( // Ignore some private methods in ALS.
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
- ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
- "org.apache.spark.mllib.recommendation.ALS.this"),
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures")
- ) ++
- MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
- MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
- MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
- MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
- MimaBuild.excludeSparkClass("storage.Values") ++
- MimaBuild.excludeSparkClass("storage.Entry") ++
- MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
- // Class was missing "@DeveloperApi" annotation in 1.0.
- MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
- Seq(
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.mllib.tree.impurity.Gini.calculate"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.mllib.tree.impurity.Entropy.calculate"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem](
- "org.apache.spark.mllib.tree.impurity.Variance.calculate")
- ) ++
- Seq( // Package-private classes removed in SPARK-2341
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
- ) ++
- Seq( // package-private classes removed in MLlib
- ProblemFilters.exclude[MissingMethodProblem](
- "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")
- ) ++
- Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement Vector)
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy")
- ) ++
- Seq( // synthetic methods generated in LabeledPoint
- ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"),
- ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString")
- ) ++
- Seq ( // Scala 2.11 compatibility fix
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2")
- )
- case v if v.startsWith("1.0") =>
- Seq(
- MimaBuild.excludeSparkPackage("api.java"),
- MimaBuild.excludeSparkPackage("mllib"),
- MimaBuild.excludeSparkPackage("streaming")
- ) ++
- MimaBuild.excludeSparkClass("rdd.ClassTags") ++
- MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
- MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
- MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
- MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
- MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
- MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
- MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
- MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
- MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
- MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
- MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
- MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
- case _ => Seq()
- }
-}
+ case v if v.startsWith("1.1") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("deploy"),
+ MimaBuild.excludeSparkPackage("graphx")
+ ) ++
+ Seq(
+ // Adding new method to JavaRDLike trait - we should probably mark this as a developer API.
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitions"),
+ // Should probably mark this as Experimental
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.foreachAsync"),
+ // We made a mistake earlier (ed06500d3) in the Java API to use default parameter values
+ // for countApproxDistinct* functions, which does not work in Java. We later removed
+ // them, and use the following to tell Mima to not care about them.
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaPairRDD.countApproxDistinctByKey$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaRDDLike.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.DiskStore.getValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.Entry")
+ ) ++
+ Seq(
+ // Serializer interface change. See SPARK-3045.
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.serializer.DeserializationStream"),
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.serializer.Serializer"),
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.serializer.SerializationStream"),
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+ "org.apache.spark.serializer.SerializerInstance")
+ )++
+ Seq(
+ // Renamed putValues -> putArray + putIterator
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.DiskStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.TachyonStore.putValues")
+ ) ++
+ Seq(
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.flume.FlumeReceiver.this"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.streaming.kafka.KafkaUtils.createStream"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.streaming.kafka.KafkaReceiver.this")
+ ) ++
+ Seq( // Ignore some private methods in ALS.
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
+ ProblemFilters.exclude[MissingMethodProblem]( // The only public constructor is the one without arguments.
+ "org.apache.spark.mllib.recommendation.ALS.this"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$<init>$default$7"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures")
+ ) ++
+ MimaBuild.excludeSparkClass("mllib.linalg.distributed.ColumnStatisticsAggregator") ++
+ MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
+ MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
+ MimaBuild.excludeSparkClass("util.SerializableHyperLogLog") ++
+ MimaBuild.excludeSparkClass("storage.Values") ++
+ MimaBuild.excludeSparkClass("storage.Entry") ++
+ MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
+ // Class was missing "@DeveloperApi" annotation in 1.0.
+ MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
+ Seq(
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.mllib.tree.impurity.Gini.calculate"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.mllib.tree.impurity.Entropy.calculate"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem](
+ "org.apache.spark.mllib.tree.impurity.Variance.calculate")
+ ) ++
+ Seq( // Package-private classes removed in SPARK-2341
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
+ ) ++
+ Seq( // package-private classes removed in MLlib
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")
+ ) ++
+ Seq( // new Vector methods in MLlib (binary compatible assuming users do not implement Vector)
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.linalg.Vector.copy")
+ ) ++
+ Seq( // synthetic methods generated in LabeledPoint
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.mllib.regression.LabeledPoint$"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.regression.LabeledPoint.apply"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.regression.LabeledPoint.toString")
+ ) ++
+ Seq ( // Scala 2.11 compatibility fix
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.<init>$default$2")
+ )
+ case v if v.startsWith("1.0") =>
+ Seq(
+ MimaBuild.excludeSparkPackage("api.java"),
+ MimaBuild.excludeSparkPackage("mllib"),
+ MimaBuild.excludeSparkPackage("streaming")
+ ) ++
+ MimaBuild.excludeSparkClass("rdd.ClassTags") ++
+ MimaBuild.excludeSparkClass("util.XORShiftRandom") ++
+ MimaBuild.excludeSparkClass("graphx.EdgeRDD") ++
+ MimaBuild.excludeSparkClass("graphx.VertexRDD") ++
+ MimaBuild.excludeSparkClass("graphx.impl.GraphImpl") ++
+ MimaBuild.excludeSparkClass("graphx.impl.RoutingTable") ++
+ MimaBuild.excludeSparkClass("graphx.util.collection.PrimitiveKeyOpenHashMap") ++
+ MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap") ++
+ MimaBuild.excludeSparkClass("mllib.recommendation.MFDataGenerator") ++
+ MimaBuild.excludeSparkClass("mllib.optimization.SquaredGradient") ++
+ MimaBuild.excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++
+ MimaBuild.excludeSparkClass("mllib.regression.LassoWithSGD") ++
+ MimaBuild.excludeSparkClass("mllib.regression.LinearRegressionWithSGD")
+ case _ => Seq()
+ }
+} \ No newline at end of file
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 6c0c926755..13cfe29d7b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
@@ -47,7 +47,9 @@ class ReceivedBlockHandlerSuite
with Matchers
with Logging {
- val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
+ val conf = new SparkConf()
+ .set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
+ .set("spark.app.id", "streaming-test")
val hadoopConf = new Configuration()
val streamId = 1
val securityMgr = new SecurityManager(conf)
@@ -184,7 +186,7 @@ class ReceivedBlockHandlerSuite
}
test("Test Block - isFullyConsumed") {
- val sparkConf = new SparkConf()
+ val sparkConf = new SparkConf().set("spark.app.id", "streaming-test")
sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
// spark.storage.unrollFraction set to 0.4 for BlockManager
sparkConf.set("spark.storage.unrollFraction", "0.4")
@@ -251,7 +253,7 @@ class ReceivedBlockHandlerSuite
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- val transfer = new NioBlockTransferService(conf, securityMgr)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")