aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/mllib-classification-regression.md45
-rw-r--r--docs/mllib-clustering.md11
-rw-r--r--docs/mllib-guide.md27
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala156
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala8
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala18
-rw-r--r--python/epydoc.conf3
-rwxr-xr-xpython/examples/kmeans.py11
-rwxr-xr-xpython/examples/logistic_regression.py8
-rwxr-xr-xpython/examples/mllib/kmeans.py44
-rwxr-xr-xpython/examples/mllib/logistic_regression.py50
-rw-r--r--python/pyspark/mllib/_common.py396
-rw-r--r--python/pyspark/mllib/classification.py75
-rw-r--r--python/pyspark/mllib/clustering.py51
-rw-r--r--python/pyspark/mllib/linalg.py245
-rw-r--r--python/pyspark/mllib/regression.py128
-rw-r--r--python/pyspark/mllib/tests.py302
-rwxr-xr-xpython/run-tests4
18 files changed, 1368 insertions, 214 deletions
diff --git a/docs/mllib-classification-regression.md b/docs/mllib-classification-regression.md
index cc8acf15ac..2c42f60c2e 100644
--- a/docs/mllib-classification-regression.md
+++ b/docs/mllib-classification-regression.md
@@ -356,16 +356,17 @@ error.
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
- val parts = line.split(' ')
- LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
+ val parts = line.split(' ').map(_.toDouble)
+ LabeledPoint(parts(0), Vectors.dense(parts.tail))
}
// Run training algorithm to build the model
-val numIterations = 20
+val numIterations = 100
val model = SVMWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
@@ -401,21 +402,22 @@ val modelL1 = svmAlg.run(parsedData)
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the Mean Squared Error at the end to evaluate
-[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
+[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
{% highlight scala %}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
- LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
+ LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
// Building the model
-val numIterations = 20
+val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
@@ -423,7 +425,7 @@ val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
-val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
+val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.reduce(_ + _) / valuesAndPreds.count
println("training Mean Squared Error = " + MSE)
{% endhighlight %}
@@ -518,18 +520,22 @@ and make predictions with the resulting model to compute the training error.
{% highlight python %}
from pyspark.mllib.classification import LogisticRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint
from numpy import array
# Load and parse the data
+def parsePoint(line):
+ values = [float(x) for x in line.split(' ')]
+ return LabeledPoint(values[0], values[1:])
+
data = sc.textFile("mllib/data/sample_svm_data.txt")
-parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
-model = LogisticRegressionWithSGD.train(parsedData)
+parsedData = data.map(parsePoint)
# Build the model
-labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
- model.predict(point.take(range(1, point.size)))))
+model = LogisticRegressionWithSGD.train(parsedData)
# Evaluating the model on training data
+labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
{% endhighlight %}
@@ -538,22 +544,25 @@ print("Training Error = " + str(trainErr))
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the Mean Squared Error at the end to evaluate
-[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
+[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
{% highlight python %}
-from pyspark.mllib.regression import LinearRegressionWithSGD
+from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
# Load and parse the data
+def parsePoint(line):
+ values = [float(x) for x in line.replace(',', ' ').split(' ')]
+ return LabeledPoint(values[0], values[1:])
+
data = sc.textFile("mllib/data/ridge-data/lpsa.data")
-parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
+parsedData = data.map(parsePoint)
# Build the model
model = LinearRegressionWithSGD.train(parsedData)
# Evaluate the model on training data
-valuesAndPreds = parsedData.map(lambda point: (point.item(0),
- model.predict(point.take(range(1, point.size)))))
-MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
+valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
+MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
-{% endhighlight %} \ No newline at end of file
+{% endhighlight %}
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 65ed75b82e..50a8671560 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -48,14 +48,15 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
{% highlight scala %}
import org.apache.spark.mllib.clustering.KMeans
+import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
-val data = sc.textFile("kmeans_data.txt")
-val parsedData = data.map( _.split(' ').map(_.toDouble))
+val data = sc.textFile("data/kmeans_data.txt")
+val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
// Cluster the data into two classes using KMeans
-val numIterations = 20
val numClusters = 2
+val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
@@ -85,12 +86,12 @@ from numpy import array
from math import sqrt
# Load and parse the data
-data = sc.textFile("kmeans_data.txt")
+data = sc.textFile("data/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
- runs=30, initialization_mode="random")
+ runs=10, initialization_mode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 1ac5cc13db..4236b0c8b6 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -7,8 +7,9 @@ title: Machine Learning Library (MLlib)
MLlib is a Spark implementation of some common machine learning (ML)
functionality, as well associated tests and data generators. MLlib
currently supports four common types of machine learning problem settings,
-namely, binary classification, regression, clustering and collaborative
-filtering, as well as an underlying gradient descent optimization primitive.
+namely classification, regression, clustering and collaborative filtering,
+as well as an underlying gradient descent optimization primitive and several
+linear algebra methods.
# Available Methods
The following links provide a detailed explanation of the methods and usage examples for each of them:
@@ -32,6 +33,28 @@ The following links provide a detailed explanation of the methods and usage exam
* Singular Value Decomposition
* Principal Component Analysis
+# Data Types
+
+Most MLlib algorithms operate on RDDs containing vectors. In Java and Scala, the
+[Vector](api/mllib/index.html#org.apache.spark.mllib.linalg.Vector) class is used to
+represent vectors. You can create either dense or sparse vectors using the
+[Vectors](api/mllib/index.html#org.apache.spark.mllib.linalg.Vectors$) factory.
+
+In Python, MLlib can take the following vector types:
+
+* [NumPy](http://www.numpy.org) arrays
+* Standard Python lists (e.g. `[1, 2, 3]`)
+* The MLlib [SparseVector](api/pyspark/pyspark.mllib.linalg.SparseVector-class.html) class
+* [SciPy sparse matrices](http://docs.scipy.org/doc/scipy/reference/sparse.html)
+
+For efficiency, we recommend using NumPy arrays over lists, and using the
+[CSC format](http://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html#scipy.sparse.csc_matrix)
+for SciPy matrices, or MLlib's own SparseVector class.
+
+Several other simple data types are used throughout the library, e.g. the LabeledPoint
+class ([Java/Scala](api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint),
+[Python](api/pyspark/pyspark.mllib.regression.LabeledPoint-class.html)) for labeled data.
+
# Dependencies
MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself
depends on native Fortran routines. You may need to install the
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index a6c049e517..7c65b0d475 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD
@@ -31,56 +31,112 @@ import org.apache.spark.rdd.RDD
/**
* :: DeveloperApi ::
* The Java stubs necessary for the Python mllib bindings.
+ *
+ * See python/pyspark/mllib/_common.py for the mutually agreed upon data format.
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
- private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
- val packetLength = bytes.length
- if (packetLength < 16) {
- throw new IllegalArgumentException("Byte array too short.")
- }
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.getLong()
- if (magic != 1) {
+ private val DENSE_VECTOR_MAGIC: Byte = 1
+ private val SPARSE_VECTOR_MAGIC: Byte = 2
+ private val DENSE_MATRIX_MAGIC: Byte = 3
+ private val LABELED_POINT_MAGIC: Byte = 4
+
+ private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+ require(bytes.length - offset >= 5, "Byte array too short")
+ val magic = bytes(offset)
+ if (magic == DENSE_VECTOR_MAGIC) {
+ deserializeDenseVector(bytes, offset)
+ } else if (magic == SPARSE_VECTOR_MAGIC) {
+ deserializeSparseVector(bytes, offset)
+ } else {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
- val length = bb.getLong()
- if (packetLength != 16 + 8 * length) {
- throw new IllegalArgumentException("Length " + length + " is wrong.")
- }
+ }
+
+ private def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+ val packetLength = bytes.length - offset
+ require(packetLength >= 5, "Byte array too short")
+ val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.get()
+ require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
+ val length = bb.getInt()
+ require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
val db = bb.asDoubleBuffer()
val ans = new Array[Double](length.toInt)
db.get(ans)
- ans
+ Vectors.dense(ans)
}
- private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+ private def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
+ val packetLength = bytes.length - offset
+ require(packetLength >= 9, "Byte array too short")
+ val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.get()
+ require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
+ val size = bb.getInt()
+ val nonZeros = bb.getInt()
+ require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
+ val ib = bb.asIntBuffer()
+ val indices = new Array[Int](nonZeros)
+ ib.get(indices)
+ bb.position(bb.position() + 4 * nonZeros)
+ val db = bb.asDoubleBuffer()
+ val values = new Array[Double](nonZeros)
+ db.get(values)
+ Vectors.sparse(size, indices, values)
+ }
+
+ private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
val len = doubles.length
- val bytes = new Array[Byte](16 + 8 * len)
+ val bytes = new Array[Byte](5 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
- bb.putLong(1)
- bb.putLong(len)
+ bb.put(DENSE_VECTOR_MAGIC)
+ bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(doubles)
bytes
}
+ private def serializeSparseVector(vector: SparseVector): Array[Byte] = {
+ val nonZeros = vector.indices.length
+ val bytes = new Array[Byte](9 + 12 * nonZeros)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.put(SPARSE_VECTOR_MAGIC)
+ bb.putInt(vector.size)
+ bb.putInt(nonZeros)
+ val ib = bb.asIntBuffer()
+ ib.put(vector.indices)
+ bb.position(bb.position() + 4 * nonZeros)
+ val db = bb.asDoubleBuffer()
+ db.put(vector.values)
+ bytes
+ }
+
+ private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
+ case s: SparseVector =>
+ serializeSparseVector(s)
+ case _ =>
+ serializeDenseVector(vector.toArray)
+ }
+
private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
val packetLength = bytes.length
- if (packetLength < 24) {
+ if (packetLength < 9) {
throw new IllegalArgumentException("Byte array too short.")
}
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
- val magic = bb.getLong()
- if (magic != 2) {
+ val magic = bb.get()
+ if (magic != DENSE_MATRIX_MAGIC) {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
- val rows = bb.getLong()
- val cols = bb.getLong()
- if (packetLength != 24 + 8 * rows * cols) {
+ val rows = bb.getInt()
+ val cols = bb.getInt()
+ if (packetLength != 9 + 8 * rows * cols) {
throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
}
val db = bb.asDoubleBuffer()
@@ -98,12 +154,12 @@ class PythonMLLibAPI extends Serializable {
if (rows > 0) {
cols = doubles(0).length
}
- val bytes = new Array[Byte](24 + 8 * rows * cols)
+ val bytes = new Array[Byte](9 + 8 * rows * cols)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
- bb.putLong(2)
- bb.putLong(rows)
- bb.putLong(cols)
+ bb.put(DENSE_MATRIX_MAGIC)
+ bb.putInt(rows)
+ bb.putInt(cols)
val db = bb.asDoubleBuffer()
for (i <- 0 until rows) {
db.put(doubles(i))
@@ -111,18 +167,27 @@ class PythonMLLibAPI extends Serializable {
bytes
}
+ private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
+ require(bytes.length >= 9, "Byte array too short")
+ val magic = bytes(0)
+ if (magic != LABELED_POINT_MAGIC) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
+ labelBytes.order(ByteOrder.nativeOrder())
+ val label = labelBytes.asDoubleBuffer().get(0)
+ LabeledPoint(label, deserializeDoubleVector(bytes, 9))
+ }
+
private def trainRegressionModel(
- trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(xBytes => {
- val x = deserializeDoubleVector(xBytes)
- LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
- })
+ val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(serializeDoubleVector(model.weights.toArray))
+ ret.add(serializeDoubleVector(model.weights))
ret.add(model.intercept: java.lang.Double)
ret
}
@@ -143,7 +208,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
- Vectors.dense(initialWeights)),
+ initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@@ -166,7 +231,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
- Vectors.dense(initialWeights)),
+ initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@@ -189,7 +254,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
- Vectors.dense(initialWeights)),
+ initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@@ -212,7 +277,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
- Vectors.dense(initialWeights)),
+ initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@@ -233,7 +298,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
- Vectors.dense(initialWeights)),
+ initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@@ -244,14 +309,11 @@ class PythonMLLibAPI extends Serializable {
def trainNaiveBayes(
dataBytesJRDD: JavaRDD[Array[Byte]],
lambda: Double): java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(xBytes => {
- val x = deserializeDoubleVector(xBytes)
- LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
- })
+ val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(serializeDoubleVector(model.labels))
- ret.add(serializeDoubleVector(model.pi))
+ ret.add(serializeDoubleVector(Vectors.dense(model.labels)))
+ ret.add(serializeDoubleVector(Vectors.dense(model.pi)))
ret.add(serializeDoubleMatrix(model.theta))
ret
}
@@ -265,7 +327,7 @@ class PythonMLLibAPI extends Serializable {
maxIterations: Int,
runs: Int,
initializationMode: String): java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
+ val data = dataBytesJRDD.rdd.map(bytes => deserializeDoubleVector(bytes))
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 99a849f1c6..7cdf6bd56a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -130,9 +130,11 @@ object Vectors {
private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = {
breezeVector match {
case v: BDV[Double] =>
- require(v.offset == 0, s"Do not support non-zero offset ${v.offset}.")
- require(v.stride == 1, s"Do not support stride other than 1, but got ${v.stride}.")
- new DenseVector(v.data)
+ if (v.offset == 0 && v.stride == 1) {
+ new DenseVector(v.data)
+ } else {
+ new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one
+ }
case v: BSV[Double] =>
new SparseVector(v.length, v.index, v.data)
case v: BV[_] =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index 8a200310e0..cfe8a27fcb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -82,4 +82,22 @@ class VectorsSuite extends FunSuite {
assert(v.## != another.##)
}
}
+
+ test("indexing dense vectors") {
+ val vec = Vectors.dense(1.0, 2.0, 3.0, 4.0)
+ assert(vec(0) === 1.0)
+ assert(vec(3) === 4.0)
+ }
+
+ test("indexing sparse vectors") {
+ val vec = Vectors.sparse(7, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
+ assert(vec(0) === 1.0)
+ assert(vec(1) === 0.0)
+ assert(vec(2) === 2.0)
+ assert(vec(3) === 0.0)
+ assert(vec(6) === 4.0)
+ val vec2 = Vectors.sparse(8, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
+ assert(vec2(6) === 4.0)
+ assert(vec2(7) === 0.0)
+ }
}
diff --git a/python/epydoc.conf b/python/epydoc.conf
index 95a6af0974..081ed215ae 100644
--- a/python/epydoc.conf
+++ b/python/epydoc.conf
@@ -33,5 +33,6 @@ target: docs/
private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
- pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test
+ pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
pyspark.rddsampler pyspark.daemon pyspark.mllib._common
+ pyspark.mllib.tests
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
index ba31af92fc..d8387b0b18 100755
--- a/python/examples/kmeans.py
+++ b/python/examples/kmeans.py
@@ -16,8 +16,13 @@
#
"""
-This example requires numpy (http://www.numpy.org/)
+The K-means algorithm written from scratch against PySpark. In practice,
+one may prefer to use the KMeans algorithm in MLlib, as shown in
+python/examples/mllib/kmeans.py.
+
+This example requires NumPy (http://www.numpy.org/).
"""
+
import sys
import numpy as np
@@ -49,9 +54,7 @@ if __name__ == "__main__":
K = int(sys.argv[3])
convergeDist = float(sys.argv[4])
- # TODO: change this after we port takeSample()
- #kPoints = data.takeSample(False, K, 34)
- kPoints = data.take(K)
+ kPoints = data.takeSample(False, K, 1)
tempDist = 1.0
while tempDist > convergeDist:
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index 1117dea538..28d52e6a40 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -16,9 +16,13 @@
#
"""
-A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
-of input data using efficient matrix operations.
+A logistic regression implementation that uses NumPy (http://www.numpy.org)
+to act on batches of input data using efficient matrix operations.
+
+In practice, one may prefer to use the LogisticRegression algorithm in
+MLlib, as shown in python/examples/mllib/logistic_regression.py.
"""
+
from collections import namedtuple
from math import exp
from os.path import realpath
diff --git a/python/examples/mllib/kmeans.py b/python/examples/mllib/kmeans.py
new file mode 100755
index 0000000000..dec82ff34f
--- /dev/null
+++ b/python/examples/mllib/kmeans.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A K-means clustering program using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.clustering import KMeans
+
+
+def parseVector(line):
+ return np.array([float(x) for x in line.split(' ')])
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 4:
+ print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "KMeans")
+ lines = sc.textFile(sys.argv[2])
+ data = lines.map(parseVector)
+ k = int(sys.argv[3])
+ model = KMeans.train(data, k)
+ print "Final centers: " + str(model.clusterCenters)
diff --git a/python/examples/mllib/logistic_regression.py b/python/examples/mllib/logistic_regression.py
new file mode 100755
index 0000000000..8631051d00
--- /dev/null
+++ b/python/examples/mllib/logistic_regression.py
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Logistic regression using MLlib.
+
+This example requires NumPy (http://www.numpy.org/).
+"""
+
+from math import exp
+import sys
+
+import numpy as np
+from pyspark import SparkContext
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.mllib.classification import LogisticRegressionWithSGD
+
+
+# Parse a line of text into an MLlib LabeledPoint object
+def parsePoint(line):
+ values = [float(s) for s in line.split(' ')]
+ if values[0] == -1: # Convert -1 labels to 0 for MLlib
+ values[0] = 0
+ return LabeledPoint(values[0], values[1:])
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 4:
+ print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
+ exit(-1)
+ sc = SparkContext(sys.argv[1], "PythonLR")
+ points = sc.textFile(sys.argv[2]).map(parsePoint)
+ iterations = int(sys.argv[3])
+ model = LogisticRegressionWithSGD.train(points, iterations)
+ print "Final weights: " + str(model.weights)
+ print "Final intercept: " + str(model.intercept)
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e19f5d2aaa..e6f0953810 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,38 +15,86 @@
# limitations under the License.
#
-from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype
+import struct
+import numpy
+from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
-import numpy as np
-
+from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
-import struct
-# Double vector format:
+"""
+Common utilities shared throughout MLlib, primarily for dealing with
+different data types. These include:
+- Serialization utilities to / from byte arrays that Java can handle
+- Serializers for other data types, like ALS Rating objects
+- Common methods for linear models
+- Methods to deal with the different vector types we support, such as
+ SparseVector and scipy.sparse matrices.
+"""
+
+
+# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
+# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
+
+_have_scipy = False
+_scipy_issparse = None
+try:
+ import scipy.sparse
+ _have_scipy = True
+ _scipy_issparse = scipy.sparse.issparse
+except:
+ # No SciPy in environment, but that's okay
+ pass
+
+
+# Serialization functions to and from Scala. These use the following formats, understood
+# by the PythonMLLibAPI class in Scala:
+#
+# Dense double vector format:
+#
+# [1-byte 1] [4-byte length] [length*8 bytes of data]
#
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
+# Sparse double vector format:
+#
+# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
#
# Double matrix format:
#
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
+#
+# LabeledPoint format:
+#
+# [1-byte 4] [8-byte label] [dense or sparse vector]
#
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
-def _deserialize_byte_array(shape, ba, offset):
- """Wrapper around ndarray aliasing hack.
+
+DENSE_VECTOR_MAGIC = 1
+SPARSE_VECTOR_MAGIC = 2
+DENSE_MATRIX_MAGIC = 3
+LABELED_POINT_MAGIC = 4
+
+
+def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
+ """
+ Deserialize a numpy array of the given type from an offset in
+ bytearray ba, assigning it the given shape.
>>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
+ True
+ >>> x = array([1, 2, 3], dtype=int32)
+ >>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
True
"""
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
- order='C')
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
return ar.copy()
+
def _serialize_double_vector(v):
"""Serialize a double vector into a mutually understood format.
@@ -55,160 +103,231 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
- if type(v) != ndarray:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- """complex is only datatype that can't be converted to float64"""
- if issubdtype(v.dtype, complex):
+ v = _convert_vector(v)
+ if type(v) == ndarray:
+ return _serialize_dense_vector(v)
+ elif type(v) == SparseVector:
+ return _serialize_sparse_vector(v)
+ else:
raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- if v.dtype != float64:
- v = v.astype(float64)
+ "wanted ndarray or SparseVector" % type(v))
+
+
+def _serialize_dense_vector(v):
+ """Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
+ if v.dtype != float64:
+ if numpy.issubdtype(v.dtype, numpy.complex):
+ raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+ "wanted ndarray of float64" % v.dtype)
+ v = v.astype(float64)
length = v.shape[0]
- ba = bytearray(16 + 8*length)
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- header[0] = 1
- header[1] = length
- arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64")
- arr_mid[...] = v
+ ba = bytearray(5 + 8 * length)
+ ba[0] = DENSE_VECTOR_MAGIC
+ length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
+ length_bytes[0] = length
+ _copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
+ return ba
+
+
+def _serialize_sparse_vector(v):
+ """Serialize a pyspark.mllib.linalg.SparseVector."""
+ nonzeros = len(v.indices)
+ ba = bytearray(9 + 12 * nonzeros)
+ ba[0] = SPARSE_VECTOR_MAGIC
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ header[0] = v.size
+ header[1] = nonzeros
+ _copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
+ values_offset = 9 + 4 * nonzeros
+ _copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
return ba
+
def _deserialize_double_vector(ba):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
>>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
True
+ >>> s = SparseVector(4, [1, 3], [3.0, 5.5])
+ >>> s == _deserialize_double_vector(_serialize_double_vector(s))
+ True
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 16:
+ if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- if header[0] != 1:
+ if ba[0] == DENSE_VECTOR_MAGIC:
+ return _deserialize_dense_vector(ba)
+ elif ba[0] == SPARSE_VECTOR_MAGIC:
+ return _deserialize_sparse_vector(ba)
+ else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
- length = header[1]
- if len(ba) != 8*length + 16:
- raise TypeError("_deserialize_double_vector called on bytearray "
+
+
+def _deserialize_dense_vector(ba):
+ """Deserialize a dense vector into a numpy array."""
+ if len(ba) < 5:
+ raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
+ if len(ba) != 8 * length + 5:
+ raise TypeError("_deserialize_dense_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_numpy_array([length], ba, 5)
+
+
+def _deserialize_sparse_vector(ba):
+ """Deserialize a sparse vector into a MLlib SparseVector object."""
+ if len(ba) < 9:
+ raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ size = header[0]
+ nonzeros = header[1]
+ if len(ba) != 9 + 12 * nonzeros:
+ raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
- return _deserialize_byte_array([length], ba, 16)
+ indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
+ values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
+ return SparseVector(int(size), indices, values)
+
def _serialize_double_matrix(m):
"""Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ if (type(m) == ndarray and m.ndim == 2):
+ if m.dtype != float64:
+ if numpy.issubdtype(m.dtype, numpy.complex):
+ raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
+ "wanted ndarray of float64" % m.dtype)
+ m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
- ba = bytearray(24 + 8 * rows * cols)
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- header[0] = 2
- header[1] = rows
- header[2] = cols
- arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24,
- dtype="float64", order='C')
- arr_mid[...] = m
+ ba = bytearray(9 + 8 * rows * cols)
+ ba[0] = DENSE_MATRIX_MAGIC
+ lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
+ lengths[0] = rows
+ lengths[1] = cols
+ _copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
return ba
else:
raise TypeError("_serialize_double_matrix called on a "
"non-double-matrix")
+
def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
- if len(ba) < 24:
+ if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- if (header[0] != 2):
+ if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
- rows = header[1]
- cols = header[2]
- if (len(ba) != 8*rows*cols + 24):
+ lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
+ rows = lengths[0]
+ cols = lengths[1]
+ if (len(ba) != 8 * rows * cols + 9):
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong length")
- return _deserialize_byte_array([rows, cols], ba, 24)
+ return _deserialize_numpy_array([rows, cols], ba, 9)
+
+
+def _serialize_labeled_point(p):
+ """Serialize a LabeledPoint with a features vector of any type."""
+ from pyspark.mllib.regression import LabeledPoint
+ serialized_features = _serialize_double_vector(p.features)
+ header = bytearray(9)
+ header[0] = LABELED_POINT_MAGIC
+ header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
+ header_float[0] = p.label
+ return header + serialized_features
+
+
+def _copyto(array, buffer, offset, shape, dtype):
+ """
+ Copy the contents of a vector to a destination bytearray at the
+ given offset.
+
+ TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
+ we should benchmark that to see whether it provides a benefit.
+ """
+ temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
+ temp_array[...] = array
-def _linear_predictor_typecheck(x, coeffs):
- """Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until I actually implement bulk predict."""
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape == coeffs.shape:
- pass
- else:
- raise RuntimeError("Got array of %d elements; wanted %d"
- % (shape(x)[0], shape(coeffs)[0]))
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
- dataBytes.cache()
+ dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+
+# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
-class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
- self._intercept = intercept
-class LinearRegressionModelBase(LinearModel):
- """A linear regression model.
+# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
+def _get_unmangled_labeled_point_rdd(data):
+ return _get_unmangled_rdd(data, _serialize_labeled_point)
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
- True
+
+# Common functions for dealing with and training linear models
+
+def _linear_predictor_typecheck(x, coeffs):
"""
- def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ Check that x is a one-dimensional vector of the right shape.
+ This is a temporary hackaround until we actually implement bulk predict.
+ """
+ x = _convert_vector(x)
+ if type(x) == ndarray:
+ if x.ndim == 1:
+ if x.shape != coeffs.shape:
+ raise RuntimeError("Got array of %d elements; wanted %d"
+ % (numpy.shape(x)[0], coeffs.shape[0]))
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif type(x) == SparseVector:
+ if x.size != coeffs.shape[0]:
+ raise RuntimeError("Got sparse vector of size %d; wanted %d"
+ % (x.size, coeffs.shape[0]))
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
+
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
- initial_weights = data.first()
- if type(initial_weights) != ndarray:
- raise TypeError("At least one data element has type "
- + type(initial_weights).__name__ + " which is not ndarray")
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = ones([initial_weights.shape[0] - 1])
+ initial_weights = _convert_vector(data.first().features)
+ if type(initial_weights) == ndarray:
+ if initial_weights.ndim != 1:
+ raise TypeError("At least one data element has "
+ + initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights = numpy.zeros([initial_weights.shape[0]])
+ elif type(initial_weights) == SparseVector:
+ initial_weights = numpy.zeros([initial_weights.size])
return initial_weights
+
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
if len(ans) != 2:
raise RuntimeError("JVM call result had unexpected length")
@@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])
+
+# Functions for serializing ALS Rating objects and tuples
+
def _serialize_rating(r):
ba = bytearray(16)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
@@ -227,11 +349,12 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r
return ba
+
class RatingDeserializer(Serializer):
def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
- res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
+ res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4)
return int(res[0]), int(res[1]), res[2]
def load_stream(self, stream):
@@ -243,12 +366,86 @@ class RatingDeserializer(Serializer):
except EOFError:
return
+
def _serialize_tuple(t):
ba = bytearray(8)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
intpart[0], intpart[1] = t
return ba
+
+# Vector math functions that support all of our vector types
+
+def _convert_vector(vec):
+ """
+ Convert a vector to a format we support internally. This does
+ the following:
+
+ * For dense NumPy vectors (ndarray), returns them as is
+ * For our SparseVector class, returns that as is
+ * For Python lists, converts them to NumPy vectors
+ * For scipy.sparse.*_matrix column vectors, converts them to
+ our own SparseVector type.
+
+ This should be called before passing any data to our algorithms
+ or attempting to serialize it to Java.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec
+ elif type(vec) == list:
+ return array(vec, dtype=float64)
+ elif _have_scipy:
+ if _scipy_issparse(vec):
+ assert vec.shape[1] == 1, "Expected column vector"
+ csc = vec.tocsc()
+ return SparseVector(vec.shape[0], csc.indices, csc.data)
+ raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
+
+
+def _squared_distance(v1, v2):
+ """
+ Squared distance of two NumPy or sparse vectors.
+
+ >>> dense1 = array([1., 2.])
+ >>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
+ >>> dense2 = array([2., 1.])
+ >>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
+ >>> _squared_distance(dense1, dense2)
+ 2.0
+ >>> _squared_distance(dense1, sparse2)
+ 2.0
+ >>> _squared_distance(sparse1, dense2)
+ 2.0
+ >>> _squared_distance(sparse1, sparse2)
+ 2.0
+ """
+ v1 = _convert_vector(v1)
+ v2 = _convert_vector(v2)
+ if type(v1) == ndarray and type(v2) == ndarray:
+ diff = v1 - v2
+ return diff.dot(diff)
+ elif type(v1) == ndarray:
+ return v2.squared_distance(v1)
+ else:
+ return v1.squared_distance(v2)
+
+
+def _dot(vec, target):
+ """
+ Compute the dot product of a vector of the types we support
+ (Numpy array, list, SparseVector, or SciPy sparse) and a target
+ NumPy array that is either 1- or 2-dimensional. Equivalent to
+ calling numpy.dot of the two vectors, but for SciPy ones, we
+ have to transpose them because they're column vectors.
+ """
+ if type(vec) == ndarray or type(vec) == SparseVector:
+ return vec.dot(target)
+ elif type(vec) == list:
+ return _convert_vector(vec).dot(target)
+ else:
+ return vec.transpose().dot(target)[0]
+
+
def _test():
import doctest
globs = globals().copy()
@@ -259,5 +456,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index d2f9cdb3f4..3a23e0801f 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -17,30 +17,55 @@
import numpy
-from numpy import array, dot, shape
+from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- LinearModel, _linear_predictor_typecheck
+ _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
+ >>> lrm.predict(array([0.0])) <= 0
+ True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm.predict(array([0.0, 1.0])) > 0
+ True
+ >>> lrm.predict(array([0.0, 0.0])) <= 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
+
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(1.0, [2.0]),
+ ... LabeledPoint(1.0, [3.0])
+ ... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
+ ... ]
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
+ True
+ >>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
+ True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
- margin = dot(x, self._coeff) + self._intercept
+ margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
- >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0, 0.0]),
+ ... LabeledPoint(0.0, [0.0, 1.0]),
+ ... LabeledPoint(1.0, [1.0, 0.0]),
+ ... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
+ >>> sparse_data = [
+ ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
+ ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
+ ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
+ ... ]
+ >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
+ >>> model.predict(SparseVector(2, {1: 1.0}))
+ 0.0
+ >>> model.predict(SparseVector(2, {0: 1.0}))
+ 1.0
"""
def __init__(self, labels, pi, theta):
@@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
- return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
+ return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
- dataBytes = _get_unmangled_double_vector_rdd(data)
+ dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 30862918c3..f65088c917 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -19,37 +19,61 @@ from numpy import array, dot
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
+from pyspark.mllib.linalg import SparseVector
+
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
- >>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
- >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+ >>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
+ >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
+ True
+ >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
+ True
+ >>> model = KMeans.train(sc.parallelize(data), 2)
+ >>> sparse_data = [
+ ... SparseVector(3, {1: 1.0}),
+ ... SparseVector(3, {1: 1.1}),
+ ... SparseVector(3, {2: 1.0}),
+ ... SparseVector(3, {2: 1.1})
+ ... ]
+ >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
+ >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
+ True
+ >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
+ True
+ >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
- >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+ >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
- >>> clusters = KMeans.train(sc.parallelize(data), 2)
+ >>> type(model.clusterCenters)
+ <type 'list'>
"""
- def __init__(self, centers_):
- self.centers = centers_
+ def __init__(self, centers):
+ self.centers = centers
+
+ @property
+ def clusterCenters(self):
+ """Get the cluster centers, represented as a list of NumPy arrays."""
+ return self.centers
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
- best_distance = 1e75
- for i in range(0, self.centers.shape[0]):
- diff = x - self.centers[i]
- distance = sqrt(dot(diff, diff))
+ best_distance = float("inf")
+ for i in range(0, len(self.centers)):
+ distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
return best
+
class KMeans(object):
@classmethod
def train(cls, data, k, maxIterations=100, runs=1,
@@ -64,7 +88,9 @@ class KMeans(object):
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
- return KMeansModel(_deserialize_double_matrix(ans[0]))
+ matrix = _deserialize_double_matrix(ans[0])
+ return KMeansModel([row for row in matrix])
+
def _test():
import doctest
@@ -76,5 +102,6 @@ def _test():
if failure_count:
exit(-1)
+
if __name__ == "__main__":
_test()
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
new file mode 100644
index 0000000000..0aa3a51de7
--- /dev/null
+++ b/python/pyspark/mllib/linalg.py
@@ -0,0 +1,245 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+MLlib utilities for linear algebra. For dense vectors, MLlib
+uses the NumPy C{array} type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a L{SparseVector}
+object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+SciPy is available in their environment.
+"""
+
+from numpy import array, array_equal, ndarray, float64, int32
+
+
+class SparseVector(object):
+ """
+ A simple sparse vector class for passing data to MLlib. Users may
+ alternatively pass SciPy's {scipy.sparse} data types.
+ """
+
+ def __init__(self, size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print SparseVector(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print SparseVector(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ assert type(size) == int, "first argument must be an int"
+ self.size = size
+ assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
+ if len(args) == 1:
+ pairs = args[0]
+ if type(pairs) == dict:
+ pairs = pairs.items()
+ pairs = sorted(pairs)
+ self.indices = array([p[0] for p in pairs], dtype=int32)
+ self.values = array([p[1] for p in pairs], dtype=float64)
+ else:
+ assert len(args[0]) == len(args[1]), "index and value arrays not same length"
+ self.indices = array(args[0], dtype=int32)
+ self.values = array(args[1], dtype=float64)
+ for i in xrange(len(self.indices) - 1):
+ if self.indices[i] >= self.indices[i + 1]:
+ raise TypeError("indices array must be sorted")
+
+ def dot(self, other):
+ """
+ Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.dot(a)
+ 25.0
+ >>> a.dot(array([1., 2., 3., 4.]))
+ 22.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.dot(b)
+ 0.0
+ >>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
+ array([ 22., 22.])
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ for i in xrange(len(self.indices)):
+ result += self.values[i] * other[self.indices[i]]
+ return result
+ elif other.ndim == 2:
+ results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
+ return array(results)
+ else:
+ raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ result += self.values[i] * other.values[j]
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ i += 1
+ else:
+ j += 1
+ return result
+
+ def squared_distance(self, other):
+ """
+ Squared distance from a SparseVector or 1-dimensional NumPy array.
+
+ >>> a = SparseVector(4, [1, 3], [3.0, 4.0])
+ >>> a.squared_distance(a)
+ 0.0
+ >>> a.squared_distance(array([1., 2., 3., 4.]))
+ 11.0
+ >>> b = SparseVector(4, [2, 4], [1.0, 2.0])
+ >>> a.squared_distance(b)
+ 30.0
+ >>> b.squared_distance(a)
+ 30.0
+ """
+ if type(other) == ndarray:
+ if other.ndim == 1:
+ result = 0.0
+ j = 0 # index into our own array
+ for i in xrange(other.shape[0]):
+ if j < len(self.indices) and self.indices[j] == i:
+ diff = self.values[j] - other[i]
+ result += diff * diff
+ j += 1
+ else:
+ result += other[i] * other[i]
+ return result
+ else:
+ raise Exception("Cannot call squared_distance with %d-dimensional array" %
+ other.ndim)
+ else:
+ result = 0.0
+ i, j = 0, 0
+ while i < len(self.indices) and j < len(other.indices):
+ if self.indices[i] == other.indices[j]:
+ diff = self.values[i] - other.values[j]
+ result += diff * diff
+ i += 1
+ j += 1
+ elif self.indices[i] < other.indices[j]:
+ result += self.values[i] * self.values[i]
+ i += 1
+ else:
+ result += other.values[j] * other.values[j]
+ j += 1
+ while i < len(self.indices):
+ result += self.values[i] * self.values[i]
+ i += 1
+ while j < len(other.indices):
+ result += other.values[j] * other.values[j]
+ j += 1
+ return result
+
+ def __str__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "[" + entries + "]"
+
+ def __repr__(self):
+ inds = self.indices
+ vals = self.values
+ entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
+ return "SparseVector({0}, {{{1}}})".format(self.size, entries)
+
+ def __eq__(self, other):
+ """
+ Test SparseVectors for equality.
+
+ >>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
+ >>> v1 == v2
+ True
+ >>> v1 != v2
+ False
+ """
+
+ return (isinstance(other, self.__class__)
+ and other.size == self.size
+ and array_equal(other.indices, self.indices)
+ and array_equal(other.values, self.values))
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+
+
+class Vectors(object):
+ """
+ Factory methods for working with vectors. Note that dense vectors
+ are simply represented as NumPy array objects, so there is no need
+ to covert them for use in MLlib. For sparse vectors, the factory
+ methods in this class create an MLlib-compatible type, or users
+ can pass in SciPy's C{scipy.sparse} column vectors.
+ """
+
+ @staticmethod
+ def sparse(size, *args):
+ """
+ Create a sparse vector, using either a dictionary, a list of
+ (index, value) pairs, or two separate arrays of indices and
+ values (sorted by index).
+
+ @param size: Size of the vector.
+ @param args: Non-zero entries, as a dictionary, list of tupes,
+ or two sorted lists containing indices and values.
+
+ >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ [1: 1.0, 3: 5.5]
+ >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ [1: 1.0, 3: 5.5]
+ """
+ return SparseVector(size, *args)
+
+ @staticmethod
+ def dense(elements):
+ """
+ Create a dense vector of 64-bit floats from a Python list. Always
+ returns a NumPy array.
+
+ >>> Vectors.dense([1, 2, 3])
+ array([ 1., 2., 3.])
+ """
+ return array(elements, dtype=float64)
+
+
+def _test():
+ import doctest
+ (failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 7656db07f6..266b31d3fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -15,41 +15,98 @@
# limitations under the License.
#
-from numpy import array, dot
+from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
- _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
- _linear_predictor_typecheck
+ _linear_predictor_typecheck, _have_scipy, _scipy_issparse
+from pyspark.mllib.linalg import SparseVector
+
+
+class LabeledPoint(object):
+ """
+ The features and labels of a data point.
+
+ @param label: Label for this data point.
+ @param features: Vector of features for this point (NumPy array, list,
+ pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
+ """
+ def __init__(self, label, features):
+ self.label = label
+ if (type(features) == ndarray or type(features) == SparseVector
+ or (_have_scipy and _scipy_issparse(features))):
+ self.features = features
+ elif type(features) == list:
+ self.features = array(features)
+ else:
+ raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
+
class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
+ """A linear model that has a vector of coefficients and an intercept."""
+ def __init__(self, weights, intercept):
+ self._coeff = weights
self._intercept = intercept
+ @property
+ def weights(self):
+ return self._coeff
+
+ @property
+ def intercept(self):
+ return self._intercept
+
+
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
+ >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
+ True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
+ return _dot(x, self._coeff) + self._intercept
+
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
+
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
+
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> from pyspark.mllib.regression import LabeledPoint
+ >>> data = [
+ ... LabeledPoint(0.0, [0.0]),
+ ... LabeledPoint(1.0, [1.0]),
+ ... LabeledPoint(3.0, [2.0]),
+ ... LabeledPoint(2.0, [3.0])
+ ... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(array([1.0])) - 1) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
+ >>> data = [
+ ... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
+ ... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
+ ... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
+ ... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
+ ... ]
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> abs(lrm.predict(array([0.0])) - 0) < 0.5
+ True
+ >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
+ True
"""
+
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
+
def _test():
import doctest
globs = globals().copy()
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
new file mode 100644
index 0000000000..d4771d779f
--- /dev/null
+++ b/python/pyspark/mllib/tests.py
@@ -0,0 +1,302 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fuller unit tests for Python MLlib.
+"""
+
+from numpy import array, array_equal
+import unittest
+
+from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
+ _deserialize_double_vector, _dot, _squared_distance
+from pyspark.mllib.linalg import SparseVector
+from pyspark.mllib.regression import LabeledPoint
+from pyspark.tests import PySparkTestCase
+
+
+_have_scipy = False
+try:
+ import scipy.sparse
+ _have_scipy = True
+except:
+ # No SciPy, but that's okay, we'll skip those tests
+ pass
+
+
+class VectorTests(unittest.TestCase):
+ def test_serialize(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ self.assertTrue(sv is _convert_vector(sv))
+ self.assertTrue(dv is _convert_vector(dv))
+ self.assertTrue(array_equal(dv, _convert_vector(lst)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(sv)))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(dv))))
+ self.assertTrue(array_equal(dv,
+ _deserialize_double_vector(_serialize_double_vector(lst))))
+
+ def test_dot(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [1, 2, 3, 4]
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(sv, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
+ self.assertEquals(30.0, _dot(dv, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
+ self.assertEquals(30.0, _dot(lst, dv))
+ self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
+
+ def test_squared_distance(self):
+ sv = SparseVector(4, {1: 1, 3: 2})
+ dv = array([1., 2., 3., 4.])
+ lst = [4, 3, 2, 1]
+ self.assertEquals(15.0, _squared_distance(sv, dv))
+ self.assertEquals(25.0, _squared_distance(sv, lst))
+ self.assertEquals(20.0, _squared_distance(dv, lst))
+ self.assertEquals(15.0, _squared_distance(dv, sv))
+ self.assertEquals(25.0, _squared_distance(lst, sv))
+ self.assertEquals(20.0, _squared_distance(lst, dv))
+ self.assertEquals(0.0, _squared_distance(sv, sv))
+ self.assertEquals(0.0, _squared_distance(dv, dv))
+ self.assertEquals(0.0, _squared_distance(lst, lst))
+
+
+class ListTests(PySparkTestCase):
+ """
+ Test MLlib algorithms on plain lists, to make sure they're passed through
+ as NumPy arrays.
+ """
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ [0, 1.1],
+ [0, 1.2],
+ [1.1, 0],
+ [1.2, 0],
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, [1, 0]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(0.0, [2, 0]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, [0, -1]),
+ LabeledPoint(1.0, [0, 1]),
+ LabeledPoint(-1.0, [0, -2]),
+ LabeledPoint(1.0, [0, 2])
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features.tolist() for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+@unittest.skipIf(not _have_scipy, "SciPy not installed")
+class SciPyTests(PySparkTestCase):
+ """
+ Test both vector operations and MLlib algorithms with SciPy sparse matrices,
+ if SciPy is available.
+ """
+
+ def test_serialize(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ sv = SparseVector(4, {1: 1, 3: 2})
+ self.assertEquals(sv, _convert_vector(lil))
+ self.assertEquals(sv, _convert_vector(lil.tocsc()))
+ self.assertEquals(sv, _convert_vector(lil.tocoo()))
+ self.assertEquals(sv, _convert_vector(lil.tocsr()))
+ self.assertEquals(sv, _convert_vector(lil.todok()))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil)))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
+ self.assertEquals(sv,
+ _deserialize_double_vector(_serialize_double_vector(lil.todok())))
+
+ def test_dot(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 1
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ mat = array([[1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.],
+ [1., 2., 3., 4.]])
+ self.assertEquals(10.0, _dot(lil, dv))
+ self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
+
+ def test_squared_distance(self):
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((4, 1))
+ lil[1, 0] = 3
+ lil[3, 0] = 2
+ dv = array([1., 2., 3., 4.])
+ sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
+ self.assertEquals(15.0, _squared_distance(lil, dv))
+ self.assertEquals(15.0, _squared_distance(lil, sv))
+ self.assertEquals(15.0, _squared_distance(dv, lil))
+ self.assertEquals(15.0, _squared_distance(sv, lil))
+
+ def scipy_matrix(self, size, values):
+ """Create a column SciPy matrix from a dictionary of values"""
+ from scipy.sparse import lil_matrix
+ lil = lil_matrix((size, 1))
+ for key, value in values.items():
+ lil[key, 0] = value
+ return lil
+
+ def test_clustering(self):
+ from pyspark.mllib.clustering import KMeans
+ data = [
+ self.scipy_matrix(3, {1: 1.0}),
+ self.scipy_matrix(3, {1: 1.1}),
+ self.scipy_matrix(3, {2: 1.0}),
+ self.scipy_matrix(3, {2: 1.1})
+ ]
+ clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
+ self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
+ self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
+
+ def test_classification(self):
+ from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
+ data = [
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LogisticRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ svm_model = SVMWithSGD.train(rdd)
+ self.assertTrue(svm_model.predict(features[0]) <= 0)
+ self.assertTrue(svm_model.predict(features[1]) > 0)
+ self.assertTrue(svm_model.predict(features[2]) <= 0)
+ self.assertTrue(svm_model.predict(features[3]) > 0)
+
+ nb_model = NaiveBayes.train(rdd)
+ self.assertTrue(nb_model.predict(features[0]) <= 0)
+ self.assertTrue(nb_model.predict(features[1]) > 0)
+ self.assertTrue(nb_model.predict(features[2]) <= 0)
+ self.assertTrue(nb_model.predict(features[3]) > 0)
+
+ def test_regression(self):
+ from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
+ RidgeRegressionWithSGD
+ data = [
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
+ LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
+ LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
+ ]
+ rdd = self.sc.parallelize(data)
+ features = [p.features for p in data]
+
+ lr_model = LinearRegressionWithSGD.train(rdd)
+ self.assertTrue(lr_model.predict(features[0]) <= 0)
+ self.assertTrue(lr_model.predict(features[1]) > 0)
+ self.assertTrue(lr_model.predict(features[2]) <= 0)
+ self.assertTrue(lr_model.predict(features[3]) > 0)
+
+ lasso_model = LassoWithSGD.train(rdd)
+ self.assertTrue(lasso_model.predict(features[0]) <= 0)
+ self.assertTrue(lasso_model.predict(features[1]) > 0)
+ self.assertTrue(lasso_model.predict(features[2]) <= 0)
+ self.assertTrue(lasso_model.predict(features[3]) > 0)
+
+ rr_model = RidgeRegressionWithSGD.train(rdd)
+ self.assertTrue(rr_model.predict(features[0]) <= 0)
+ self.assertTrue(rr_model.predict(features[1]) > 0)
+ self.assertTrue(rr_model.predict(features[2]) <= 0)
+ self.assertTrue(rr_model.predict(features[3]) > 0)
+
+
+if __name__ == "__main__":
+ if not _have_scipy:
+ print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+ unittest.main()
+ if not _have_scipy:
+ print "NOTE: SciPy tests were skipped as it does not seem to be installed"
diff --git a/python/run-tests b/python/run-tests
index dabb714da9..7bbf10d05a 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -34,7 +34,7 @@ rm -rf metastore warehouse
function run_test() {
SPARK_TESTING=0 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log
FAILED=$((PIPESTATUS[0]||$FAILED))
-
+
# Fail and exit on the first test failure.
if [[ $FAILED != 0 ]]; then
cat unit-tests.log | grep -v "^[0-9][0-9]*" # filter all lines starting with a number.
@@ -57,8 +57,10 @@ run_test "pyspark/tests.py"
run_test "pyspark/mllib/_common.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
+run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
+run_test "pyspark/mllib/tests.py"
if [[ $FAILED == 0 ]]; then
echo -en "\033[32m" # Green