diff options
author | Subhobrata Dey <sbcd90@gmail.com> | 2016-04-17 15:18:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-17 15:18:32 -0700 |
commit | 699a4dfd89dc598e79955cfd6f66c06b6bf74be6 (patch) | |
tree | b779c07762dbf270aad8812b4b1cd0f3150db7b5 /sql | |
parent | 8a87f7d5c85f8bfa67432737cb200f5503755a65 (diff) | |
download | spark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.tar.gz spark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.tar.bz2 spark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.zip |
[SPARK-14632] randomSplit method fails on dataframes with maps in schema
## What changes were proposed in this pull request?
The patch fixes the issue with the randomSplit method which is not able to split dataframes which has maps in schema. The bug was introduced in spark 1.6.1.
## How was this patch tested?
Tested with unit tests.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Subhobrata Dey <sbcd90@gmail.com>
Closes #12438 from sbcd90/randomSplitIssue.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 4edc90d9c3..fb3e184a64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1502,7 +1502,9 @@ class Dataset[T] private[sql]( // constituent partitions each time a split is materialized which could result in // overlapping splits. To prevent this, we explicitly sort each input partition to make the // ordering deterministic. - val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan) + // MapType cannot be sorted. + val sorted = Sort(logicalPlan.output.filterNot(_.dataType.isInstanceOf[MapType]) + .map(SortOrder(_, Ascending)), global = false, logicalPlan) val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => |