aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala12
1 files changed, 6 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 05e13e66d1..3a5ea19b8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -55,7 +55,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
unresolvedVEncoder.resolve(dataAttributes, OuterScopes.outerScopes)
private def logicalPlan = queryExecution.analyzed
- private def sqlContext = queryExecution.sqlContext
+ private def sparkSession = queryExecution.sparkSession
/**
* Returns a new [[KeyValueGroupedDataset]] where the type of the key has been mapped to the
@@ -79,7 +79,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
def keys: Dataset[K] = {
Dataset[K](
- sqlContext,
+ sparkSession,
Distinct(
Project(groupingAttributes, logicalPlan)))
}
@@ -104,7 +104,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = {
Dataset[U](
- sqlContext,
+ sparkSession,
MapGroups(
f,
groupingAttributes,
@@ -217,10 +217,10 @@ class KeyValueGroupedDataset[K, V] private[sql](
Alias(CreateStruct(groupingAttributes), "key")()
}
val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, logicalPlan)
- val execution = new QueryExecution(sqlContext, aggregate)
+ val execution = new QueryExecution(sparkSession, aggregate)
new Dataset(
- sqlContext,
+ sparkSession,
execution,
ExpressionEncoder.tuple(unresolvedKEncoder +: encoders))
}
@@ -289,7 +289,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = {
implicit val uEncoder = other.unresolvedVEncoder
Dataset[R](
- sqlContext,
+ sparkSession,
CoGroup(
f,
this.groupingAttributes,