aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorRaymond Liu <raymond.liu@intel.com>2013-11-14 23:04:55 -0800
committerRaymond Liu <raymond.liu@intel.com>2013-11-14 23:04:55 -0800
commitf6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc (patch)
treeb35338d51edcc288537b1891a10c64f6de42d0e9 /streaming
parentd4cd32330e1e4ac83b38bc922a9d3fd85f85f606 (diff)
parentce1d2af7e4ddd043b59b3710c61ca5ada4130be6 (diff)
downloadspark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.tar.gz
spark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.tar.bz2
spark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.zip
Merge pull request #1 from aarondav/scala210-master
Various merge corrections
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala1
5 files changed, 2 insertions, 63 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index 66fe6e7870..6e9a781978 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import scala.concurrent.duration._
import akka.dispatch._
import org.apache.spark.storage.BlockId
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index ea5c165691..80af96c060 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
-import scala.Some
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 3ba37bed4d..dfd6e27c3e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = {
+ implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = {
new JavaPairDStream[K, V](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 16c1567355..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-import scala.reflect.ClassTag
-
-private[streaming]
-class CoGroupedDStream[K : ClassTag](
- parents: Seq[DStream[(K, _)]],
- partitioner: Partitioner
- ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideDuration).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideDuration: Duration = parents.head.slideDuration
-
- override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
- val part = partitioner
- val rdds = parents.flatMap(_.getOrCompute(validTime))
- if (rdds.size > 0) {
- val q = new CoGroupedRDD[K](rdds, part)
- Some(q)
- } else {
- None
- }
- }
-
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index ec0096c85f..526f5564c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -33,7 +33,6 @@ import org.I0Itec.zkclient._
import scala.collection.Map
import scala.reflect.ClassTag
-
/**
* Input stream that pulls messages from a Kafka Broker.
*