diff options
3 files changed, 8 insertions, 55 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 41898b9228..96f57cbcc8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -25,6 +25,14 @@ import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger import java.util.UUID +import akka.actor.Props +import akka.actor.SupervisorStrategy +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.fs.Path + import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -33,14 +41,6 @@ import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers._ import org.apache.spark.streaming.scheduler._ -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path - -import akka.actor.Props -import akka.actor.SupervisorStrategy /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -322,26 +322,6 @@ class StreamingContext private ( fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } - /* - /** - * Create a input stream that returns tweets received from Twitter. - * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's default OAuth - * authorization; this uses the system properties twitter4j.oauth.consumerKey, - * .consumerSecret, .accessToken and .accessTokenSecret. - * @param filters Set of filter strings to get only those tweets that match them - * @param storageLevel Storage level to use for storing the received objects - */ - def twitterStream( - twitterAuth: Option[Authorization] = None, - filters: Seq[String] = Nil, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): DStream[Status] = { - val inputStream = new TwitterInputDStream(this, twitterAuth, filters, storageLevel) - registerInputStream(inputStream) - inputStream - } - */ - /** * Create an input stream from a queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 2734393ae9..1cd0b9b0a4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -33,7 +33,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val ssc = jobScheduler.ssc val clockClass = System.getProperty( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") - logInfo("Using clock class = " + clockClass) val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock] val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => generateJobs(new Time(longTime))) diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index f4d26c0be6..62187957d6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -1600,30 +1600,4 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa public void testRawSocketStream() { JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } - /* - @Test - public void testFileStream() { - JavaPairDStream<String, String> foo = ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); - } - - @Test - public void testTwitterStream() { - String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testActorStream() { - JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); - } - - @Test - public void testZeroMQStream() { - JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { - @Override - public Iterable<String> call(byte[][] b) throws Exception { - return null; - } - }); - } */ } |