aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 18:19:21 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 18:19:21 -0700
commitb4067cbad4a46cda0799a891ded152531ca83b62 (patch)
treee1a66e613f0a2558ec4f15d45e5e3b84eef4ab14
parent8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a (diff)
downloadspark-b4067cbad4a46cda0799a891ded152531ca83b62.tar.gz
spark-b4067cbad4a46cda0799a891ded152531ca83b62.tar.bz2
spark-b4067cbad4a46cda0799a891ded152531ca83b62.zip
More doc updates, and moved Serializer to a subpackage.
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala11
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala4
-rw-r--r--core/src/main/scala/spark/KryoSerializer.scala11
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala2
-rw-r--r--core/src/main/scala/spark/Partitioner.scala11
-rw-r--r--core/src/main/scala/spark/RDD.scala4
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/spark/api/java/JavaDoubleRDD.scala2
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala10
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/Task.scala8
-rw-r--r--core/src/main/scala/spark/serializer/Serializer.scala (renamed from core/src/main/scala/spark/Serializer.scala)18
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala3
13 files changed, 57 insertions, 30 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 8ced0f9c73..06cc8c748b 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -1,6 +1,7 @@
package spark.bagel.examples
import spark._
+import serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import spark.SparkContext._
import spark.bagel._
@@ -33,10 +34,10 @@ object WikipediaPageRankStandalone {
val partitioner = new HashPartitioner(sc.defaultParallelism)
val links =
if (usePartitioner)
- input.map(parseArticle _).partitionBy(partitioner).cache
+ input.map(parseArticle _).partitionBy(partitioner).cache()
else
- input.map(parseArticle _).cache
- val n = links.count
+ input.map(parseArticle _).cache()
+ val n = links.count()
val defaultRank = 1.0 / n
val a = 0.15
@@ -51,7 +52,7 @@ object WikipediaPageRankStandalone {
(ranks
.filter { case (id, rank) => rank >= threshold }
.map { case (id, rank) => "%s\t%s\n".format(id, rank) }
- .collect.mkString)
+ .collect().mkString)
println(top)
val time = (System.currentTimeMillis - startTime) / 1000.0
@@ -113,7 +114,7 @@ object WikipediaPageRankStandalone {
}
}
-class WPRSerializer extends spark.Serializer {
+class WPRSerializer extends spark.serializer.Serializer {
def newInstance(): SerializerInstance = new WPRSerializerInstance()
}
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index 863d00eeb5..b04a27d073 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -3,6 +3,7 @@ package spark
import java.io._
import java.nio.ByteBuffer
+import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
import spark.util.ByteBufferInputStream
private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
@@ -57,6 +58,9 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
}
}
+/**
+ * A Spark serializer that uses Java's built-in serialization.
+ */
class JavaSerializer extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
}
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index b8aa3a86c5..44b630e478 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -13,6 +13,7 @@ import com.esotericsoftware.kryo.serialize.ClassSerializer
import com.esotericsoftware.kryo.serialize.SerializableSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
+import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._
import spark.storage._
@@ -158,12 +159,18 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
}
}
-// Used by clients to register their own classes
+/**
+ * Interface implemented by clients to register their classes with Kryo when using Kryo
+ * serialization.
+ */
trait KryoRegistrator {
def registerClasses(kryo: Kryo): Unit
}
-class KryoSerializer extends Serializer with Logging {
+/**
+ * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
+ */
+class KryoSerializer extends spark.serializer.Serializer with Logging {
// Make this lazy so that it only gets called once we receive our first task on each executor,
// so we can pull out any custom Kryo registrator from the user's JARs.
lazy val kryo = createKryo()
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 321f5264b8..9b57ae3b4f 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -21,7 +21,7 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest](
override val index: Int = slice
}
-class ParallelCollection[T: ClassManifest](
+private[spark] class ParallelCollection[T: ClassManifest](
sc: SparkContext,
@transient data: Seq[T],
numSlices: Int)
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 20c31714ae..b71021a082 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -1,10 +1,17 @@
package spark
+/**
+ * An object that defines how the elements in a key-value pair RDD are partitioned by key.
+ * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
+ */
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
+/**
+ * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ */
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
@@ -29,6 +36,10 @@ class HashPartitioner(partitions: Int) extends Partitioner {
}
}
+/**
+ * A [[spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
+ * Determines the ranges by sampling the RDD passed in.
+ */
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
partitions: Int,
@transient rdd: RDD[(K,V)],
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index fdda8f29a6..ddb420efff 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -121,10 +121,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
this
}
- /** Persist this RDD with the default storage level (MEMORY_ONLY). */
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
- /** Persist this RDD with the default storage level (MEMORY_ONLY). */
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): RDD[T] = persist()
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 6a006e0697..4c6ec6cc6e 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -4,6 +4,7 @@ import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.remote.RemoteActorRefProvider
+import serializer.Serializer
import spark.broadcast.BroadcastManager
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
index 4ad006c4a7..843e1bd18b 100644
--- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala
@@ -22,7 +22,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
import JavaDoubleRDD.fromRDD
- /** Persist this RDD with the default storage level (MEMORY_ONLY). */
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
/**
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index cfaa26ff52..5c2be534ff 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -34,7 +34,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
// Common RDD functions
- /** Persist this RDD with the default storage level (MEMORY_ONLY). */
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
/**
@@ -431,8 +431,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
rdd.saveAsHadoopDataset(conf)
}
-
- // Ordered RDD Functions
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements in
+ * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an
+ * ordered list of records (in the `save` case, they will be written to multiple `part-X` files
+ * in the filesystem, in order of the keys).
+ */
def sortByKey(): JavaPairRDD[K, V] = sortByKey(true)
/**
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index 50be5a6cbe..ac31350ec3 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -11,7 +11,7 @@ JavaRDDLike[T, JavaRDD[T]] {
// Common RDD functions
- /** Persist this RDD with the default storage level (MEMORY_ONLY). */
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
/**
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
index d449ac67d6..ef987fdeb6 100644
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ b/core/src/main/scala/spark/scheduler/Task.scala
@@ -1,8 +1,8 @@
package spark.scheduler
-import scala.collection.mutable.{HashMap}
-import spark.{SerializerInstance, Serializer, Utils}
-import java.io.{DataInputStream, DataOutputStream, File}
+import scala.collection.mutable.HashMap
+import spark.serializer.{SerializerInstance, Serializer}
+import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.util.ByteBufferInputStream
@@ -92,4 +92,4 @@ private[spark] object Task {
val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
(taskFiles, taskJars, subBuffer)
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala
index d8bcf6326a..50b086125a 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/serializer/Serializer.scala
@@ -1,25 +1,23 @@
-package spark
+package spark.serializer
-import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer
-import java.nio.channels.Channels
-
+import java.io.{EOFException, InputStream, OutputStream}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
import spark.util.ByteBufferInputStream
/**
* A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create SerializerInstances that do the actual serialization.
+ * create [[spark.serializer.SerializerInstance]] objects that do the actual serialization and are
+ * guaranteed to only be called from one thread at a time.
*/
trait Serializer {
def newInstance(): SerializerInstance
}
/**
- * An instance of the serializer, for use by one thread at a time.
+ * An instance of a serializer, for use by one thread at a time.
*/
-private[spark] trait SerializerInstance {
+trait SerializerInstance {
def serialize[T](t: T): ByteBuffer
def deserialize[T](bytes: ByteBuffer): T
@@ -50,7 +48,7 @@ private[spark] trait SerializerInstance {
/**
* A stream for writing serialized objects.
*/
-private[spark] trait SerializationStream {
+trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
def close(): Unit
@@ -66,7 +64,7 @@ private[spark] trait SerializationStream {
/**
* A stream for reading serialized objects.
*/
-private[spark] trait DeserializationStream {
+trait DeserializationStream {
def readObject[T](): T
def close(): Unit
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 8a111f44c9..bd9155ef29 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -12,8 +12,9 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
-import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
+import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils}
import spark.network._
+import spark.serializer.Serializer
import spark.util.ByteBufferInputStream
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import sun.nio.ch.DirectBuffer