From af3c9d50424602f3e5af1055e83e9badef0a1632 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 16 Jul 2013 17:21:33 -0700 Subject: Add Apache license headers and LICENSE and NOTICE files --- python/pyspark/rdd.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a9fec17a9d..c6a6b24c5a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1,3 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + from base64 import standard_b64encode as b64enc import copy from collections import defaultdict -- cgit v1.2.3 From b9d6783f36d527f5082bf13a4ee6fd108e97795c Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 28 Jul 2013 23:28:42 -0400 Subject: Optimize Python take() to not compute entire first partition --- .../main/scala/spark/api/python/PythonRDD.scala | 64 ++++++++++++---------- python/pyspark/rdd.py | 15 +++-- 2 files changed, 45 insertions(+), 34 deletions(-) (limited to 'python/pyspark/rdd.py') diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index af10822dbd..2dd79f7100 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -63,34 +63,42 @@ private[spark] class PythonRDD[T: ClassManifest]( // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + pythonExec) { override def run() { - SparkEnv.set(env) - val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) - val dataOut = new DataOutputStream(stream) - val printOut = new PrintWriter(stream) - // Partition index - dataOut.writeInt(split.index) - // sparkFilesDir - PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut) - // Broadcast variables - dataOut.writeInt(broadcastVars.length) - for (broadcast <- broadcastVars) { - dataOut.writeLong(broadcast.id) - dataOut.writeInt(broadcast.value.length) - dataOut.write(broadcast.value) - } - dataOut.flush() - // Serialized user code - for (elem <- command) { - printOut.println(elem) - } - printOut.flush() - // Data values - for (elem <- parent.iterator(split, context)) { - PythonRDD.writeAsPickle(elem, dataOut) + try { + SparkEnv.set(env) + val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) + val dataOut = new DataOutputStream(stream) + val printOut = new PrintWriter(stream) + // Partition index + dataOut.writeInt(split.index) + // sparkFilesDir + PythonRDD.writeAsPickle(SparkFiles.getRootDirectory, dataOut) + // Broadcast variables + dataOut.writeInt(broadcastVars.length) + for (broadcast <- broadcastVars) { + dataOut.writeLong(broadcast.id) + dataOut.writeInt(broadcast.value.length) + dataOut.write(broadcast.value) + } + dataOut.flush() + // Serialized user code + for (elem <- command) { + printOut.println(elem) + } + printOut.flush() + // Data values + for (elem <- parent.iterator(split, context)) { + PythonRDD.writeAsPickle(elem, dataOut) + } + dataOut.flush() + printOut.flush() + worker.shutdownOutput() + } catch { + case e: IOException => + // This can happen for legitimate reasons if the Python code stops returning data before we are done + // passing elements through, e.g., for take(). Just log a message to say it happened. + logInfo("stdin writer to Python finished early") + logDebug("stdin writer to Python finished early", e) } - dataOut.flush() - printOut.flush() - worker.shutdownOutput() } }.start() @@ -297,7 +305,7 @@ class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int) Utils.checkHost(serverHost, "Expected hostname") val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt - + override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]]) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c6a6b24c5a..6efa61aa66 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -386,13 +386,16 @@ class RDD(object): >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) [2, 3, 4, 5, 6] """ + def takeUpToNum(iterator): + taken = 0 + while taken < num: + yield next(iterator) + taken += 1 + # Take only up to num elements from each partition we try + mapped = self.mapPartitions(takeUpToNum) items = [] - for partition in range(self._jrdd.splits().size()): - iterator = self.ctx._takePartition(self._jrdd.rdd(), partition) - # Each item in the iterator is a string, Python object, batch of - # Python objects. Regardless, it is sufficient to take `num` - # of these objects in order to collect `num` Python objects: - iterator = iterator.take(num) + for partition in range(mapped._jrdd.splits().size()): + iterator = self.ctx._takePartition(mapped._jrdd.rdd(), partition) items.extend(self._collect_iterator_through_file(iterator)) if len(items) >= num: break -- cgit v1.2.3 From b5ec35562210c8e7ca4fea07a0d46cb255988c0d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 28 Jul 2013 23:38:56 -0400 Subject: Optimize Python foreach() to not return as many objects --- python/pyspark/rdd.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6efa61aa66..4aafe35d13 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -267,7 +267,11 @@ class RDD(object): >>> def f(x): print x >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) """ - self.map(f).collect() # Force evaluation + def processPartition(iterator): + for x in iterator: + f(x) + yield None + self.mapPartitions(processPartition).collect() # Force evaluation def collect(self): """ -- cgit v1.2.3 From d75c3086951f603ec30b2527c24559e053ed7f25 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 28 Jul 2013 23:50:38 -0400 Subject: Use None instead of empty string as it's slightly smaller/faster --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4aafe35d13..8734cacb0b 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -160,7 +160,7 @@ class RDD(object): >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) [1, 2, 3] """ - return self.map(lambda x: (x, "")) \ + return self.map(lambda x: (x, None)) \ .reduceByKey(lambda x, _: x) \ .map(lambda (x, _): x) -- cgit v1.2.3 From b95732632b5b06d494ebd9e539af136ab3b8490e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 28 Jul 2013 19:24:23 -0700 Subject: Do not inherit master's PYTHONPATH on workers. This fixes SPARK-832, an issue where PySpark would not work when the master and workers used different SPARK_HOME paths. This change may potentially break code that relied on the master's PYTHONPATH being used on workers. To have custom PYTHONPATH additions used on the workers, users should set a custom PYTHONPATH in spark-env.sh rather than setting it in the shell. --- core/src/main/scala/spark/api/python/PythonWorkerFactory.scala | 2 ++ python/pyspark/rdd.py | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) (limited to 'python/pyspark/rdd.py') diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala index 078ad45ce8..84673470db 100644 --- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala @@ -67,6 +67,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + val pythonPath = sparkHome + "/python/:" + workerEnv.get("PYTHONPATH") + workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8734cacb0b..51c2cb9806 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -756,9 +756,8 @@ class PipelinedRDD(RDD): self.ctx._gateway._gateway_client) self.ctx._pickled_broadcast_vars.clear() class_manifest = self._prev_jrdd.classManifest() - env = copy.copy(self.ctx.environment) - env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "") - env = MapConverter().convert(env, self.ctx._gateway._gateway_client) + env = MapConverter().convert(self.ctx.environment, + self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, class_manifest) -- cgit v1.2.3 From c7e348faec45ad1d996d16639015c4bc4fc3bc92 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Thu, 15 Aug 2013 16:01:19 -0700 Subject: Implementing SPARK-878 for PySpark: adding zip and egg files to context and passing it down to workers which add these to their sys.path --- core/src/main/scala/spark/api/python/PythonRDD.scala | 9 ++++++++- python/pyspark/context.py | 14 +++++++++++--- python/pyspark/rdd.py | 4 +++- python/pyspark/tests.py | 11 +++++++++++ python/pyspark/worker.py | 13 ++++++++++++- python/test_support/userlib-0.1-py2.7.egg | Bin 0 -> 1945 bytes 6 files changed, 45 insertions(+), 6 deletions(-) create mode 100644 python/test_support/userlib-0.1-py2.7.egg (limited to 'python/pyspark/rdd.py') diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 2dd79f7100..49671437d0 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -33,6 +33,7 @@ private[spark] class PythonRDD[T: ClassManifest]( parent: RDD[T], command: Seq[String], envVars: JMap[String, String], + pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], @@ -44,10 +45,11 @@ private[spark] class PythonRDD[T: ClassManifest]( // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) def this(parent: RDD[T], command: String, envVars: JMap[String, String], + pythonIncludes: JList[String], preservePartitoning: Boolean, pythonExec: String, broadcastVars: JList[Broadcast[Array[Byte]]], accumulator: Accumulator[JList[Array[Byte]]]) = - this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec, + this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator) override def getPartitions = parent.partitions @@ -79,6 +81,11 @@ private[spark] class PythonRDD[T: ClassManifest]( dataOut.writeInt(broadcast.value.length) dataOut.write(broadcast.value) } + // Python includes (*.zip and *.egg files) + dataOut.writeInt(pythonIncludes.length) + for (f <- pythonIncludes) { + PythonRDD.writeAsPickle(f, dataOut) + } dataOut.flush() // Serialized user code for (elem <- command) { diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c2b49ff37a..2803ce90f3 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -46,6 +46,7 @@ class SparkContext(object): _next_accum_id = 0 _active_spark_context = None _lock = Lock() + _python_includes = None # zip and egg files that need to be added to PYTHONPATH def __init__(self, master, jobName, sparkHome=None, pyFiles=None, environment=None, batchSize=1024): @@ -103,11 +104,14 @@ class SparkContext(object): # send. self._pickled_broadcast_vars = set() + SparkFiles._sc = self + root_dir = SparkFiles.getRootDirectory() + sys.path.append(root_dir) + # Deploy any code dependencies specified in the constructor + self._python_includes = list() for path in (pyFiles or []): self.addPyFile(path) - SparkFiles._sc = self - sys.path.append(SparkFiles.getRootDirectory()) # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.spark.Utils.getLocalDir() @@ -257,7 +261,11 @@ class SparkContext(object): HTTP, HTTPS or FTP URI. """ self.addFile(path) - filename = path.split("/")[-1] + (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + + if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): + self._python_includes.append(filename) + sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode def setCheckpointDir(self, dirName, useExisting=False): """ diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 51c2cb9806..99f5967a8e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -758,8 +758,10 @@ class PipelinedRDD(RDD): class_manifest = self._prev_jrdd.classManifest() env = MapConverter().convert(self.ctx.environment, self.ctx._gateway._gateway_client) + includes = ListConverter().convert(self.ctx._python_includes, + self.ctx._gateway._gateway_client) python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), - pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec, + pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, class_manifest) self._jrdd_val = python_rdd.asJavaRDD() return self._jrdd_val diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f75215a781..29d6a128f6 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -125,6 +125,17 @@ class TestAddFile(PySparkTestCase): from userlibrary import UserClass self.assertEqual("Hello World!", UserClass().hello()) + def test_add_egg_file_locally(self): + # To ensure that we're actually testing addPyFile's effects, check that + # this fails due to `userlibrary` not being on the Python path: + def func(): + from userlib import UserClass + self.assertRaises(ImportError, func) + path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg") + self.sc.addPyFile(path) + from userlib import UserClass + self.assertEqual("Hello World from inside a package!", UserClass().hello()) + class TestIO(PySparkTestCase): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 75d692beeb..695f6dfb84 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -49,15 +49,26 @@ def main(infile, outfile): split_index = read_int(infile) if split_index == -1: # for unit tests return + + # fetch name of workdir spark_files_dir = load_pickle(read_with_length(infile)) SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True - sys.path.append(spark_files_dir) + + # fetch names and values of broadcast variables num_broadcast_variables = read_int(infile) for _ in range(num_broadcast_variables): bid = read_long(infile) value = read_with_length(infile) _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value)) + + # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH + sys.path.append(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + sys.path.append(os.path.join(spark_files_dir, load_pickle(read_with_length(infile)))) + + # now load function func = load_obj(infile) bypassSerializer = load_obj(infile) if bypassSerializer: diff --git a/python/test_support/userlib-0.1-py2.7.egg b/python/test_support/userlib-0.1-py2.7.egg new file mode 100644 index 0000000000..1674c9cb22 Binary files /dev/null and b/python/test_support/userlib-0.1-py2.7.egg differ -- cgit v1.2.3 From 76077bf9f4b726699ba9e59cdfa9c4361df4ea92 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Tue, 20 Aug 2013 13:22:06 -0700 Subject: Implementing SPARK-838: Add DoubleRDDFunctions methods to PySpark --- python/pyspark/rdd.py | 60 ++++++++++++++++++++++- python/pyspark/statcounter.py | 109 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 1 deletion(-) create mode 100644 python/pyspark/statcounter.py (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 99f5967a8e..1e9b3bb5c0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,6 +31,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ read_from_pickle_file from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup +from pyspark.statcounter import StatCounter from py4j.java_collections import ListConverter, MapConverter @@ -357,6 +358,63 @@ class RDD(object): 3 """ return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() + + def stats(self): + """ + Return a L{StatCounter} object that captures the mean, variance + and count of the RDD's elements in one operation. + """ + def redFunc(left_counter, right_counter): + return left_counter.mergeStats(right_counter) + + return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) + + def mean(self): + """ + Compute the mean of this RDD's elements. + + >>> sc.parallelize([1, 2, 3]).mean() + 2.0 + """ + return self.stats().mean() + + def variance(self): + """ + Compute the variance of this RDD's elements. + + >>> sc.parallelize([1, 2, 3]).variance() + 0.666... + """ + return self.stats().variance() + + def stdev(self): + """ + Compute the standard deviation of this RDD's elements. + + >>> sc.parallelize([1, 2, 3]).stdev() + 0.816... + """ + return self.stats().stdev() + + def sampleStdev(self): + """ + Compute the sample standard deviation of this RDD's elements (which corrects for bias in + estimating the standard deviation by dividing by N-1 instead of N). + + >>> sc.parallelize([1, 2, 3]).sampleStdev() + 1.0 + """ + return self.stats().sampleStdev() + + def sampleVariance(self): + """ + Compute the sample variance of this RDD's elements (which corrects for bias in + estimating the variance by dividing by N-1 instead of N). + + >>> sc.parallelize([1, 2, 3]).sampleVariance() + 1.0 + """ + return self.stats().sampleVariance() def countByValue(self): """ @@ -777,7 +835,7 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs) + (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1) diff --git a/python/pyspark/statcounter.py b/python/pyspark/statcounter.py new file mode 100644 index 0000000000..8e1cbd4ad9 --- /dev/null +++ b/python/pyspark/statcounter.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file is ported from spark/util/StatCounter.scala + +import copy +import math + +class StatCounter(object): + + def __init__(self, values=[]): + self.n = 0L # Running count of our values + self.mu = 0.0 # Running mean of our values + self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) + + for v in values: + self.merge(v) + + # Add a value into this StatCounter, updating the internal statistics. + def merge(self, value): + delta = value - self.mu + self.n += 1 + self.mu += delta / self.n + self.m2 += delta * (value - self.mu) + return self + + # Merge another StatCounter into this one, adding up the internal statistics. + def mergeStats(self, other): + if not isinstance(other, StatCounter): + raise Exception("Can only merge Statcounters!") + + if other is self: # reference equality holds + self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order + else: + if self.n == 0: + self.mu = other.mu + self.m2 = other.m2 + self.n = other.n + elif other.n != 0: + delta = other.mu - self.mu + if other.n * 10 < self.n: + self.mu = self.mu + (delta * other.n) / (self.n + other.n) + elif self.n * 10 < other.n: + self.mu = other.mu - (delta * self.n) / (self.n + other.n) + else: + self.mu = (self.mu * self.n + other.mu * other.n) / (self.n + other.n) + + self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) + self.n += other.n + return self + + # Clone this StatCounter + def copy(self): + return copy.deepcopy(self) + + def count(self): + return self.n + + def mean(self): + return self.mu + + def sum(self): + return self.n * self.mu + + # Return the variance of the values. + def variance(self): + if self.n == 0: + return float('nan') + else: + return self.m2 / self.n + + # + # Return the sample variance, which corrects for bias in estimating the variance by dividing + # by N-1 instead of N. + # + def sampleVariance(self): + if self.n <= 1: + return float('nan') + else: + return self.m2 / (self.n - 1) + + # Return the standard deviation of the values. + def stdev(self): + return math.sqrt(self.variance()) + + # + # Return the sample standard deviation of the values, which corrects for bias in estimating the + # variance by dividing by N-1 instead of N. + # + def sampleStdev(self): + return math.sqrt(self.sampleVariance()) + + def __repr__(self): + return "(count: %s, mean: %s, stdev: %s)" % (self.count(), self.mean(), self.stdev()) + -- cgit v1.2.3 From 457bcd33436c91a6ef07591837ea048bb4bbcede Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Wed, 28 Aug 2013 16:10:38 -0700 Subject: PySpark: implementing subtractByKey(), subtract() and keyBy() --- python/pyspark/rdd.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1e9b3bb5c0..dfc518a7b0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -754,6 +754,43 @@ class RDD(object): """ return python_cogroup(self, other, numPartitions) + def subtractByKey(self, other, numPartitions=None): + """ + Return each (key, value) pair in C{self} that has no pair with matching key + in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtractByKey(y).collect()) + [('b', 4), ('b', 5)] + """ + filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0 + map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]] + return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) + + def subtract(self, other, numPartitions=None): + """ + Return each value in C{self} that is not contained in C{other}. + + >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) + >>> y = sc.parallelize([("a", 3), ("c", None)]) + >>> sorted(x.subtract(y).collect()) + [('a', 1), ('b', 4), ('b', 5)] + """ + rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder + return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder + + def keyBy(self, f): + """ + Creates tuples of the elements in this RDD by applying C{f}. + + >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) + >>> y = sc.parallelize(zip(range(0,5), range(0,5))) + >>> sorted(x.cogroup(y).collect()) + [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] + """ + return self.map(lambda x: (f(x), x)) + # TODO: `lookup` is disabled because we can't make direct comparisons based # on the key; we need to compare the hash of the key to the hash of the # keys in the pairs. This could be an expensive operation, since those -- cgit v1.2.3 From a511c5379ee156f08624e380b240af7d961a60f7 Mon Sep 17 00:00:00 2001 From: Andre Schumacher Date: Fri, 23 Aug 2013 11:16:44 -0700 Subject: RDD sample() and takeSample() prototypes for PySpark --- python/pyspark/rdd.py | 62 +++++++++++++++++++++--- python/pyspark/rddsampler.py | 112 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 7 deletions(-) create mode 100644 python/pyspark/rddsampler.py (limited to 'python/pyspark/rdd.py') diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1e9b3bb5c0..8394fe6a31 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -21,6 +21,7 @@ from collections import defaultdict from itertools import chain, ifilter, imap, product import operator import os +import sys import shlex from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile @@ -32,6 +33,7 @@ from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \ from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter +from pyspark.rddsampler import RDDSampler from py4j.java_collections import ListConverter, MapConverter @@ -165,14 +167,60 @@ class RDD(object): .reduceByKey(lambda x, _: x) \ .map(lambda (x, _): x) - # TODO: sampling needs to be re-implemented due to Batch - #def sample(self, withReplacement, fraction, seed): - # jrdd = self._jrdd.sample(withReplacement, fraction, seed) - # return RDD(jrdd, self.ctx) + def sample(self, withReplacement, fraction, seed): + """ + Return a sampled subset of this RDD (relies on numpy and falls back + on default random generator if numpy is unavailable). + + >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP + [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] + """ + return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True) + + # this is ported from scala/spark/RDD.scala + def takeSample(self, withReplacement, num, seed): + """ + Return a fixed-size sampled subset of this RDD (currently requires numpy). + + >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP + [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] + """ - #def takeSample(self, withReplacement, num, seed): - # vals = self._jrdd.takeSample(withReplacement, num, seed) - # return [load_pickle(bytes(x)) for x in vals] + fraction = 0.0 + total = 0 + multiplier = 3.0 + initialCount = self.count() + maxSelected = 0 + + if (num < 0): + raise ValueError + + if initialCount > sys.maxint - 1: + maxSelected = sys.maxint - 1 + else: + maxSelected = initialCount + + if num > initialCount and not withReplacement: + total = maxSelected + fraction = multiplier * (maxSelected + 1) / initialCount + else: + fraction = multiplier * (num + 1) / initialCount + total = num + + samples = self.sample(withReplacement, fraction, seed).collect() + + # If the first sample didn't turn out large enough, keep trying to take samples; + # this shouldn't happen often because we use a big multiplier for their initial size. + # See: scala/spark/RDD.scala + while len(samples) < total: + if seed > sys.maxint - 2: + seed = -1 + seed += 1 + samples = self.sample(withReplacement, fraction, seed).collect() + + sampler = RDDSampler(withReplacement, fraction, seed+1) + sampler.shuffle(samples) + return samples[0:total] def union(self, other): """ diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py new file mode 100644 index 0000000000..aca2ef3b51 --- /dev/null +++ b/python/pyspark/rddsampler.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +import random + +class RDDSampler(object): + def __init__(self, withReplacement, fraction, seed): + try: + import numpy + self._use_numpy = True + except ImportError: + print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling." + self._use_numpy = False + + self._seed = seed + self._withReplacement = withReplacement + self._fraction = fraction + self._random = None + self._split = None + self._rand_initialized = False + + def initRandomGenerator(self, split): + if self._use_numpy: + import numpy + self._random = numpy.random.RandomState(self._seed) + for _ in range(0, split): + # discard the next few values in the sequence to have a + # different seed for the different splits + self._random.randint(sys.maxint) + else: + import random + random.seed(self._seed) + for _ in range(0, split): + # discard the next few values in the sequence to have a + # different seed for the different splits + random.randint(0, sys.maxint) + self._split = split + self._rand_initialized = True + + def getUniformSample(self, split): + if not self._rand_initialized or split != self._split: + self.initRandomGenerator(split) + + if self._use_numpy: + return self._random.random_sample() + else: + return random.uniform(0.0, 1.0) + + def getPoissonSample(self, split, mean): + if not self._rand_initialized or split != self._split: + self.initRandomGenerator(split) + + if self._use_numpy: + return self._random.poisson(mean) + else: + # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by + # drawing a sequence of numbers delta_j ~ Exp(mean) + num_arrivals = 1 + cur_time = 0.0 + + cur_time += random.expovariate(mean) + + if cur_time > 1.0: + return 0 + + while(cur_time <= 1.0): + cur_time += random.expovariate(mean) + num_arrivals += 1 + + return (num_arrivals - 1) + + def shuffle(self, vals): + if self._random == None or split != self._split: + self.initRandomGenerator(0) # this should only ever called on the master so + # the split does not matter + + if self._use_numpy: + self._random.shuffle(vals) + else: + random.shuffle(vals, self._random) + + def func(self, split, iterator): + if self._withReplacement: + for obj in iterator: + # For large datasets, the expected number of occurrences of each element in a sample with + # replacement is Poisson(frac). We use that to get a count for each element. + count = self.getPoissonSample(split, mean = self._fraction) + for _ in range(0, count): + yield obj + else: + for obj in iterator: + if self.getUniformSample(split) <= self._fraction: + yield obj + + + + -- cgit v1.2.3