aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-05-05 18:32:14 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-05 18:32:14 -0700
commitea10b3126167af3f50f7c2a70e1d942e839fcb66 (patch)
treee30ca1ab5d15b3f06bab764cdb6a457ec1ab1b35 /core/src
parent8e724dcbad411c533e4f0cd053aa878e8a60694d (diff)
downloadspark-ea10b3126167af3f50f7c2a70e1d942e839fcb66.tar.gz
spark-ea10b3126167af3f50f7c2a70e1d942e839fcb66.tar.bz2
spark-ea10b3126167af3f50f7c2a70e1d942e839fcb66.zip
Expose SparkListeners and relevant classes as DeveloperApi
Hopefully this can go into 1.0, as a few people on the user list have asked for this. Author: Andrew Or <andrewor14@gmail.com> Closes #648 from andrewor14/expose-listeners and squashes the following commits: e45e1ef [Andrew Or] Add missing colons (minor) 350d643 [Andrew Or] Expose SparkListeners and relevant classes as DeveloperApi
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockId.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageUtils.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala7
11 files changed, 78 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index cffea28fbf..42ec181b00 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -19,14 +19,18 @@ package org.apache.spark.storage
import java.util.UUID
+import org.apache.spark.annotation.DeveloperApi
+
/**
+ * :: DeveloperApi ::
* Identifies a particular Block of data, usually associated with a single file.
* A Block can be uniquely identified by its filename, but each type of Block has a different
* set of keys which produce its unique name.
*
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
-private[spark] sealed abstract class BlockId {
+@DeveloperApi
+sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String
@@ -44,24 +48,29 @@ private[spark] sealed abstract class BlockId {
}
}
-private[spark] case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
+@DeveloperApi
+case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
def name = "rdd_" + rddId + "_" + splitIndex
}
-private[spark] case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
+@DeveloperApi
+case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int)
extends BlockId {
def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
-private[spark] case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
+@DeveloperApi
+case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
-private[spark] case class TaskResultBlockId(taskId: Long) extends BlockId {
+@DeveloperApi
+case class TaskResultBlockId(taskId: Long) extends BlockId {
def name = "taskresult_" + taskId
}
-private[spark] case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
+@DeveloperApi
+case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
def name = "input-" + streamId + "-" + uniqueId
}
@@ -75,7 +84,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
def name = "test_" + id
}
-private[spark] object BlockId {
+@DeveloperApi
+object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index be537d7730..b1585bd819 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -20,17 +20,20 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils
/**
+ * :: DeveloperApi ::
* This class represent an unique identifier for a BlockManager.
- * The first 2 constructors of this class is made private to ensure that
- * BlockManagerId objects can be created only using the apply method in
- * the companion object. This allows de-duplication of ID objects.
- * Also, constructor parameters are private to ensure that parameters cannot
- * be modified from outside this class.
+ *
+ * The first 2 constructors of this class is made private to ensure that BlockManagerId objects
+ * can be created only using the apply method in the companion object. This allows de-duplication
+ * of ID objects. Also, constructor parameters are private to ensure that parameters cannot be
+ * modified from outside this class.
*/
-private[spark] class BlockManagerId private (
+@DeveloperApi
+class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 63fa5d3eb6..98fa0df6ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -28,6 +28,7 @@ import akka.actor.{Actor, ActorRef, Cancellable}
import akka.pattern.ask
import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -411,7 +412,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
}
-private[spark] case class BlockStatus(
+@DeveloperApi
+case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index c9a52e0366..363de93e06 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -22,14 +22,17 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import org.apache.spark.annotation.DeveloperApi
/**
+ * :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
* or Tachyon, whether to drop the RDD to disk if it falls out of memory or Tachyon , whether to
* keep the data in memory in a serialized format, and whether to replicate the RDD partitions on
* multiple nodes.
+ *
* The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
* for commonly useful storage levels. To create your own storage level object, use the
* factory method of the singleton object (`StorageLevel(...)`).
*/
+@DeveloperApi
class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
@@ -54,9 +57,9 @@ class StorageLevel private(
assert(replication < 40, "Replication restricted to be less than 40 for calculating hashcodes")
if (useOffHeap) {
- require(useDisk == false, "Off-heap storage level does not support using disk")
- require(useMemory == false, "Off-heap storage level does not support using heap memory")
- require(deserialized == false, "Off-heap storage level does not support deserialized storage")
+ require(!useDisk, "Off-heap storage level does not support using disk")
+ require(!useMemory, "Off-heap storage level does not support using heap memory")
+ require(!deserialized, "Off-heap storage level does not support deserialized storage")
require(replication == 1, "Off-heap storage level does not support multiple replication")
}
@@ -146,7 +149,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Create a new StorageLevel object without setting useOffHeap
+ * Create a new StorageLevel object without setting useOffHeap.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean, useOffHeap: Boolean,
@@ -155,7 +158,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Create a new StorageLevel object
+ * Create a new StorageLevel object.
*/
@DeveloperApi
def apply(useDisk: Boolean, useMemory: Boolean,
@@ -164,7 +167,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Create a new StorageLevel object from its integer representation
+ * Create a new StorageLevel object from its integer representation.
*/
@DeveloperApi
def apply(flags: Int, replication: Int): StorageLevel =
@@ -172,7 +175,7 @@ object StorageLevel {
/**
* :: DeveloperApi ::
- * Read StorageLevel object from ObjectInput stream
+ * Read StorageLevel object from ObjectInput stream.
*/
@DeveloperApi
def apply(in: ObjectInput): StorageLevel = {
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 7a17495903..a6e6627d54 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,12 +19,15 @@ package org.apache.spark.storage
import scala.collection.mutable
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
/**
- * A SparkListener that maintains executor storage status
+ * :: DeveloperApi ::
+ * A SparkListener that maintains executor storage status.
*/
-private[spark] class StorageStatusListener extends SparkListener {
+@DeveloperApi
+class StorageStatusListener extends SparkListener {
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
def storageStatusList = executorIdToStorageStatus.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 1eddd1cdc4..6f3252a2f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -21,9 +21,14 @@ import scala.collection.Map
import scala.collection.mutable
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
-/** Storage information for each BlockManager. */
-private[spark] class StorageStatus(
+/**
+ * :: DeveloperApi ::
+ * Storage information for each BlockManager.
+ */
+@DeveloperApi
+class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
index 03b46e1bd5..bbbe55ecf4 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -17,6 +17,7 @@
package org.apache.spark.ui.env
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.ui._
@@ -30,9 +31,11 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "envi
}
/**
+ * :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the EnvironmentTab
*/
-private[ui] class EnvironmentListener extends SparkListener {
+@DeveloperApi
+class EnvironmentListener extends SparkListener {
var jvmInformation = Seq[(String, String)]()
var sparkProperties = Seq[(String, String)]()
var systemProperties = Seq[(String, String)]()
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 5678bf34ac..91d37b835b 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui.exec
import scala.collection.mutable.HashMap
import org.apache.spark.ExceptionFailure
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, WebUITab}
@@ -34,9 +35,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
}
/**
+ * :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
-private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener)
+@DeveloperApi
+class ExecutorsListener(storageStatusListener: StorageStatusListener)
extends SparkListener {
val executorToTasksActive = HashMap[String, Int]()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 1dfe1d4f1f..2aaf6329b7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -17,8 +17,14 @@
package org.apache.spark.ui.jobs
-/** class for reporting aggregated metrics for each executors in stageUI */
-private[ui] class ExecutorSummary {
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Class for reporting aggregated metrics for each executor in stage UI.
+ */
+@DeveloperApi
+class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0db4afa701..396cbcbc8d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -20,19 +20,22 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.spark.{ExceptionFailure, SparkConf, SparkContext, Success}
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
/**
+ * :: DeveloperApi ::
* Tracks task-level information to be displayed in the UI.
*
* All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the EventBus loop may otherwise be reading and
* updating the internal data structures concurrently.
*/
-private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
+@DeveloperApi
+class JobProgressListener(conf: SparkConf) extends SparkListener {
import JobProgressListener._
@@ -246,7 +249,8 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
}
-private[ui] case class TaskUIData(
+@DeveloperApi
+case class TaskUIData(
taskInfo: TaskInfo,
taskMetrics: Option[TaskMetrics] = None,
exception: Option[ExceptionFailure] = None)
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 07ec297841..c4bb7aab50 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.storage
import scala.collection.mutable
+import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
@@ -35,9 +36,11 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
}
/**
- * A SparkListener that prepares information to be displayed on the BlockManagerUI
+ * :: DeveloperApi ::
+ * A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
-private[ui] class StorageListener(storageStatusListener: StorageStatusListener)
+@DeveloperApi
+class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()