aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index a636ca2f29..b2483e69a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -68,7 +68,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
override def children: Seq[LogicalPlan] = query :: Nil
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
if (query.schema.fieldNames.length != query.schema.fieldNames.distinct.length) {
val duplicateColumns = query.schema.fieldNames.groupBy(identity).collect {
@@ -78,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
s"cannot save to file.")
}
- val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val hadoopConf = new Configuration(sparkSession.sessionState.hadoopConf)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
@@ -111,14 +111,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = query.output.filterNot(partitionSet.contains)
- val queryExecution = Dataset.ofRows(sqlContext, query).queryExecution
- SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
+ val queryExecution = Dataset.ofRows(sparkSession, query).queryExecution
+ SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
val relation =
WriteRelation(
- sqlContext,
+ sparkSession,
dataColumns.toStructType,
qualifiedOutputPath.toString,
- fileFormat.prepareWrite(sqlContext, _, options, dataColumns.toStructType),
+ fileFormat.prepareWrite(sparkSession, _, options, dataColumns.toStructType),
bucketSpec)
val writerContainer = if (partitionColumns.isEmpty && bucketSpec.isEmpty) {
@@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
- sqlContext.conf.getConf(SQLConf.PARTITION_MAX_FILES),
+ sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
isAppend)
}
@@ -140,7 +140,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
writerContainer.driverSideSetup()
try {
- sqlContext.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
+ sparkSession.sparkContext.runJob(queryExecution.toRdd, writerContainer.writeRows _)
writerContainer.commitJob()
refreshFunction()
} catch { case cause: Throwable =>