aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-11-14 22:13:09 -0800
committerAaron Davidson <aaron@databricks.com>2013-11-14 22:13:09 -0800
commitf629ba95b6a1a3508463bfdcb03efcfaa3327cb5 (patch)
tree74a4cbcd839471d8c4248c9615f5803fad28787d
parentd4cd32330e1e4ac83b38bc922a9d3fd85f85f606 (diff)
downloadspark-f629ba95b6a1a3508463bfdcb03efcfaa3327cb5.tar.gz
spark-f629ba95b6a1a3508463bfdcb03efcfaa3327cb5.tar.bz2
spark-f629ba95b6a1a3508463bfdcb03efcfaa3327cb5.zip
Various merge corrections
I've diff'd this patch against my own -- since they were both created independently, this means that two sets of eyes have gone over all the merge conflicts that were created, so I'm feeling significantly more confident in the resulting PR. @rxin has looked at the changes to the repl and is resoundingly confident that they are correct.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function.java2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function2.java2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala1
-rw-r--r--docs/hadoop-third-party-distributions.md4
-rw-r--r--project/SparkBuild.scala11
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala14
-rw-r--r--repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala1
25 files changed, 33 insertions, 135 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.java b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
index 49e661a376..537439ef53 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function.java
@@ -29,8 +29,6 @@ import java.io.Serializable;
* when mapping RDDs of other types.
*/
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
- public abstract R call(T t) throws Exception;
-
public ClassTag<R> returnType() {
return ClassTag$.MODULE$.apply(Object.class);
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
index cf77bb6b73..a2d1214fb4 100644
--- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function2.java
@@ -28,8 +28,6 @@ import java.io.Serializable;
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable {
- public abstract R call(T1 t1, T2 t2) throws Exception;
-
public ClassTag<R> returnType() {
return (ClassTag<R>) ClassTag$.MODULE$.apply(Object.class);
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 3953a3e178..572fc347df 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -145,11 +145,11 @@ private[spark] class Client(
markDisconnected()
case DisassociatedEvent(_, address, _) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logWarning("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logWarning("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
case StopClient =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index a74d7be4c9..67e6c5d66a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object ApplicationState
- extends Enumeration {
+private[spark] object ApplicationState extends Enumeration {
type ApplicationState = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0545ad185f..7db5097c2d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
-import scala.concurrent.duration.{ Duration, FiniteDuration }
+import scala.concurrent.duration.{Duration, FiniteDuration}
import akka.actor._
import akka.pattern.ask
@@ -41,16 +41,6 @@ import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
import org.apache.spark.deploy.DeployMessages.KillExecutor
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import scala.Some
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisteredApplication
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
-import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
-import org.apache.spark.deploy.DeployMessages.MasterStateResponse
-import org.apache.spark.deploy.DeployMessages.ExecutorAdded
-import org.apache.spark.deploy.DeployMessages.RegisterApplication
-import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
import akka.actor.Terminated
import akka.serialization.SerializationExtension
import java.util.concurrent.TimeUnit
@@ -571,7 +561,7 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
- val timeoutDuration : FiniteDuration = Duration.create(
+ val timeoutDuration: FiniteDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS)
implicit val timeout = Timeout(timeoutDuration)
val respFuture = actor ? RequestWebUIPort // ask pattern
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 98c57ca0b0..07189ac850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.Logging
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -34,19 +34,6 @@ import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import scala.Some
-import akka.remote.DisassociatedEvent
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
-import org.apache.spark.deploy.DeployMessages.WorkerSchedulerStateResponse
-import org.apache.spark.deploy.DeployMessages.MasterChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import akka.actor.Terminated
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -248,7 +235,7 @@ private[spark] class Worker(
}
}
- case DisassociatedEvent(_, _, _) =>
+ case DisassociatedEvent(_, address, _) if address == master.path.address =>
masterDisconnected()
case RequestWorkerState => {
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 73fa7d6b6a..50302fcca4 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -107,7 +107,6 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
-
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index de4540493a..0b0a60ee60 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -118,7 +118,11 @@ private[spark] class Executor(
}
}
- private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
+ // Akka's message frame size. If task result is bigger than this, we use the block manager
+ // to send the result back.
+ private val akkaFrameSize = {
+ env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
+ }
// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 44c5078621..d1c74a5063 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -21,9 +21,9 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
-import scala.reflect.ClassTag
/**
* A set of asynchronous RDD actions available through an implicit conversion.
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 63b9fe1478..424354ae16 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,9 +17,10 @@
package org.apache.spark.rdd
+import scala.reflect.ClassTag
+
import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.storage.{BlockId, BlockManager}
-import scala.reflect.ClassTag
private[spark] class BlockRDDPartition(val blockId: BlockId, idx: Int) extends Partition {
val index = idx
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 47e958b5e6..53f77a38f5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -52,7 +52,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* sources in HBase, or S3).
*
* @param sc The SparkContext to associate the RDD with.
- * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
+ * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7b4fc6b9be..fdea3f6f88 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -20,13 +20,14 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import scala.concurrent.duration._
-import scala.concurrent.ExecutionContext.Implicits.global
-import akka.actor._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
import scala.reflect.ClassTag
+import akka.actor._
+
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 2d8a0a62c9..9975ec1ab6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -25,8 +25,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
-import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 8de9b72b2f..84fe3094cc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend(
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+ totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7557ddab19..02adcb41c6 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -22,14 +22,11 @@ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
-
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.collection.Map
+import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
-import scala.Some
-
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
index 45849b3380..c26f23d500 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util.collection
import scala.reflect.ClassTag
-
/**
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md
index f706625fe9..b33af2cf24 100644
--- a/docs/hadoop-third-party-distributions.md
+++ b/docs/hadoop-third-party-distributions.md
@@ -25,8 +25,8 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
<h3>CDH Releases</h3>
<table class="table" style="width:350px; margin-right: 20px;">
<tr><th>Release</th><th>Version code</th></tr>
- <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-chd4.X.X</td></tr>
- <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-chd4.X.X</td></tr>
+ <tr><td>CDH 4.X.X (YARN mode)</td><td>2.0.0-cdh4.X.X</td></tr>
+ <tr><td>CDH 4.X.X</td><td>2.0.0-mr1-cdh4.X.X</td></tr>
<tr><td>CDH 3u6</td><td>0.20.2-cdh3u6</td></tr>
<tr><td>CDH 3u5</td><td>0.20.2-cdh3u5</td></tr>
<tr><td>CDH 3u4</td><td>0.20.2-cdh3u4</td></tr>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 26e6a8326c..476e7c5800 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -105,12 +105,6 @@ object SparkBuild extends Build {
// also check the local Maven repository ~/.m2
resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
- // Shared between both core and streaming.
- resolvers ++= Seq("Akka Repository" at "http://repo.akka.io/releases/"),
-
- // Shared between both examples and streaming.
- resolvers ++= Seq("Mqtt Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/"),
-
// For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@@ -292,11 +286,10 @@ object SparkBuild extends Build {
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
- exclude("net.sf.jopt-simple", "jopt-simple")
- excludeAll(excludeNetty),
+ exclude("net.sf.jopt-simple", "jopt-simple"),
"org.eclipse.paho" % "mqtt-client" % "0.4.0",
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 43e504c290..523fd1222d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -940,17 +940,9 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
if (prop != null) prop else "local"
}
}
- val jars = Option(System.getenv("ADD_JARS")).map(_.split(','))
- .getOrElse(new Array[String](0))
- .map(new java.io.File(_).getAbsolutePath)
- try {
- sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
- } catch {
- case e: Exception =>
- e.printStackTrace()
- echo("Failed to create SparkContext, exiting...")
- sys.exit(1)
- }
+ val jars = SparkILoop.getAddedJars.map(new java.io.File(_).getAbsolutePath)
+ sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars)
+ echo("Created spark context..")
sparkContext
}
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 418c31e24b..c230a03298 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -78,7 +78,7 @@ class ReplSuite extends FunSuite {
System.clearProperty("spark.hostPort")
}
- test ("simple foreach with accumulator") {
+ test("simple foreach with accumulator") {
val output = runInterpreter("local", """
|val accum = sc.accumulator(0)
|sc.parallelize(1 to 10).foreach(x => accum += x)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
index 66fe6e7870..6e9a781978 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/NetworkInputTracker.scala
@@ -25,10 +25,10 @@ import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import scala.concurrent.duration._
import akka.dispatch._
import org.apache.spark.storage.BlockId
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index ea5c165691..80af96c060 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
-import scala.Some
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 3ba37bed4d..dfd6e27c3e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -728,7 +728,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
object JavaPairDStream {
- implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) : JavaPairDStream[K, V] = {
+ implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)]) = {
new JavaPairDStream[K, V](dstream)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
deleted file mode 100644
index 16c1567355..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/CoGroupedDStream.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.dstream
-
-import org.apache.spark.Partitioner
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.CoGroupedRDD
-import org.apache.spark.streaming.{Time, DStream, Duration}
-import scala.reflect.ClassTag
-
-private[streaming]
-class CoGroupedDStream[K : ClassTag](
- parents: Seq[DStream[(K, _)]],
- partitioner: Partitioner
- ) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {
-
- if (parents.length == 0) {
- throw new IllegalArgumentException("Empty array of parents")
- }
-
- if (parents.map(_.ssc).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different StreamingContexts")
- }
-
- if (parents.map(_.slideDuration).distinct.size > 1) {
- throw new IllegalArgumentException("Array of parents have different slide times")
- }
-
- override def dependencies = parents.toList
-
- override def slideDuration: Duration = parents.head.slideDuration
-
- override def compute(validTime: Time): Option[RDD[(K, Seq[Seq[_]])]] = {
- val part = partitioner
- val rdds = parents.flatMap(_.getOrCompute(validTime))
- if (rdds.size > 0) {
- val q = new CoGroupedRDD[K](rdds, part)
- Some(q)
- } else {
- None
- }
- }
-
-}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index ec0096c85f..526f5564c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -33,7 +33,6 @@ import org.I0Itec.zkclient._
import scala.collection.Map
import scala.reflect.ClassTag
-
/**
* Input stream that pulls messages from a Kafka Broker.
*