diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-12 22:35:14 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-12 22:35:14 -0800 |
commit | e6ed13f255d70de422711b979447690cdab7423b (patch) | |
tree | ac4774bf41c478fce612477dff77933845022802 /streaming | |
parent | 0b96d85c2063bd2864b5753496551c6cf2f9047a (diff) | |
parent | 0bb33076e2c12947edc91ff61f98e4b72d881ec3 (diff) | |
download | spark-e6ed13f255d70de422711b979447690cdab7423b.tar.gz spark-e6ed13f255d70de422711b979447690cdab7423b.tar.bz2 spark-e6ed13f255d70de422711b979447690cdab7423b.zip |
Merge pull request #397 from pwendell/host-port
Remove now un-needed hostPort option
I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
Diffstat (limited to 'streaming')
5 files changed, 1 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 44e396e1cd..5046a1d53f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -46,7 +46,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) // These should be unset when a checkpoint is deserialized, // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") + sparkConf.remove("spark.driver.host").remove("spark.driver.port") def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 162b19d7f0..592e84791b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -186,7 +186,6 @@ object MasterFailureTest extends Logging { // Setup the streaming computation with the given operation System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil, Map()) ssc.checkpoint(checkpointDir.toString) val inputStream = ssc.textFileStream(testDir.toString) @@ -233,7 +232,6 @@ object MasterFailureTest extends Logging { // (iii) Its not timed out yet System.clearProperty("spark.streaming.clock") System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") ssc.start() val startTime = System.currentTimeMillis() while (!killed && !isLastOutputGenerated && !isTimedOut) { diff --git a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java index 34bee56885..849bbf1299 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java +++ b/streaming/src/test/java/org/apache/spark/streaming/LocalJavaStreamingContext.java @@ -28,7 +28,6 @@ public abstract class LocalJavaStreamingContext { @Before public void setUp() { System.clearProperty("spark.driver.port"); - System.clearProperty("spark.hostPort"); System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock"); ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); ssc.checkpoint("checkpoint"); @@ -41,6 +40,5 @@ public abstract class LocalJavaStreamingContext { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port"); - System.clearProperty("spark.hostPort"); } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 9590bca989..67ce5bc566 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -358,7 +358,6 @@ class CheckpointSuite extends TestSuiteBase { ) ssc = new StreamingContext(checkpointDir) System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") ssc.start() val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches) // the first element will be re-processed data of the last batch before restart diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 63a07cfbdf..9b2bb57e77 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -156,7 +156,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { def afterFunction() { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") - System.clearProperty("spark.hostPort") } before(beforeFunction) |