diff options
author | Raymond Liu <raymond.liu@intel.com> | 2013-11-14 23:04:55 -0800 |
---|---|---|
committer | Raymond Liu <raymond.liu@intel.com> | 2013-11-14 23:04:55 -0800 |
commit | f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc (patch) | |
tree | b35338d51edcc288537b1891a10c64f6de42d0e9 /streaming | |
parent | d4cd32330e1e4ac83b38bc922a9d3fd85f85f606 (diff) | |
parent | ce1d2af7e4ddd043b59b3710c61ca5ada4130be6 (diff) | |
download | spark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.tar.gz spark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.tar.bz2 spark-f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc.zip |
Merge pull request #1 from aarondav/scala210-master
Various merge corrections
Diffstat (limited to 'streaming')
5 files changed, 2 insertions, 63 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala index 66fe6e7870..6e9a781978 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala @@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.Queue +import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import scala.concurrent.duration._ import akka.dispatch._ import org.apache.spark.storage.BlockId diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala index ea5c165691..80af96c060 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.conf.Configuration -import scala.Some class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) extends Serializable { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 3ba37bed4d..dfd6e27c3e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } object JavaPairDStream { - implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = { + implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = { new JavaPairDStream[K, V](dstream) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala deleted file mode 100644 index 16c1567355..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.dstream - -import org.apache.spark.Partitioner -import org.apache.spark.rdd.RDD -import org.apache.spark.rdd.CoGroupedRDD -import org.apache.spark.streaming.{Time, DStream, Duration} -import scala.reflect.ClassTag - -private[streaming] -class CoGroupedDStream[K : ClassTag]( - parents: Seq[DStream[(K, _)]], - partitioner: Partitioner - ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) { - - if (parents.length == 0) { - throw new IllegalArgumentException("Empty array of parents") - } - - if (parents.map(_.ssc).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different StreamingContexts") - } - - if (parents.map(_.slideDuration).distinct.size > 1) { - throw new IllegalArgumentException("Array of parents have different slide times") - } - - override def dependencies = parents.toList - - override def slideDuration: Duration = parents.head.slideDuration - - override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = { - val part = partitioner - val rdds = parents.flatMap(_.getOrCompute(validTime)) - if (rdds.size > 0) { - val q = new CoGroupedRDD[K](rdds, part) - Some(q) - } else { - None - } - } - -} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala index ec0096c85f..526f5564c7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala @@ -33,7 +33,6 @@ import org.I0Itec.zkclient._ import scala.collection.Map import scala.reflect.ClassTag - /** * Input stream that pulls messages from a Kafka Broker. * |