diff options
Diffstat (limited to 'core/src/main/scala/spark/SparkContext.scala')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 337 |
1 files changed, 262 insertions, 75 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 1d5131ad13..0d37075ef3 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -2,13 +2,15 @@ package spark import java.io._ import java.util.concurrent.atomic.AtomicInteger +import java.net.{URI, URLClassLoader} + +import scala.collection.Map +import scala.collection.generic.Growable +import scala.collection.mutable.{ArrayBuffer, HashMap} import akka.actor.Actor import akka.actor.Actor._ - -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileUtil, Path} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.SequenceFileInputFormat @@ -25,34 +27,59 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.TextInputFormat - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} - import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ - +import spark.deploy.LocalSparkCluster import spark.partial.ApproximateEvaluator import spark.partial.PartialResult - +import spark.rdd.HadoopRDD +import spark.rdd.NewHadoopRDD +import spark.rdd.UnionRDD import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.BlockManagerMaster +/** + * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark + * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI. + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes. + */ class SparkContext( val master: String, - val frameworkName: String, + val jobName: String, val sparkHome: String, - val jars: Seq[String]) + val jars: Seq[String], + environment: Map[String, String]) extends Logging { - - def this(master: String, frameworkName: String) = this(master, frameworkName, null, Nil) + + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + */ + def this(master: String, jobName: String, sparkHome: String, jars: Seq[String]) = + this(master, jobName, sparkHome, jars, Map()) + + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + */ + def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map()) // Ensure logging is initialized before we spawn any threads initLogging() @@ -68,46 +95,89 @@ class SparkContext( private val isLocal = (master == "local" || master.startsWith("local[")) // Create the Spark execution environment (cache, map output tracker, etc) - val env = SparkEnv.createFromSystemProperties( + private[spark] val env = SparkEnv.createFromSystemProperties( System.getProperty("spark.master.host"), System.getProperty("spark.master.port").toInt, true, isLocal) SparkEnv.set(env) + // Used to store a URL for each static file/jar together with the file's local timestamp + private[spark] val addedFiles = HashMap[String, Long]() + private[spark] val addedJars = HashMap[String, Long]() + + // Add each JAR given through the constructor + jars.foreach { addJar(_) } + + // Environment variables to pass to our executors + private[spark] val executorEnvs = HashMap[String, String]() + for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", + "SPARK_TESTING")) { + val value = System.getenv(key) + if (value != null) { + executorEnvs(key) = value + } + } + executorEnvs ++= environment + // Create and start the scheduler private var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks - val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """(spark://.*)""".r master match { - case "local" => - new LocalScheduler(1, 0) + case "local" => + new LocalScheduler(1, 0, this) - case LOCAL_N_REGEX(threads) => - new LocalScheduler(threads.toInt, 0) + case LOCAL_N_REGEX(threads) => + new LocalScheduler(threads.toInt, 0, this) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => - new LocalScheduler(threads.toInt, maxFailures.toInt) + new LocalScheduler(threads.toInt, maxFailures.toInt, this) case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) scheduler.initialize(backend) scheduler + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang. + val memoryPerSlaveInt = memoryPerSlave.toInt + val sparkMemEnv = System.getenv("SPARK_MEM") + val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512 + if (sparkMemEnvInt > memoryPerSlaveInt) { + throw new SparkException( + "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format( + memoryPerSlaveInt, sparkMemEnvInt)) + } + + val scheduler = new ClusterScheduler(this) + val localCluster = new LocalSparkCluster( + numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) + val sparkUrl = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, jobName) + scheduler.initialize(backend) + backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { + localCluster.stop() + } + scheduler + case _ => MesosNativeLibrary.load() val scheduler = new ClusterScheduler(this) val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean + val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos:// val backend = if (coarseGrained) { - new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName) + new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName) } else { - new MesosSchedulerBackend(scheduler, this, master, frameworkName) + new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, jobName) } scheduler.initialize(backend) scheduler @@ -119,14 +189,20 @@ class SparkContext( // Methods for creating RDDs - def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + /** Distribute a local Scala collection to form an RDD. */ + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollection[T](this, seq, numSlices) } - - def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism ): RDD[T] = { + + /** Distribute a local Scala collection to form an RDD. */ + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { parallelize(seq, numSlices) } + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) .map(pair => pair._2.toString) @@ -163,19 +239,31 @@ class SparkContext( } /** - * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, - * values and the InputFormat so that users don't need to pass them directly. + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) : RDD[(K, V)] = { hadoopFile(path, - fm.erasure.asInstanceOf[Class[F]], + fm.erasure.asInstanceOf[Class[F]], km.erasure.asInstanceOf[Class[K]], vm.erasure.asInstanceOf[Class[V]], minSplits) } + /** + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) + * }}} + */ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = hadoopFile[K, V, F](path, defaultMinSplits) @@ -191,7 +279,7 @@ class SparkContext( new Configuration) } - /** + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ @@ -207,7 +295,7 @@ class SparkContext( new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) } - /** + /** * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * and extra configuration options to pass to the input format. */ @@ -219,7 +307,7 @@ class SparkContext( new NewHadoopRDD(this, fClass, kClass, vClass, conf) } - /** Get an RDD for a Hadoop SequenceFile with given key and value types */ + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -229,18 +317,23 @@ class SparkContext( hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) } + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = sequenceFile(path, keyClass, valueClass, defaultMinSplits) /** - * Version of sequenceFile() for types implicitly convertible to Writables through a - * WritableConverter. + * Version of sequenceFile() for types implicitly convertible to Writables through a + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} * * WritableConverters are provided in a somewhat strange way (by an implicit function) to support - * both subclasses of Writable and types for which we define a converter (e.g. Int to + * both subclasses of Writable and types for which we define a converter (e.g. Int to * IntWritable). The most natural thing would've been to have implicit objects for the * converters, but then we couldn't have an object for every subclass of Writable (you can't - * have a parameterized singleton object). We use functions instead to create a new converter + * have a parameterized singleton object). We use functions instead to create a new converter * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to * allow it to figure out the Writable class to use in the subclass case. */ @@ -265,7 +358,7 @@ class SparkContext( * that there's very little effort required to save arbitrary objects. */ def objectFile[T: ClassManifest]( - path: String, + path: String, minSplits: Int = defaultMinSplits ): RDD[T] = { sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) @@ -275,43 +368,128 @@ class SparkContext( /** Build the union of a list of RDDs. */ def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) - /** Build the union of a list of RDDs. */ + /** Build the union of a list of RDDs passed as variable-length arguments. */ def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = new UnionRDD(this, Seq(first) ++ rest) // Methods for creating shared variables + /** + * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** - * Create an accumulable shared variable, with a `+=` method + * Create an [[spark.Accumulable]] shared variable, with a `+=` method * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) = new Accumulable(initialValue, param) + /** + * Create an accumulator from a "mutable collection" type. + * + * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by + * standard mutable collections. So you can use this with mutable Map, Set, etc. + */ + def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = { + val param = new GrowableAccumulableParam[R,T] + new Accumulable(initialValue, param) + } + + /** + * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for + * reading it in distributed functions. The variable will be sent to each cluster only once. + */ + def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T] (value, isLocal) + + /** + * Add a file to be downloaded into the working directory of this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ + def addFile(path: String) { + val uri = new URI(path) + val key = uri.getScheme match { + case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) + case _ => path + } + addedFiles(key) = System.currentTimeMillis - // Keep around a weak hash map of values to Cached versions? - def broadcast[T](value: T) = SparkEnv.get.broadcastManager.newBroadcast[T] (value, isLocal) + // Fetch the file locally in case the task is executed locally + val filename = new File(path.split("/").last) + Utils.fetchFile(path, new File(".")) - // Stop the SparkContext + logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) + } + + /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getSlavesMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.ip + ":" + blockManagerId.port, mem) + } + } + + /** + * Clear the job's list of files added by `addFile` so that they do not get donwloaded to + * any new nodes. + */ + def clearFiles() { + addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } + addedFiles.clear() + } + + /** + * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ + def addJar(path: String) { + val uri = new URI(path) + val key = uri.getScheme match { + case null | "file" => env.httpFileServer.addJar(new File(uri.getPath)) + case _ => path + } + addedJars(key) = System.currentTimeMillis + logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) + } + + /** + * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to + * any new nodes. + */ + def clearJars() { + addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() } + addedJars.clear() + } + + /** Shut down the SparkContext. */ def stop() { dagScheduler.stop() dagScheduler = null taskScheduler = null // TODO: Cache.stop()? env.stop() + // Clean up locally linked files + clearFiles() + clearJars() SparkEnv.set(null) ShuffleMapTask.clearCache() logInfo("Successfully stopped SparkContext") } - // Get Spark's home location from either a value set through the constructor, - // or the spark.home Java property, or the SPARK_HOME environment variable - // (in that order of preference). If neither of these is set, return None. - def getSparkHome(): Option[String] = { + /** + * Get Spark's home location from either a value set through the constructor, + * or the spark.home Java property, or the SPARK_HOME environment variable + * (in that order of preference). If neither of these is set, return None. + */ + private[spark] def getSparkHome(): Option[String] = { if (sparkHome != null) { Some(sparkHome) } else if (System.getProperty("spark.home") != null) { @@ -326,7 +504,7 @@ class SparkContext( /** * Run a function on a given set of partitions in an RDD and return the results. This is the main * entry point to the scheduler, by which all actions get launched. The allowLocal flag specifies - * whether the scheduler can run the computation on the master rather than shipping it out to the + * whether the scheduler can run the computation on the master rather than shipping it out to the * cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( @@ -335,22 +513,27 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { - logInfo("Starting job...") + val callSite = Utils.getSparkCallSite + logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runJob(rdd, func, partitions, allowLocal) - logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } + /** + * Run a job on a given set of partitions of an RDD, but take a function of type + * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. + */ def runJob[T, U: ClassManifest]( rdd: RDD[T], - func: Iterator[T] => U, + func: Iterator[T] => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) } - + /** * Run a job on all partitions in an RDD and return the results in an array. */ @@ -358,6 +541,9 @@ class SparkContext( runJob(rdd, func, 0 until rdd.splits.size, false) } + /** + * Run a job on all partitions in an RDD and return the results in an array. + */ def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.splits.size, false) } @@ -371,38 +557,37 @@ class SparkContext( evaluator: ApproximateEvaluator[U, R], timeout: Long ): PartialResult[R] = { - logInfo("Starting job...") + val callSite = Utils.getSparkCallSite + logInfo("Starting job: " + callSite) val start = System.nanoTime - val result = dagScheduler.runApproximateJob(rdd, func, evaluator, timeout) - logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") + val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") result } - // Clean a closure to make it ready to serialized and send to tasks - // (removes unreferenced variables in $outer's, updates REPL variables) + /** + * Clean a closure to make it ready to serialized and send to tasks + * (removes unreferenced variables in $outer's, updates REPL variables) + */ private[spark] def clean[F <: AnyRef](f: F): F = { ClosureCleaner.clean(f) return f } - // Default level of parallelism to use when not given by user (e.g. for reduce tasks) + /** Default level of parallelism to use when not given by user (e.g. for reduce tasks) */ def defaultParallelism: Int = taskScheduler.defaultParallelism - // Default min number of splits for Hadoop RDDs when not given by user + /** Default min number of splits for Hadoop RDDs when not given by user */ def defaultMinSplits: Int = math.min(defaultParallelism, 2) private var nextShuffleId = new AtomicInteger(0) - private[spark] def newShuffleId(): Int = { - nextShuffleId.getAndIncrement() - } - + private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() + private var nextRddId = new AtomicInteger(0) - // Register a new RDD, returning its RDD ID - private[spark] def newRddId(): Int = { - nextRddId.getAndIncrement() - } + /** Register a new RDD, returning its RDD ID */ + private[spark] def newRddId(): Int = nextRddId.getAndIncrement() } /** @@ -429,7 +614,7 @@ object SparkContext { implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = new PairRDDFunctions(rdd) - + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) @@ -450,7 +635,7 @@ object SparkContext { implicit def longToLongWritable(l: Long) = new LongWritable(l) implicit def floatToFloatWritable(f: Float) = new FloatWritable(f) - + implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d) implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b) @@ -461,7 +646,7 @@ object SparkContext { private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u - + new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], arr.map(x => anyToWritable(x)).toArray) } @@ -489,8 +674,10 @@ object SparkContext { implicit def writableWritableConverter[T <: Writable]() = new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) - // Find the JAR from which a given class was loaded, to make it easy for users to pass - // their JARs to SparkContext + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext + */ def jarOfClass(cls: Class[_]): Seq[String] = { val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") if (uri != null) { @@ -505,8 +692,8 @@ object SparkContext { Nil } } - - // Find the JAR that contains the class of a particular object + + /** Find the JAR that contains the class of a particular object */ def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) } @@ -518,7 +705,7 @@ object SparkContext { * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ -class WritableConverter[T]( +private[spark] class WritableConverter[T]( val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable |