aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala487
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala10
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala44
-rw-r--r--python/epydoc.conf2
-rw-r--r--python/pyspark/context.py1
-rw-r--r--python/pyspark/mllib/_common.py562
-rw-r--r--python/pyspark/mllib/classification.py61
-rw-r--r--python/pyspark/mllib/clustering.py38
-rw-r--r--python/pyspark/mllib/linalg.py256
-rw-r--r--python/pyspark/mllib/random.py54
-rw-r--r--python/pyspark/mllib/recommendation.py69
-rw-r--r--python/pyspark/mllib/regression.py105
-rw-r--r--python/pyspark/mllib/stat.py63
-rw-r--r--python/pyspark/mllib/tests.py99
-rw-r--r--python/pyspark/mllib/tree.py167
-rw-r--r--python/pyspark/mllib/util.py43
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/serializers.py36
-rwxr-xr-xpython/run-tests1
22 files changed, 891 insertions, 1267 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 12b345a8fa..f9ff4ea6ca 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -775,17 +775,36 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}
+ private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+ private val pickle = new Pickler()
+ private var batch = 1
+ private val buffer = new mutable.ArrayBuffer[Any]
+
+ override def hasNext(): Boolean = iter.hasNext
+
+ override def next(): Array[Byte] = {
+ while (iter.hasNext && buffer.length < batch) {
+ buffer += iter.next()
+ }
+ val bytes = pickle.dumps(buffer.toArray)
+ val size = bytes.length
+ // let 1M < size < 10M
+ if (size < 1024 * 1024) {
+ batch *= 2
+ } else if (size > 1024 * 1024 * 10 && batch > 1) {
+ batch /= 2
+ }
+ buffer.clear()
+ bytes
+ }
+ }
+
/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
- jRDD.rdd.mapPartitions { iter =>
- val pickle = new Pickler
- iter.map { row =>
- pickle.dumps(row)
- }
- }
+ jRDD.rdd.mapPartitions { iter => new AutoBatchedPickler(iter) }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 6668797f5f..7903457b17 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -68,8 +68,8 @@ private[python] object SerDeUtil extends Logging {
construct(args ++ Array(""))
} else if (args.length == 2 && args(1).isInstanceOf[String]) {
val typecode = args(0).asInstanceOf[String].charAt(0)
- val data: String = args(1).asInstanceOf[String]
- construct(typecode, machineCodes(typecode), data.getBytes("ISO-8859-1"))
+ val data: Array[Byte] = args(1).asInstanceOf[String].getBytes("ISO-8859-1")
+ construct(typecode, machineCodes(typecode), data)
} else {
super.construct(args)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index fa0fa69f38..9164c294ac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -17,16 +17,20 @@
package org.apache.spark.mllib.api.python
-import java.nio.{ByteBuffer, ByteOrder}
+import java.io.OutputStream
import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import net.razorvine.pickle._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.optimization._
-import org.apache.spark.mllib.linalg.{Matrix, SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.random.{RandomRDDs => RG}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
@@ -40,11 +44,10 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
+
/**
* :: DeveloperApi ::
* The Java stubs necessary for the Python mllib bindings.
- *
- * See python/pyspark/mllib/_common.py for the mutually agreed upon data format.
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
@@ -60,18 +63,17 @@ class PythonMLLibAPI extends Serializable {
def loadLabeledPoints(
jsc: JavaSparkContext,
path: String,
- minPartitions: Int): JavaRDD[Array[Byte]] =
- MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(SerDe.serializeLabeledPoint)
+ minPartitions: Int): JavaRDD[LabeledPoint] =
+ MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
- val initialWeights = SerDe.deserializeDoubleVector(initialWeightsBA)
- val model = trainFunc(data, initialWeights)
+ val initialWeights = SerDe.loads(initialWeightsBA).asInstanceOf[Vector]
+ val model = trainFunc(data.rdd, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(SerDe.serializeDoubleVector(model.weights))
+ ret.add(SerDe.dumps(model.weights))
ret.add(model.intercept: java.lang.Double)
ret
}
@@ -80,7 +82,7 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib LinearRegressionWithSGD.train()
*/
def trainLinearRegressionModelWithSGD(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
@@ -106,7 +108,7 @@ class PythonMLLibAPI extends Serializable {
trainRegressionModel(
(data, initialWeights) =>
lrAlg.run(data, initialWeights),
- dataBytesJRDD,
+ data,
initialWeightsBA)
}
@@ -114,7 +116,7 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib LassoWithSGD.train()
*/
def trainLassoModelWithSGD(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -129,7 +131,7 @@ class PythonMLLibAPI extends Serializable {
regParam,
miniBatchFraction,
initialWeights),
- dataBytesJRDD,
+ data,
initialWeightsBA)
}
@@ -137,7 +139,7 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
def trainRidgeModelWithSGD(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -152,7 +154,7 @@ class PythonMLLibAPI extends Serializable {
regParam,
miniBatchFraction,
initialWeights),
- dataBytesJRDD,
+ data,
initialWeightsBA)
}
@@ -160,7 +162,7 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib SVMWithSGD.train()
*/
def trainSVMModelWithSGD(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
regParam: Double,
@@ -186,7 +188,7 @@ class PythonMLLibAPI extends Serializable {
trainRegressionModel(
(data, initialWeights) =>
SVMAlg.run(data, initialWeights),
- dataBytesJRDD,
+ data,
initialWeightsBA)
}
@@ -194,7 +196,7 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
def trainLogisticRegressionModelWithSGD(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
numIterations: Int,
stepSize: Double,
miniBatchFraction: Double,
@@ -220,7 +222,7 @@ class PythonMLLibAPI extends Serializable {
trainRegressionModel(
(data, initialWeights) =>
LogRegAlg.run(data, initialWeights),
- dataBytesJRDD,
+ data,
initialWeightsBA)
}
@@ -228,14 +230,13 @@ class PythonMLLibAPI extends Serializable {
* Java stub for NaiveBayes.train()
*/
def trainNaiveBayes(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
lambda: Double): java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
- val model = NaiveBayes.train(data, lambda)
+ val model = NaiveBayes.train(data.rdd, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(SerDe.serializeDoubleVector(Vectors.dense(model.labels)))
- ret.add(SerDe.serializeDoubleVector(Vectors.dense(model.pi)))
- ret.add(SerDe.serializeDoubleMatrix(model.theta))
+ ret.add(Vectors.dense(model.labels))
+ ret.add(Vectors.dense(model.pi))
+ ret.add(model.theta)
ret
}
@@ -243,16 +244,12 @@ class PythonMLLibAPI extends Serializable {
* Java stub for Python mllib KMeans.train()
*/
def trainKMeansModel(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
- initializationMode: String): java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(bytes => SerDe.deserializeDoubleVector(bytes))
- val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
- val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(SerDe.serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))
- ret
+ initializationMode: String): KMeansModel = {
+ KMeans.train(data.rdd, k, maxIterations, runs, initializationMode)
}
/**
@@ -262,13 +259,12 @@ class PythonMLLibAPI extends Serializable {
* the Py4J documentation.
*/
def trainALSModel(
- ratingsBytesJRDD: JavaRDD[Array[Byte]],
+ ratings: JavaRDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int): MatrixFactorizationModel = {
- val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
- ALS.train(ratings, rank, iterations, lambda, blocks)
+ ALS.train(ratings.rdd, rank, iterations, lambda, blocks)
}
/**
@@ -278,14 +274,13 @@ class PythonMLLibAPI extends Serializable {
* exit; see the Py4J documentation.
*/
def trainImplicitALSModel(
- ratingsBytesJRDD: JavaRDD[Array[Byte]],
+ ratingsJRDD: JavaRDD[Rating],
rank: Int,
iterations: Int,
lambda: Double,
blocks: Int,
alpha: Double): MatrixFactorizationModel = {
- val ratings = ratingsBytesJRDD.rdd.map(SerDe.unpackRating)
- ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+ ALS.trainImplicit(ratingsJRDD.rdd, rank, iterations, lambda, blocks, alpha)
}
/**
@@ -293,11 +288,11 @@ class PythonMLLibAPI extends Serializable {
* This stub returns a handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on exit;
* see the Py4J documentation.
- * @param dataBytesJRDD Training data
+ * @param data Training data
* @param categoricalFeaturesInfoJMap Categorical features info, as Java map
*/
def trainDecisionTreeModel(
- dataBytesJRDD: JavaRDD[Array[Byte]],
+ data: JavaRDD[LabeledPoint],
algoStr: String,
numClasses: Int,
categoricalFeaturesInfoJMap: java.util.Map[Int, Int],
@@ -307,8 +302,6 @@ class PythonMLLibAPI extends Serializable {
minInstancesPerNode: Int,
minInfoGain: Double): DecisionTreeModel = {
- val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
-
val algo = Algo.fromString(algoStr)
val impurity = Impurities.fromString(impurityStr)
@@ -322,44 +315,15 @@ class PythonMLLibAPI extends Serializable {
minInstancesPerNode = minInstancesPerNode,
minInfoGain = minInfoGain)
- DecisionTree.train(data, strategy)
- }
-
- /**
- * Predict the label of the given data point.
- * This is a Java stub for python DecisionTreeModel.predict()
- *
- * @param featuresBytes Serialized feature vector for data point
- * @return predicted label
- */
- def predictDecisionTreeModel(
- model: DecisionTreeModel,
- featuresBytes: Array[Byte]): Double = {
- val features: Vector = SerDe.deserializeDoubleVector(featuresBytes)
- model.predict(features)
- }
-
- /**
- * Predict the labels of the given data points.
- * This is a Java stub for python DecisionTreeModel.predict()
- *
- * @param dataJRDD A JavaRDD with serialized feature vectors
- * @return JavaRDD of serialized predictions
- */
- def predictDecisionTreeModel(
- model: DecisionTreeModel,
- dataJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
- val data = dataJRDD.rdd.map(xBytes => SerDe.deserializeDoubleVector(xBytes))
- model.predict(data).map(SerDe.serializeDouble)
+ DecisionTree.train(data.rdd, strategy)
}
/**
* Java stub for mllib Statistics.colStats(X: RDD[Vector]).
* TODO figure out return type.
*/
- def colStats(X: JavaRDD[Array[Byte]]): MultivariateStatisticalSummarySerialized = {
- val cStats = Statistics.colStats(X.rdd.map(SerDe.deserializeDoubleVector(_)))
- new MultivariateStatisticalSummarySerialized(cStats)
+ def colStats(rdd: JavaRDD[Vector]): MultivariateStatisticalSummary = {
+ Statistics.colStats(rdd.rdd)
}
/**
@@ -367,19 +331,15 @@ class PythonMLLibAPI extends Serializable {
* Returns the correlation matrix serialized into a byte array understood by deserializers in
* pyspark.
*/
- def corr(X: JavaRDD[Array[Byte]], method: String): Array[Byte] = {
- val inputMatrix = X.rdd.map(SerDe.deserializeDoubleVector(_))
- val result = Statistics.corr(inputMatrix, getCorrNameOrDefault(method))
- SerDe.serializeDoubleMatrix(SerDe.to2dArray(result))
+ def corr(x: JavaRDD[Vector], method: String): Matrix = {
+ Statistics.corr(x.rdd, getCorrNameOrDefault(method))
}
/**
* Java stub for mllib Statistics.corr(x: RDD[Double], y: RDD[Double], method: String).
*/
- def corr(x: JavaRDD[Array[Byte]], y: JavaRDD[Array[Byte]], method: String): Double = {
- val xDeser = x.rdd.map(SerDe.deserializeDouble(_))
- val yDeser = y.rdd.map(SerDe.deserializeDouble(_))
- Statistics.corr(xDeser, yDeser, getCorrNameOrDefault(method))
+ def corr(x: JavaRDD[Double], y: JavaRDD[Double], method: String): Double = {
+ Statistics.corr(x.rdd, y.rdd, getCorrNameOrDefault(method))
}
// used by the corr methods to retrieve the name of the correlation method passed in via pyspark
@@ -411,10 +371,10 @@ class PythonMLLibAPI extends Serializable {
def uniformRDD(jsc: JavaSparkContext,
size: Long,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.uniformRDD(jsc.sc, size, parts, s).map(SerDe.serializeDouble)
+ RG.uniformRDD(jsc.sc, size, parts, s)
}
/**
@@ -423,10 +383,10 @@ class PythonMLLibAPI extends Serializable {
def normalRDD(jsc: JavaSparkContext,
size: Long,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.normalRDD(jsc.sc, size, parts, s).map(SerDe.serializeDouble)
+ RG.normalRDD(jsc.sc, size, parts, s)
}
/**
@@ -436,10 +396,10 @@ class PythonMLLibAPI extends Serializable {
mean: Double,
size: Long,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Double] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.poissonRDD(jsc.sc, mean, size, parts, s).map(SerDe.serializeDouble)
+ RG.poissonRDD(jsc.sc, mean, size, parts, s)
}
/**
@@ -449,10 +409,10 @@ class PythonMLLibAPI extends Serializable {
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
+ RG.uniformVectorRDD(jsc.sc, numRows, numCols, parts, s)
}
/**
@@ -462,10 +422,10 @@ class PythonMLLibAPI extends Serializable {
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
+ RG.normalVectorRDD(jsc.sc, numRows, numCols, parts, s)
}
/**
@@ -476,259 +436,168 @@ class PythonMLLibAPI extends Serializable {
numRows: Long,
numCols: Int,
numPartitions: java.lang.Integer,
- seed: java.lang.Long): JavaRDD[Array[Byte]] = {
+ seed: java.lang.Long): JavaRDD[Vector] = {
val parts = getNumPartitionsOrDefault(numPartitions, jsc)
val s = getSeedOrDefault(seed)
- RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
+ RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
}
}
/**
- * :: DeveloperApi ::
- * MultivariateStatisticalSummary with Vector fields serialized.
+ * SerDe utility functions for PythonMLLibAPI.
*/
-@DeveloperApi
-class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
- extends Serializable {
+private[spark] object SerDe extends Serializable {
- def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
+ val PYSPARK_PACKAGE = "pyspark.mllib"
- def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
+ /**
+ * Base class used for pickle
+ */
+ private[python] abstract class BasePickler[T: ClassTag]
+ extends IObjectPickler with IObjectConstructor {
+
+ private val cls = implicitly[ClassTag[T]].runtimeClass
+ private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
+ private val name = cls.getSimpleName
+
+ // register this to Pickler and Unpickler
+ def register(): Unit = {
+ Pickler.registerCustomPickler(this.getClass, this)
+ Pickler.registerCustomPickler(cls, this)
+ Unpickler.registerConstructor(module, name, this)
+ }
- def count: Long = summary.count
+ def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
+ if (obj == this) {
+ out.write(Opcodes.GLOBAL)
+ out.write((module + "\n" + name + "\n").getBytes())
+ } else {
+ pickler.save(this) // it will be memorized by Pickler
+ saveState(obj, out, pickler)
+ out.write(Opcodes.REDUCE)
+ }
+ }
+
+ private[python] def saveObjects(out: OutputStream, pickler: Pickler, objects: Any*) = {
+ if (objects.length == 0 || objects.length > 3) {
+ out.write(Opcodes.MARK)
+ }
+ objects.foreach(pickler.save(_))
+ val code = objects.length match {
+ case 1 => Opcodes.TUPLE1
+ case 2 => Opcodes.TUPLE2
+ case 3 => Opcodes.TUPLE3
+ case _ => Opcodes.TUPLE
+ }
+ out.write(code)
+ }
- def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
+ private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
+ }
- def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
+ // Pickler for DenseVector
+ private[python] class DenseVectorPickler extends BasePickler[DenseVector] {
- def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
-}
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+ val vector: DenseVector = obj.asInstanceOf[DenseVector]
+ saveObjects(out, pickler, vector.toArray)
+ }
-/**
- * SerDe utility functions for PythonMLLibAPI.
- */
-private[spark] object SerDe extends Serializable {
- private val DENSE_VECTOR_MAGIC: Byte = 1
- private val SPARSE_VECTOR_MAGIC: Byte = 2
- private val DENSE_MATRIX_MAGIC: Byte = 3
- private val LABELED_POINT_MAGIC: Byte = 4
-
- private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
- require(bytes.length - offset >= 5, "Byte array too short")
- val magic = bytes(offset)
- if (magic == DENSE_VECTOR_MAGIC) {
- deserializeDenseVector(bytes, offset)
- } else if (magic == SPARSE_VECTOR_MAGIC) {
- deserializeSparseVector(bytes, offset)
- } else {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ def construct(args: Array[Object]): Object = {
+ require(args.length == 1)
+ if (args.length != 1) {
+ throw new PickleException("should be 1")
+ }
+ new DenseVector(args(0).asInstanceOf[Array[Double]])
}
}
- private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
- require(bytes.length - offset == 8, "Wrong size byte array for Double")
- val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
- bb.order(ByteOrder.nativeOrder())
- bb.getDouble
- }
-
- private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
- val packetLength = bytes.length - offset
- require(packetLength >= 5, "Byte array too short")
- val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.get()
- require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
- val length = bb.getInt()
- require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
- val db = bb.asDoubleBuffer()
- val ans = new Array[Double](length.toInt)
- db.get(ans)
- Vectors.dense(ans)
- }
-
- private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
- val packetLength = bytes.length - offset
- require(packetLength >= 9, "Byte array too short")
- val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.get()
- require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
- val size = bb.getInt()
- val nonZeros = bb.getInt()
- require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
- val ib = bb.asIntBuffer()
- val indices = new Array[Int](nonZeros)
- ib.get(indices)
- bb.position(bb.position() + 4 * nonZeros)
- val db = bb.asDoubleBuffer()
- val values = new Array[Double](nonZeros)
- db.get(values)
- Vectors.sparse(size, indices, values)
- }
+ // Pickler for DenseMatrix
+ private[python] class DenseMatrixPickler extends BasePickler[DenseMatrix] {
- /**
- * Returns an 8-byte array for the input Double.
- *
- * Note: we currently do not use a magic byte for double for storage efficiency.
- * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
- * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
- * serialization scheme changes.
- */
- private[python] def serializeDouble(double: Double): Array[Byte] = {
- val bytes = new Array[Byte](8)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.putDouble(double)
- bytes
- }
-
- private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
- val len = doubles.length
- val bytes = new Array[Byte](5 + 8 * len)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.put(DENSE_VECTOR_MAGIC)
- bb.putInt(len)
- val db = bb.asDoubleBuffer()
- db.put(doubles)
- bytes
- }
-
- private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
- val nonZeros = vector.indices.length
- val bytes = new Array[Byte](9 + 12 * nonZeros)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.put(SPARSE_VECTOR_MAGIC)
- bb.putInt(vector.size)
- bb.putInt(nonZeros)
- val ib = bb.asIntBuffer()
- ib.put(vector.indices)
- bb.position(bb.position() + 4 * nonZeros)
- val db = bb.asDoubleBuffer()
- db.put(vector.values)
- bytes
- }
-
- private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
- case s: SparseVector =>
- serializeSparseVector(s)
- case _ =>
- serializeDenseVector(vector.toArray)
- }
-
- private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
- val packetLength = bytes.length
- if (packetLength < 9) {
- throw new IllegalArgumentException("Byte array too short.")
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+ val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
+ saveObjects(out, pickler, m.numRows, m.numCols, m.values)
}
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.get()
- if (magic != DENSE_MATRIX_MAGIC) {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 3) {
+ throw new PickleException("should be 3")
+ }
+ new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
+ args(2).asInstanceOf[Array[Double]])
}
- val rows = bb.getInt()
- val cols = bb.getInt()
- if (packetLength != 9 + 8 * rows * cols) {
- throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
+ }
+
+ // Pickler for SparseVector
+ private[python] class SparseVectorPickler extends BasePickler[SparseVector] {
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+ val v: SparseVector = obj.asInstanceOf[SparseVector]
+ saveObjects(out, pickler, v.size, v.indices, v.values)
}
- val db = bb.asDoubleBuffer()
- val ans = new Array[Array[Double]](rows.toInt)
- for (i <- 0 until rows.toInt) {
- ans(i) = new Array[Double](cols.toInt)
- db.get(ans(i))
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 3) {
+ throw new PickleException("should be 3")
+ }
+ new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
+ args(2).asInstanceOf[Array[Double]])
}
- ans
}
- private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
- val rows = doubles.length
- var cols = 0
- if (rows > 0) {
- cols = doubles(0).length
+ // Pickler for LabeledPoint
+ private[python] class LabeledPointPickler extends BasePickler[LabeledPoint] {
+
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+ val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
+ saveObjects(out, pickler, point.label, point.features)
}
- val bytes = new Array[Byte](9 + 8 * rows * cols)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.put(DENSE_MATRIX_MAGIC)
- bb.putInt(rows)
- bb.putInt(cols)
- val db = bb.asDoubleBuffer()
- for (i <- 0 until rows) {
- db.put(doubles(i))
+
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 2) {
+ throw new PickleException("should be 2")
+ }
+ new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
}
- bytes
}
- private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
- val fb = serializeDoubleVector(p.features)
- val bytes = new Array[Byte](1 + 8 + fb.length)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.put(LABELED_POINT_MAGIC)
- bb.putDouble(p.label)
- bb.put(fb)
- bytes
- }
+ // Pickler for Rating
+ private[python] class RatingPickler extends BasePickler[Rating] {
- private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
- require(bytes.length >= 9, "Byte array too short")
- val magic = bytes(0)
- if (magic != LABELED_POINT_MAGIC) {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
+ val rating: Rating = obj.asInstanceOf[Rating]
+ saveObjects(out, pickler, rating.user, rating.product, rating.rating)
}
- val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
- labelBytes.order(ByteOrder.nativeOrder())
- val label = labelBytes.asDoubleBuffer().get(0)
- LabeledPoint(label, deserializeDoubleVector(bytes, 9))
- }
- // Reformat a Matrix into Array[Array[Double]] for serialization
- private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
- val values = matrix.toArray
- Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
+ def construct(args: Array[Object]): Object = {
+ if (args.length != 3) {
+ throw new PickleException("should be 3")
+ }
+ new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
+ args(2).asInstanceOf[Double])
+ }
}
+ def initialize(): Unit = {
+ new DenseVectorPickler().register()
+ new DenseMatrixPickler().register()
+ new SparseVectorPickler().register()
+ new LabeledPointPickler().register()
+ new RatingPickler().register()
+ }
- /** Unpack a Rating object from an array of bytes */
- private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
- val bb = ByteBuffer.wrap(ratingBytes)
- bb.order(ByteOrder.nativeOrder())
- val user = bb.getInt()
- val product = bb.getInt()
- val rating = bb.getDouble()
- new Rating(user, product, rating)
+ def dumps(obj: AnyRef): Array[Byte] = {
+ new Pickler().dumps(obj)
}
- /** Unpack a tuple of Ints from an array of bytes */
- def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
- val bb = ByteBuffer.wrap(tupleBytes)
- bb.order(ByteOrder.nativeOrder())
- val v1 = bb.getInt()
- val v2 = bb.getInt()
- (v1, v2)
+ def loads(bytes: Array[Byte]): AnyRef = {
+ new Unpickler().loads(bytes)
}
- /**
- * Serialize a Rating object into an array of bytes.
- * It can be deserialized using RatingDeserializer().
- *
- * @param rate the Rating object to serialize
- * @return
- */
- def serializeRating(rate: Rating): Array[Byte] = {
- val len = 3
- val bytes = new Array[Byte](4 + 8 * len)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.putInt(len)
- val db = bb.asDoubleBuffer()
- db.put(rate.user.toDouble)
- db.put(rate.product.toDouble)
- db.put(rate.rating)
- bytes
+ /* convert object into Tuple */
+ def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
+ rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 5711532abc..4e87fe088e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -17,12 +17,12 @@
package org.apache.spark.mllib.linalg
+import java.util.Arrays
+
import breeze.linalg.{Matrix => BM, DenseMatrix => BDM, CSCMatrix => BSM}
import org.apache.spark.util.random.XORShiftRandom
-import java.util.Arrays
-
/**
* Trait for a local matrix.
*/
@@ -106,6 +106,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
override def toArray: Array[Double] = values
+ override def equals(o: Any) = o match {
+ case m: DenseMatrix =>
+ m.numRows == numRows && m.numCols == numCols && Arrays.equals(toArray, m.toArray)
+ case _ => false
+ }
+
private[mllib] def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
private[mllib] def apply(i: Int): Double = values(i)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 478c648505..66b58ba770 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -106,19 +106,4 @@ class MatrixFactorizationModel private[mllib] (
}
scored.top(num)(Ordering.by(_._2))
}
-
- /**
- * :: DeveloperApi ::
- * Predict the rating of many users for many products.
- * This is a Java stub for python predictAll()
- *
- * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
- * @return JavaRDD of serialized Rating objects.
- */
- @DeveloperApi
- def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
- val usersProducts = usersProductsJRDD.rdd.map(xBytes => SerDe.unpackTuple(xBytes))
- predict(usersProducts).map(rate => SerDe.serializeRating(rate))
- }
-
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
index 092d67bbc5..db8ed62fa4 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.mllib.api.python
import org.scalatest.FunSuite
-import org.apache.spark.mllib.linalg.{Matrices, Vectors}
+import org.apache.spark.mllib.linalg.{DenseMatrix, Matrices, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.recommendation.Rating
class PythonMLLibAPISuite extends FunSuite {
- test("vector serialization") {
+ SerDe.initialize()
+
+ test("pickle vector") {
val vectors = Seq(
Vectors.dense(Array.empty[Double]),
Vectors.dense(0.0),
@@ -33,14 +36,13 @@ class PythonMLLibAPISuite extends FunSuite {
Vectors.sparse(1, Array.empty[Int], Array.empty[Double]),
Vectors.sparse(2, Array(1), Array(-2.0)))
vectors.foreach { v =>
- val bytes = SerDe.serializeDoubleVector(v)
- val u = SerDe.deserializeDoubleVector(bytes)
+ val u = SerDe.loads(SerDe.dumps(v))
assert(u.getClass === v.getClass)
assert(u === v)
}
}
- test("labeled point serialization") {
+ test("pickle labeled point") {
val points = Seq(
LabeledPoint(0.0, Vectors.dense(Array.empty[Double])),
LabeledPoint(1.0, Vectors.dense(0.0)),
@@ -49,34 +51,44 @@ class PythonMLLibAPISuite extends FunSuite {
LabeledPoint(1.0, Vectors.sparse(1, Array.empty[Int], Array.empty[Double])),
LabeledPoint(-0.5, Vectors.sparse(2, Array(1), Array(-2.0))))
points.foreach { p =>
- val bytes = SerDe.serializeLabeledPoint(p)
- val q = SerDe.deserializeLabeledPoint(bytes)
+ val q = SerDe.loads(SerDe.dumps(p)).asInstanceOf[LabeledPoint]
assert(q.label === p.label)
assert(q.features.getClass === p.features.getClass)
assert(q.features === p.features)
}
}
- test("double serialization") {
+ test("pickle double") {
for (x <- List(123.0, -10.0, 0.0, Double.MaxValue, Double.MinValue, Double.NaN)) {
- val bytes = SerDe.serializeDouble(x)
- val deser = SerDe.deserializeDouble(bytes)
+ val deser = SerDe.loads(SerDe.dumps(x.asInstanceOf[AnyRef])).asInstanceOf[Double]
// We use `equals` here for comparison because we cannot use `==` for NaN
assert(x.equals(deser))
}
}
- test("matrix to 2D array") {
+ test("pickle matrix") {
val values = Array[Double](0, 1.2, 3, 4.56, 7, 8)
val matrix = Matrices.dense(2, 3, values)
- val arr = SerDe.to2dArray(matrix)
- val expected = Array(Array[Double](0, 3, 7), Array[Double](1.2, 4.56, 8))
- assert(arr === expected)
+ val nm = SerDe.loads(SerDe.dumps(matrix)).asInstanceOf[DenseMatrix]
+ assert(matrix === nm)
// Test conversion for empty matrix
val empty = Array[Double]()
val emptyMatrix = Matrices.dense(0, 0, empty)
- val empty2D = SerDe.to2dArray(emptyMatrix)
- assert(empty2D === Array[Array[Double]]())
+ val ne = SerDe.loads(SerDe.dumps(emptyMatrix)).asInstanceOf[DenseMatrix]
+ assert(emptyMatrix == ne)
+ }
+
+ test("pickle rating") {
+ val rat = new Rating(1, 2, 3.0)
+ val rat2 = SerDe.loads(SerDe.dumps(rat)).asInstanceOf[Rating]
+ assert(rat == rat2)
+
+ // Test name of class only occur once
+ val rats = (1 to 10).map(x => new Rating(x, x + 1, x + 3.0)).toArray
+ val bytes = SerDe.dumps(rats)
+ assert(bytes.toString.split("Rating").length == 1)
+ assert(bytes.length / 10 < 25) // 25 bytes per rating
+
}
}
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 51c0faf359..8593e08ded 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -34,5 +34,5 @@ private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
- pyspark.rddsampler pyspark.daemon pyspark.mllib._common
+ pyspark.rddsampler pyspark.daemon
pyspark.mllib.tests pyspark.shuffle
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a17f2c1203..064a24bff5 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -211,6 +211,7 @@ class SparkContext(object):
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()
+ SparkContext._jvm.SerDe.initialize()
if instance:
if (SparkContext._active_spark_context and
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
deleted file mode 100644
index 68f6033616..0000000000
--- a/python/pyspark/mllib/_common.py
+++ /dev/null
@@ -1,562 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import struct
-import sys
-import numpy
-from numpy import ndarray, float64, int64, int32, array_equal, array
-from pyspark import SparkContext, RDD
-from pyspark.mllib.linalg import SparseVector
-from pyspark.serializers import FramedSerializer
-
-
-"""
-Common utilities shared throughout MLlib, primarily for dealing with
-different data types. These include:
-- Serialization utilities to / from byte arrays that Java can handle
-- Serializers for other data types, like ALS Rating objects
-- Common methods for linear models
-- Methods to deal with the different vector types we support, such as
- SparseVector and scipy.sparse matrices.
-"""
-
-
-# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
-# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
-
-_have_scipy = False
-_scipy_issparse = None
-try:
- import scipy.sparse
- _have_scipy = True
- _scipy_issparse = scipy.sparse.issparse
-except:
- # No SciPy in environment, but that's okay
- pass
-
-
-# Serialization functions to and from Scala. These use the following formats, understood
-# by the PythonMLLibAPI class in Scala:
-#
-# Dense double vector format:
-#
-# [1-byte 1] [4-byte length] [length*8 bytes of data]
-#
-# Sparse double vector format:
-#
-# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \
-# [nonzeros*8 bytes of values]
-#
-# Double matrix format:
-#
-# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
-#
-# LabeledPoint format:
-#
-# [1-byte 4] [8-byte label] [dense or sparse vector]
-#
-# This is all in machine-endian. That means that the Java interpreter and the
-# Python interpreter must agree on what endian the machine is.
-
-
-DENSE_VECTOR_MAGIC = 1
-SPARSE_VECTOR_MAGIC = 2
-DENSE_MATRIX_MAGIC = 3
-LABELED_POINT_MAGIC = 4
-
-
-# Workaround for SPARK-2954: before Python 2.7, struct.unpack couldn't unpack bytearray()s.
-if sys.version_info[:2] <= (2, 6):
- def _unpack(fmt, string):
- return struct.unpack(fmt, buffer(string))
-else:
- _unpack = struct.unpack
-
-
-def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
- """
- Deserialize a numpy array of the given type from an offset in
- bytearray ba, assigning it the given shape.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
- True
- >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
- True
- >>> x = array([1, 2, 3], dtype=int32)
- >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
- True
- """
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
- return ar.copy()
-
-
-def _serialize_double(d):
- """
- Serialize a double (float or numpy.float64) into a mutually understood format.
- """
- if type(d) == float or type(d) == float64 or type(d) == int or type(d) == long:
- d = float64(d)
- ba = bytearray(8)
- _copyto(d, buffer=ba, offset=0, shape=[1], dtype=float64)
- return ba
- else:
- raise TypeError("_serialize_double called on non-float input")
-
-
-def _serialize_double_vector(v):
- """
- Serialize a double vector into a mutually understood format.
-
- Note: we currently do not use a magic byte for double for storage
- efficiency. This should be reconsidered when we add Ser/De for other
- 8-byte types (e.g. Long), for safety. The corresponding deserializer,
- _deserialize_double, needs to be modified as well if the serialization
- scheme changes.
-
- >>> x = array([1,2,3])
- >>> y = _deserialize_double_vector(_serialize_double_vector(x))
- >>> array_equal(y, array([1.0, 2.0, 3.0]))
- True
- """
- v = _convert_vector(v)
- if type(v) == ndarray:
- return _serialize_dense_vector(v)
- elif type(v) == SparseVector:
- return _serialize_sparse_vector(v)
- else:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray or SparseVector" % type(v))
-
-
-def _serialize_dense_vector(v):
- """Serialize a dense vector given as a NumPy array."""
- if v.ndim != 1:
- raise TypeError("_serialize_double_vector called on a %ddarray; "
- "wanted a 1darray" % v.ndim)
- if v.dtype != float64:
- if numpy.issubdtype(v.dtype, numpy.complex):
- raise TypeError("_serialize_double_vector called on an ndarray of %s; "
- "wanted ndarray of float64" % v.dtype)
- v = v.astype(float64)
- length = v.shape[0]
- ba = bytearray(5 + 8 * length)
- ba[0] = DENSE_VECTOR_MAGIC
- length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
- length_bytes[0] = length
- _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
- return ba
-
-
-def _serialize_sparse_vector(v):
- """Serialize a pyspark.mllib.linalg.SparseVector."""
- nonzeros = len(v.indices)
- ba = bytearray(9 + 12 * nonzeros)
- ba[0] = SPARSE_VECTOR_MAGIC
- header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
- header[0] = v.size
- header[1] = nonzeros
- _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
- values_offset = 9 + 4 * nonzeros
- _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
- return ba
-
-
-def _deserialize_double(ba, offset=0):
- """Deserialize a double from a mutually understood format.
-
- >>> import sys
- >>> _deserialize_double(_serialize_double(123.0)) == 123.0
- True
- >>> _deserialize_double(_serialize_double(float64(0.0))) == 0.0
- True
- >>> _deserialize_double(_serialize_double(1)) == 1.0
- True
- >>> _deserialize_double(_serialize_double(1L)) == 1.0
- True
- >>> x = sys.float_info.max
- >>> _deserialize_double(_serialize_double(sys.float_info.max)) == x
- True
- >>> y = float64(sys.float_info.max)
- >>> _deserialize_double(_serialize_double(sys.float_info.max)) == y
- True
- """
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double called on a %s; wanted bytearray" % type(ba))
- if len(ba) - offset != 8:
- raise TypeError("_deserialize_double called on a %d-byte array; wanted 8 bytes." % nb)
- return _unpack("d", ba[offset:])[0]
-
-
-def _deserialize_double_vector(ba, offset=0):
- """Deserialize a double vector from a mutually understood format.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
- >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
- True
- >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
- >>> s == _deserialize_double_vector(_serialize_double_vector(s))
- True
- """
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_vector called on a %s; "
- "wanted bytearray" % type(ba))
- nb = len(ba) - offset
- if nb < 5:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is too short" % nb)
- if ba[offset] == DENSE_VECTOR_MAGIC:
- return _deserialize_dense_vector(ba, offset)
- elif ba[offset] == SPARSE_VECTOR_MAGIC:
- return _deserialize_sparse_vector(ba, offset)
- else:
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong magic")
-
-
-def _deserialize_dense_vector(ba, offset=0):
- """Deserialize a dense vector into a numpy array."""
- nb = len(ba) - offset
- if nb < 5:
- raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
- "which is too short" % nb)
- length = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=int32)[0]
- if nb < 8 * length + 5:
- raise TypeError("_deserialize_dense_vector called on bytearray "
- "with wrong length")
- return _deserialize_numpy_array([length], ba, offset + 5)
-
-
-def _deserialize_sparse_vector(ba, offset=0):
- """Deserialize a sparse vector into a MLlib SparseVector object."""
- nb = len(ba) - offset
- if nb < 9:
- raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
- "which is too short" % nb)
- header = ndarray(shape=[2], buffer=ba, offset=offset + 1, dtype=int32)
- size = header[0]
- nonzeros = header[1]
- if nb < 9 + 12 * nonzeros:
- raise TypeError("_deserialize_sparse_vector called on bytearray "
- "with wrong length")
- indices = _deserialize_numpy_array([nonzeros], ba, offset + 9, dtype=int32)
- values = _deserialize_numpy_array([nonzeros], ba, offset + 9 + 4 * nonzeros, dtype=float64)
- return SparseVector(int(size), indices, values)
-
-
-def _serialize_double_matrix(m):
- """Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.ndim == 2):
- if m.dtype != float64:
- if numpy.issubdtype(m.dtype, numpy.complex):
- raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
- "wanted ndarray of float64" % m.dtype)
- m = m.astype(float64)
- rows = m.shape[0]
- cols = m.shape[1]
- ba = bytearray(9 + 8 * rows * cols)
- ba[0] = DENSE_MATRIX_MAGIC
- lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
- lengths[0] = rows
- lengths[1] = cols
- _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
- return ba
- else:
- raise TypeError("_serialize_double_matrix called on a "
- "non-double-matrix")
-
-
-def _deserialize_double_matrix(ba):
- """Deserialize a double matrix from a mutually understood format."""
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_matrix called on a %s; "
- "wanted bytearray" % type(ba))
- if len(ba) < 9:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is too short" % len(ba))
- if ba[0] != DENSE_MATRIX_MAGIC:
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong magic")
- lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
- rows = lengths[0]
- cols = lengths[1]
- if (len(ba) != 8 * rows * cols + 9):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong length")
- return _deserialize_numpy_array([rows, cols], ba, 9)
-
-
-def _serialize_labeled_point(p):
- """
- Serialize a LabeledPoint with a features vector of any type.
-
- >>> from pyspark.mllib.regression import LabeledPoint
- >>> dp0 = LabeledPoint(0.5, array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0]))
- >>> dp1 = _deserialize_labeled_point(_serialize_labeled_point(dp0))
- >>> dp1.label == dp0.label
- True
- >>> array_equal(dp1.features, dp0.features)
- True
- >>> sp0 = LabeledPoint(0.0, SparseVector(4, [1, 3], [3.0, 5.5]))
- >>> sp1 = _deserialize_labeled_point(_serialize_labeled_point(sp0))
- >>> sp1.label == sp1.label
- True
- >>> sp1.features == sp0.features
- True
- """
- from pyspark.mllib.regression import LabeledPoint
- serialized_features = _serialize_double_vector(p.features)
- header = bytearray(9)
- header[0] = LABELED_POINT_MAGIC
- header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
- header_float[0] = p.label
- return header + serialized_features
-
-
-def _deserialize_labeled_point(ba, offset=0):
- """Deserialize a LabeledPoint from a mutually understood format."""
- from pyspark.mllib.regression import LabeledPoint
- if type(ba) != bytearray:
- raise TypeError("Expecting a bytearray but got %s" % type(ba))
- if ba[offset] != LABELED_POINT_MAGIC:
- raise TypeError("Expecting magic number %d but got %d" % (LABELED_POINT_MAGIC, ba[0]))
- label = ndarray(shape=[1], buffer=ba, offset=offset + 1, dtype=float64)[0]
- features = _deserialize_double_vector(ba, offset + 9)
- return LabeledPoint(label, features)
-
-
-def _copyto(array, buffer, offset, shape, dtype):
- """
- Copy the contents of a vector to a destination bytearray at the
- given offset.
-
- TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
- we should benchmark that to see whether it provides a benefit.
- """
- temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
- temp_array[...] = array
-
-
-def _get_unmangled_rdd(data, serializer, cache=True):
- """
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- dataBytes = data.map(serializer)
- dataBytes._bypass_serializer = True
- if cache:
- dataBytes.cache()
- return dataBytes
-
-
-def _get_unmangled_double_vector_rdd(data, cache=True):
- """
- Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
- _serialized_double_vectors.
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- return _get_unmangled_rdd(data, _serialize_double_vector, cache)
-
-
-def _get_unmangled_labeled_point_rdd(data, cache=True):
- """
- Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points.
- :param cache: If True, the serialized RDD is cached. (default = True)
- WARNING: Users should unpersist() this later!
- """
- return _get_unmangled_rdd(data, _serialize_labeled_point, cache)
-
-
-# Common functions for dealing with and training linear models
-
-def _linear_predictor_typecheck(x, coeffs):
- """
- Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until we actually implement bulk predict.
- """
- x = _convert_vector(x)
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape != coeffs.shape:
- raise RuntimeError("Got array of %d elements; wanted %d" % (
- numpy.shape(x)[0], coeffs.shape[0]))
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif type(x) == SparseVector:
- if x.size != coeffs.shape[0]:
- raise RuntimeError("Got sparse vector of size %d; wanted %d" % (
- x.size, coeffs.shape[0]))
- elif isinstance(x, RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
-
-
-# If we weren't given initial weights, take a zero vector of the appropriate
-# length.
-def _get_initial_weights(initial_weights, data):
- if initial_weights is None:
- initial_weights = _convert_vector(data.first().features)
- if type(initial_weights) == ndarray:
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = numpy.zeros([initial_weights.shape[0]])
- elif type(initial_weights) == SparseVector:
- initial_weights = numpy.zeros([initial_weights.size])
- return initial_weights
-
-
-# train_func should take two parameters, namely data and initial_weights, and
-# return the result of a call to the appropriate JVM stub.
-# _regression_train_wrapper is responsible for setup and error checking.
-def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
- initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
- if len(ans) != 2:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]).__name__ + " which is not bytearray")
- elif type(ans[1]) != float:
- raise RuntimeError("JVM call result had second element of type "
- + type(ans[0]).__name__ + " which is not float")
- return klass(_deserialize_double_vector(ans[0]), ans[1])
-
-
-# Functions for serializing ALS Rating objects and tuples
-
-def _serialize_rating(r):
- ba = bytearray(16)
- intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
- doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
- intpart[0], intpart[1], doublepart[0] = r
- return ba
-
-
-class RatingDeserializer(FramedSerializer):
-
- def loads(self, string):
- res = ndarray(shape=(3, ), buffer=string, dtype=float64, offset=4)
- return int(res[0]), int(res[1]), res[2]
-
- def load_stream(self, stream):
- while True:
- try:
- yield self._read_with_length(stream)
- except struct.error:
- return
- except EOFError:
- return
-
-
-def _serialize_tuple(t):
- ba = bytearray(8)
- intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
- intpart[0], intpart[1] = t
- return ba
-
-
-# Vector math functions that support all of our vector types
-
-def _convert_vector(vec):
- """
- Convert a vector to a format we support internally. This does
- the following:
-
- * For dense NumPy vectors (ndarray), returns them as is
- * For our SparseVector class, returns that as is
- * For Python lists, converts them to NumPy vectors
- * For scipy.sparse.*_matrix column vectors, converts them to
- our own SparseVector type.
-
- This should be called before passing any data to our algorithms
- or attempting to serialize it to Java.
- """
- if type(vec) == ndarray or type(vec) == SparseVector:
- return vec
- elif type(vec) == list:
- return array(vec, dtype=float64)
- elif _have_scipy:
- if _scipy_issparse(vec):
- assert vec.shape[1] == 1, "Expected column vector"
- csc = vec.tocsc()
- return SparseVector(vec.shape[0], csc.indices, csc.data)
- raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
-
-
-def _squared_distance(v1, v2):
- """
- Squared distance of two NumPy or sparse vectors.
-
- >>> dense1 = array([1., 2.])
- >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
- >>> dense2 = array([2., 1.])
- >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
- >>> _squared_distance(dense1, dense2)
- 2.0
- >>> _squared_distance(dense1, sparse2)
- 2.0
- >>> _squared_distance(sparse1, dense2)
- 2.0
- >>> _squared_distance(sparse1, sparse2)
- 2.0
- """
- v1 = _convert_vector(v1)
- v2 = _convert_vector(v2)
- if type(v1) == ndarray and type(v2) == ndarray:
- diff = v1 - v2
- return numpy.dot(diff, diff)
- elif type(v1) == ndarray:
- return v2.squared_distance(v1)
- else:
- return v1.squared_distance(v2)
-
-
-def _dot(vec, target):
- """
- Compute the dot product of a vector of the types we support
- (Numpy array, list, SparseVector, or SciPy sparse) and a target
- NumPy array that is either 1- or 2-dimensional. Equivalent to
- calling numpy.dot of the two vectors, but for SciPy ones, we
- have to transpose them because they're column vectors.
- """
- if type(vec) == ndarray:
- return numpy.dot(vec, target)
- elif type(vec) == SparseVector:
- return vec.dot(target)
- elif type(vec) == list:
- return numpy.dot(_convert_vector(vec), target)
- else:
- return vec.transpose().dot(target)[0]
-
-
-def _test():
- import doctest
- globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
- globs['sc'].stop()
- if failure_count:
- exit(-1)
-
-
-if __name__ == "__main__":
- _test()
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 71ab46b61d..ac142fb49a 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -15,19 +15,14 @@
# limitations under the License.
#
+from math import exp
+
import numpy
+from numpy import array
-from numpy import array, shape
-from pyspark import SparkContext
-from pyspark.mllib._common import \
- _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
-from pyspark.mllib.linalg import SparseVector
-from pyspark.mllib.regression import LabeledPoint, LinearModel
-from math import exp, log
+from pyspark import SparkContext, PickleSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
__all__ = ['LogisticRegressionModel', 'LogisticRegressionWithSGD', 'SVMModel',
@@ -67,8 +62,7 @@ class LogisticRegressionModel(LinearModel):
"""
def predict(self, x):
- _linear_predictor_typecheck(x, self._coeff)
- margin = _dot(x, self._coeff) + self._intercept
+ margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
else:
@@ -81,7 +75,7 @@ class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
- initialWeights=None, regParam=1.0, regType=None, intercept=False):
+ initialWeights=None, regParam=1.0, regType="none", intercept=False):
"""
Train a logistic regression model on the given data.
@@ -106,11 +100,12 @@ class LogisticRegressionWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
- d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
- return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
+
+ def train(jdata, i):
+ return sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
+ jdata, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, LogisticRegressionModel, data,
initialWeights)
@@ -141,8 +136,7 @@ class SVMModel(LinearModel):
"""
def predict(self, x):
- _linear_predictor_typecheck(x, self._coeff)
- margin = _dot(x, self._coeff) + self._intercept
+ margin = self.weights.dot(x) + self.intercept
return 1 if margin >= 0 else 0
@@ -150,7 +144,7 @@ class SVMWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
- miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False):
+ miniBatchFraction=1.0, initialWeights=None, regType="none", intercept=False):
"""
Train a support vector machine on the given data.
@@ -175,11 +169,12 @@ class SVMWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
- return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, SVMModel, data, initialWeights)
class NaiveBayesModel(object):
@@ -220,7 +215,8 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]
+ x = _convert_to_vector(x)
+ return self.labels[numpy.argmax(self.pi + x.dot(self.theta.transpose()))]
class NaiveBayes(object):
@@ -242,12 +238,9 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
- return NaiveBayesModel(
- _deserialize_double_vector(ans[0]),
- _deserialize_double_vector(ans[1]),
- _deserialize_double_matrix(ans[2]))
+ jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+ labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
+ return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def _test():
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index f3e952a1d8..12c5602271 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -15,15 +15,9 @@
# limitations under the License.
#
-from numpy import array, dot
-from math import sqrt
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
__all__ = ['KMeansModel', 'KMeans']
@@ -32,6 +26,7 @@ class KMeansModel(object):
"""A clustering model derived from the k-means method.
+ >>> from numpy import array
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
@@ -71,8 +66,9 @@ class KMeansModel(object):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = float("inf")
- for i in range(0, len(self.centers)):
- distance = _squared_distance(x, self.centers[i])
+ x = _convert_to_vector(x)
+ for i in xrange(len(self.centers)):
+ distance = x.squared_distance(self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
@@ -82,19 +78,17 @@ class KMeansModel(object):
class KMeans(object):
@classmethod
- def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"):
+ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
- sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(
- dataBytes._jrdd, k, maxIterations, runs, initializationMode)
- if len(ans) != 1:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
- matrix = _deserialize_double_matrix(ans[0])
- return KMeansModel([row for row in matrix])
+ sc = rdd.context
+ ser = PickleSerializer()
+ # cache serialized data to avoid objects over head in JVM
+ cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
+ model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
+ cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+ bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
+ centers = ser.loads(str(bytes))
+ return KMeansModel([c.toArray() for c in centers])
def _test():
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index e69051c104..0a5dcaac55 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -23,14 +23,148 @@ object from MLlib or pass SciPy C{scipy.sparse} column vectors if
SciPy is available in their environment.
"""
-import numpy
-from numpy import array, array_equal, ndarray, float64, int32
+import sys
+import array
+import copy_reg
+import numpy as np
-__all__ = ['SparseVector', 'Vectors']
+__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
-class SparseVector(object):
+if sys.version_info[:2] == (2, 7):
+ # speed up pickling array in Python 2.7
+ def fast_pickle_array(ar):
+ return array.array, (ar.typecode, ar.tostring())
+ copy_reg.pickle(array.array, fast_pickle_array)
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy in environment, but that's okay
+ _have_scipy = False
+
+
+def _convert_to_vector(l):
+ if isinstance(l, Vector):
+ return l
+ elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+ return DenseVector(l)
+ elif _have_scipy and scipy.sparse.issparse(l):
+ assert l.shape[1] == 1, "Expected column vector"
+ csc = l.tocsc()
+ return SparseVector(l.shape[0], csc.indices, csc.data)
+ else:
+ raise TypeError("Cannot convert type %s into Vector" % type(l))
+
+
+class Vector(object):
+ """
+ Abstract class for DenseVector and SparseVector
+ """
+ def toArray(self):
+ """
+ Convert the vector into an numpy.ndarray
+ :return: numpy.ndarray
+ """
+ raise NotImplementedError
+
+
+class DenseVector(Vector):
+ def __init__(self, ar):
+ if not isinstance(ar, array.array):
+ ar = array.array('d', ar)
+ self.array = ar
+
+ def __reduce__(self):
+ return DenseVector, (self.array,)
+
+ def dot(self, other):
+ """
+ Compute the dot product of two Vectors. We support
+ (Numpy array, list, SparseVector, or SciPy sparse)
+ and a target NumPy array that is either 1- or 2-dimensional.
+ Equivalent to calling numpy.dot of the two vectors.
+
+ >>> dense = DenseVector(array.array('d', [1., 2.]))
+ >>> dense.dot(dense)
+ 5.0
+ >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
+ 4.0
+ >>> dense.dot(range(1, 3))
+ 5.0
+ >>> dense.dot(np.array(range(1, 3)))
+ 5.0
+ """
+ if isinstance(other, SparseVector):
+ return other.dot(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return other.transpose().dot(self.toArray())[0]
+ elif isinstance(other, Vector):
+ return np.dot(self.toArray(), other.toArray())
+ else:
+ return np.dot(self.toArray(), other)
+
+ def squared_distance(self, other):
+ """
+ Squared distance of two Vectors.
+
+ >>> dense1 = DenseVector(array.array('d', [1., 2.]))
+ >>> dense1.squared_distance(dense1)
+ 0.0
+ >>> dense2 = np.array([2., 1.])
+ >>> dense1.squared_distance(dense2)
+ 2.0
+ >>> dense3 = [2., 1.]
+ >>> dense1.squared_distance(dense3)
+ 2.0
+ >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
+ >>> dense1.squared_distance(sparse1)
+ 2.0
+ """
+ if isinstance(other, SparseVector):
+ return other.squared_distance(self)
+ elif _have_scipy and scipy.sparse.issparse(other):
+ return _convert_to_vector(other).squared_distance(self)
+
+ if isinstance(other, Vector):
+ other = other.toArray()
+ elif not isinstance(other, np.ndarray):
+ other = np.array(other)
+ diff = self.toArray() - other
+ return np.dot(diff, diff)
+
+ def toArray(self):
+ return np.array(self.array)
+
+ def __getitem__(self, item):
+ return self.array[item]
+
+ def __len__(self):
+ return len(self.array)
+
+ def __str__(self):
+ return "[" + ",".join([str(v) for v in self.array]) + "]"
+
+ def __repr__(self):
+ return "DenseVector(%r)" % self.array
+
+ def __eq__(self, other):
+ return isinstance(other, DenseVector) and self.array == other.array
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __getattr__(self, item):
+ return getattr(self.array, item)
+
+
+class SparseVector(Vector):
"""
A simple sparse vector class for passing data to MLlib. Users may
@@ -61,16 +195,19 @@ class SparseVector(object):
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
- self.indices = array([p[0] for p in pairs], dtype=int32)
- self.values = array([p[1] for p in pairs], dtype=float64)
+ self.indices = array.array('i', [p[0] for p in pairs])
+ self.values = array.array('d', [p[1] for p in pairs])
else:
assert len(args[0]) == len(args[1]), "index and value arrays not same length"
- self.indices = array(args[0], dtype=int32)
- self.values = array(args[1], dtype=float64)
+ self.indices = array.array('i', args[0])
+ self.values = array.array('d', args[1])
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")
+ def __reduce__(self):
+ return (SparseVector, (self.size, self.indices, self.values))
+
def dot(self, other):
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
@@ -78,15 +215,15 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
- >>> a.dot(array([1., 2., 3., 4.]))
+ >>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
- >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ >>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
- if type(other) == ndarray:
+ if type(other) == np.ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
@@ -94,10 +231,17 @@ class SparseVector(object):
return result
elif other.ndim == 2:
results = [self.dot(other[:, i]) for i in xrange(other.shape[1])]
- return array(results)
+ return np.array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
- else:
+
+ elif type(other) in (array.array, DenseVector):
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -110,6 +254,8 @@ class SparseVector(object):
else:
j += 1
return result
+ else:
+ return self.dot(_convert_to_vector(other))
def squared_distance(self, other):
"""
@@ -118,7 +264,9 @@ class SparseVector(object):
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
- >>> a.squared_distance(array([1., 2., 3., 4.]))
+ >>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
+ 11.0
+ >>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
@@ -126,22 +274,22 @@ class SparseVector(object):
>>> b.squared_distance(a)
30.0
"""
- if type(other) == ndarray:
- if other.ndim == 1:
- result = 0.0
- j = 0 # index into our own array
- for i in xrange(other.shape[0]):
- if j < len(self.indices) and self.indices[j] == i:
- diff = self.values[j] - other[i]
- result += diff * diff
- j += 1
- else:
- result += other[i] * other[i]
- return result
- else:
+ if type(other) in (list, array.array, DenseVector, np.array, np.ndarray):
+ if type(other) is np.array and other.ndim != 1:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
- else:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(len(other)):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+
+ elif type(other) is SparseVector:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
@@ -163,16 +311,21 @@ class SparseVector(object):
result += other.values[j] * other.values[j]
j += 1
return result
+ else:
+ return self.squared_distance(_convert_to_vector(other))
def toArray(self):
"""
Returns a copy of this SparseVector as a 1-dimensional NumPy array.
"""
- arr = numpy.zeros(self.size)
+ arr = np.zeros((self.size,), dtype=np.float64)
for i in xrange(self.indices.size):
arr[self.indices[i]] = self.values[i]
return arr
+ def __len__(self):
+ return self.size
+
def __str__(self):
inds = "[" + ",".join([str(i) for i in self.indices]) + "]"
vals = "[" + ",".join([str(v) for v in self.values]) + "]"
@@ -198,8 +351,8 @@ class SparseVector(object):
return (isinstance(other, self.__class__)
and other.size == self.size
- and array_equal(other.indices, self.indices)
- and array_equal(other.values, self.values))
+ and other.indices == self.indices
+ and other.values == self.values)
def __ne__(self, other):
return not self.__eq__(other)
@@ -242,9 +395,9 @@ class Vectors(object):
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
- array([ 1., 2., 3.])
+ DenseVector(array('d', [1.0, 2.0, 3.0]))
"""
- return array(elements, dtype=float64)
+ return DenseVector(elements)
@staticmethod
def stringify(vector):
@@ -257,10 +410,39 @@ class Vectors(object):
>>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
'[0.0,1.0]'
"""
- if type(vector) == SparseVector:
- return str(vector)
- else:
- return "[" + ",".join([str(v) for v in vector]) + "]"
+ return str(vector)
+
+
+class Matrix(object):
+ """ the Matrix """
+ def __init__(self, nRow, nCol):
+ self.nRow = nRow
+ self.nCol = nCol
+
+ def toArray(self):
+ raise NotImplementedError
+
+
+class DenseMatrix(Matrix):
+ def __init__(self, nRow, nCol, values):
+ Matrix.__init__(self, nRow, nCol)
+ assert len(values) == nRow * nCol
+ self.values = values
+
+ def __reduce__(self):
+ return DenseMatrix, (self.nRow, self.nCol, self.values)
+
+ def toArray(self):
+ """
+ Return an numpy.ndarray
+
+ >>> arr = array.array('d', [float(i) for i in range(4)])
+ >>> m = DenseMatrix(2, 2, arr)
+ >>> m.toArray()
+ array([[ 0., 1.],
+ [ 2., 3.]])
+ """
+ return np.ndarray((self.nRow, self.nCol), np.float64, buffer=self.values.tostring())
def _test():
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index d53c95fd59..a787e4dea2 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -19,15 +19,32 @@
Python package for random data generation.
"""
+from functools import wraps
from pyspark.rdd import RDD
-from pyspark.mllib._common import _deserialize_double, _deserialize_double_vector
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
__all__ = ['RandomRDDs', ]
+def serialize(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ jrdd = f(sc, *a, **kw)
+ return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+ BatchedSerializer(PickleSerializer(), 1024))
+ return func
+
+
+def toArray(f):
+ @wraps(f)
+ def func(sc, *a, **kw):
+ rdd = f(sc, *a, **kw)
+ return rdd.map(lambda vec: vec.toArray())
+ return func
+
+
class RandomRDDs(object):
"""
Generator methods for creating RDDs comprised of i.i.d samples from
@@ -35,6 +52,7 @@ class RandomRDDs(object):
"""
@staticmethod
+ @serialize
def uniformRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the
@@ -56,11 +74,10 @@ class RandomRDDs(object):
>>> parts == sc.defaultParallelism
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().uniformRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def normalRDD(sc, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the standard normal
@@ -80,11 +97,10 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().normalRDD(sc._jsc, size, numPartitions, seed)
@staticmethod
+ @serialize
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
"""
Generates an RDD comprised of i.i.d. samples from the Poisson
@@ -101,11 +117,11 @@ class RandomRDDs(object):
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ return sc._jvm.PythonMLLibAPI().poissonRDD(sc._jsc, mean, size, numPartitions, seed)
@staticmethod
+ @toArray
+ @serialize
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -120,12 +136,12 @@ class RandomRDDs(object):
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
4
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.uniformVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- uniform = RDD(jrdd, sc, NoOpSerializer())
- return uniform.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -140,12 +156,12 @@ class RandomRDDs(object):
>>> abs(mat.std() - 1.0) < 0.1
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.normalVectorRDD(sc._jsc, numRows, numCols, numPartitions, seed)
- normal = RDD(jrdd, sc, NoOpSerializer())
- return normal.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
@staticmethod
+ @toArray
+ @serialize
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
"""
Generates an RDD comprised of vectors containing i.i.d. samples drawn
@@ -163,10 +179,8 @@ class RandomRDDs(object):
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
"""
- jrdd = sc._jvm.PythonMLLibAPI() \
+ return sc._jvm.PythonMLLibAPI() \
.poissonVectorRDD(sc._jsc, mean, numRows, numCols, numPartitions, seed)
- poisson = RDD(jrdd, sc, NoOpSerializer())
- return poisson.map(lambda bytes: _deserialize_double_vector(bytearray(bytes)))
def _test():
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 2df23394da..59c1c5ff0c 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -16,17 +16,25 @@
#
from pyspark import SparkContext
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
- _serialize_double_matrix, _deserialize_double_matrix, \
- _serialize_double_vector, _deserialize_double_vector, \
- _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _serialize_tuple, RatingDeserializer
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
__all__ = ['MatrixFactorizationModel', 'ALS']
+class Rating(object):
+ def __init__(self, user, product, rating):
+ self.user = int(user)
+ self.product = int(product)
+ self.rating = float(rating)
+
+ def __reduce__(self):
+ return Rating, (self.user, self.product, self.rating)
+
+ def __repr__(self):
+ return "Rating(%d, %d, %d)" % (self.user, self.product, self.rating)
+
+
class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
@@ -39,7 +47,9 @@ class MatrixFactorizationModel(object):
>>> model = ALS.trainImplicit(ratings, 1)
>>> model.predict(2,2) is not None
True
+
>>> testset = sc.parallelize([(1, 2), (1, 1)])
+ >>> model = ALS.train(ratings, 1)
>>> model.predictAll(testset).count() == 2
True
"""
@@ -54,34 +64,61 @@ class MatrixFactorizationModel(object):
def predict(self, user, product):
return self._java_model.predict(user, product)
- def predictAll(self, usersProducts):
- usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
- return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
- self._context, RatingDeserializer())
+ def predictAll(self, user_product):
+ assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
+ first = user_product.first()
+ if isinstance(first, list):
+ user_product = user_product.map(tuple)
+ first = tuple(first)
+ assert type(first) is tuple and len(first) == 2, \
+ "user_product should be RDD of (user, product)"
+ if any(isinstance(x, str) for x in first):
+ user_product = user_product.map(lambda (u, p): (int(x), int(p)))
+ first = tuple(map(int, first))
+ assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
+ sc = self._context
+ tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+ jresult = self._java_model.predict(tuplerdd).toJavaRDD()
+ return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+ AutoBatchedSerializer(PickleSerializer()))
class ALS(object):
@classmethod
+ def _prepare(cls, ratings):
+ assert isinstance(ratings, RDD), "ratings should be RDD"
+ first = ratings.first()
+ if not isinstance(first, Rating):
+ if isinstance(first, (tuple, list)):
+ ratings = ratings.map(lambda x: Rating(*x))
+ else:
+ raise ValueError("rating should be RDD of Rating or tuple/list")
+ # serialize them by AutoBatchedSerializer before cache to reduce the
+ # objects overhead in JVM
+ cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
+ return cached._to_java_object_rdd()
+
+ @classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
- mod = sc._jvm.PythonMLLibAPI().trainALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks)
+ jrating = cls._prepare(ratings)
+ mod = sc._jvm.PythonMLLibAPI().trainALSModel(jrating, rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)
@classmethod
def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
sc = ratings.context
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ jrating = cls._prepare(ratings)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(
- ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)
+ jrating, rank, iterations, lambda_, blocks, alpha)
return MatrixFactorizationModel(sc, mod)
def _test():
import doctest
- globs = globals().copy()
+ import pyspark.mllib.recommendation
+ globs = pyspark.mllib.recommendation.__dict__.copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index f572dcfb84..cbdbc09858 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,12 +15,12 @@
# limitations under the License.
#
-from numpy import array, ndarray
-from pyspark import SparkContext
-from pyspark.mllib._common import _dot, _regression_train_wrapper, \
- _linear_predictor_typecheck, _have_scipy, _scipy_issparse
-from pyspark.mllib.linalg import SparseVector, Vectors
+import numpy as np
+from numpy import array
+from pyspark import SparkContext
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel'
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -38,16 +38,16 @@ class LabeledPoint(object):
def __init__(self, label, features):
self.label = label
- if (type(features) == ndarray or type(features) == SparseVector
- or (_have_scipy and _scipy_issparse(features))):
- self.features = features
- elif type(features) == list:
- self.features = array(features)
- else:
- raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+ self.features = _convert_to_vector(features)
+
+ def __reduce__(self):
+ return (LabeledPoint, (self.label, self.features))
def __str__(self):
- return "(" + ",".join((str(self.label), Vectors.stringify(self.features))) + ")"
+ return "(" + ",".join((str(self.label), str(self.features))) + ")"
+
+ def __repr__(self):
+ return "LabeledPoint(" + ",".join((repr(self.label), repr(self.features))) + ")"
class LinearModel(object):
@@ -55,7 +55,7 @@ class LinearModel(object):
"""A linear model that has a vector of coefficients and an intercept."""
def __init__(self, weights, intercept):
- self._coeff = weights
+ self._coeff = _convert_to_vector(weights)
self._intercept = intercept
@property
@@ -71,18 +71,19 @@ class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+ >>> lrmb = LinearRegressionModelBase(np.array([1.0, 2.0]), 0.1)
+ >>> abs(lrmb.predict(np.array([-1.03, 7.777])) - 14.624) < 1e-6
True
>>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
True
"""
def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return _dot(x, self._coeff) + self._intercept
+ """
+ Predict the value of the dependent variable given a vector x
+ containing values for the independent variables.
+ """
+ return self.weights.dot(x) + self.intercept
class LinearRegressionModel(LinearRegressionModelBase):
@@ -96,10 +97,10 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0]))
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -117,11 +118,27 @@ class LinearRegressionModel(LinearRegressionModelBase):
"""
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights):
+ initial_weights = initial_weights or [0.0] * len(data.first().features)
+ ser = PickleSerializer()
+ initial_bytes = bytearray(ser.dumps(_convert_to_vector(initial_weights)))
+ # use AutoBatchedSerializer before cache to reduce the memory
+ # overhead in JVM
+ cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
+ ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+ assert len(ans) == 2, "JVM call result had unexpected length"
+ weights = ser.loads(str(ans[0]))
+ return modelClass(weights, ans[1])
+
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
- initialWeights=None, regParam=1.0, regType=None, intercept=False):
+ initialWeights=None, regParam=1.0, regType="none", intercept=False):
"""
Train a linear regression model on the given data.
@@ -146,11 +163,12 @@ class LinearRegressionWithSGD(object):
are activated or not).
"""
sc = data.context
- if regType is None:
- regType = "none"
- train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
- d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
- return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
+ jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
+
+ return _regression_train_wrapper(sc, train, LinearRegressionModel, data, initialWeights)
class LassoModel(LinearRegressionModelBase):
@@ -166,9 +184,9 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -179,7 +197,7 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -193,9 +211,11 @@ class LassoWithSGD(object):
miniBatchFraction=1.0, initialWeights=None):
"""Train a Lasso regression model on the given data."""
sc = data.context
- train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i)
- return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i)
+ return _regression_train_wrapper(sc, train, LassoModel, data, initialWeights)
class RidgeRegressionModel(LinearRegressionModelBase):
@@ -211,9 +231,9 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
- >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ >>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -224,7 +244,7 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
- >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ >>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
@@ -238,9 +258,12 @@ class RidgeRegressionWithSGD(object):
miniBatchFraction=1.0, initialWeights=None):
"""Train a ridge regression model on the given data."""
sc = data.context
- train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
- d._jrdd, iterations, step, regParam, miniBatchFraction, i)
- return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights)
+
+ def train(jrdd, i):
+ return sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(
+ jrdd, iterations, step, regParam, miniBatchFraction, i)
+
+ return _regression_train_wrapper(sc, train, RidgeRegressionModel, data, initialWeights)
def _test():
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index 8c726f171c..b9de0909a6 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -19,14 +19,26 @@
Python package for statistical functions in MLlib.
"""
-from pyspark.mllib._common import \
- _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \
- _serialize_double, _deserialize_double_matrix, _deserialize_double_vector
+from functools import wraps
+
+from pyspark import PickleSerializer
__all__ = ['MultivariateStatisticalSummary', 'Statistics']
+def serialize(f):
+ ser = PickleSerializer()
+
+ @wraps(f)
+ def func(self):
+ jvec = f(self)
+ bytes = self._sc._jvm.SerDe.dumps(jvec)
+ return ser.loads(str(bytes)).toArray()
+
+ return func
+
+
class MultivariateStatisticalSummary(object):
"""
@@ -44,33 +56,38 @@ class MultivariateStatisticalSummary(object):
def __del__(self):
self._sc._gateway.detach(self._java_summary)
+ @serialize
def mean(self):
- return _deserialize_double_vector(self._java_summary.mean())
+ return self._java_summary.mean()
+ @serialize
def variance(self):
- return _deserialize_double_vector(self._java_summary.variance())
+ return self._java_summary.variance()
def count(self):
return self._java_summary.count()
+ @serialize
def numNonzeros(self):
- return _deserialize_double_vector(self._java_summary.numNonzeros())
+ return self._java_summary.numNonzeros()
+ @serialize
def max(self):
- return _deserialize_double_vector(self._java_summary.max())
+ return self._java_summary.max()
+ @serialize
def min(self):
- return _deserialize_double_vector(self._java_summary.min())
+ return self._java_summary.min()
class Statistics(object):
@staticmethod
- def colStats(X):
+ def colStats(rdd):
"""
Computes column-wise summary statistics for the input RDD[Vector].
- >>> from linalg import Vectors
+ >>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8])])
@@ -88,9 +105,9 @@ class Statistics(object):
>>> cStats.min()
array([ 2., 0., 0., -2.])
"""
- sc = X.ctx
- Xser = _get_unmangled_double_vector_rdd(X)
- cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd)
+ sc = rdd.ctx
+ jrdd = rdd._to_java_object_rdd()
+ cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)
@staticmethod
@@ -117,7 +134,7 @@ class Statistics(object):
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
- >>> from linalg import Vectors
+ >>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
@@ -144,18 +161,16 @@ class Statistics(object):
# check if y is used to specify the method name instead.
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")
+
+ jx = x._to_java_object_rdd()
if not y:
- try:
- Xser = _get_unmangled_double_vector_rdd(x)
- except TypeError:
- raise TypeError("corr called on a single RDD not consisted of Vectors.")
- resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method)
- return _deserialize_double_matrix(resultMat)
+ resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
+ bytes = sc._jvm.SerDe.dumps(resultMat)
+ ser = PickleSerializer()
+ return ser.loads(str(bytes)).toArray()
else:
- xSer = _get_unmangled_rdd(x, _serialize_double)
- ySer = _get_unmangled_rdd(y, _serialize_double)
- result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method)
- return result
+ jy = y._to_java_object_rdd()
+ return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
def _test():
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8a851bd35c..f72e88ba6e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -20,6 +20,8 @@ Fuller unit tests for Python MLlib.
"""
import sys
+import array as pyarray
+
from numpy import array, array_equal
if sys.version_info[:2] <= (2, 6):
@@ -27,9 +29,8 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
-from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
- _deserialize_double_vector, _dot, _squared_distance
-from pyspark.mllib.linalg import SparseVector
+from pyspark.serializers import PickleSerializer
+from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
from pyspark.tests import PySparkTestCase
@@ -42,39 +43,52 @@ except:
# No SciPy, but that's okay, we'll skip those tests
pass
+ser = PickleSerializer()
+
+
+def _squared_distance(a, b):
+ if isinstance(a, Vector):
+ return a.squared_distance(b)
+ else:
+ return b.squared_distance(a)
-class VectorTests(unittest.TestCase):
+
+class VectorTests(PySparkTestCase):
+
+ def _test_serialize(self, v):
+ jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
+ nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
+ self.assertEqual(v, nv)
+ vs = [v] * 100
+ jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
+ nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
+ self.assertEqual(vs, nvs)
def test_serialize(self):
- sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [1, 2, 3, 4]
- self.assertTrue(sv is _convert_vector(sv))
- self.assertTrue(dv is _convert_vector(dv))
- self.assertTrue(array_equal(dv, _convert_vector(lst)))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(sv)))
- self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(dv))))
- self.assertTrue(array_equal(dv, _deserialize_double_vector(_serialize_double_vector(lst))))
+ self._test_serialize(DenseVector(range(10)))
+ self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
+ self._test_serialize(DenseVector(pyarray.array('d', range(10))))
+ self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [1, 2, 3, 4]
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ lst = DenseVector([1, 2, 3, 4])
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
- self.assertEquals(10.0, _dot(sv, dv))
- self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
- self.assertEquals(30.0, _dot(dv, dv))
- self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
- self.assertEquals(30.0, _dot(lst, dv))
- self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+ self.assertEquals(10.0, sv.dot(dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), sv.dot(mat)))
+ self.assertEquals(30.0, dv.dot(dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), dv.dot(mat)))
+ self.assertEquals(30.0, lst.dot(dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), lst.dot(mat)))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
- dv = array([1., 2., 3., 4.])
- lst = [4, 3, 2, 1]
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ lst = DenseVector([4, 3, 2, 1])
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
@@ -198,41 +212,36 @@ class SciPyTests(PySparkTestCase):
lil[1, 0] = 1
lil[3, 0] = 2
sv = SparseVector(4, {1: 1, 3: 2})
- self.assertEquals(sv, _convert_vector(lil))
- self.assertEquals(sv, _convert_vector(lil.tocsc()))
- self.assertEquals(sv, _convert_vector(lil.tocoo()))
- self.assertEquals(sv, _convert_vector(lil.tocsr()))
- self.assertEquals(sv, _convert_vector(lil.todok()))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
- self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+ self.assertEquals(sv, _convert_to_vector(lil))
+ self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
+ self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
+ self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
+ self.assertEquals(sv, _convert_to_vector(lil.todok()))
+
+ def serialize(l):
+ return ser.loads(ser.dumps(_convert_to_vector(l)))
+ self.assertEquals(sv, serialize(lil))
+ self.assertEquals(sv, serialize(lil.tocsc()))
+ self.assertEquals(sv, serialize(lil.tocsr()))
+ self.assertEquals(sv, serialize(lil.todok()))
def test_dot(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
- dv = array([1., 2., 3., 4.])
- sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
- mat = array([[1., 2., 3., 4.],
- [1., 2., 3., 4.],
- [1., 2., 3., 4.],
- [1., 2., 3., 4.]])
- self.assertEquals(10.0, _dot(lil, dv))
- self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+ dv = DenseVector(array([1., 2., 3., 4.]))
+ self.assertEquals(10.0, dv.dot(lil))
def test_squared_distance(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 3
lil[3, 0] = 2
- dv = array([1., 2., 3., 4.])
+ dv = DenseVector(array([1., 2., 3., 4.]))
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
- self.assertEquals(15.0, _squared_distance(lil, dv))
- self.assertEquals(15.0, _squared_distance(lil, sv))
- self.assertEquals(15.0, _squared_distance(dv, lil))
- self.assertEquals(15.0, _squared_distance(sv, lil))
+ self.assertEquals(15.0, dv.squared_distance(lil))
+ self.assertEquals(15.0, sv.squared_distance(lil))
def scipy_matrix(self, size, values):
"""Create a column SciPy matrix from a dictionary of values"""
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5b13ab682b..f59a818a6e 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -18,13 +18,9 @@
from py4j.java_collections import MapConverter
from pyspark import SparkContext, RDD
-from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _serialize_double_vector, \
- _deserialize_labeled_point, _get_unmangled_labeled_point_rdd, \
- _deserialize_double
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
-from pyspark.serializers import NoOpSerializer
-
__all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -55,21 +51,24 @@ class DecisionTreeModel(object):
:param x: Data point (feature vector),
or an RDD of data points (feature vectors).
"""
- pythonAPI = self._sc._jvm.PythonMLLibAPI()
+ SerDe = self._sc._jvm.SerDe
+ ser = PickleSerializer()
if isinstance(x, RDD):
# Bulk prediction
- if x.count() == 0:
+ first = x.take(1)
+ if not first:
return self._sc.parallelize([])
- dataBytes = _get_unmangled_double_vector_rdd(x, cache=False)
- jSerializedPreds = \
- pythonAPI.predictDecisionTreeModel(self._java_model,
- dataBytes._jrdd)
- serializedPreds = RDD(jSerializedPreds, self._sc, NoOpSerializer())
- return serializedPreds.map(lambda bytes: _deserialize_double(bytearray(bytes)))
+ if not isinstance(first[0], Vector):
+ x = x.map(_convert_to_vector)
+ jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
+ jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+ return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
+
else:
# Assume x is a single data point.
- x_ = _serialize_double_vector(x)
- return pythonAPI.predictDecisionTreeModel(self._java_model, x_)
+ bytes = bytearray(ser.dumps(_convert_to_vector(x)))
+ vec = self._sc._jvm.SerDe.loads(bytes)
+ return self._java_model.predict(vec)
def numNodes(self):
return self._java_model.numNodes()
@@ -77,7 +76,7 @@ class DecisionTreeModel(object):
def depth(self):
return self._java_model.depth()
- def __str__(self):
+ def __repr__(self):
return self._java_model.toString()
@@ -90,53 +89,24 @@ class DecisionTree(object):
EXPERIMENTAL: This is an experimental API.
It will probably be modified for Spark v1.2.
- Example usage:
-
- >>> from numpy import array
- >>> import sys
- >>> from pyspark.mllib.regression import LabeledPoint
- >>> from pyspark.mllib.tree import DecisionTree
- >>> from pyspark.mllib.linalg import SparseVector
- >>>
- >>> data = [
- ... LabeledPoint(0.0, [0.0]),
- ... LabeledPoint(1.0, [1.0]),
- ... LabeledPoint(1.0, [2.0]),
- ... LabeledPoint(1.0, [3.0])
- ... ]
- >>> categoricalFeaturesInfo = {} # no categorical features
- >>> model = DecisionTree.trainClassifier(sc.parallelize(data), numClasses=2,
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> sys.stdout.write(model)
- DecisionTreeModel classifier
- If (feature 0 <= 0.5)
- Predict: 0.0
- Else (feature 0 > 0.5)
- Predict: 1.0
- >>> model.predict(array([1.0])) > 0
- True
- >>> model.predict(array([0.0])) == 0
- True
- >>> sparse_data = [
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
- ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
- ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
- ... ]
- >>>
- >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data),
- ... categoricalFeaturesInfo=categoricalFeaturesInfo)
- >>> model.predict(array([0.0, 1.0])) == 1
- True
- >>> model.predict(array([0.0, 0.0])) == 0
- True
- >>> model.predict(SparseVector(2, {1: 1.0})) == 1
- True
- >>> model.predict(SparseVector(2, {1: 0.0})) == 0
- True
"""
@staticmethod
+ def _train(data, type, numClasses, categoricalFeaturesInfo,
+ impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
+ minInfoGain=0.0):
+ first = data.first()
+ assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
+ sc = data.context
+ jrdd = data._to_java_object_rdd()
+ cfiMap = MapConverter().convert(categoricalFeaturesInfo,
+ sc._gateway._gateway_client)
+ model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
+ jrdd, type, numClasses, cfiMap,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
+ return DecisionTreeModel(sc, model)
+
+ @staticmethod
def trainClassifier(data, numClasses, categoricalFeaturesInfo,
impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
minInfoGain=0.0):
@@ -159,18 +129,34 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
+ >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
+ >>> print model, # it already has newline
+ DecisionTreeModel classifier
+ If (feature 0 <= 0.5)
+ Predict: 0.0
+ Else (feature 0 > 0.5)
+ Predict: 1.0
+ >>> model.predict(array([1.0])) > 0
+ True
+ >>> model.predict(array([0.0])) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "classification",
- numClasses, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "classification", numClasses, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
@staticmethod
def trainRegressor(data, categoricalFeaturesInfo,
@@ -194,18 +180,33 @@ class DecisionTree(object):
the parent split
:param minInfoGain: Min info gain required to create a split
:return: DecisionTreeModel
+
+ Example usage:
+
+ >>> from numpy import array
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> from pyspark.mllib.tree import DecisionTree
+ >>> from pyspark.mllib.linalg import SparseVector
+ >>>
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>>
+ >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
+ >>> model.predict(array([0.0, 1.0])) == 1
+ True
+ >>> model.predict(array([0.0, 0.0])) == 0
+ True
+ >>> model.predict(SparseVector(2, {1: 1.0})) == 1
+ True
+ >>> model.predict(SparseVector(2, {1: 0.0})) == 0
+ True
"""
- sc = data.context
- dataBytes = _get_unmangled_labeled_point_rdd(data)
- categoricalFeaturesInfoJMap = \
- MapConverter().convert(categoricalFeaturesInfo,
- sc._gateway._gateway_client)
- model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
- dataBytes._jrdd, "regression",
- 0, categoricalFeaturesInfoJMap,
- impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
- dataBytes.unpersist()
- return DecisionTreeModel(sc, model)
+ return DecisionTree._train(data, "regression", 0, categoricalFeaturesInfo,
+ impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
def _test():
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1c7b8c809a..8233d4e81f 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -18,11 +18,10 @@
import numpy as np
import warnings
-from pyspark.mllib.linalg import Vectors, SparseVector
-from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
from pyspark.rdd import RDD
-from pyspark.serializers import NoOpSerializer
+from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
+from pyspark.mllib.regression import LabeledPoint
class MLUtils(object):
@@ -32,15 +31,12 @@ class MLUtils(object):
"""
@staticmethod
- def _parse_libsvm_line(line, multiclass):
- warnings.warn("deprecated", DeprecationWarning)
- return _parse_libsvm_line(line)
-
- @staticmethod
- def _parse_libsvm_line(line):
+ def _parse_libsvm_line(line, multiclass=None):
"""
Parses a line in LIBSVM format into (label, indices, values).
"""
+ if multiclass is not None:
+ warnings.warn("deprecated", DeprecationWarning)
items = line.split(None)
label = float(items[0])
nnz = len(items) - 1
@@ -55,27 +51,20 @@ class MLUtils(object):
@staticmethod
def _convert_labeled_point_to_libsvm(p):
"""Converts a LabeledPoint to a string in LIBSVM format."""
+ assert isinstance(p, LabeledPoint)
items = [str(p.label)]
- v = _convert_vector(p.features)
- if type(v) == np.ndarray:
- for i in xrange(len(v)):
- items.append(str(i + 1) + ":" + str(v[i]))
- elif type(v) == SparseVector:
+ v = _convert_to_vector(p.features)
+ if isinstance(v, SparseVector):
nnz = len(v.indices)
for i in xrange(nnz):
items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
else:
- raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector"
- " but got " % type(v))
+ for i in xrange(len(v)):
+ items.append(str(i + 1) + ":" + str(v[i]))
return " ".join(items)
@staticmethod
- def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
- warnings.warn("deprecated", DeprecationWarning)
- return loadLibSVMFile(sc, path, numFeatures, minPartitions)
-
- @staticmethod
- def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None):
+ def loadLibSVMFile(sc, path, numFeatures=-1, minPartitions=None, multiclass=None):
"""
Loads labeled data in the LIBSVM format into an RDD of
LabeledPoint. The LIBSVM format is a text-based format used by
@@ -122,6 +111,8 @@ class MLUtils(object):
>>> print examples[2]
(-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
"""
+ if multiclass is not None:
+ warnings.warn("deprecated", DeprecationWarning)
lines = sc.textFile(path, minPartitions)
parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
@@ -182,9 +173,9 @@ class MLUtils(object):
(0.0,[1.01,2.02,3.03])
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
- jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
- serialized = RDD(jSerialized, sc, NoOpSerializer())
- return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))
+ jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
+ jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
+ return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
def _test():
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index b43606b730..8ef233bc80 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -34,7 +34,7 @@ from math import sqrt, log, isinf, isnan
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
- PickleSerializer, pack_long, CompressedSerializer
+ PickleSerializer, pack_long, AutoBatchedSerializer
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -1927,10 +1927,10 @@ class RDD(object):
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
- if not self._is_pickled():
- self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
- batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
- return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+ rdd = self._reserialize(AutoBatchedSerializer(PickleSerializer())) \
+ if not self._is_pickled() else self
+ is_batch = isinstance(rdd._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(rdd._jrdd, is_batch)
def countApprox(self, timeout, confidence=0.95):
"""
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 44ac564283..2672da36c1 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -68,6 +68,7 @@ import sys
import types
import collections
import zlib
+import itertools
from pyspark import cloudpickle
@@ -214,6 +215,41 @@ class BatchedSerializer(Serializer):
return "BatchedSerializer<%s>" % str(self.serializer)
+class AutoBatchedSerializer(BatchedSerializer):
+ """
+ Choose the size of batch automatically based on the size of object
+ """
+
+ def __init__(self, serializer, bestSize=1 << 20):
+ BatchedSerializer.__init__(self, serializer, -1)
+ self.bestSize = bestSize
+
+ def dump_stream(self, iterator, stream):
+ batch, best = 1, self.bestSize
+ iterator = iter(iterator)
+ while True:
+ vs = list(itertools.islice(iterator, batch))
+ if not vs:
+ break
+
+ bytes = self.serializer.dumps(vs)
+ write_int(len(bytes), stream)
+ stream.write(bytes)
+
+ size = len(bytes)
+ if size < best:
+ batch *= 2
+ elif size > best * 10 and batch > 1:
+ batch /= 2
+
+ def __eq__(self, other):
+ return (isinstance(other, AutoBatchedSerializer) and
+ other.serializer == self.serializer)
+
+ def __str__(self):
+ return "BatchedSerializer<%s>" % str(self.serializer)
+
+
class CartesianDeserializer(FramedSerializer):
"""
diff --git a/python/run-tests b/python/run-tests
index a67e5a99fb..a7ec270c7d 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -73,7 +73,6 @@ run_test "pyspark/serializers.py"
unset PYSPARK_DOC_TEST
run_test "pyspark/shuffle.py"
run_test "pyspark/tests.py"
-run_test "pyspark/mllib/_common.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"