diff options
author | jbencook <jbenjamincook@gmail.com> | 2014-12-23 17:46:24 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-23 17:46:24 -0800 |
commit | fd41eb9574280b5cfee9b94b4f92e4c44363fb14 (patch) | |
tree | 1637c2f5ee28a4f0990093c934ac4aa1ced81167 /sql | |
parent | 7e2deb71c4239564631b19c748e95c3d1aa1c77d (diff) | |
download | spark-fd41eb9574280b5cfee9b94b4f92e4c44363fb14.tar.gz spark-fd41eb9574280b5cfee9b94b4f92e4c44363fb14.tar.bz2 spark-fd41eb9574280b5cfee9b94b4f92e4c44363fb14.zip |
[SPARK-4860][pyspark][sql] speeding up `sample()` and `takeSample()`
This PR modifies the python `SchemaRDD` to use `sample()` and `takeSample()` from Scala instead of the slower python implementations from `rdd.py`. This is worthwhile because the `Row`'s are already serialized as Java objects.
In order to use the faster `takeSample()`, a `takeSampleToPython()` method was implemented in `SchemaRDD.scala` following the pattern of `collectToPython()`.
Author: jbencook <jbenjamincook@gmail.com>
Author: J. Benjamin Cook <jbenjamincook@gmail.com>
Closes #3764 from jbencook/master and squashes the following commits:
6fbc769 [J. Benjamin Cook] [SPARK-4860][pyspark][sql] fixing sloppy indentation for takeSampleToPython() arguments
5170da2 [J. Benjamin Cook] [SPARK-4860][pyspark][sql] fixing typo: from RDD to SchemaRDD
de22f70 [jbencook] [SPARK-4860][pyspark][sql] using sample() method from JavaSchemaRDD
b916442 [jbencook] [SPARK-4860][pyspark][sql] adding sample() to JavaSchemaRDD
020cbdf [jbencook] [SPARK-4860][pyspark][sql] using Scala implementations of `sample()` and `takeSample()`
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 15 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala | 6 |
2 files changed, 21 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 7baf8ffcef..856b10f1a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -438,6 +438,21 @@ class SchemaRDD( } /** + * Serializes the Array[Row] returned by SchemaRDD's takeSample(), using the same + * format as javaToPython and collectToPython. It is used by pyspark. + */ + private[sql] def takeSampleToPython( + withReplacement: Boolean, + num: Int, + seed: Long): JList[Array[Byte]] = { + val fieldTypes = schema.fields.map(_.dataType) + val pickle = new Pickler + new java.util.ArrayList(this.takeSample(withReplacement, num, seed).map { row => + EvaluatePython.rowToArray(row, fieldTypes) + }.grouped(100).map(batched => pickle.dumps(batched.toArray)).toIterable) + } + + /** * Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value * of base RDD functions that do not change schema. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index ac4844f9b9..5b9c612487 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -218,4 +218,10 @@ class JavaSchemaRDD( */ def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD = this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD + + /** + * Return a SchemaRDD with a sampled version of the underlying dataset. + */ + def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaSchemaRDD = + this.baseSchemaRDD.sample(withReplacement, fraction, seed).toJavaSchemaRDD } |