diff options
author | Sandeep <sandeep@techaddict.me> | 2014-05-06 17:27:52 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-05-06 17:27:52 -0700 |
commit | a000b5c3b0438c17e9973df4832c320210c29c27 (patch) | |
tree | 446bbe902ecd6de05072357f7ef25aaeb0687b73 /python | |
parent | 39b8b1489ff92697e4aeec997cdc436c7079d6f8 (diff) | |
download | spark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.gz spark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.bz2 spark-a000b5c3b0438c17e9973df4832c320210c29c27.zip |
SPARK-1637: Clean up examples for 1.0
- [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
- [x] Move Python examples into examples/src/main/python
- [x] Update docs to reflect these changes
Author: Sandeep <sandeep@techaddict.me>
This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <matei@databricks.com>
Closes #571 from techaddict/SPARK-1637 and squashes the following commits:
47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples
8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples
5f96121 [Sandeep] Move Python examples into examples/src/main/python
0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
Diffstat (limited to 'python')
-rwxr-xr-x | python/examples/als.py | 87 | ||||
-rwxr-xr-x | python/examples/kmeans.py | 73 | ||||
-rwxr-xr-x | python/examples/logistic_regression.py | 76 | ||||
-rwxr-xr-x | python/examples/mllib/kmeans.py | 44 | ||||
-rwxr-xr-x | python/examples/mllib/logistic_regression.py | 50 | ||||
-rwxr-xr-x | python/examples/pagerank.py | 70 | ||||
-rwxr-xr-x | python/examples/pi.py | 37 | ||||
-rwxr-xr-x | python/examples/sort.py | 36 | ||||
-rwxr-xr-x | python/examples/transitive_closure.py | 66 | ||||
-rwxr-xr-x | python/examples/wordcount.py | 35 |
10 files changed, 0 insertions, 574 deletions
diff --git a/python/examples/als.py b/python/examples/als.py deleted file mode 100755 index a77dfb2577..0000000000 --- a/python/examples/als.py +++ /dev/null @@ -1,87 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -This example requires numpy (http://www.numpy.org/) -""" -from os.path import realpath -import sys - -import numpy as np -from numpy.random import rand -from numpy import matrix -from pyspark import SparkContext - -LAMBDA = 0.01 # regularization -np.random.seed(42) - -def rmse(R, ms, us): - diff = R - ms * us.T - return np.sqrt(np.sum(np.power(diff, 2)) / M * U) - -def update(i, vec, mat, ratings): - uu = mat.shape[0] - ff = mat.shape[1] - XtX = matrix(np.zeros((ff, ff))) - Xty = np.zeros((ff, 1)) - - for j in range(uu): - v = mat[j, :] - XtX += v.T * v - Xty += v.T * ratings[i, j] - XtX += np.eye(ff, ff) * LAMBDA * uu - return np.linalg.solve(XtX, Xty) - -if __name__ == "__main__": - if len(sys.argv) < 2: - print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)]) - M = int(sys.argv[2]) if len(sys.argv) > 2 else 100 - U = int(sys.argv[3]) if len(sys.argv) > 3 else 500 - F = int(sys.argv[4]) if len(sys.argv) > 4 else 10 - ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5 - slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2 - - print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \ - (M, U, F, ITERATIONS, slices) - - R = matrix(rand(M, F)) * matrix(rand(U, F).T) - ms = matrix(rand(M ,F)) - us = matrix(rand(U, F)) - - Rb = sc.broadcast(R) - msb = sc.broadcast(ms) - usb = sc.broadcast(us) - - for i in range(ITERATIONS): - ms = sc.parallelize(range(M), slices) \ - .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \ - .collect() - ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being - # a 3-d array, we take the first 2 dims for the matrix - msb = sc.broadcast(ms) - - us = sc.parallelize(range(U), slices) \ - .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \ - .collect() - us = matrix(np.array(us)[:, :, 0]) - usb = sc.broadcast(us) - - error = rmse(R, ms, us) - print "Iteration %d:" % i - print "\nRMSE: %5.4f\n" % error diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py deleted file mode 100755 index d8387b0b18..0000000000 --- a/python/examples/kmeans.py +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -The K-means algorithm written from scratch against PySpark. In practice, -one may prefer to use the KMeans algorithm in MLlib, as shown in -python/examples/mllib/kmeans.py. - -This example requires NumPy (http://www.numpy.org/). -""" - -import sys - -import numpy as np -from pyspark import SparkContext - - -def parseVector(line): - return np.array([float(x) for x in line.split(' ')]) - - -def closestPoint(p, centers): - bestIndex = 0 - closest = float("+inf") - for i in range(len(centers)): - tempDist = np.sum((p - centers[i]) ** 2) - if tempDist < closest: - closest = tempDist - bestIndex = i - return bestIndex - - -if __name__ == "__main__": - if len(sys.argv) < 5: - print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonKMeans") - lines = sc.textFile(sys.argv[2]) - data = lines.map(parseVector).cache() - K = int(sys.argv[3]) - convergeDist = float(sys.argv[4]) - - kPoints = data.takeSample(False, K, 1) - tempDist = 1.0 - - while tempDist > convergeDist: - closest = data.map( - lambda p : (closestPoint(p, kPoints), (p, 1))) - pointStats = closest.reduceByKey( - lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2)) - newPoints = pointStats.map( - lambda (x, (y, z)): (x, y / z)).collect() - - tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints) - - for (x, y) in newPoints: - kPoints[x] = y - - print "Final centers: " + str(kPoints) diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py deleted file mode 100755 index 28d52e6a40..0000000000 --- a/python/examples/logistic_regression.py +++ /dev/null @@ -1,76 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -A logistic regression implementation that uses NumPy (http://www.numpy.org) -to act on batches of input data using efficient matrix operations. - -In practice, one may prefer to use the LogisticRegression algorithm in -MLlib, as shown in python/examples/mllib/logistic_regression.py. -""" - -from collections import namedtuple -from math import exp -from os.path import realpath -import sys - -import numpy as np -from pyspark import SparkContext - - -D = 10 # Number of dimensions - - -# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to -# make further computations faster. -# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these -# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient(). -def readPointBatch(iterator): - strs = list(iterator) - matrix = np.zeros((len(strs), D + 1)) - for i in xrange(len(strs)): - matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ') - return [matrix] - -if __name__ == "__main__": - if len(sys.argv) != 4: - print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)]) - points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache() - iterations = int(sys.argv[3]) - - # Initialize w to a random value - w = 2 * np.random.ranf(size=D) - 1 - print "Initial w: " + str(w) - - # Compute logistic regression gradient for a matrix of data points - def gradient(matrix, w): - Y = matrix[:,0] # point labels (first column of input file) - X = matrix[:,1:] # point coordinates - # For each point (x, y), compute gradient function, then sum these up - return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1) - - def add(x, y): - x += y - return x - - for i in range(iterations): - print "On iteration %i" % (i + 1) - w -= points.map(lambda m: gradient(m, w)).reduce(add) - - print "Final w: " + str(w) diff --git a/python/examples/mllib/kmeans.py b/python/examples/mllib/kmeans.py deleted file mode 100755 index dec82ff34f..0000000000 --- a/python/examples/mllib/kmeans.py +++ /dev/null @@ -1,44 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -A K-means clustering program using MLlib. - -This example requires NumPy (http://www.numpy.org/). -""" - -import sys - -import numpy as np -from pyspark import SparkContext -from pyspark.mllib.clustering import KMeans - - -def parseVector(line): - return np.array([float(x) for x in line.split(' ')]) - - -if __name__ == "__main__": - if len(sys.argv) < 4: - print >> sys.stderr, "Usage: kmeans <master> <file> <k>" - exit(-1) - sc = SparkContext(sys.argv[1], "KMeans") - lines = sc.textFile(sys.argv[2]) - data = lines.map(parseVector) - k = int(sys.argv[3]) - model = KMeans.train(data, k) - print "Final centers: " + str(model.clusterCenters) diff --git a/python/examples/mllib/logistic_regression.py b/python/examples/mllib/logistic_regression.py deleted file mode 100755 index 8631051d00..0000000000 --- a/python/examples/mllib/logistic_regression.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Logistic regression using MLlib. - -This example requires NumPy (http://www.numpy.org/). -""" - -from math import exp -import sys - -import numpy as np -from pyspark import SparkContext -from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib.classification import LogisticRegressionWithSGD - - -# Parse a line of text into an MLlib LabeledPoint object -def parsePoint(line): - values = [float(s) for s in line.split(' ')] - if values[0] == -1: # Convert -1 labels to 0 for MLlib - values[0] = 0 - return LabeledPoint(values[0], values[1:]) - - -if __name__ == "__main__": - if len(sys.argv) != 4: - print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonLR") - points = sc.textFile(sys.argv[2]).map(parsePoint) - iterations = int(sys.argv[3]) - model = LogisticRegressionWithSGD.train(points, iterations) - print "Final weights: " + str(model.weights) - print "Final intercept: " + str(model.intercept) diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py deleted file mode 100755 index cd774cf3a3..0000000000 --- a/python/examples/pagerank.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -#!/usr/bin/env python - -import re, sys -from operator import add - -from pyspark import SparkContext - - -def computeContribs(urls, rank): - """Calculates URL contributions to the rank of other URLs.""" - num_urls = len(urls) - for url in urls: yield (url, rank / num_urls) - - -def parseNeighbors(urls): - """Parses a urls pair string into urls pair.""" - parts = re.split(r'\s+', urls) - return parts[0], parts[1] - - -if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>" - exit(-1) - - # Initialize the spark context. - sc = SparkContext(sys.argv[1], "PythonPageRank") - - # Loads in input file. It should be in format of: - # URL neighbor URL - # URL neighbor URL - # URL neighbor URL - # ... - lines = sc.textFile(sys.argv[2], 1) - - # Loads all URLs from input file and initialize their neighbors. - links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache() - - # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. - ranks = links.map(lambda (url, neighbors): (url, 1.0)) - - # Calculates and updates URL ranks continuously using PageRank algorithm. - for iteration in xrange(int(sys.argv[3])): - # Calculates URL contributions to the rank of other URLs. - contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)): - computeContribs(urls, rank)) - - # Re-calculates URL ranks based on neighbor contributions. - ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15) - - # Collects all URL ranks and dump them to console. - for (link, rank) in ranks.collect(): - print "%s has rank: %s." % (link, rank) diff --git a/python/examples/pi.py b/python/examples/pi.py deleted file mode 100755 index ab0645fc2f..0000000000 --- a/python/examples/pi.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -from random import random -from operator import add - -from pyspark import SparkContext - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, "Usage: pi <master> [<slices>]" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonPi") - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 - n = 100000 * slices - def f(_): - x = random() * 2 - 1 - y = random() * 2 - 1 - return 1 if x ** 2 + y ** 2 < 1 else 0 - count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add) - print "Pi is roughly %f" % (4.0 * count / n) diff --git a/python/examples/sort.py b/python/examples/sort.py deleted file mode 100755 index 5de20a6d98..0000000000 --- a/python/examples/sort.py +++ /dev/null @@ -1,36 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys - -from pyspark import SparkContext - - -if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: sort <master> <file>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonSort") - lines = sc.textFile(sys.argv[2], 1) - sortedCount = lines.flatMap(lambda x: x.split(' ')) \ - .map(lambda x: (int(x), 1)) \ - .sortByKey(lambda x: x) - # This is just a demo on how to bring all the sorted data back to a single node. - # In reality, we wouldn't want to collect all the data to the driver node. - output = sortedCount.collect() - for (num, unitcount) in output: - print num diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py deleted file mode 100755 index 744cce6651..0000000000 --- a/python/examples/transitive_closure.py +++ /dev/null @@ -1,66 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -from random import Random - -from pyspark import SparkContext - -numEdges = 200 -numVertices = 100 -rand = Random(42) - - -def generateGraph(): - edges = set() - while len(edges) < numEdges: - src = rand.randrange(0, numEdges) - dst = rand.randrange(0, numEdges) - if src != dst: - edges.add((src, dst)) - return edges - - -if __name__ == "__main__": - if len(sys.argv) == 1: - print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonTransitiveClosure") - slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2 - tc = sc.parallelize(generateGraph(), slices).cache() - - # Linear transitive closure: each round grows paths by one edge, - # by joining the graph's edges with the already-discovered paths. - # e.g. join the path (y, z) from the TC with the edge (x, y) from - # the graph to obtain the path (x, z). - - # Because join() joins on keys, the edges are stored in reversed order. - edges = tc.map(lambda (x, y): (y, x)) - - oldCount = 0L - nextCount = tc.count() - while True: - oldCount = nextCount - # Perform the join, obtaining an RDD of (y, (z, x)) pairs, - # then project the result to obtain the new (x, z) paths. - new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a)) - tc = tc.union(new_edges).distinct().cache() - nextCount = tc.count() - if nextCount == oldCount: - break - - print "TC has %i edges" % tc.count() diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py deleted file mode 100755 index b9139b9d76..0000000000 --- a/python/examples/wordcount.py +++ /dev/null @@ -1,35 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import sys -from operator import add - -from pyspark import SparkContext - - -if __name__ == "__main__": - if len(sys.argv) < 3: - print >> sys.stderr, "Usage: wordcount <master> <file>" - exit(-1) - sc = SparkContext(sys.argv[1], "PythonWordCount") - lines = sc.textFile(sys.argv[2], 1) - counts = lines.flatMap(lambda x: x.split(' ')) \ - .map(lambda x: (x, 1)) \ - .reduceByKey(add) - output = counts.collect() - for (word, count) in output: - print "%s: %i" % (word, count) |