aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/pom.xml2
-rw-r--r--core/pom.xml6
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala2
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala4
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala33
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala10
-rw-r--r--core/src/main/scala/spark/serializer/Serializer.scala32
-rw-r--r--core/src/main/scala/spark/storage/BlockFetchTracker.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala6
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala8
-rw-r--r--core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala2
-rw-r--r--core/src/main/scala/spark/util/NextIterator.scala71
-rw-r--r--core/src/test/scala/spark/scheduler/SparkListenerSuite.scala86
-rw-r--r--core/src/test/scala/spark/util/NextIteratorSuite.scala68
-rw-r--r--docs/quick-start.md4
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--examples/pom.xml2
-rw-r--r--pom.xml9
-rw-r--r--repl-bin/pom.xml2
-rw-r--r--repl/pom.xml2
-rw-r--r--streaming/pom.xml2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala44
23 files changed, 291 insertions, 112 deletions
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 667d28c1a2..510cff4669 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/core/pom.xml b/core/pom.xml
index 9d46d94c1c..fe9c803728 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -87,6 +87,10 @@
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
<dependency>
<groupId>org.scalatest</groupId>
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 53b0389c3a..c27ed36406 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -55,7 +55,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleReadMillis = itr.getNetMillis
shuffleMetrics.remoteFetchTime = itr.remoteFetchTime
- shuffleMetrics.remoteFetchWaitTime = itr.remoteFetchWaitTime
+ shuffleMetrics.fetchWaitTime = itr.fetchWaitTime
shuffleMetrics.remoteBytesRead = itr.remoteBytesRead
shuffleMetrics.totalBlocksFetched = itr.totalBlocks
shuffleMetrics.localBlocksFetched = itr.numLocalBlocks
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index b9c07830f5..93bbb6b458 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -54,9 +54,9 @@ class ShuffleReadMetrics extends Serializable {
var shuffleReadMillis: Long = _
/**
- * Total time that is spent blocked waiting for shuffle to fetch remote data
+ * Total time that is spent blocked waiting for shuffle to fetch data
*/
- var remoteFetchWaitTime: Long = _
+ var fetchWaitTime: Long = _
/**
* The total amount of time for all the shuffle fetches. This adds up time from overlapping
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index 78097502bc..a6322dc58d 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -16,6 +16,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import spark.util.NextIterator
/**
@@ -62,7 +63,7 @@ class HadoopRDD[K, V](
.asInstanceOf[InputFormat[K, V]]
}
- override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
+ override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
var reader: RecordReader[K, V] = null
@@ -71,38 +72,22 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback{ () => close() }
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
- var gotNext = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!gotNext) {
- try {
- finished = !reader.next(key, value)
- } catch {
- case eof: EOFException =>
- finished = true
- }
- gotNext = true
- }
- !finished
- }
- override def next: (K, V) = {
- if (!gotNext) {
+ override def getNext() = {
+ try {
finished = !reader.next(key, value)
+ } catch {
+ case eof: EOFException =>
+ finished = true
}
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- gotNext = false
(key, value)
}
- private def close() {
+ override def close() {
try {
reader.close()
} catch {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 21185227ab..a65140b145 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -31,7 +31,7 @@ class StatsReportListener extends SparkListener with Logging {
showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
//fetch & io
- showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.remoteFetchWaitTime})
+ showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
@@ -137,7 +137,7 @@ case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], othe
object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map{_.remoteFetchWaitTime}
+ val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
val fetch = fetchTime.map{_ / denom}
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index a76253ea14..9e1bde3fbe 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -67,8 +67,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
logInfo("Size of task " + idInJob + " is " + bytes.limit + " bytes")
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
updateDependencies(taskFiles, taskJars) // Download any files added with addFile
+ val deserStart = System.currentTimeMillis()
val deserializedTask = ser.deserialize[Task[_]](
taskBytes, Thread.currentThread.getContextClassLoader)
+ val deserTime = System.currentTimeMillis() - deserStart
// Run it
val result: Any = deserializedTask.run(attemptId)
@@ -77,15 +79,19 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
- val resultToReturn = ser.deserialize[Any](ser.serialize(result))
+ val serResult = ser.serialize(result)
+ deserializedTask.metrics.get.resultSize = serResult.limit()
+ val resultToReturn = ser.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished " + task)
info.markSuccessful()
+ deserializedTask.metrics.get.executorRunTime = info.duration.toInt //close enough
+ deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
// If the threadpool has not already been shutdown, notify DAGScheduler
if (!Thread.currentThread().isInterrupted)
- listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, null)
+ listener.taskEnded(task, Success, resultToReturn, accumUpdates, info, deserializedTask.metrics.getOrElse(null))
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala
index 50b086125a..aca86ab6f0 100644
--- a/core/src/main/scala/spark/serializer/Serializer.scala
+++ b/core/src/main/scala/spark/serializer/Serializer.scala
@@ -72,40 +72,18 @@ trait DeserializationStream {
* Read the elements of this stream through an iterator. This can only be called once, as
* reading each element will consume data from the input source.
*/
- def asIterator: Iterator[Any] = new Iterator[Any] {
- var gotNext = false
- var finished = false
- var nextValue: Any = null
-
- private def getNext() {
+ def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] {
+ override protected def getNext() = {
try {
- nextValue = readObject[Any]()
+ readObject[Any]()
} catch {
case eof: EOFException =>
finished = true
}
- gotNext = true
}
- override def hasNext: Boolean = {
- if (!gotNext) {
- getNext()
- }
- if (finished) {
- close()
- }
- !finished
- }
-
- override def next(): Any = {
- if (!gotNext) {
- getNext()
- }
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- gotNext = false
- nextValue
+ override protected def close() {
+ DeserializationStream.this.close()
}
}
}
diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
index ababb04305..993aece1f7 100644
--- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala
+++ b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
@@ -5,6 +5,6 @@ private[spark] trait BlockFetchTracker {
def numLocalBlocks: Int
def numRemoteBlocks: Int
def remoteFetchTime : Long
- def remoteFetchWaitTime: Long
+ def fetchWaitTime: Long
def remoteBytesRead : Long
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3118d3d412..210061e972 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -903,7 +903,7 @@ class BlockFetcherIterator(
private var _remoteBytesRead = 0l
private var _remoteFetchTime = 0l
- private var _remoteFetchWaitTime = 0l
+ private var _fetchWaitTime = 0l
if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null")
@@ -1046,7 +1046,7 @@ class BlockFetcherIterator(
val startFetchWait = System.currentTimeMillis()
val result = results.take()
val stopFetchWait = System.currentTimeMillis()
- _remoteFetchWaitTime += (stopFetchWait - startFetchWait)
+ _fetchWaitTime += (stopFetchWait - startFetchWait)
bytesInFlight -= result.size
while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
@@ -1061,7 +1061,7 @@ class BlockFetcherIterator(
def numRemoteBlocks = remoteBlockIds.size
def remoteFetchTime = _remoteFetchTime
- def remoteFetchWaitTime = _remoteFetchWaitTime
+ def fetchWaitTime = _fetchWaitTime
def remoteBytesRead = _remoteBytesRead
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 1494f90103..cff48d9909 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -49,16 +49,16 @@ class UpdateBlockInfo(
blockManagerId.writeExternal(out)
out.writeUTF(blockId)
storageLevel.writeExternal(out)
- out.writeInt(memSize.toInt)
- out.writeInt(diskSize.toInt)
+ out.writeLong(memSize)
+ out.writeLong(diskSize)
}
override def readExternal(in: ObjectInput) {
blockManagerId = BlockManagerId(in)
blockId = in.readUTF()
storageLevel = StorageLevel(in)
- memSize = in.readInt()
- diskSize = in.readInt()
+ memSize = in.readLong()
+ diskSize = in.readLong()
}
}
diff --git a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
index 5c491877ba..f6c28dce52 100644
--- a/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
+++ b/core/src/main/scala/spark/storage/DelegateBlockFetchTracker.scala
@@ -7,6 +7,6 @@ private[spark] trait DelegateBlockFetchTracker extends BlockFetchTracker {
def numLocalBlocks = delegate.numLocalBlocks
def numRemoteBlocks = delegate.numRemoteBlocks
def remoteFetchTime = delegate.remoteFetchTime
- def remoteFetchWaitTime = delegate.remoteFetchWaitTime
+ def fetchWaitTime = delegate.fetchWaitTime
def remoteBytesRead = delegate.remoteBytesRead
}
diff --git a/core/src/main/scala/spark/util/NextIterator.scala b/core/src/main/scala/spark/util/NextIterator.scala
new file mode 100644
index 0000000000..48b5018ddd
--- /dev/null
+++ b/core/src/main/scala/spark/util/NextIterator.scala
@@ -0,0 +1,71 @@
+package spark.util
+
+/** Provides a basic/boilerplate Iterator implementation. */
+private[spark] abstract class NextIterator[U] extends Iterator[U] {
+
+ private var gotNext = false
+ private var nextValue: U = _
+ private var closed = false
+ protected var finished = false
+
+ /**
+ * Method for subclasses to implement to provide the next element.
+ *
+ * If no next element is available, the subclass should set `finished`
+ * to `true` and may return any value (it will be ignored).
+ *
+ * This convention is required because `null` may be a valid value,
+ * and using `Option` seems like it might create unnecessary Some/None
+ * instances, given some iterators might be called in a tight loop.
+ *
+ * @return U, or set 'finished' when done
+ */
+ protected def getNext(): U
+
+ /**
+ * Method for subclasses to implement when all elements have been successfully
+ * iterated, and the iteration is done.
+ *
+ * <b>Note:</b> `NextIterator` cannot guarantee that `close` will be
+ * called because it has no control over what happens when an exception
+ * happens in the user code that is calling hasNext/next.
+ *
+ * Ideally you should have another try/catch, as in HadoopRDD, that
+ * ensures any resources are closed should iteration fail.
+ */
+ protected def close()
+
+ /**
+ * Calls the subclass-defined close method, but only once.
+ *
+ * Usually calling `close` multiple times should be fine, but historically
+ * there have been issues with some InputFormats throwing exceptions.
+ */
+ def closeIfNeeded() {
+ if (!closed) {
+ close()
+ closed = true
+ }
+ }
+
+ override def hasNext: Boolean = {
+ if (!finished) {
+ if (!gotNext) {
+ nextValue = getNext()
+ if (finished) {
+ closeIfNeeded()
+ }
+ gotNext = true
+ }
+ }
+ !finished
+ }
+
+ override def next(): U = {
+ if (!hasNext) {
+ throw new NoSuchElementException("End of stream")
+ }
+ gotNext = false
+ nextValue
+ }
+} \ No newline at end of file
diff --git a/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
new file mode 100644
index 0000000000..2f5af10e69
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/SparkListenerSuite.scala
@@ -0,0 +1,86 @@
+package spark.scheduler
+
+import org.scalatest.FunSuite
+import spark.{SparkContext, LocalSparkContext}
+import scala.collection.mutable
+import org.scalatest.matchers.ShouldMatchers
+import spark.SparkContext._
+
+/**
+ *
+ */
+
+class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
+
+ test("local metrics") {
+ sc = new SparkContext("local[4]", "test")
+ val listener = new SaveStageInfo
+ sc.addSparkListener(listener)
+ sc.addSparkListener(new StatsReportListener)
+ //just to make sure some of the tasks take a noticeable amount of time
+ val w = {i:Int =>
+ if (i == 0)
+ Thread.sleep(100)
+ i
+ }
+
+ val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
+ d.count
+ listener.stageInfos.size should be (1)
+
+ val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
+
+ val d3 = d.map{i => w(i) -> (0 to (i % 5))}.setName("shuffle input 2")
+
+ val d4 = d2.cogroup(d3, 64).map{case(k,(v1,v2)) => w(k) -> (v1.size, v2.size)}
+ d4.setName("A Cogroup")
+
+ d4.collectAsMap
+
+ listener.stageInfos.size should be (4)
+ listener.stageInfos.foreach {stageInfo =>
+ //small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._1.duration}, stageInfo + " duration")
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorRunTime.toLong}, stageInfo + " executorRunTime")
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.executorDeserializeTime.toLong}, stageInfo + " executorDeserializeTime")
+ if (stageInfo.stage.rdd.name == d4.name) {
+ checkNonZeroAvg(stageInfo.taskInfos.map{_._2.shuffleReadMetrics.get.fetchWaitTime}, stageInfo + " fetchWaitTime")
+ }
+
+ stageInfo.taskInfos.foreach{case (taskInfo, taskMetrics) =>
+ taskMetrics.resultSize should be > (0l)
+ if (isStage(stageInfo, Set(d2.name, d3.name), Set(d4.name))) {
+ taskMetrics.shuffleWriteMetrics should be ('defined)
+ taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
+ }
+ if (stageInfo.stage.rdd.name == d4.name) {
+ taskMetrics.shuffleReadMetrics should be ('defined)
+ val sm = taskMetrics.shuffleReadMetrics.get
+ sm.totalBlocksFetched should be > (0)
+ sm.shuffleReadMillis should be > (0l)
+ sm.localBlocksFetched should be > (0)
+ sm.remoteBlocksFetched should be (0)
+ sm.remoteBytesRead should be (0l)
+ sm.remoteFetchTime should be (0l)
+ }
+ }
+ }
+ }
+
+ def checkNonZeroAvg(m: Traversable[Long], msg: String) {
+ assert(m.sum / m.size.toDouble > 0.0, msg)
+ }
+
+ def isStage(stageInfo: StageInfo, rddNames: Set[String], excludedNames: Set[String]) = {
+ val names = Set(stageInfo.stage.rdd.name) ++ stageInfo.stage.rdd.dependencies.map{_.rdd.name}
+ !names.intersect(rddNames).isEmpty && names.intersect(excludedNames).isEmpty
+ }
+
+ class SaveStageInfo extends SparkListener {
+ val stageInfos = mutable.Buffer[StageInfo]()
+ def onStageCompleted(stage: StageCompleted) {
+ stageInfos += stage.stageInfo
+ }
+ }
+
+}
diff --git a/core/src/test/scala/spark/util/NextIteratorSuite.scala b/core/src/test/scala/spark/util/NextIteratorSuite.scala
new file mode 100644
index 0000000000..ed5b36da73
--- /dev/null
+++ b/core/src/test/scala/spark/util/NextIteratorSuite.scala
@@ -0,0 +1,68 @@
+package spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable.Buffer
+import java.util.NoSuchElementException
+
+class NextIteratorSuite extends FunSuite with ShouldMatchers {
+ test("one iteration") {
+ val i = new StubIterator(Buffer(1))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("two iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === true
+ i.next should be === 2
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("empty iteration") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("close is called once for empty iterations") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ test("close is called once for non-empty iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.next should be === 1
+ i.next should be === 2
+ // close isn't called until we check for the next element
+ i.closeCalled should be === 0
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
+ var closeCalled = 0
+
+ override def getNext() = {
+ if (ints.size == 0) {
+ finished = true
+ 0
+ } else {
+ ints.remove(0)
+ }
+ }
+
+ override def close() {
+ closeCalled += 1
+ }
+ }
+}
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 36d024f13a..216f7c9cc5 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -189,7 +189,7 @@ public class SimpleJob {
}
{% endhighlight %}
-This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that like in the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide]("java-programming-guide") describes these differences in more detail.
+This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file. Note that like in the Scala example, we initialize a SparkContext, though we use the special `JavaSparkContext` class to get a Java-friendly one. We also create RDDs (represented by `JavaRDD`) and run transformations on them. Finally, we pass functions to Spark by creating classes that extend `spark.api.java.function.Function`. The [Java programming guide](java-programming-guide.html) describes these differences in more detail.
To build the job, we also write a Maven `pom.xml` file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.
@@ -265,7 +265,7 @@ print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
This job simply counts the number of lines containing 'a' and the number containing 'b' in a system log file.
Like in the Scala and Java examples, we use a SparkContext to create RDDs.
We can pass Python functions to Spark, which are automatically serialized along with any variables that they reference.
-For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide).
+For jobs that use custom classes or third-party libraries, we can add those code dependencies to SparkContext to ensure that they will be available on remote machines; this is described in more detail in the [Python programming guide](python-programming-guide.html).
`SimpleJob` is simple enough that we do not need to specify any code dependencies.
We can run this job using the `pyspark` script:
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6fb81b6004..c2957e6cb4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -4,7 +4,7 @@ title: Launching Spark on YARN
---
Experimental support for running over a [YARN (Hadoop
-NextGen)](http://hadoop.apache.org/docs/r2.0.1-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html)
+NextGen)](http://hadoop.apache.org/docs/r2.0.2-alpha/hadoop-yarn/hadoop-yarn-site/YARN.html)
cluster was added to Spark in version 0.6.0. Because YARN depends on version
2.0 of the Hadoop libraries, this currently requires checking out a separate
branch of Spark, called `yarn`, which you can do as follows:
diff --git a/examples/pom.xml b/examples/pom.xml
index 2adeec8786..39cc47c709 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/pom.xml b/pom.xml
index 09ad903e6e..08d1fc12e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2,7 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
@@ -58,6 +58,7 @@
<spray.json.version>1.1.1</spray.json.version>
<slf4j.version>1.6.1</slf4j.version>
<cdh.version>4.1.2</cdh.version>
+ <log4j.version>1.2.17</log4j.version>
</properties>
<repositories>
@@ -268,6 +269,12 @@
</dependency>
<dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
<version>1.8</version>
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index a60028bb53..dd720e2291 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/repl/pom.xml b/repl/pom.xml
index a1b3ccece8..a3e4606edc 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/streaming/pom.xml b/streaming/pom.xml
index d1a766aeac..ec077e8089 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
- <artifactId>parent</artifactId>
+ <artifactId>spark-parent</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 4af839ad7f..1408af0afa 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -2,6 +2,7 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import spark.storage.StorageLevel
+import spark.util.NextIterator
import java.io._
import java.net.Socket
@@ -59,45 +60,18 @@ object SocketReceiver {
*/
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"))
-
- val iterator = new Iterator[String] {
- var gotNext = false
- var finished = false
- var nextValue: String = null
-
- private def getNext() {
- try {
- nextValue = dataInputStream.readLine()
- if (nextValue == null) {
- finished = true
- }
- }
- gotNext = true
- }
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (!gotNext) {
- getNext()
- if (finished) {
- dataInputStream.close()
- }
- }
+ new NextIterator[String] {
+ protected override def getNext() = {
+ val nextValue = dataInputStream.readLine()
+ if (nextValue == null) {
+ finished = true
}
- !finished
+ nextValue
}
- override def next(): String = {
- if (finished) {
- throw new NoSuchElementException("End of stream")
- }
- if (!gotNext) {
- getNext()
- }
- gotNext = false
- nextValue
+ protected override def close() {
+ dataInputStream.close()
}
}
- iterator
}
}