aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorSubhobrata Dey <sbcd90@gmail.com>2016-04-17 15:18:32 -0700
committerReynold Xin <rxin@databricks.com>2016-04-17 15:18:32 -0700
commit699a4dfd89dc598e79955cfd6f66c06b6bf74be6 (patch)
treeb779c07762dbf270aad8812b4b1cd0f3150db7b5 /sql/core/src
parent8a87f7d5c85f8bfa67432737cb200f5503755a65 (diff)
downloadspark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.tar.gz
spark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.tar.bz2
spark-699a4dfd89dc598e79955cfd6f66c06b6bf74be6.zip
[SPARK-14632] randomSplit method fails on dataframes with maps in schema
## What changes were proposed in this pull request? The patch fixes the issue with the randomSplit method which is not able to split dataframes which has maps in schema. The bug was introduced in spark 1.6.1. ## How was this patch tested? Tested with unit tests. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Subhobrata Dey <sbcd90@gmail.com> Closes #12438 from sbcd90/randomSplitIssue.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 4edc90d9c3..fb3e184a64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1502,7 +1502,9 @@ class Dataset[T] private[sql](
// constituent partitions each time a split is materialized which could result in
// overlapping splits. To prevent this, we explicitly sort each input partition to make the
// ordering deterministic.
- val sorted = Sort(logicalPlan.output.map(SortOrder(_, Ascending)), global = false, logicalPlan)
+ // MapType cannot be sorted.
+ val sorted = Sort(logicalPlan.output.filterNot(_.dataType.isInstanceOf[MapType])
+ .map(SortOrder(_, Ascending)), global = false, logicalPlan)
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>