aboutsummaryrefslogtreecommitdiff
path: root/python/examples
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-05-06 17:27:52 -0700
committerMatei Zaharia <matei@databricks.com>2014-05-06 17:27:52 -0700
commita000b5c3b0438c17e9973df4832c320210c29c27 (patch)
tree446bbe902ecd6de05072357f7ef25aaeb0687b73 /python/examples
parent39b8b1489ff92697e4aeec997cdc436c7079d6f8 (diff)
downloadspark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.gz
spark-a000b5c3b0438c17e9973df4832c320210c29c27.tar.bz2
spark-a000b5c3b0438c17e9973df4832c320210c29c27.zip
SPARK-1637: Clean up examples for 1.0
- [x] Move all of them into subpackages of org.apache.spark.examples (right now some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib) - [x] Move Python examples into examples/src/main/python - [x] Update docs to reflect these changes Author: Sandeep <sandeep@techaddict.me> This patch had conflicts when merged, resolved by Committer: Matei Zaharia <matei@databricks.com> Closes #571 from techaddict/SPARK-1637 and squashes the following commits: 47ef86c [Sandeep] Changes based on Discussions on PR, removing use of RawTextHelper from examples 8ed2d3f [Sandeep] Docs Updated for changes, Change for java examples 5f96121 [Sandeep] Move Python examples into examples/src/main/python 0a8dd77 [Sandeep] Move all Scala Examples to org.apache.spark.examples (some are in org.apache.spark.streaming.examples, for instance, and others are in org.apache.spark.examples.mllib)
Diffstat (limited to 'python/examples')
-rwxr-xr-xpython/examples/als.py87
-rwxr-xr-xpython/examples/kmeans.py73
-rwxr-xr-xpython/examples/logistic_regression.py76
-rwxr-xr-xpython/examples/mllib/kmeans.py44
-rwxr-xr-xpython/examples/mllib/logistic_regression.py50
-rwxr-xr-xpython/examples/pagerank.py70
-rwxr-xr-xpython/examples/pi.py37
-rwxr-xr-xpython/examples/sort.py36
-rwxr-xr-xpython/examples/transitive_closure.py66
-rwxr-xr-xpython/examples/wordcount.py35
10 files changed, 0 insertions, 574 deletions
diff --git a/python/examples/als.py b/python/examples/als.py
deleted file mode 100755
index a77dfb2577..0000000000
--- a/python/examples/als.py
+++ /dev/null
@@ -1,87 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-This example requires numpy (http://www.numpy.org/)
-"""
-from os.path import realpath
-import sys
-
-import numpy as np
-from numpy.random import rand
-from numpy import matrix
-from pyspark import SparkContext
-
-LAMBDA = 0.01 # regularization
-np.random.seed(42)
-
-def rmse(R, ms, us):
- diff = R - ms * us.T
- return np.sqrt(np.sum(np.power(diff, 2)) / M * U)
-
-def update(i, vec, mat, ratings):
- uu = mat.shape[0]
- ff = mat.shape[1]
- XtX = matrix(np.zeros((ff, ff)))
- Xty = np.zeros((ff, 1))
-
- for j in range(uu):
- v = mat[j, :]
- XtX += v.T * v
- Xty += v.T * ratings[i, j]
- XtX += np.eye(ff, ff) * LAMBDA * uu
- return np.linalg.solve(XtX, Xty)
-
-if __name__ == "__main__":
- if len(sys.argv) < 2:
- print >> sys.stderr, "Usage: als <master> <M> <U> <F> <iters> <slices>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonALS", pyFiles=[realpath(__file__)])
- M = int(sys.argv[2]) if len(sys.argv) > 2 else 100
- U = int(sys.argv[3]) if len(sys.argv) > 3 else 500
- F = int(sys.argv[4]) if len(sys.argv) > 4 else 10
- ITERATIONS = int(sys.argv[5]) if len(sys.argv) > 5 else 5
- slices = int(sys.argv[6]) if len(sys.argv) > 6 else 2
-
- print "Running ALS with M=%d, U=%d, F=%d, iters=%d, slices=%d\n" % \
- (M, U, F, ITERATIONS, slices)
-
- R = matrix(rand(M, F)) * matrix(rand(U, F).T)
- ms = matrix(rand(M ,F))
- us = matrix(rand(U, F))
-
- Rb = sc.broadcast(R)
- msb = sc.broadcast(ms)
- usb = sc.broadcast(us)
-
- for i in range(ITERATIONS):
- ms = sc.parallelize(range(M), slices) \
- .map(lambda x: update(x, msb.value[x, :], usb.value, Rb.value)) \
- .collect()
- ms = matrix(np.array(ms)[:, :, 0]) # collect() returns a list, so array ends up being
- # a 3-d array, we take the first 2 dims for the matrix
- msb = sc.broadcast(ms)
-
- us = sc.parallelize(range(U), slices) \
- .map(lambda x: update(x, usb.value[x, :], msb.value, Rb.value.T)) \
- .collect()
- us = matrix(np.array(us)[:, :, 0])
- usb = sc.broadcast(us)
-
- error = rmse(R, ms, us)
- print "Iteration %d:" % i
- print "\nRMSE: %5.4f\n" % error
diff --git a/python/examples/kmeans.py b/python/examples/kmeans.py
deleted file mode 100755
index d8387b0b18..0000000000
--- a/python/examples/kmeans.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-The K-means algorithm written from scratch against PySpark. In practice,
-one may prefer to use the KMeans algorithm in MLlib, as shown in
-python/examples/mllib/kmeans.py.
-
-This example requires NumPy (http://www.numpy.org/).
-"""
-
-import sys
-
-import numpy as np
-from pyspark import SparkContext
-
-
-def parseVector(line):
- return np.array([float(x) for x in line.split(' ')])
-
-
-def closestPoint(p, centers):
- bestIndex = 0
- closest = float("+inf")
- for i in range(len(centers)):
- tempDist = np.sum((p - centers[i]) ** 2)
- if tempDist < closest:
- closest = tempDist
- bestIndex = i
- return bestIndex
-
-
-if __name__ == "__main__":
- if len(sys.argv) < 5:
- print >> sys.stderr, "Usage: kmeans <master> <file> <k> <convergeDist>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonKMeans")
- lines = sc.textFile(sys.argv[2])
- data = lines.map(parseVector).cache()
- K = int(sys.argv[3])
- convergeDist = float(sys.argv[4])
-
- kPoints = data.takeSample(False, K, 1)
- tempDist = 1.0
-
- while tempDist > convergeDist:
- closest = data.map(
- lambda p : (closestPoint(p, kPoints), (p, 1)))
- pointStats = closest.reduceByKey(
- lambda (x1, y1), (x2, y2): (x1 + x2, y1 + y2))
- newPoints = pointStats.map(
- lambda (x, (y, z)): (x, y / z)).collect()
-
- tempDist = sum(np.sum((kPoints[x] - y) ** 2) for (x, y) in newPoints)
-
- for (x, y) in newPoints:
- kPoints[x] = y
-
- print "Final centers: " + str(kPoints)
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
deleted file mode 100755
index 28d52e6a40..0000000000
--- a/python/examples/logistic_regression.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-A logistic regression implementation that uses NumPy (http://www.numpy.org)
-to act on batches of input data using efficient matrix operations.
-
-In practice, one may prefer to use the LogisticRegression algorithm in
-MLlib, as shown in python/examples/mllib/logistic_regression.py.
-"""
-
-from collections import namedtuple
-from math import exp
-from os.path import realpath
-import sys
-
-import numpy as np
-from pyspark import SparkContext
-
-
-D = 10 # Number of dimensions
-
-
-# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
-# make further computations faster.
-# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
-# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
-def readPointBatch(iterator):
- strs = list(iterator)
- matrix = np.zeros((len(strs), D + 1))
- for i in xrange(len(strs)):
- matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
- return [matrix]
-
-if __name__ == "__main__":
- if len(sys.argv) != 4:
- print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
- points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
- iterations = int(sys.argv[3])
-
- # Initialize w to a random value
- w = 2 * np.random.ranf(size=D) - 1
- print "Initial w: " + str(w)
-
- # Compute logistic regression gradient for a matrix of data points
- def gradient(matrix, w):
- Y = matrix[:,0] # point labels (first column of input file)
- X = matrix[:,1:] # point coordinates
- # For each point (x, y), compute gradient function, then sum these up
- return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
-
- def add(x, y):
- x += y
- return x
-
- for i in range(iterations):
- print "On iteration %i" % (i + 1)
- w -= points.map(lambda m: gradient(m, w)).reduce(add)
-
- print "Final w: " + str(w)
diff --git a/python/examples/mllib/kmeans.py b/python/examples/mllib/kmeans.py
deleted file mode 100755
index dec82ff34f..0000000000
--- a/python/examples/mllib/kmeans.py
+++ /dev/null
@@ -1,44 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-A K-means clustering program using MLlib.
-
-This example requires NumPy (http://www.numpy.org/).
-"""
-
-import sys
-
-import numpy as np
-from pyspark import SparkContext
-from pyspark.mllib.clustering import KMeans
-
-
-def parseVector(line):
- return np.array([float(x) for x in line.split(' ')])
-
-
-if __name__ == "__main__":
- if len(sys.argv) < 4:
- print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "KMeans")
- lines = sc.textFile(sys.argv[2])
- data = lines.map(parseVector)
- k = int(sys.argv[3])
- model = KMeans.train(data, k)
- print "Final centers: " + str(model.clusterCenters)
diff --git a/python/examples/mllib/logistic_regression.py b/python/examples/mllib/logistic_regression.py
deleted file mode 100755
index 8631051d00..0000000000
--- a/python/examples/mllib/logistic_regression.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""
-Logistic regression using MLlib.
-
-This example requires NumPy (http://www.numpy.org/).
-"""
-
-from math import exp
-import sys
-
-import numpy as np
-from pyspark import SparkContext
-from pyspark.mllib.regression import LabeledPoint
-from pyspark.mllib.classification import LogisticRegressionWithSGD
-
-
-# Parse a line of text into an MLlib LabeledPoint object
-def parsePoint(line):
- values = [float(s) for s in line.split(' ')]
- if values[0] == -1: # Convert -1 labels to 0 for MLlib
- values[0] = 0
- return LabeledPoint(values[0], values[1:])
-
-
-if __name__ == "__main__":
- if len(sys.argv) != 4:
- print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonLR")
- points = sc.textFile(sys.argv[2]).map(parsePoint)
- iterations = int(sys.argv[3])
- model = LogisticRegressionWithSGD.train(points, iterations)
- print "Final weights: " + str(model.weights)
- print "Final intercept: " + str(model.intercept)
diff --git a/python/examples/pagerank.py b/python/examples/pagerank.py
deleted file mode 100755
index cd774cf3a3..0000000000
--- a/python/examples/pagerank.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-#!/usr/bin/env python
-
-import re, sys
-from operator import add
-
-from pyspark import SparkContext
-
-
-def computeContribs(urls, rank):
- """Calculates URL contributions to the rank of other URLs."""
- num_urls = len(urls)
- for url in urls: yield (url, rank / num_urls)
-
-
-def parseNeighbors(urls):
- """Parses a urls pair string into urls pair."""
- parts = re.split(r'\s+', urls)
- return parts[0], parts[1]
-
-
-if __name__ == "__main__":
- if len(sys.argv) < 3:
- print >> sys.stderr, "Usage: pagerank <master> <file> <number_of_iterations>"
- exit(-1)
-
- # Initialize the spark context.
- sc = SparkContext(sys.argv[1], "PythonPageRank")
-
- # Loads in input file. It should be in format of:
- # URL neighbor URL
- # URL neighbor URL
- # URL neighbor URL
- # ...
- lines = sc.textFile(sys.argv[2], 1)
-
- # Loads all URLs from input file and initialize their neighbors.
- links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
-
- # Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- ranks = links.map(lambda (url, neighbors): (url, 1.0))
-
- # Calculates and updates URL ranks continuously using PageRank algorithm.
- for iteration in xrange(int(sys.argv[3])):
- # Calculates URL contributions to the rank of other URLs.
- contribs = links.join(ranks).flatMap(lambda (url, (urls, rank)):
- computeContribs(urls, rank))
-
- # Re-calculates URL ranks based on neighbor contributions.
- ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
-
- # Collects all URL ranks and dump them to console.
- for (link, rank) in ranks.collect():
- print "%s has rank: %s." % (link, rank)
diff --git a/python/examples/pi.py b/python/examples/pi.py
deleted file mode 100755
index ab0645fc2f..0000000000
--- a/python/examples/pi.py
+++ /dev/null
@@ -1,37 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-from random import random
-from operator import add
-
-from pyspark import SparkContext
-
-
-if __name__ == "__main__":
- if len(sys.argv) == 1:
- print >> sys.stderr, "Usage: pi <master> [<slices>]"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonPi")
- slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
- n = 100000 * slices
- def f(_):
- x = random() * 2 - 1
- y = random() * 2 - 1
- return 1 if x ** 2 + y ** 2 < 1 else 0
- count = sc.parallelize(xrange(1, n+1), slices).map(f).reduce(add)
- print "Pi is roughly %f" % (4.0 * count / n)
diff --git a/python/examples/sort.py b/python/examples/sort.py
deleted file mode 100755
index 5de20a6d98..0000000000
--- a/python/examples/sort.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-
-from pyspark import SparkContext
-
-
-if __name__ == "__main__":
- if len(sys.argv) < 3:
- print >> sys.stderr, "Usage: sort <master> <file>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonSort")
- lines = sc.textFile(sys.argv[2], 1)
- sortedCount = lines.flatMap(lambda x: x.split(' ')) \
- .map(lambda x: (int(x), 1)) \
- .sortByKey(lambda x: x)
- # This is just a demo on how to bring all the sorted data back to a single node.
- # In reality, we wouldn't want to collect all the data to the driver node.
- output = sortedCount.collect()
- for (num, unitcount) in output:
- print num
diff --git a/python/examples/transitive_closure.py b/python/examples/transitive_closure.py
deleted file mode 100755
index 744cce6651..0000000000
--- a/python/examples/transitive_closure.py
+++ /dev/null
@@ -1,66 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-from random import Random
-
-from pyspark import SparkContext
-
-numEdges = 200
-numVertices = 100
-rand = Random(42)
-
-
-def generateGraph():
- edges = set()
- while len(edges) < numEdges:
- src = rand.randrange(0, numEdges)
- dst = rand.randrange(0, numEdges)
- if src != dst:
- edges.add((src, dst))
- return edges
-
-
-if __name__ == "__main__":
- if len(sys.argv) == 1:
- print >> sys.stderr, "Usage: transitive_closure <master> [<slices>]"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonTransitiveClosure")
- slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
- tc = sc.parallelize(generateGraph(), slices).cache()
-
- # Linear transitive closure: each round grows paths by one edge,
- # by joining the graph's edges with the already-discovered paths.
- # e.g. join the path (y, z) from the TC with the edge (x, y) from
- # the graph to obtain the path (x, z).
-
- # Because join() joins on keys, the edges are stored in reversed order.
- edges = tc.map(lambda (x, y): (y, x))
-
- oldCount = 0L
- nextCount = tc.count()
- while True:
- oldCount = nextCount
- # Perform the join, obtaining an RDD of (y, (z, x)) pairs,
- # then project the result to obtain the new (x, z) paths.
- new_edges = tc.join(edges).map(lambda (_, (a, b)): (b, a))
- tc = tc.union(new_edges).distinct().cache()
- nextCount = tc.count()
- if nextCount == oldCount:
- break
-
- print "TC has %i edges" % tc.count()
diff --git a/python/examples/wordcount.py b/python/examples/wordcount.py
deleted file mode 100755
index b9139b9d76..0000000000
--- a/python/examples/wordcount.py
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import sys
-from operator import add
-
-from pyspark import SparkContext
-
-
-if __name__ == "__main__":
- if len(sys.argv) < 3:
- print >> sys.stderr, "Usage: wordcount <master> <file>"
- exit(-1)
- sc = SparkContext(sys.argv[1], "PythonWordCount")
- lines = sc.textFile(sys.argv[2], 1)
- counts = lines.flatMap(lambda x: x.split(' ')) \
- .map(lambda x: (x, 1)) \
- .reduceByKey(add)
- output = counts.collect()
- for (word, count) in output:
- print "%s: %i" % (word, count)