diff options
Diffstat (limited to 'examples/src/main/python')
5 files changed, 270 insertions, 0 deletions
diff --git a/examples/src/main/python/mllib/gaussian_mixture_example.py b/examples/src/main/python/mllib/gaussian_mixture_example.py new file mode 100644 index 0000000000..a60e799d62 --- /dev/null +++ b/examples/src/main/python/mllib/gaussian_mixture_example.py @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from numpy import array +# $example off$ + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="GaussianMixtureExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/gmm_data.txt") + parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')])) + + # Build the model (cluster the data) + gmm = GaussianMixture.train(parsedData, 2) + + # Save and load model + gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") + sameModel = GaussianMixtureModel\ + .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") + + # output parameters of model + for i in range(2): + print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu, + "sigma = ", gmm.gaussians[i].sigma.toArray()) + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/k_means_example.py b/examples/src/main/python/mllib/k_means_example.py new file mode 100644 index 0000000000..5c397e62ef --- /dev/null +++ b/examples/src/main/python/mllib/k_means_example.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +# $example on$ +from numpy import array +from math import sqrt +# $example off$ + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import KMeans, KMeansModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="KMeansExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/kmeans_data.txt") + parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')])) + + # Build the model (cluster the data) + clusters = KMeans.train(parsedData, 2, maxIterations=10, + runs=10, initializationMode="random") + + # Evaluate clustering by computing Within Set Sum of Squared Errors + def error(point): + center = clusters.centers[clusters.predict(point)] + return sqrt(sum([x**2 for x in (point - center)])) + + WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) + print("Within Set Sum of Squared Error = " + str(WSSSE)) + + # Save and load model + clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel") + sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py new file mode 100644 index 0000000000..2a1bef5f20 --- /dev/null +++ b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py @@ -0,0 +1,54 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import LDA, LDAModel +from pyspark.mllib.linalg import Vectors +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="LatentDirichletAllocationExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/sample_lda_data.txt") + parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) + # Index documents with unique IDs + corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache() + + # Cluster the documents into three topics using LDA + ldaModel = LDA.train(corpus, k=3) + + # Output topics. Each is a distribution over words (matching word count vectors) + print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize()) + + " words):") + topics = ldaModel.topicsMatrix() + for topic in range(3): + print("Topic " + str(topic) + ":") + for word in range(0, ldaModel.vocabSize()): + print(" " + str(topics[word][topic])) + + # Save and load model + ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel") + sameModel = LDAModel\ + .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/power_iteration_clustering_example.py b/examples/src/main/python/mllib/power_iteration_clustering_example.py new file mode 100644 index 0000000000..ca19c0ccb6 --- /dev/null +++ b/examples/src/main/python/mllib/power_iteration_clustering_example.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="PowerIterationClusteringExample") # SparkContext + + # $example on$ + # Load and parse the data + data = sc.textFile("data/mllib/pic_data.txt") + similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')])) + + # Cluster the data into two classes using PowerIterationClustering + model = PowerIterationClustering.train(similarities, 2, 10) + + model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster))) + + # Save and load model + model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") + sameModel = PowerIterationClusteringModel\ + .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") + # $example off$ + + sc.stop() diff --git a/examples/src/main/python/mllib/streaming_k_means_example.py b/examples/src/main/python/mllib/streaming_k_means_example.py new file mode 100644 index 0000000000..e82509ad3f --- /dev/null +++ b/examples/src/main/python/mllib/streaming_k_means_example.py @@ -0,0 +1,66 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from __future__ import print_function + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext +# $example on$ +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.clustering import StreamingKMeans +# $example off$ + +if __name__ == "__main__": + sc = SparkContext(appName="StreamingKMeansExample") # SparkContext + ssc = StreamingContext(sc, 1) + + # $example on$ + # we make an input stream of vectors for training, + # as well as a stream of vectors for testing + def parse(lp): + label = float(lp[lp.find('(') + 1: lp.find(')')]) + vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(',')) + + return LabeledPoint(label, vec) + + trainingData = sc.textFile("data/mllib/kmeans_data.txt")\ + .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) + + testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse) + + trainingQueue = [trainingData] + testingQueue = [testingData] + + trainingStream = ssc.queueStream(trainingQueue) + testingStream = ssc.queueStream(testingQueue) + + # We create a model with random clusters and specify the number of clusters to find + model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0) + + # Now register the streams for training and testing and start the job, + # printing the predicted cluster assignments on new data points as they arrive. + model.trainOn(trainingStream) + + result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features))) + result.pprint() + + ssc.start() + ssc.stop(stopSparkContext=True, stopGraceFully=True) + # $example off$ + + print("Final centers: " + str(model.latestModel().centers)) |