aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-31 00:38:19 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-31 00:38:19 -0800
commit3ab297adaae153cd76c9893ab62e0a815ec21fa4 (patch)
treea97aee8c96abf1f51c1b5d2c23f31cd222f66eb2 /streaming
parent97630849ff2cdaa2ff8a115c3e8e6ca8dba7477d (diff)
downloadspark-3ab297adaae153cd76c9893ab62e0a815ec21fa4.tar.gz
spark-3ab297adaae153cd76c9893ab62e0a815ec21fa4.tar.bz2
spark-3ab297adaae153cd76c9893ab62e0a815ec21fa4.zip
Removed unnecessary comments.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala36
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala1
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java26
3 files changed, 8 insertions, 55 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 41898b9228..96f57cbcc8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -25,6 +25,14 @@ import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
import java.util.UUID
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.fs.Path
+
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -33,14 +41,6 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-import org.apache.hadoop.fs.Path
-
-import akka.actor.Props
-import akka.actor.SupervisorStrategy
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -322,26 +322,6 @@ class StreamingContext private (
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
- /*
- /**
- * Create a input stream that returns tweets received from Twitter.
- * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth
- * authorization; this uses the system properties twitter4j.oauth.consumerKey,
- * .consumerSecret, .accessToken and .accessTokenSecret.
- * @param filters Set of filter strings to get only those tweets that match them
- * @param storageLevel Storage level to use for storing the received objects
- */
- def twitterStream(
- twitterAuth: Option[Authorization] = None,
- filters: Seq[String] = Nil,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- ): DStream[Status] = {
- val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel)
- registerInputStream(inputStream)
- inputStream
- }
- */
-
/**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 2734393ae9..1cd0b9b0a4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -33,7 +33,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val ssc = jobScheduler.ssc
val clockClass = System.getProperty(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
- logInfo("Using clock class = " + clockClass)
val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]
val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => generateJobs(new Time(longTime)))
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index f4d26c0be6..62187957d6 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1600,30 +1600,4 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
public void testRawSocketStream() {
JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
- /*
- @Test
- public void testFileStream() {
- JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
- }
-
- @Test
- public void testTwitterStream() {
- String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
- }
-
- @Test
- public void testActorStream() {
- JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
- }
-
- @Test
- public void testZeroMQStream() {
- JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
- @Override
- public Iterable<String> call(byte[][] b) throws Exception {
- return null;
- }
- });
- } */
}