From 9d376ad76ca702ae3fc6ffd0567e7590d9a8daf3 Mon Sep 17 00:00:00 2001 From: Jacek Laskowski Date: Tue, 23 Aug 2016 12:59:25 +0200 Subject: [SPARK-17199] Use CatalystConf.resolver for case-sensitivity comparison ## What changes were proposed in this pull request? Use `CatalystConf.resolver` consistently for case-sensitivity comparison (removed dups). ## How was this patch tested? Local build. Waiting for Jenkins to ensure clean build and test. Author: Jacek Laskowski Closes #14771 from jaceklaskowski/17199-catalystconf-resolver. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 +------- .../apache/spark/sql/execution/datasources/DataSource.scala | 10 ++-------- .../spark/sql/execution/datasources/DataSourceStrategy.scala | 8 +------- .../apache/spark/sql/execution/streaming/FileStreamSink.scala | 6 +----- 4 files changed, 5 insertions(+), 27 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 41e0e6d65e..e559f235c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -64,13 +64,7 @@ class Analyzer( this(catalog, conf, conf.optimizerMaxIterations) } - def resolver: Resolver = { - if (conf.caseSensitiveAnalysis) { - caseSensitiveResolution - } else { - caseInsensitiveResolution - } - } + def resolver: Resolver = conf.resolver protected val fixedPoint = FixedPoint(maxIterations) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5ad6ae0956..b783d69974 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -394,13 +394,7 @@ case class DataSource( sparkSession, globbedPaths, options, partitionSchema, !checkPathExist) val dataSchema = userSpecifiedSchema.map { schema => - val equality = - if (sparkSession.sessionState.conf.caseSensitiveAnalysis) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } - + val equality = sparkSession.sessionState.conf.resolver StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name)))) }.orElse { format.inferSchema( @@ -430,7 +424,7 @@ case class DataSource( relation } - /** Writes the give [[DataFrame]] out to this [[DataSource]]. */ + /** Writes the given [[DataFrame]] out to this [[DataSource]]. */ def write( mode: SaveMode, data: DataFrame): BaseRelation = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5eba7df060..a6621054fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -45,13 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String */ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { - def resolver: Resolver = { - if (conf.caseSensitiveAnalysis) { - caseSensitiveResolution - } else { - caseInsensitiveResolution - } - } + def resolver: Resolver = conf.resolver // Visible for testing. def convertStaticPartitions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 117d6672ee..0f7d958136 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -102,11 +102,7 @@ class FileStreamSinkWriter( // Get the actual partition columns as attributes after matching them by name with // the given columns names. private val partitionColumns = partitionColumnNames.map { col => - val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) { - org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution - } else { - org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution - } + val nameEquality = data.sparkSession.sessionState.conf.resolver data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse { throw new RuntimeException(s"Partition column $col not found in schema $dataSchema") } -- cgit v1.2.3