aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAnand Avati <avati@redhat.com>2014-10-10 00:46:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-10 00:46:56 -0700
commit411cf29fff011561f0093bb6101af87842828369 (patch)
tree3f931b9a0c929ff45c8e3a6329422780f10b6130 /streaming
parent6f98902a3d7749e543bc493a8c62b1e3a7b924cc (diff)
downloadspark-411cf29fff011561f0093bb6101af87842828369.tar.gz
spark-411cf29fff011561f0093bb6101af87842828369.tar.bz2
spark-411cf29fff011561f0093bb6101af87842828369.zip
[SPARK-2805] Upgrade Akka to 2.3.4
This is a second rev of the Akka upgrade (earlier merged, but reverted). I made a slight modification which is that I also upgrade Hive to deal with a compatibility issue related to the protocol buffers library. Author: Anand Avati <avati@redhat.com> Author: Patrick Wendell <pwendell@gmail.com> Closes #2752 from pwendell/akka-upgrade and squashes the following commits: 4c7ca3f [Patrick Wendell] Upgrading to new hive->protobuf version 57a2315 [Anand Avati] SPARK-1812: streaming - remove tests which depend on akka.actor.IO 2a551d3 [Anand Avati] SPARK-1812: core - upgrade to akka 2.3.4
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala71
1 files changed, 0 insertions, 71 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a44a45a3e9..fa04fa326e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -18,8 +18,6 @@
package org.apache.spark.streaming
import akka.actor.Actor
-import akka.actor.IO
-import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
@@ -143,59 +141,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
}
- // TODO: This test works in IntelliJ but not through SBT
- ignore("actor input stream") {
- // Start the server
- val testServer = new TestServer()
- val port = testServer.port
- testServer.start()
-
- // Set up the streaming context and input streams
- val ssc = new StreamingContext(conf, batchDuration)
- val networkStream = ssc.actorStream[String](Props(new TestActor(port)), "TestActor",
- // Had to pass the local value of port to prevent from closing over entire scope
- StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(networkStream, outputBuffer)
- def output = outputBuffer.flatMap(x => x)
- outputStream.register()
- ssc.start()
-
- // Feed data to the server to send to the network receiver
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- val input = 1 to 9
- val expectedOutput = input.map(x => x.toString)
- Thread.sleep(1000)
- for (i <- 0 until input.size) {
- testServer.send(input(i).toString)
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(1000)
- logInfo("Stopping server")
- testServer.stop()
- logInfo("Stopping context")
- ssc.stop()
-
- // Verify whether data received was as expected
- logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
- expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("--------------------------------")
-
- // Verify whether all the elements received are as expected
- // (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
- }
-
-
test("multi-thread receiver") {
// set up the test receiver
val numThreads = 10
@@ -377,22 +322,6 @@ class TestServer(portToBind: Int = 0) extends Logging {
def port = serverSocket.getLocalPort
}
-/** This is an actor for testing actor input stream */
-class TestActor(port: Int) extends Actor with ActorHelper {
-
- def bytesToString(byteString: ByteString) = byteString.utf8String
-
- override def preStart(): Unit = {
- @deprecated("suppress compile time deprecation warning", "1.0.0")
- val unit = IOManager(context.system).connect(new InetSocketAddress(port))
- }
-
- def receive = {
- case IO.Read(socket, bytes) =>
- store(bytesToString(bytes))
- }
-}
-
/** This is a receiver to test multiple threads inserting data using block generator */
class MultiThreadTestReceiver(numThreads: Int, numRecordsPerThread: Int)
extends Receiver[Int](StorageLevel.MEMORY_ONLY_SER) with Logging {