From 60c0ce134d90ef18852ed2c637d2f240b7f99ab9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 21 Jul 2015 11:56:38 -0700 Subject: [SPARK-8906][SQL] Move all internal data source classes into execution.datasources. This way, the sources package contains only public facing interfaces. Author: Reynold Xin Closes #7565 from rxin/move-ds and squashes the following commits: 7661aff [Reynold Xin] Mima 9d5196a [Reynold Xin] Rearranged imports. 3dd7174 [Reynold Xin] [SPARK-8906][SQL] Move all internal data source classes into execution.datasources. --- project/MimaExcludes.scala | 47 ++ .../scala/org/apache/spark/sql/DataFrame.scala | 2 +- .../org/apache/spark/sql/DataFrameReader.scala | 4 +- .../org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../scala/org/apache/spark/sql/SQLContext.scala | 9 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../spark/sql/execution/SqlNewHadoopRDD.scala | 263 ++++++++++ .../execution/datasources/DataSourceStrategy.scala | 394 ++++++++++++++ .../execution/datasources/LogicalRelation.scala | 58 ++ .../execution/datasources/PartitioningUtils.scala | 361 +++++++++++++ .../spark/sql/execution/datasources/commands.scala | 582 +++++++++++++++++++++ .../spark/sql/execution/datasources/ddl.scala | 492 +++++++++++++++++ .../spark/sql/execution/datasources/rules.scala | 158 ++++++ .../org/apache/spark/sql/parquet/newParquet.scala | 5 +- .../spark/sql/sources/DataSourceStrategy.scala | 395 -------------- .../apache/spark/sql/sources/LogicalRelation.scala | 57 -- .../spark/sql/sources/PartitioningUtils.scala | 360 ------------- .../apache/spark/sql/sources/SqlNewHadoopRDD.scala | 264 ---------- .../org/apache/spark/sql/sources/commands.scala | 581 -------------------- .../scala/org/apache/spark/sql/sources/ddl.scala | 493 ----------------- .../org/apache/spark/sql/sources/filters.scala | 4 + .../org/apache/spark/sql/sources/interfaces.scala | 4 +- .../scala/org/apache/spark/sql/sources/rules.scala | 158 ------ .../org/apache/spark/sql/json/JsonSuite.scala | 2 +- .../spark/sql/parquet/ParquetFilterSuite.scala | 2 +- .../parquet/ParquetPartitionDiscoverySuite.scala | 4 +- .../sql/sources/CreateTableAsSelectSuite.scala | 1 + .../sql/sources/ResolvedDataSourceSuite.scala | 1 + .../org/apache/spark/sql/hive/HiveContext.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 +- .../scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 2 +- .../apache/spark/sql/hive/execution/commands.scala | 1 + .../apache/spark/sql/hive/orc/OrcRelation.scala | 1 + .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../sql/hive/execution/HiveComparisonTest.scala | 4 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/hive/parquetSuites.scala | 2 +- .../spark/sql/sources/hadoopFsRelationSuites.scala | 6 +- 39 files changed, 2404 insertions(+), 2342 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a2595ff6c2..fa36629c37 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -104,6 +104,53 @@ object MimaExcludes { // SPARK-7422 add argmax for sparse vectors ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.mllib.linalg.Vector.argmax") + ) ++ Seq( + // SPARK-8906 Move all internal data source classes into execution.datasources + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopPartition"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DefaultWriterContainer"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$PartitionValues"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DynamicPartitionWriterContainer"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsingAsSelect"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreInsertCastAndRename"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitioningUtils"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.LogicalRelation"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.Partition"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.BaseWriterContainer"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.RefreshTable"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsing"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTempTableUsingAsSelect"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CreateTableUsing$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.ResolvedDataSource$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PreWriteCheck$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoDataSource"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLParser"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.CaseInsensitiveMap"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DataSourceStrategy"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException") ) case v if v.startsWith("1.4") => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 830fba35bb..323ff17357 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -38,8 +38,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} +import org.apache.spark.sql.execution.datasources.CreateTableUsingAsSelect import org.apache.spark.sql.json.JacksonGenerator -import org.apache.spark.sql.sources.CreateTableUsingAsSelect import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index f1c1ddf898..e9d782cdcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql import java.util.Properties import org.apache.hadoop.fs.Path -import org.apache.spark.{Logging, Partition} +import org.apache.spark.{Logging, Partition} import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json.JSONRelation import org.apache.spark.sql.parquet.ParquetRelation2 -import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3e7b9cd797..ee0201a9d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,8 +22,8 @@ import java.util.Properties import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource} import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils} -import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 2dda3ad121..8b4528b5d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -39,8 +39,9 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _} -import org.apache.spark.sql.execution.{Filter, _} -import org.apache.spark.sql.sources._ +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -146,11 +147,11 @@ class SQLContext(@transient val sparkContext: SparkContext) new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = ExtractPythonUDFs :: - sources.PreInsertCastAndRename :: + PreInsertCastAndRename :: Nil override val extendedCheckRules = Seq( - sources.PreWriteCheck(catalog) + datasources.PreWriteCheck(catalog) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 240332a80a..8cef7f200d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.{SQLContext, Strategy, execution} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ @@ -25,10 +26,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} +import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.parquet._ -import org.apache.spark.sql.sources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SQLContext, Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SQLContext#SparkPlanner => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala new file mode 100644 index 0000000000..e1c1a6c062 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SqlNewHadoopRDD.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.text.SimpleDateFormat +import java.util.Date + +import org.apache.spark.{Partition => SparkPartition, _} +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.DataReadMethod +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil +import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD +import org.apache.spark.rdd.{HadoopRDD, RDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.{SerializableConfiguration, Utils} + +import scala.reflect.ClassTag + +private[spark] class SqlNewHadoopPartition( + rddId: Int, + val index: Int, + @transient rawSplit: InputSplit with Writable) + extends SparkPartition { + + val serializableHadoopSplit = new SerializableWritable(rawSplit) + + override def hashCode(): Int = 41 * (41 + rddId) + index +} + +/** + * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, + * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). + * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions. + * 1. A shared broadcast Hadoop Configuration. + * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side + * to the shared Hadoop Configuration. + * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side + * and the executor side to the shared Hadoop Configuration. + * + * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with + * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be + * folded into core. + */ +private[sql] class SqlNewHadoopRDD[K, V]( + @transient sc : SparkContext, + broadcastedConf: Broadcast[SerializableConfiguration], + @transient initDriverSideJobFuncOpt: Option[Job => Unit], + initLocalJobFuncOpt: Option[Job => Unit], + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V]) + extends RDD[(K, V)](sc, Nil) + with SparkHadoopMapReduceUtil + with Logging { + + protected def getJob(): Job = { + val conf: Configuration = broadcastedConf.value.value + // "new Job" will make a copy of the conf. Then, it is + // safe to mutate conf properties with initLocalJobFuncOpt + // and initDriverSideJobFuncOpt. + val newJob = new Job(conf) + initLocalJobFuncOpt.map(f => f(newJob)) + newJob + } + + def getConf(isDriverSide: Boolean): Configuration = { + val job = getJob() + if (isDriverSide) { + initDriverSideJobFuncOpt.map(f => f(job)) + } + job.getConfiguration + } + + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + + @transient protected val jobId = new JobID(jobTrackerId, id) + + override def getPartitions: Array[SparkPartition] = { + val conf = getConf(isDriverSide = true) + val inputFormat = inputFormatClass.newInstance + inputFormat match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val jobContext = newJobContext(conf, jobId) + val rawSplits = inputFormat.getSplits(jobContext).toArray + val result = new Array[SparkPartition](rawSplits.size) + for (i <- 0 until rawSplits.size) { + result(i) = + new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + } + result + } + + override def compute( + theSplit: SparkPartition, + context: TaskContext): InterruptibleIterator[(K, V)] = { + val iter = new Iterator[(K, V)] { + val split = theSplit.asInstanceOf[SqlNewHadoopPartition] + logInfo("Input split: " + split.serializableHadoopSplit) + val conf = getConf(isDriverSide = false) + + val inputMetrics = context.taskMetrics + .getInputMetricsForReadMethod(DataReadMethod.Hadoop) + + // Find a function that will return the FileSystem bytes read by this thread. Do this before + // creating RecordReader, because RecordReader's constructor might read some bytes + val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { + split.serializableHadoopSplit.value match { + case _: FileSplit | _: CombineFileSplit => + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + case _ => None + } + } + inputMetrics.setBytesReadCallback(bytesReadCallback) + + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) + val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) + val format = inputFormatClass.newInstance + format match { + case configurable: Configurable => + configurable.setConf(conf) + case _ => + } + val reader = format.createRecordReader( + split.serializableHadoopSplit.value, hadoopAttemptContext) + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener(context => close()) + var havePair = false + var finished = false + var recordsSinceMetricsUpdate = 0 + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = !reader.nextKeyValue + havePair = !finished + } + !finished + } + + override def next(): (K, V) = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + if (!finished) { + inputMetrics.incRecordsRead(1) + } + (reader.getCurrentKey, reader.getCurrentValue) + } + + private def close() { + try { + reader.close() + if (bytesReadCallback.isDefined) { + inputMetrics.updateBytesRead() + } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || + split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { + // If we can't get the bytes read from the FS stats, fall back to the split size, + // which may be inaccurate. + try { + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) + } catch { + case e: java.io.IOException => + logWarning("Unable to get input size to set InputMetrics for task", e) + } + } + } catch { + case e: Exception => { + if (!Utils.inShutdown()) { + logWarning("Exception in RecordReader.close()", e) + } + } + } + } + } + new InterruptibleIterator(context, iter) + } + + /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */ + @DeveloperApi + def mapPartitionsWithInputSplit[U: ClassTag]( + f: (InputSplit, Iterator[(K, V)]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = { + new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning) + } + + override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = { + val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value + val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match { + case Some(c) => + try { + val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] + Some(HadoopRDD.convertSplitLocationInfo(infos)) + } catch { + case e : Exception => + logDebug("Failed to use InputSplit#getLocationInfo.", e) + None + } + case None => None + } + locs.getOrElse(split.getLocations.filter(_ != "localhost")) + } + + override def persist(storageLevel: StorageLevel): this.type = { + if (storageLevel.deserialized) { + logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" + + " behavior because Hadoop's RecordReader reuses the same Writable object for all records." + + " Use a map transformation to make copies of the records.") + } + super.persist(storageLevel) + } +} + +private[spark] object SqlNewHadoopRDD { + /** + * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to + * the given function rather than the index of the partition. + */ + private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag]( + prev: RDD[T], + f: (InputSplit, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false) + extends RDD[U](prev) { + + override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + + override def getPartitions: Array[SparkPartition] = firstParent[T].partitions + + override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = { + val partition = split.asInstanceOf[SqlNewHadoopPartition] + val inputSplit = partition.serializableHadoopSplit.value + f(inputSplit, firstParent[T].iterator(split, context)) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala new file mode 100644 index 0000000000..2b40092617 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.{SaveMode, Strategy, execution, sources, _} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.{SerializableConfiguration, Utils} + +/** + * A Strategy for planning scans over data sources defined using the sources API. + */ +private[sql] object DataSourceStrategy extends Strategy with Logging { + def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) => + pruneFilterProjectRaw( + l, + projects, + filters, + (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil + + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => + pruneFilterProject( + l, + projects, + filters, + (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil + + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) => + pruneFilterProject( + l, + projects, + filters, + (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil + + // Scanning partitioned HadoopFsRelation + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) + if t.partitionSpec.partitionColumns.nonEmpty => + val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray + + logInfo { + val total = t.partitionSpec.partitions.length + val selected = selectedPartitions.length + val percentPruned = (1 - selected.toDouble / total.toDouble) * 100 + s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." + } + + // Only pushes down predicates that do not reference partition columns. + val pushedFilters = { + val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet + filters.filter { f => + val referencedColumnNames = f.references.map(_.name).toSet + referencedColumnNames.intersect(partitionColumnNames).isEmpty + } + } + + buildPartitionedTableScan( + l, + projects, + pushedFilters, + t.partitionSpec.partitionColumns, + selectedPartitions) :: Nil + + // Scanning non-partitioned HadoopFsRelation + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => + // See buildPartitionedTableScan for the reason that we need to create a shard + // broadcast HadoopConf. + val sharedHadoopConf = SparkHadoopUtil.get.conf + val confBroadcast = + t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) + pruneFilterProject( + l, + projects, + filters, + (a, f) => + toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil + + case l @ LogicalRelation(t: TableScan) => + execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil + + case i @ logical.InsertIntoTable( + l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => + execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil + + case i @ logical.InsertIntoTable( + l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) => + val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append + execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil + + case _ => Nil + } + + private def buildPartitionedTableScan( + logicalRelation: LogicalRelation, + projections: Seq[NamedExpression], + filters: Seq[Expression], + partitionColumns: StructType, + partitions: Array[Partition]) = { + val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] + + // Because we are creating one RDD per partition, we need to have a shared HadoopConf. + // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high. + val sharedHadoopConf = SparkHadoopUtil.get.conf + val confBroadcast = + relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) + + // Builds RDD[Row]s for each selected partition. + val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => + // The table scan operator (PhysicalRDD) which retrieves required columns from data files. + // Notice that the schema of data files, represented by `relation.dataSchema`, may contain + // some partition column(s). + val scan = + pruneFilterProject( + logicalRelation, + projections, + filters, + (columns: Seq[Attribute], filters) => { + val partitionColNames = partitionColumns.fieldNames + + // Don't scan any partition columns to save I/O. Here we are being optimistic and + // assuming partition columns data stored in data files are always consistent with those + // partition values encoded in partition directory paths. + val needed = columns.filterNot(a => partitionColNames.contains(a.name)) + val dataRows = + relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) + + // Merges data values with partition values. + mergeWithPartitionValues( + relation.schema, + columns.map(_.name).toArray, + partitionColNames, + partitionValues, + toCatalystRDD(logicalRelation, needed, dataRows)) + }) + + scan.execute() + } + + val unionedRows = + if (perPartitionRows.length == 0) { + relation.sqlContext.emptyResult + } else { + new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + } + + execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows) + } + + private def mergeWithPartitionValues( + schema: StructType, + requiredColumns: Array[String], + partitionColumns: Array[String], + partitionValues: InternalRow, + dataRows: RDD[InternalRow]): RDD[InternalRow] = { + val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains) + + // If output columns contain any partition column(s), we need to merge scanned data + // columns and requested partition columns to form the final result. + if (!requiredColumns.sameElements(nonPartitionColumns)) { + val mergers = requiredColumns.zipWithIndex.map { case (name, index) => + // To see whether the `index`-th column is a partition column... + val i = partitionColumns.indexOf(name) + if (i != -1) { + // If yes, gets column value from partition values. + (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { + mutableRow(ordinal) = partitionValues(i) + } + } else { + // Otherwise, inherits the value from scanned data. + val i = nonPartitionColumns.indexOf(name) + (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { + mutableRow(ordinal) = dataRow(i) + } + } + } + + // Since we know for sure that this closure is serializable, we can avoid the overhead + // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally + // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). + val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => { + val dataTypes = requiredColumns.map(schema(_).dataType) + val mutableRow = new SpecificMutableRow(dataTypes) + iterator.map { dataRow => + var i = 0 + while (i < mutableRow.length) { + mergers(i)(mutableRow, dataRow, i) + i += 1 + } + mutableRow.asInstanceOf[InternalRow] + } + } + + // This is an internal RDD whose call site the user should not be concerned with + // Since we create many of these (one per partition), the time spent on computing + // the call site may add up. + Utils.withDummyCallSite(dataRows.sparkContext) { + new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) + } + + } else { + dataRows + } + } + + protected def prunePartitions( + predicates: Seq[Expression], + partitionSpec: PartitionSpec): Seq[Partition] = { + val PartitionSpec(partitionColumns, partitions) = partitionSpec + val partitionColumnNames = partitionColumns.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + + if (partitionPruningPredicates.nonEmpty) { + val predicate = + partitionPruningPredicates + .reduceOption(expressions.And) + .getOrElse(Literal(true)) + + val boundPredicate = InterpretedPredicate.create(predicate.transform { + case a: AttributeReference => + val index = partitionColumns.indexWhere(a.name == _.name) + BoundReference(index, partitionColumns(index).dataType, nullable = true) + }) + + partitions.filter { case Partition(values, _) => boundPredicate(values) } + } else { + partitions + } + } + + // Based on Public API. + protected def pruneFilterProject( + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = { + pruneFilterProjectRaw( + relation, + projects, + filterPredicates, + (requestedColumns, pushedFilters) => { + scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray) + }) + } + + // Based on Catalyst expressions. + protected def pruneFilterProjectRaw( + relation: LogicalRelation, + projects: Seq[NamedExpression], + filterPredicates: Seq[Expression], + scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = { + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) + val filterCondition = filterPredicates.reduceLeftOption(expressions.And) + + val pushedFilters = filterPredicates.map { _ transform { + case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. + }} + + if (projects.map(_.toAttribute) == projects && + projectSet.size == projects.size && + filterSet.subsetOf(projectSet)) { + // When it is possible to just use column pruning to get the right projection and + // when the columns of this projection are enough to evaluate all filter conditions, + // just do a scan followed by a filter, with no extra project. + val requestedColumns = + projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. + .map(relation.attributeMap) // Match original case of attributes. + + val scan = execution.PhysicalRDD(projects.map(_.toAttribute), + scanBuilder(requestedColumns, pushedFilters)) + filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) + } else { + val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq + + val scan = execution.PhysicalRDD(requestedColumns, + scanBuilder(requestedColumns, pushedFilters)) + execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) + } + } + + /** + * Convert RDD of Row into RDD of InternalRow with objects in catalyst types + */ + private[this] def toCatalystRDD( + relation: LogicalRelation, + output: Seq[Attribute], + rdd: RDD[Row]): RDD[InternalRow] = { + if (relation.relation.needConversion) { + execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) + } else { + rdd.map(_.asInstanceOf[InternalRow]) + } + } + + /** + * Convert RDD of Row into RDD of InternalRow with objects in catalyst types + */ + private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { + toCatalystRDD(relation, relation.output, rdd) + } + + /** + * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s, + * and convert them. + */ + protected[sql] def selectFilters(filters: Seq[Expression]) = { + def translate(predicate: Expression): Option[Filter] = predicate match { + case expressions.EqualTo(a: Attribute, Literal(v, _)) => + Some(sources.EqualTo(a.name, v)) + case expressions.EqualTo(Literal(v, _), a: Attribute) => + Some(sources.EqualTo(a.name, v)) + + case expressions.GreaterThan(a: Attribute, Literal(v, _)) => + Some(sources.GreaterThan(a.name, v)) + case expressions.GreaterThan(Literal(v, _), a: Attribute) => + Some(sources.LessThan(a.name, v)) + + case expressions.LessThan(a: Attribute, Literal(v, _)) => + Some(sources.LessThan(a.name, v)) + case expressions.LessThan(Literal(v, _), a: Attribute) => + Some(sources.GreaterThan(a.name, v)) + + case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) => + Some(sources.GreaterThanOrEqual(a.name, v)) + case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) => + Some(sources.LessThanOrEqual(a.name, v)) + + case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) => + Some(sources.LessThanOrEqual(a.name, v)) + case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) => + Some(sources.GreaterThanOrEqual(a.name, v)) + + case expressions.InSet(a: Attribute, set) => + Some(sources.In(a.name, set.toArray)) + + case expressions.IsNull(a: Attribute) => + Some(sources.IsNull(a.name)) + case expressions.IsNotNull(a: Attribute) => + Some(sources.IsNotNull(a.name)) + + case expressions.And(left, right) => + (translate(left) ++ translate(right)).reduceOption(sources.And) + + case expressions.Or(left, right) => + for { + leftFilter <- translate(left) + rightFilter <- translate(right) + } yield sources.Or(leftFilter, rightFilter) + + case expressions.Not(child) => + translate(child).map(sources.Not) + + case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringStartsWith(a.name, v.toString)) + + case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringEndsWith(a.name, v.toString)) + + case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => + Some(sources.StringContains(a.name, v.toString)) + + case _ => None + } + + filters.flatMap(translate) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala new file mode 100644 index 0000000000..a7123dc845 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.sources.BaseRelation + +/** + * Used to link a [[BaseRelation]] in to a logical query plan. + */ +private[sql] case class LogicalRelation(relation: BaseRelation) + extends LeafNode + with MultiInstanceRelation { + + override val output: Seq[AttributeReference] = relation.schema.toAttributes + + // Logical Relations are distinct if they have different output for the sake of transformations. + override def equals(other: Any): Boolean = other match { + case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output + case _ => false + } + + override def hashCode: Int = { + com.google.common.base.Objects.hashCode(relation, output) + } + + override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { + case LogicalRelation(otherRelation) => relation == otherRelation + case _ => false + } + + @transient override lazy val statistics: Statistics = Statistics( + sizeInBytes = BigInt(relation.sizeInBytes) + ) + + /** Used to lookup original attribute capitalization */ + val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) + + def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type] + + override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala new file mode 100644 index 0000000000..6b4a359db2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.lang.{Double => JDouble, Long => JLong} +import java.math.{BigDecimal => JBigDecimal} + +import scala.collection.mutable.ArrayBuffer +import scala.util.Try + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.util.Shell +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} +import org.apache.spark.sql.types._ + + +private[sql] case class Partition(values: InternalRow, path: String) + +private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) + +private[sql] object PartitionSpec { + val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition]) +} + +private[sql] object PartitioningUtils { + // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't + // depend on Hive. + private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + + private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { + require(columnNames.size == literals.size) + } + + /** + * Given a group of qualified paths, tries to parse them and returns a partition specification. + * For example, given: + * {{{ + * hdfs://:/path/to/partition/a=1/b=hello/c=3.14 + * hdfs://:/path/to/partition/a=2/b=world/c=6.28 + * }}} + * it returns: + * {{{ + * PartitionSpec( + * partitionColumns = StructType( + * StructField(name = "a", dataType = IntegerType, nullable = true), + * StructField(name = "b", dataType = StringType, nullable = true), + * StructField(name = "c", dataType = DoubleType, nullable = true)), + * partitions = Seq( + * Partition( + * values = Row(1, "hello", 3.14), + * path = "hdfs://:/path/to/partition/a=1/b=hello/c=3.14"), + * Partition( + * values = Row(2, "world", 6.28), + * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) + * }}} + */ + private[sql] def parsePartitions( + paths: Seq[Path], + defaultPartitionName: String, + typeInference: Boolean): PartitionSpec = { + // First, we need to parse every partition's path and see if we can find partition values. + val pathsWithPartitionValues = paths.flatMap { path => + parsePartition(path, defaultPartitionName, typeInference).map(path -> _) + } + + if (pathsWithPartitionValues.isEmpty) { + // This dataset is not partitioned. + PartitionSpec.emptySpec + } else { + // This dataset is partitioned. We need to check whether all partitions have the same + // partition columns and resolve potential type conflicts. + val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) + + // Creates the StructType which represents the partition columns. + val fields = { + val PartitionValues(columnNames, literals) = resolvedPartitionValues.head + columnNames.zip(literals).map { case (name, Literal(_, dataType)) => + // We always assume partition columns are nullable since we've no idea whether null values + // will be appended in the future. + StructField(name, dataType, nullable = true) + } + } + + // Finally, we create `Partition`s based on paths and resolved partition values. + val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { + case (PartitionValues(_, literals), (path, _)) => + Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString) + } + + PartitionSpec(StructType(fields), partitions) + } + } + + /** + * Parses a single partition, returns column names and values of each partition column. For + * example, given: + * {{{ + * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 + * }}} + * it returns: + * {{{ + * PartitionValues( + * Seq("a", "b", "c"), + * Seq( + * Literal.create(42, IntegerType), + * Literal.create("hello", StringType), + * Literal.create(3.14, FloatType))) + * }}} + */ + private[sql] def parsePartition( + path: Path, + defaultPartitionName: String, + typeInference: Boolean): Option[PartitionValues] = { + val columns = ArrayBuffer.empty[(String, Literal)] + // Old Hadoop versions don't have `Path.isRoot` + var finished = path.getParent == null + var chopped = path + + while (!finished) { + // Sometimes (e.g., when speculative task is enabled), temporary directories may be left + // uncleaned. Here we simply ignore them. + if (chopped.getName.toLowerCase == "_temporary") { + return None + } + + val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) + maybeColumn.foreach(columns += _) + chopped = chopped.getParent + finished = maybeColumn.isEmpty || chopped.getParent == null + } + + if (columns.isEmpty) { + None + } else { + val (columnNames, values) = columns.reverse.unzip + Some(PartitionValues(columnNames, values)) + } + } + + private def parsePartitionColumn( + columnSpec: String, + defaultPartitionName: String, + typeInference: Boolean): Option[(String, Literal)] = { + val equalSignIndex = columnSpec.indexOf('=') + if (equalSignIndex == -1) { + None + } else { + val columnName = columnSpec.take(equalSignIndex) + assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'") + + val rawColumnValue = columnSpec.drop(equalSignIndex + 1) + assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") + + val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference) + Some(columnName -> literal) + } + } + + /** + * Resolves possible type conflicts between partitions by up-casting "lower" types. The up- + * casting order is: + * {{{ + * NullType -> + * IntegerType -> LongType -> + * DoubleType -> DecimalType.Unlimited -> + * StringType + * }}} + */ + private[sql] def resolvePartitions( + pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { + if (pathsWithPartitionValues.isEmpty) { + Seq.empty + } else { + val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct + assert( + distinctPartColNames.size == 1, + listConflictingPartitionColumns(pathsWithPartitionValues)) + + // Resolves possible type conflicts for each column + val values = pathsWithPartitionValues.map(_._2) + val columnCount = values.head.columnNames.size + val resolvedValues = (0 until columnCount).map { i => + resolveTypeConflicts(values.map(_.literals(i))) + } + + // Fills resolved literals back to each partition + values.zipWithIndex.map { case (d, index) => + d.copy(literals = resolvedValues.map(_(index))) + } + } + } + + private[sql] def listConflictingPartitionColumns( + pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { + val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct + + def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = + seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) + + val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { + case (path, partValues) => partValues.columnNames -> path + }) + + val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { + case (names, index) => + s"Partition column name list #$index: $names" + } + + // Lists out those non-leaf partition directories that also contain files + val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) + + s"Conflicting partition column names detected:\n" + + distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + + "For partitioned table directories, data files should only live in leaf directories.\n" + + "And directories at the same level should have the same partition column name.\n" + + "Please check the following directories for unexpected files or " + + "inconsistent partition column names:\n" + + suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") + } + + /** + * Converts a string to a [[Literal]] with automatic type inference. Currently only supports + * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and + * [[StringType]]. + */ + private[sql] def inferPartitionColumnValue( + raw: String, + defaultPartitionName: String, + typeInference: Boolean): Literal = { + if (typeInference) { + // First tries integral types + Try(Literal.create(Integer.parseInt(raw), IntegerType)) + .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) + // Then falls back to fractional types + .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) + .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) + // Then falls back to string + .getOrElse { + if (raw == defaultPartitionName) { + Literal.create(null, NullType) + } else { + Literal.create(unescapePathName(raw), StringType) + } + } + } else { + if (raw == defaultPartitionName) { + Literal.create(null, NullType) + } else { + Literal.create(unescapePathName(raw), StringType) + } + } + } + + private val upCastingOrder: Seq[DataType] = + Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType) + + /** + * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" + * types. + */ + private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { + val desiredType = { + val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) + // Falls back to string if all values of this column are null or empty string + if (topType == NullType) StringType else topType + } + + literals.map { case l @ Literal(_, dataType) => + Literal.create(Cast(l, desiredType).eval(), desiredType) + } + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). + ////////////////////////////////////////////////////////////////////////////////////////////////// + + val charToEscape = { + val bitSet = new java.util.BitSet(128) + + /** + * ASCII 01-1F are HTTP control characters that need to be escaped. + * \u000A and \u000D are \n and \r, respectively. + */ + val clist = Array( + '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', + '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', + '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', + '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', + '{', '[', ']', '^') + + clist.foreach(bitSet.set(_)) + + if (Shell.WINDOWS) { + Array(' ', '<', '>', '|').foreach(bitSet.set(_)) + } + + bitSet + } + + def needsEscaping(c: Char): Boolean = { + c >= 0 && c < charToEscape.size() && charToEscape.get(c) + } + + def escapePathName(path: String): String = { + val builder = new StringBuilder() + path.foreach { c => + if (needsEscaping(c)) { + builder.append('%') + builder.append(f"${c.asInstanceOf[Int]}%02x") + } else { + builder.append(c) + } + } + + builder.toString() + } + + def unescapePathName(path: String): String = { + val sb = new StringBuilder + var i = 0 + + while (i < path.length) { + val c = path.charAt(i) + if (c == '%' && i + 2 < path.length) { + val code: Int = try { + Integer.valueOf(path.substring(i + 1, i + 3), 16) + } catch { case e: Exception => + -1: Integer + } + if (code >= 0) { + sb.append(code.asInstanceOf[Char]) + i += 3 + } else { + sb.append(c) + i += 1 + } + } else { + sb.append(c) + i += 1 + } + } + + sb.toString() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala new file mode 100644 index 0000000000..84a0441e14 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.{Date, UUID} + +import scala.collection.JavaConversions.asScalaIterator + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} +import org.apache.spark._ +import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StringType +import org.apache.spark.util.SerializableConfiguration + + +private[sql] case class InsertIntoDataSource( + logicalRelation: LogicalRelation, + query: LogicalPlan, + overwrite: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] + val data = DataFrame(sqlContext, query) + // Apply the schema of the existing table to the new data. + val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) + relation.insert(df, overwrite) + + // Invalidate the cache. + sqlContext.cacheManager.invalidateCache(logicalRelation) + + Seq.empty[Row] + } +} + +/** + * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. + * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a + * single write job, and owns a UUID that identifies this job. Each concrete implementation of + * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for + * each task output file. This UUID is passed to executor side via a property named + * `spark.sql.sources.writeJobUUID`. + * + * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]] + * are used to write to normal tables and tables with dynamic partitions. + * + * Basic work flow of this command is: + * + * 1. Driver side setup, including output committer initialization and data source specific + * preparation work for the write job to be issued. + * 2. Issues a write job consists of one or more executor side tasks, each of which writes all + * rows within an RDD partition. + * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any + * exception is thrown during task commitment, also aborts that task. + * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is + * thrown during job commitment, also aborts the job. + */ +private[sql] case class InsertIntoHadoopFsRelation( + @transient relation: HadoopFsRelation, + @transient query: LogicalPlan, + mode: SaveMode) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + require( + relation.paths.length == 1, + s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") + + val hadoopConf = sqlContext.sparkContext.hadoopConfiguration + val outputPath = new Path(relation.paths.head) + val fs = outputPath.getFileSystem(hadoopConf) + val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + + val pathExists = fs.exists(qualifiedOutputPath) + val doInsertion = (mode, pathExists) match { + case (SaveMode.ErrorIfExists, true) => + sys.error(s"path $qualifiedOutputPath already exists.") + case (SaveMode.Overwrite, true) => + fs.delete(qualifiedOutputPath, true) + true + case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => + true + case (SaveMode.Ignore, exists) => + !exists + } + // If we are appending data to an existing dir. + val isAppend = pathExists && (mode == SaveMode.Append) + + if (doInsertion) { + val job = new Job(hadoopConf) + job.setOutputKeyClass(classOf[Void]) + job.setOutputValueClass(classOf[InternalRow]) + FileOutputFormat.setOutputPath(job, qualifiedOutputPath) + + // We create a DataFrame by applying the schema of relation to the data to make sure. + // We are writing data based on the expected schema, + val df = { + // For partitioned relation r, r.schema's column ordering can be different from the column + // ordering of data.logicalPlan (partition columns are all moved after data column). We + // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can + // safely apply the schema of r.schema to the data. + val project = Project( + relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query) + + sqlContext.internalCreateDataFrame( + DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema) + } + + val partitionColumns = relation.partitionColumns.fieldNames + if (partitionColumns.isEmpty) { + insert(new DefaultWriterContainer(relation, job, isAppend), df) + } else { + val writerContainer = new DynamicPartitionWriterContainer( + relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend) + insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) + } + } + + Seq.empty[Row] + } + + /** + * Inserts the content of the [[DataFrame]] into a table without any partitioning columns. + */ + private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = { + // Uses local vals for serialization + val needsConversion = relation.needConversion + val dataSchema = relation.dataSchema + + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + writerContainer.driverSideSetup() + + try { + df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) + writerContainer.commitJob() + relation.refresh() + } catch { case cause: Throwable => + logError("Aborting job.", cause) + writerContainer.abortJob() + throw new SparkException("Job aborted.", cause) + } + + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // If anything below fails, we should abort the task. + try { + writerContainer.executorSideSetup(taskContext) + + val converter: InternalRow => Row = if (needsConversion) { + CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] + } else { + r: InternalRow => r.asInstanceOf[Row] + } + while (iterator.hasNext) { + val internalRow = iterator.next() + writerContainer.outputWriterForRow(internalRow).write(converter(internalRow)) + } + + writerContainer.commitTask() + } catch { case cause: Throwable => + logError("Aborting task.", cause) + writerContainer.abortTask() + throw new SparkException("Task failed while writing rows.", cause) + } + } + } + + /** + * Inserts the content of the [[DataFrame]] into a table with partitioning columns. + */ + private def insertWithDynamicPartitions( + sqlContext: SQLContext, + writerContainer: BaseWriterContainer, + df: DataFrame, + partitionColumns: Array[String]): Unit = { + // Uses a local val for serialization + val needsConversion = relation.needConversion + val dataSchema = relation.dataSchema + + require( + df.schema == relation.schema, + s"""DataFrame must have the same schema as the relation to which is inserted. + |DataFrame schema: ${df.schema} + |Relation schema: ${relation.schema} + """.stripMargin) + + val partitionColumnsInSpec = relation.partitionColumns.fieldNames + require( + partitionColumnsInSpec.sameElements(partitionColumns), + s"""Partition columns mismatch. + |Expected: ${partitionColumnsInSpec.mkString(", ")} + |Actual: ${partitionColumns.mkString(", ")} + """.stripMargin) + + val output = df.queryExecution.executedPlan.output + val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name)) + val codegenEnabled = df.sqlContext.conf.codegenEnabled + + // This call shouldn't be put into the `try` block below because it only initializes and + // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. + writerContainer.driverSideSetup() + + try { + df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) + writerContainer.commitJob() + relation.refresh() + } catch { case cause: Throwable => + logError("Aborting job.", cause) + writerContainer.abortJob() + throw new SparkException("Job aborted.", cause) + } + + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { + // If anything below fails, we should abort the task. + try { + writerContainer.executorSideSetup(taskContext) + + // Projects all partition columns and casts them to strings to build partition directories. + val partitionCasts = partitionOutput.map(Cast(_, StringType)) + val partitionProj = newProjection(codegenEnabled, partitionCasts, output) + val dataProj = newProjection(codegenEnabled, dataOutput, output) + + val dataConverter: InternalRow => Row = if (needsConversion) { + CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] + } else { + r: InternalRow => r.asInstanceOf[Row] + } + + while (iterator.hasNext) { + val internalRow = iterator.next() + val partitionPart = partitionProj(internalRow) + val dataPart = dataConverter(dataProj(internalRow)) + writerContainer.outputWriterForRow(partitionPart).write(dataPart) + } + + writerContainer.commitTask() + } catch { case cause: Throwable => + logError("Aborting task.", cause) + writerContainer.abortTask() + throw new SparkException("Task failed while writing rows.", cause) + } + } + } + + // This is copied from SparkPlan, probably should move this to a more general place. + private def newProjection( + codegenEnabled: Boolean, + expressions: Seq[Expression], + inputSchema: Seq[Attribute]): Projection = { + log.debug( + s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") + if (codegenEnabled) { + + try { + GenerateProjection.generate(expressions, inputSchema) + } catch { + case e: Exception => + if (sys.props.contains("spark.testing")) { + throw e + } else { + log.error("failed to generate projection, fallback to interpreted", e) + new InterpretedProjection(expressions, inputSchema) + } + } + } else { + new InterpretedProjection(expressions, inputSchema) + } + } +} + +private[sql] abstract class BaseWriterContainer( + @transient val relation: HadoopFsRelation, + @transient job: Job, + isAppend: Boolean) + extends SparkHadoopMapReduceUtil + with Logging + with Serializable { + + protected val serializableConf = new SerializableConfiguration(job.getConfiguration) + + // This UUID is used to avoid output file name collision between different appending write jobs. + // These jobs may belong to different SparkContext instances. Concrete data source implementations + // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). + // The reason why this ID is used to identify a job rather than a single task output file is + // that, speculative tasks must generate the same output file name as the original task. + private val uniqueWriteJobId = UUID.randomUUID() + + // This is only used on driver side. + @transient private val jobContext: JobContext = job + + // The following fields are initialized and used on both driver and executor side. + @transient protected var outputCommitter: OutputCommitter = _ + @transient private var jobId: JobID = _ + @transient private var taskId: TaskID = _ + @transient private var taskAttemptId: TaskAttemptID = _ + @transient protected var taskAttemptContext: TaskAttemptContext = _ + + protected val outputPath: String = { + assert( + relation.paths.length == 1, + s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") + relation.paths.head + } + + protected val dataSchema = relation.dataSchema + + protected var outputWriterFactory: OutputWriterFactory = _ + + private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ + + def driverSideSetup(): Unit = { + setupIDs(0, 0, 0) + setupConf() + + // This UUID is sent to executor side together with the serialized `Configuration` object within + // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate + // unique task output files. + job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) + + // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor + // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, + // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. + // + // Also, the `prepareJobForWrite` call must happen before initializing output format and output + // committer, since their initialization involve the job configuration, which can be potentially + // decorated in `prepareJobForWrite`. + outputWriterFactory = relation.prepareJobForWrite(job) + taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) + + outputFormatClass = job.getOutputFormatClass + outputCommitter = newOutputCommitter(taskAttemptContext) + outputCommitter.setupJob(jobContext) + } + + def executorSideSetup(taskContext: TaskContext): Unit = { + setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) + setupConf() + taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) + outputCommitter = newOutputCommitter(taskAttemptContext) + outputCommitter.setupTask(taskAttemptContext) + initWriters() + } + + protected def getWorkPath: String = { + outputCommitter match { + // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. + case f: MapReduceFileOutputCommitter => f.getWorkPath.toString + case _ => outputPath + } + } + + private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { + val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) + + if (isAppend) { + // If we are appending data to an existing dir, we will only use the output committer + // associated with the file output format since it is not safe to use a custom + // committer for appending. For example, in S3, direct parquet output committer may + // leave partial data in the destination dir when the the appending job fails. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + "for appending.") + defaultOutputCommitter + } else { + val committerClass = context.getConfiguration.getClass( + SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) + + Option(committerClass).map { clazz => + logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") + + // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat + // has an associated output committer. To override this output committer, + // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. + // If a data source needs to override the output committer, it needs to set the + // output committer in prepareForWrite method. + if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { + // The specified output committer is a FileOutputCommitter. + // So, we will use the FileOutputCommitter-specified constructor. + val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) + ctor.newInstance(new Path(outputPath), context) + } else { + // The specified output committer is just a OutputCommitter. + // So, we will use the no-argument constructor. + val ctor = clazz.getDeclaredConstructor() + ctor.newInstance() + } + }.getOrElse { + // If output committer class is not set, we will use the one associated with the + // file output format. + logInfo( + s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") + defaultOutputCommitter + } + } + } + + private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { + this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) + this.taskId = new TaskID(this.jobId, true, splitId) + this.taskAttemptId = new TaskAttemptID(taskId, attemptId) + } + + private def setupConf(): Unit = { + serializableConf.value.set("mapred.job.id", jobId.toString) + serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString) + serializableConf.value.set("mapred.task.id", taskAttemptId.toString) + serializableConf.value.setBoolean("mapred.task.is.map", true) + serializableConf.value.setInt("mapred.task.partition", 0) + } + + // Called on executor side when writing rows + def outputWriterForRow(row: InternalRow): OutputWriter + + protected def initWriters(): Unit + + def commitTask(): Unit = { + SparkHadoopMapRedUtil.commitTask( + outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId) + } + + def abortTask(): Unit = { + if (outputCommitter != null) { + outputCommitter.abortTask(taskAttemptContext) + } + logError(s"Task attempt $taskAttemptId aborted.") + } + + def commitJob(): Unit = { + outputCommitter.commitJob(jobContext) + logInfo(s"Job $jobId committed.") + } + + def abortJob(): Unit = { + if (outputCommitter != null) { + outputCommitter.abortJob(jobContext, JobStatus.State.FAILED) + } + logError(s"Job $jobId aborted.") + } +} + +private[sql] class DefaultWriterContainer( + @transient relation: HadoopFsRelation, + @transient job: Job, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { + + @transient private var writer: OutputWriter = _ + + override protected def initWriters(): Unit = { + taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath) + writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) + } + + override def outputWriterForRow(row: InternalRow): OutputWriter = writer + + override def commitTask(): Unit = { + try { + assert(writer != null, "OutputWriter instance should have been initialized") + writer.close() + super.commitTask() + } catch { case cause: Throwable => + // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will + // cause `abortTask()` to be invoked. + throw new RuntimeException("Failed to commit task", cause) + } + } + + override def abortTask(): Unit = { + try { + // It's possible that the task fails before `writer` gets initialized + if (writer != null) { + writer.close() + } + } finally { + super.abortTask() + } + } +} + +private[sql] class DynamicPartitionWriterContainer( + @transient relation: HadoopFsRelation, + @transient job: Job, + partitionColumns: Array[String], + defaultPartitionName: String, + isAppend: Boolean) + extends BaseWriterContainer(relation, job, isAppend) { + + // All output writers are created on executor side. + @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ + + override protected def initWriters(): Unit = { + outputWriters = new java.util.HashMap[String, OutputWriter] + } + + // The `row` argument is supposed to only contain partition column values which have been casted + // to strings. + override def outputWriterForRow(row: InternalRow): OutputWriter = { + val partitionPath = { + val partitionPathBuilder = new StringBuilder + var i = 0 + + while (i < partitionColumns.length) { + val col = partitionColumns(i) + val partitionValueString = { + val string = row.getString(i) + if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string) + } + + if (i > 0) { + partitionPathBuilder.append(Path.SEPARATOR_CHAR) + } + + partitionPathBuilder.append(s"$col=$partitionValueString") + i += 1 + } + + partitionPathBuilder.toString() + } + + val writer = outputWriters.get(partitionPath) + if (writer.eq(null)) { + val path = new Path(getWorkPath, partitionPath) + taskAttemptContext.getConfiguration.set( + "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) + val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) + outputWriters.put(partitionPath, newWriter) + newWriter + } else { + writer + } + } + + private def clearOutputWriters(): Unit = { + if (!outputWriters.isEmpty) { + asScalaIterator(outputWriters.values().iterator()).foreach(_.close()) + outputWriters.clear() + } + } + + override def commitTask(): Unit = { + try { + clearOutputWriters() + super.commitTask() + } catch { case cause: Throwable => + throw new RuntimeException("Failed to commit task", cause) + } + } + + override def abortTask(): Unit = { + try { + clearOutputWriters() + } finally { + super.abortTask() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala new file mode 100644 index 0000000000..c8033d3c04 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.language.{existentials, implicitConversions} +import scala.util.matching.Regex + +import org.apache.hadoop.fs.Path +import org.apache.spark.Logging +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, InternalRow} +import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * A parser for foreign DDL commands. + */ +private[sql] class DDLParser( + parseQuery: String => LogicalPlan) + extends AbstractSparkSQLParser with DataTypeParser with Logging { + + def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { + try { + parse(input) + } catch { + case ddlException: DDLException => throw ddlException + case _ if !exceptionOnError => parseQuery(input) + case x: Throwable => throw x + } + } + + // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword` + // properties via reflection the class in runtime for constructing the SqlLexical object + protected val CREATE = Keyword("CREATE") + protected val TEMPORARY = Keyword("TEMPORARY") + protected val TABLE = Keyword("TABLE") + protected val IF = Keyword("IF") + protected val NOT = Keyword("NOT") + protected val EXISTS = Keyword("EXISTS") + protected val USING = Keyword("USING") + protected val OPTIONS = Keyword("OPTIONS") + protected val DESCRIBE = Keyword("DESCRIBE") + protected val EXTENDED = Keyword("EXTENDED") + protected val AS = Keyword("AS") + protected val COMMENT = Keyword("COMMENT") + protected val REFRESH = Keyword("REFRESH") + + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable + + protected def start: Parser[LogicalPlan] = ddl + + /** + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS] + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * or + * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] + * USING org.apache.spark.sql.avro + * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` + * AS SELECT ... + */ + protected lazy val createTable: Parser[LogicalPlan] = + // TODO: Support database.table. + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ + tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { + case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => + if (temp.isDefined && allowExisting.isDefined) { + throw new DDLException( + "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") + } + + val options = opts.getOrElse(Map.empty[String, String]) + if (query.isDefined) { + if (columns.isDefined) { + throw new DDLException( + "a CREATE TABLE AS SELECT statement does not allow column definitions.") + } + // When IF NOT EXISTS clause appears in the query, the save mode will be ignore. + val mode = if (allowExisting.isDefined) { + SaveMode.Ignore + } else if (temp.isDefined) { + SaveMode.Overwrite + } else { + SaveMode.ErrorIfExists + } + + val queryPlan = parseQuery(query.get) + CreateTableUsingAsSelect(tableName, + provider, + temp.isDefined, + Array.empty[String], + mode, + options, + queryPlan) + } else { + val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) + CreateTableUsing( + tableName, + userSpecifiedSchema, + provider, + temp.isDefined, + options, + allowExisting.isDefined, + managedIfNoPath = false) + } + } + + protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" + + /* + * describe [extended] table avroTable + * This will display all columns of table `avroTable` includes column_name,column_type,comment + */ + protected lazy val describeTable: Parser[LogicalPlan] = + (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ { + case e ~ db ~ tbl => + val tblIdentifier = db match { + case Some(dbName) => + Seq(dbName, tbl) + case None => + Seq(tbl) + } + DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined) + } + + protected lazy val refreshTable: Parser[LogicalPlan] = + REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ { + case maybeDatabaseName ~ tableName => + RefreshTable(maybeDatabaseName.getOrElse("default"), tableName) + } + + protected lazy val options: Parser[Map[String, String]] = + "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } + + protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")} + + override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch( + s"identifier matching regex $regex", { + case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str + case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str + } + ) + + protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ { + case name => name + } + + protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ { + case parts => parts.mkString(".") + } + + protected lazy val pair: Parser[(String, String)] = + optionName ~ stringLit ^^ { case k ~ v => (k, v) } + + protected lazy val column: Parser[StructField] = + ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm => + val meta = cm match { + case Some(comment) => + new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build() + case None => Metadata.empty + } + + StructField(columnName, typ, nullable = true, meta) + } +} + +private[sql] object ResolvedDataSource { + + private val builtinSources = Map( + "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource", + "json" -> "org.apache.spark.sql.json.DefaultSource", + "parquet" -> "org.apache.spark.sql.parquet.DefaultSource", + "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource" + ) + + /** Given a provider name, look up the data source class definition. */ + def lookupDataSource(provider: String): Class[_] = { + val loader = Utils.getContextOrSparkClassLoader + + if (builtinSources.contains(provider)) { + return loader.loadClass(builtinSources(provider)) + } + + try { + loader.loadClass(provider) + } catch { + case cnf: java.lang.ClassNotFoundException => + try { + loader.loadClass(provider + ".DefaultSource") + } catch { + case cnf: java.lang.ClassNotFoundException => + if (provider.startsWith("org.apache.spark.sql.hive.orc")) { + sys.error("The ORC data source must be used with Hive support enabled.") + } else { + sys.error(s"Failed to load class for data source: $provider") + } + } + } + } + + /** Create a [[ResolvedDataSource]] for reading data in. */ + def apply( + sqlContext: SQLContext, + userSpecifiedSchema: Option[StructType], + partitionColumns: Array[String], + provider: String, + options: Map[String, String]): ResolvedDataSource = { + val clazz: Class[_] = lookupDataSource(provider) + def className: String = clazz.getCanonicalName + val relation = userSpecifiedSchema match { + case Some(schema: StructType) => clazz.newInstance() match { + case dataSource: SchemaRelationProvider => + dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) + case dataSource: HadoopFsRelationProvider => + val maybePartitionsSchema = if (partitionColumns.isEmpty) { + None + } else { + Some(partitionColumnsSchema(schema, partitionColumns)) + } + + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val paths = { + val patternPath = new Path(caseInsensitiveOptions("path")) + val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + } + + val dataSchema = + StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable + + dataSource.createRelation( + sqlContext, + paths, + Some(dataSchema), + maybePartitionsSchema, + caseInsensitiveOptions) + case dataSource: org.apache.spark.sql.sources.RelationProvider => + throw new AnalysisException(s"$className does not allow user-specified schemas.") + case _ => + throw new AnalysisException(s"$className is not a RelationProvider.") + } + + case None => clazz.newInstance() match { + case dataSource: RelationProvider => + dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) + case dataSource: HadoopFsRelationProvider => + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val paths = { + val patternPath = new Path(caseInsensitiveOptions("path")) + val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray + } + dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions) + case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => + throw new AnalysisException( + s"A schema needs to be specified when using $className.") + case _ => + throw new AnalysisException( + s"$className is neither a RelationProvider nor a FSBasedRelationProvider.") + } + } + new ResolvedDataSource(clazz, relation) + } + + private def partitionColumnsSchema( + schema: StructType, + partitionColumns: Array[String]): StructType = { + StructType(partitionColumns.map { col => + schema.find(_.name == col).getOrElse { + throw new RuntimeException(s"Partition column $col not found in schema $schema") + } + }).asNullable + } + + /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */ + def apply( + sqlContext: SQLContext, + provider: String, + partitionColumns: Array[String], + mode: SaveMode, + options: Map[String, String], + data: DataFrame): ResolvedDataSource = { + if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) { + throw new AnalysisException("Cannot save interval data type into external storage.") + } + val clazz: Class[_] = lookupDataSource(provider) + val relation = clazz.newInstance() match { + case dataSource: CreatableRelationProvider => + dataSource.createRelation(sqlContext, mode, options, data) + case dataSource: HadoopFsRelationProvider => + // Don't glob path for the write path. The contracts here are: + // 1. Only one output path can be specified on the write path; + // 2. Output path must be a legal HDFS style file system path; + // 3. It's OK that the output path doesn't exist yet; + val caseInsensitiveOptions = new CaseInsensitiveMap(options) + val outputPath = { + val path = new Path(caseInsensitiveOptions("path")) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory) + } + val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) + val r = dataSource.createRelation( + sqlContext, + Array(outputPath.toString), + Some(dataSchema.asNullable), + Some(partitionColumnsSchema(data.schema, partitionColumns)), + caseInsensitiveOptions) + + // For partitioned relation r, r.schema's column ordering can be different from the column + // ordering of data.logicalPlan (partition columns are all moved after data column). This + // will be adjusted within InsertIntoHadoopFsRelation. + sqlContext.executePlan( + InsertIntoHadoopFsRelation( + r, + data.logicalPlan, + mode)).toRdd + r + case _ => + sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") + } + new ResolvedDataSource(clazz, relation) + } +} + +private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) + +/** + * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. + * @param table The table to be described. + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. + * It is effective only when the table is a Hive table. + */ +private[sql] case class DescribeCommand( + table: LogicalPlan, + isExtended: Boolean) extends LogicalPlan with Command { + + override def children: Seq[LogicalPlan] = Seq.empty + override val output: Seq[Attribute] = Seq( + // Column names are based on Hive. + AttributeReference("col_name", StringType, nullable = false, + new MetadataBuilder().putString("comment", "name of the column").build())(), + AttributeReference("data_type", StringType, nullable = false, + new MetadataBuilder().putString("comment", "data type of the column").build())(), + AttributeReference("comment", StringType, nullable = false, + new MetadataBuilder().putString("comment", "comment of the column").build())()) +} + +/** + * Used to represent the operation of create table using a data source. + * @param allowExisting If it is true, we will do nothing when the table already exists. + * If it is false, an exception will be thrown + */ +private[sql] case class CreateTableUsing( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + temporary: Boolean, + options: Map[String, String], + allowExisting: Boolean, + managedIfNoPath: Boolean) extends LogicalPlan with Command { + + override def output: Seq[Attribute] = Seq.empty + override def children: Seq[LogicalPlan] = Seq.empty +} + +/** + * A node used to support CTAS statements and saveAsTable for the data source API. + * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer + * can analyze the logical plan that will be used to populate the table. + * So, [[PreWriteCheck]] can detect cases that are not allowed. + */ +private[sql] case class CreateTableUsingAsSelect( + tableName: String, + provider: String, + temporary: Boolean, + partitionColumns: Array[String], + mode: SaveMode, + options: Map[String, String], + child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = Seq.empty[Attribute] + // TODO: Override resolved after we support databaseName. + // override lazy val resolved = databaseName != None && childrenResolved +} + +private[sql] case class CreateTempTableUsing( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext): Seq[InternalRow] = { + val resolved = ResolvedDataSource( + sqlContext, userSpecifiedSchema, Array.empty[String], provider, options) + sqlContext.registerDataFrameAsTable( + DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + Seq.empty + } +} + +private[sql] case class CreateTempTableUsingAsSelect( + tableName: String, + provider: String, + partitionColumns: Array[String], + mode: SaveMode, + options: Map[String, String], + query: LogicalPlan) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[InternalRow] = { + val df = DataFrame(sqlContext, query) + val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df) + sqlContext.registerDataFrameAsTable( + DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) + + Seq.empty + } +} + +private[sql] case class RefreshTable(databaseName: String, tableName: String) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[InternalRow] = { + // Refresh the given table's metadata first. + sqlContext.catalog.refreshTable(databaseName, tableName) + + // If this table is cached as a InMemoryColumnarRelation, drop the original + // cached version and make the new version cached lazily. + val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName)) + // Use lookupCachedData directly since RefreshTable also takes databaseName. + val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty + if (isCached) { + // Create a data frame to represent the table. + // TODO: Use uncacheTable once it supports database name. + val df = DataFrame(sqlContext, logicalPlan) + // Uncache the logicalPlan. + sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) + // Cache it again. + sqlContext.cacheManager.cacheQuery(df, Some(tableName)) + } + + Seq.empty[InternalRow] + } +} + +/** + * Builds a map in which keys are case insensitive + */ +protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] + with Serializable { + + val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase)) + + override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) + + override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = + baseMap + kv.copy(_1 = kv._1.toLowerCase) + + override def iterator: Iterator[(String, String)] = baseMap.iterator + + override def -(key: String): Map[String, String] = baseMap - key.toLowerCase +} + +/** + * The exception thrown from the DDL parser. + */ +protected[sql] class DDLException(message: String) extends Exception(message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala new file mode 100644 index 0000000000..11bb49b8d8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} + +/** + * A rule to do pre-insert data type casting and field renaming. Before we insert into + * an [[InsertableRelation]], we will use this rule to make sure that + * the columns to be inserted have the correct data type and fields have the correct names. + */ +private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + // We are inserting into an InsertableRelation or HadoopFsRelation. + case i @ InsertIntoTable( + l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => { + // First, make sure the data to be inserted have the same number of fields with the + // schema of the relation. + if (l.output.size != child.output.size) { + sys.error( + s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " + + s"statement generates the same number of columns as its schema.") + } + castAndRenameChildOutput(i, l.output, child) + } + } + + /** If necessary, cast data types and rename fields to the expected types and names. */ + def castAndRenameChildOutput( + insertInto: InsertIntoTable, + expectedOutput: Seq[Attribute], + child: LogicalPlan): InsertIntoTable = { + val newChildOutput = expectedOutput.zip(child.output).map { + case (expected, actual) => + val needCast = !expected.dataType.sameType(actual.dataType) + // We want to make sure the filed names in the data to be inserted exactly match + // names in the schema. + val needRename = expected.name != actual.name + (needCast, needRename) match { + case (true, _) => Alias(Cast(actual, expected.dataType), expected.name)() + case (false, true) => Alias(actual, expected.name)() + case (_, _) => actual + } + } + + if (newChildOutput == child.output) { + insertInto + } else { + insertInto.copy(child = Project(newChildOutput, child)) + } + } +} + +/** + * A rule to do various checks before inserting into or writing to a data source table. + */ +private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { + def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } + + def apply(plan: LogicalPlan): Unit = { + plan.foreach { + case i @ logical.InsertIntoTable( + l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) => + // Right now, we do not support insert into a data source table with partition specs. + if (partition.nonEmpty) { + failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") + } else { + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src: BaseRelation) => src + } + if (srcRelations.contains(t)) { + failAnalysis( + "Cannot insert overwrite into table that is also being read from.") + } else { + // OK + } + } + + case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) => + // We need to make sure the partition columns specified by users do match partition + // columns of the relation. + val existingPartitionColumns = r.partitionColumns.fieldNames.toSet + val specifiedPartitionColumns = part.keySet + if (existingPartitionColumns != specifiedPartitionColumns) { + failAnalysis(s"Specified partition columns " + + s"(${specifiedPartitionColumns.mkString(", ")}) " + + s"do not match the partition columns of the table. Please use " + + s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") + } else { + // OK + } + + case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => + // The relation in l is not an InsertableRelation. + failAnalysis(s"$l does not allow insertion.") + + case logical.InsertIntoTable(t, _, _, _, _) => + if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) { + failAnalysis(s"Inserting into an RDD-based table is not allowed.") + } else { + // OK + } + + case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => + // When the SaveMode is Overwrite, we need to check if the table is an input table of + // the query. If so, we will throw an AnalysisException to let users know it is not allowed. + if (catalog.tableExists(Seq(tableName))) { + // Need to remove SubQuery operator. + EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { + // Only do the check if the table is a data source table + // (the relation is a BaseRelation). + case l @ LogicalRelation(dest: BaseRelation) => + // Get all input data source relations of the query. + val srcRelations = query.collect { + case LogicalRelation(src: BaseRelation) => src + } + if (srcRelations.contains(dest)) { + failAnalysis( + s"Cannot overwrite table $tableName that is also being read from.") + } else { + // OK + } + + case _ => // OK + } + } else { + // OK + } + + case _ => // OK + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index e683eb0126..2f9f880c70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -35,15 +35,18 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.ContextUtil import org.apache.parquet.schema.MessageType +import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD} +import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.{Logging, Partition => SparkPartition, SparkException} + private[sql] class DefaultSource extends HadoopFsRelationProvider { override def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala deleted file mode 100644 index 70c9e06927..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import org.apache.spark.{Logging, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} -import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.unsafe.types.UTF8String - -/** - * A Strategy for planning scans over data sources defined using the sources API. - */ -private[sql] object DataSourceStrategy extends Strategy with Logging { - def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) => - pruneFilterProjectRaw( - l, - projects, - filters, - (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil - - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => - pruneFilterProject( - l, - projects, - filters, - (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) => - pruneFilterProject( - l, - projects, - filters, - (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil - - // Scanning partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) - if t.partitionSpec.partitionColumns.nonEmpty => - val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray - - logInfo { - val total = t.partitionSpec.partitions.length - val selected = selectedPartitions.length - val percentPruned = (1 - selected.toDouble / total.toDouble) * 100 - s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." - } - - // Only pushes down predicates that do not reference partition columns. - val pushedFilters = { - val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet - filters.filter { f => - val referencedColumnNames = f.references.map(_.name).toSet - referencedColumnNames.intersect(partitionColumnNames).isEmpty - } - } - - buildPartitionedTableScan( - l, - projects, - pushedFilters, - t.partitionSpec.partitionColumns, - selectedPartitions) :: Nil - - // Scanning non-partitioned HadoopFsRelation - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => - // See buildPartitionedTableScan for the reason that we need to create a shard - // broadcast HadoopConf. - val sharedHadoopConf = SparkHadoopUtil.get.conf - val confBroadcast = - t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - pruneFilterProject( - l, - projects, - filters, - (a, f) => - toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil - - case l @ LogicalRelation(t: TableScan) => - execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil - - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => - execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil - - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) => - val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append - execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil - - case _ => Nil - } - - private def buildPartitionedTableScan( - logicalRelation: LogicalRelation, - projections: Seq[NamedExpression], - filters: Seq[Expression], - partitionColumns: StructType, - partitions: Array[Partition]) = { - val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] - - // Because we are creating one RDD per partition, we need to have a shared HadoopConf. - // Otherwise, the cost of broadcasting HadoopConf in every RDD will be high. - val sharedHadoopConf = SparkHadoopUtil.get.conf - val confBroadcast = - relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf)) - - // Builds RDD[Row]s for each selected partition. - val perPartitionRows = partitions.map { case Partition(partitionValues, dir) => - // The table scan operator (PhysicalRDD) which retrieves required columns from data files. - // Notice that the schema of data files, represented by `relation.dataSchema`, may contain - // some partition column(s). - val scan = - pruneFilterProject( - logicalRelation, - projections, - filters, - (columns: Seq[Attribute], filters) => { - val partitionColNames = partitionColumns.fieldNames - - // Don't scan any partition columns to save I/O. Here we are being optimistic and - // assuming partition columns data stored in data files are always consistent with those - // partition values encoded in partition directory paths. - val needed = columns.filterNot(a => partitionColNames.contains(a.name)) - val dataRows = - relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) - - // Merges data values with partition values. - mergeWithPartitionValues( - relation.schema, - columns.map(_.name).toArray, - partitionColNames, - partitionValues, - toCatalystRDD(logicalRelation, needed, dataRows)) - }) - - scan.execute() - } - - val unionedRows = - if (perPartitionRows.length == 0) { - relation.sqlContext.emptyResult - } else { - new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) - } - - execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows) - } - - private def mergeWithPartitionValues( - schema: StructType, - requiredColumns: Array[String], - partitionColumns: Array[String], - partitionValues: InternalRow, - dataRows: RDD[InternalRow]): RDD[InternalRow] = { - val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains) - - // If output columns contain any partition column(s), we need to merge scanned data - // columns and requested partition columns to form the final result. - if (!requiredColumns.sameElements(nonPartitionColumns)) { - val mergers = requiredColumns.zipWithIndex.map { case (name, index) => - // To see whether the `index`-th column is a partition column... - val i = partitionColumns.indexOf(name) - if (i != -1) { - // If yes, gets column value from partition values. - (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { - mutableRow(ordinal) = partitionValues(i) - } - } else { - // Otherwise, inherits the value from scanned data. - val i = nonPartitionColumns.indexOf(name) - (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { - mutableRow(ordinal) = dataRow(i) - } - } - } - - // Since we know for sure that this closure is serializable, we can avoid the overhead - // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally - // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). - val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => { - val dataTypes = requiredColumns.map(schema(_).dataType) - val mutableRow = new SpecificMutableRow(dataTypes) - iterator.map { dataRow => - var i = 0 - while (i < mutableRow.length) { - mergers(i)(mutableRow, dataRow, i) - i += 1 - } - mutableRow.asInstanceOf[InternalRow] - } - } - - // This is an internal RDD whose call site the user should not be concerned with - // Since we create many of these (one per partition), the time spent on computing - // the call site may add up. - Utils.withDummyCallSite(dataRows.sparkContext) { - new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false) - } - - } else { - dataRows - } - } - - protected def prunePartitions( - predicates: Seq[Expression], - partitionSpec: PartitionSpec): Seq[Partition] = { - val PartitionSpec(partitionColumns, partitions) = partitionSpec - val partitionColumnNames = partitionColumns.map(_.name).toSet - val partitionPruningPredicates = predicates.filter { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - - if (partitionPruningPredicates.nonEmpty) { - val predicate = - partitionPruningPredicates - .reduceOption(expressions.And) - .getOrElse(Literal(true)) - - val boundPredicate = InterpretedPredicate.create(predicate.transform { - case a: AttributeReference => - val index = partitionColumns.indexWhere(a.name == _.name) - BoundReference(index, partitionColumns(index).dataType, nullable = true) - }) - - partitions.filter { case Partition(values, _) => boundPredicate(values) } - } else { - partitions - } - } - - // Based on Public API. - protected def pruneFilterProject( - relation: LogicalRelation, - projects: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = { - pruneFilterProjectRaw( - relation, - projects, - filterPredicates, - (requestedColumns, pushedFilters) => { - scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray) - }) - } - - // Based on Catalyst expressions. - protected def pruneFilterProjectRaw( - relation: LogicalRelation, - projects: Seq[NamedExpression], - filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = { - - val projectSet = AttributeSet(projects.flatMap(_.references)) - val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) - val filterCondition = filterPredicates.reduceLeftOption(expressions.And) - - val pushedFilters = filterPredicates.map { _ transform { - case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. - }} - - if (projects.map(_.toAttribute) == projects && - projectSet.size == projects.size && - filterSet.subsetOf(projectSet)) { - // When it is possible to just use column pruning to get the right projection and - // when the columns of this projection are enough to evaluate all filter conditions, - // just do a scan followed by a filter, with no extra project. - val requestedColumns = - projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. - .map(relation.attributeMap) // Match original case of attributes. - - val scan = execution.PhysicalRDD(projects.map(_.toAttribute), - scanBuilder(requestedColumns, pushedFilters)) - filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) - } else { - val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq - - val scan = execution.PhysicalRDD(requestedColumns, - scanBuilder(requestedColumns, pushedFilters)) - execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) - } - } - - /** - * Convert RDD of Row into RDD of InternalRow with objects in catalyst types - */ - private[this] def toCatalystRDD( - relation: LogicalRelation, - output: Seq[Attribute], - rdd: RDD[Row]): RDD[InternalRow] = { - if (relation.relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) - } else { - rdd.map(_.asInstanceOf[InternalRow]) - } - } - - /** - * Convert RDD of Row into RDD of InternalRow with objects in catalyst types - */ - private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { - toCatalystRDD(relation, relation.output, rdd) - } - - /** - * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s, - * and convert them. - */ - protected[sql] def selectFilters(filters: Seq[Expression]) = { - def translate(predicate: Expression): Option[Filter] = predicate match { - case expressions.EqualTo(a: Attribute, Literal(v, _)) => - Some(sources.EqualTo(a.name, v)) - case expressions.EqualTo(Literal(v, _), a: Attribute) => - Some(sources.EqualTo(a.name, v)) - - case expressions.GreaterThan(a: Attribute, Literal(v, _)) => - Some(sources.GreaterThan(a.name, v)) - case expressions.GreaterThan(Literal(v, _), a: Attribute) => - Some(sources.LessThan(a.name, v)) - - case expressions.LessThan(a: Attribute, Literal(v, _)) => - Some(sources.LessThan(a.name, v)) - case expressions.LessThan(Literal(v, _), a: Attribute) => - Some(sources.GreaterThan(a.name, v)) - - case expressions.GreaterThanOrEqual(a: Attribute, Literal(v, _)) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case expressions.GreaterThanOrEqual(Literal(v, _), a: Attribute) => - Some(sources.LessThanOrEqual(a.name, v)) - - case expressions.LessThanOrEqual(a: Attribute, Literal(v, _)) => - Some(sources.LessThanOrEqual(a.name, v)) - case expressions.LessThanOrEqual(Literal(v, _), a: Attribute) => - Some(sources.GreaterThanOrEqual(a.name, v)) - - case expressions.InSet(a: Attribute, set) => - Some(sources.In(a.name, set.toArray)) - - case expressions.IsNull(a: Attribute) => - Some(sources.IsNull(a.name)) - case expressions.IsNotNull(a: Attribute) => - Some(sources.IsNotNull(a.name)) - - case expressions.And(left, right) => - (translate(left) ++ translate(right)).reduceOption(sources.And) - - case expressions.Or(left, right) => - for { - leftFilter <- translate(left) - rightFilter <- translate(right) - } yield sources.Or(leftFilter, rightFilter) - - case expressions.Not(child) => - translate(child).map(sources.Not) - - case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringStartsWith(a.name, v.toString)) - - case expressions.EndsWith(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringEndsWith(a.name, v.toString)) - - case expressions.Contains(a: Attribute, Literal(v: UTF8String, StringType)) => - Some(sources.StringContains(a.name, v.toString)) - - case _ => None - } - - filters.flatMap(translate) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala deleted file mode 100644 index f374abffdd..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/LogicalRelation.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.sources - -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeMap} -import org.apache.spark.sql.catalyst.plans.logical.{Statistics, LeafNode, LogicalPlan} - -/** - * Used to link a [[BaseRelation]] in to a logical query plan. - */ -private[sql] case class LogicalRelation(relation: BaseRelation) - extends LeafNode - with MultiInstanceRelation { - - override val output: Seq[AttributeReference] = relation.schema.toAttributes - - // Logical Relations are distinct if they have different output for the sake of transformations. - override def equals(other: Any): Boolean = other match { - case l @ LogicalRelation(otherRelation) => relation == otherRelation && output == l.output - case _ => false - } - - override def hashCode: Int = { - com.google.common.base.Objects.hashCode(relation, output) - } - - override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation) => relation == otherRelation - case _ => false - } - - @transient override lazy val statistics: Statistics = Statistics( - sizeInBytes = BigInt(relation.sizeInBytes) - ) - - /** Used to lookup original attribute capitalization */ - val attributeMap: AttributeMap[AttributeReference] = AttributeMap(output.map(o => (o, o))) - - def newInstance(): this.type = LogicalRelation(relation).asInstanceOf[this.type] - - override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala deleted file mode 100644 index 8b2a45d8e9..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.lang.{Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong} -import java.math.{BigDecimal => JBigDecimal} - -import scala.collection.mutable.ArrayBuffer -import scala.util.Try - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.util.Shell -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} -import org.apache.spark.sql.types._ - -private[sql] case class Partition(values: InternalRow, path: String) - -private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) - -private[sql] object PartitionSpec { - val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition]) -} - -private[sql] object PartitioningUtils { - // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't - // depend on Hive. - private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" - - private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { - require(columnNames.size == literals.size) - } - - /** - * Given a group of qualified paths, tries to parse them and returns a partition specification. - * For example, given: - * {{{ - * hdfs://:/path/to/partition/a=1/b=hello/c=3.14 - * hdfs://:/path/to/partition/a=2/b=world/c=6.28 - * }}} - * it returns: - * {{{ - * PartitionSpec( - * partitionColumns = StructType( - * StructField(name = "a", dataType = IntegerType, nullable = true), - * StructField(name = "b", dataType = StringType, nullable = true), - * StructField(name = "c", dataType = DoubleType, nullable = true)), - * partitions = Seq( - * Partition( - * values = Row(1, "hello", 3.14), - * path = "hdfs://:/path/to/partition/a=1/b=hello/c=3.14"), - * Partition( - * values = Row(2, "world", 6.28), - * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) - * }}} - */ - private[sql] def parsePartitions( - paths: Seq[Path], - defaultPartitionName: String, - typeInference: Boolean): PartitionSpec = { - // First, we need to parse every partition's path and see if we can find partition values. - val pathsWithPartitionValues = paths.flatMap { path => - parsePartition(path, defaultPartitionName, typeInference).map(path -> _) - } - - if (pathsWithPartitionValues.isEmpty) { - // This dataset is not partitioned. - PartitionSpec.emptySpec - } else { - // This dataset is partitioned. We need to check whether all partitions have the same - // partition columns and resolve potential type conflicts. - val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) - - // Creates the StructType which represents the partition columns. - val fields = { - val PartitionValues(columnNames, literals) = resolvedPartitionValues.head - columnNames.zip(literals).map { case (name, Literal(_, dataType)) => - // We always assume partition columns are nullable since we've no idea whether null values - // will be appended in the future. - StructField(name, dataType, nullable = true) - } - } - - // Finally, we create `Partition`s based on paths and resolved partition values. - val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { - case (PartitionValues(_, literals), (path, _)) => - Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString) - } - - PartitionSpec(StructType(fields), partitions) - } - } - - /** - * Parses a single partition, returns column names and values of each partition column. For - * example, given: - * {{{ - * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 - * }}} - * it returns: - * {{{ - * PartitionValues( - * Seq("a", "b", "c"), - * Seq( - * Literal.create(42, IntegerType), - * Literal.create("hello", StringType), - * Literal.create(3.14, FloatType))) - * }}} - */ - private[sql] def parsePartition( - path: Path, - defaultPartitionName: String, - typeInference: Boolean): Option[PartitionValues] = { - val columns = ArrayBuffer.empty[(String, Literal)] - // Old Hadoop versions don't have `Path.isRoot` - var finished = path.getParent == null - var chopped = path - - while (!finished) { - // Sometimes (e.g., when speculative task is enabled), temporary directories may be left - // uncleaned. Here we simply ignore them. - if (chopped.getName.toLowerCase == "_temporary") { - return None - } - - val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) - maybeColumn.foreach(columns += _) - chopped = chopped.getParent - finished = maybeColumn.isEmpty || chopped.getParent == null - } - - if (columns.isEmpty) { - None - } else { - val (columnNames, values) = columns.reverse.unzip - Some(PartitionValues(columnNames, values)) - } - } - - private def parsePartitionColumn( - columnSpec: String, - defaultPartitionName: String, - typeInference: Boolean): Option[(String, Literal)] = { - val equalSignIndex = columnSpec.indexOf('=') - if (equalSignIndex == -1) { - None - } else { - val columnName = columnSpec.take(equalSignIndex) - assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'") - - val rawColumnValue = columnSpec.drop(equalSignIndex + 1) - assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - - val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference) - Some(columnName -> literal) - } - } - - /** - * Resolves possible type conflicts between partitions by up-casting "lower" types. The up- - * casting order is: - * {{{ - * NullType -> - * IntegerType -> LongType -> - * DoubleType -> DecimalType.Unlimited -> - * StringType - * }}} - */ - private[sql] def resolvePartitions( - pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { - if (pathsWithPartitionValues.isEmpty) { - Seq.empty - } else { - val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct - assert( - distinctPartColNames.size == 1, - listConflictingPartitionColumns(pathsWithPartitionValues)) - - // Resolves possible type conflicts for each column - val values = pathsWithPartitionValues.map(_._2) - val columnCount = values.head.columnNames.size - val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) - } - - // Fills resolved literals back to each partition - values.zipWithIndex.map { case (d, index) => - d.copy(literals = resolvedValues.map(_(index))) - } - } - } - - private[sql] def listConflictingPartitionColumns( - pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { - val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct - - def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }) - - val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { - case (path, partValues) => partValues.columnNames -> path - }) - - val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map { - case (names, index) => - s"Partition column name list #$index: $names" - } - - // Lists out those non-leaf partition directories that also contain files - val suspiciousPaths = distinctPartColNames.sortBy(_.length).flatMap(partColNamesToPaths) - - s"Conflicting partition column names detected:\n" + - distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") + - "For partitioned table directories, data files should only live in leaf directories.\n" + - "And directories at the same level should have the same partition column name.\n" + - "Please check the following directories for unexpected files or " + - "inconsistent partition column names:\n" + - suspiciousPaths.map("\t" + _).mkString("\n", "\n", "") - } - - /** - * Converts a string to a [[Literal]] with automatic type inference. Currently only supports - * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.Unlimited]], and - * [[StringType]]. - */ - private[sql] def inferPartitionColumnValue( - raw: String, - defaultPartitionName: String, - typeInference: Boolean): Literal = { - if (typeInference) { - // First tries integral types - Try(Literal.create(Integer.parseInt(raw), IntegerType)) - .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) - // Then falls back to fractional types - .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) - .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) - // Then falls back to string - .getOrElse { - if (raw == defaultPartitionName) { - Literal.create(null, NullType) - } else { - Literal.create(unescapePathName(raw), StringType) - } - } - } else { - if (raw == defaultPartitionName) { - Literal.create(null, NullType) - } else { - Literal.create(unescapePathName(raw), StringType) - } - } - } - - private val upCastingOrder: Seq[DataType] = - Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType) - - /** - * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" - * types. - */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { - val desiredType = { - val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) - // Falls back to string if all values of this column are null or empty string - if (topType == NullType) StringType else topType - } - - literals.map { case l @ Literal(_, dataType) => - Literal.create(Cast(l, desiredType).eval(), desiredType) - } - } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - // The following string escaping code is mainly copied from Hive (o.a.h.h.common.FileUtils). - ////////////////////////////////////////////////////////////////////////////////////////////////// - - val charToEscape = { - val bitSet = new java.util.BitSet(128) - - /** - * ASCII 01-1F are HTTP control characters that need to be escaped. - * \u000A and \u000D are \n and \r, respectively. - */ - val clist = Array( - '\u0001', '\u0002', '\u0003', '\u0004', '\u0005', '\u0006', '\u0007', '\u0008', '\u0009', - '\n', '\u000B', '\u000C', '\r', '\u000E', '\u000F', '\u0010', '\u0011', '\u0012', '\u0013', - '\u0014', '\u0015', '\u0016', '\u0017', '\u0018', '\u0019', '\u001A', '\u001B', '\u001C', - '\u001D', '\u001E', '\u001F', '"', '#', '%', '\'', '*', '/', ':', '=', '?', '\\', '\u007F', - '{', '[', ']', '^') - - clist.foreach(bitSet.set(_)) - - if (Shell.WINDOWS) { - Array(' ', '<', '>', '|').foreach(bitSet.set(_)) - } - - bitSet - } - - def needsEscaping(c: Char): Boolean = { - c >= 0 && c < charToEscape.size() && charToEscape.get(c) - } - - def escapePathName(path: String): String = { - val builder = new StringBuilder() - path.foreach { c => - if (needsEscaping(c)) { - builder.append('%') - builder.append(f"${c.asInstanceOf[Int]}%02x") - } else { - builder.append(c) - } - } - - builder.toString() - } - - def unescapePathName(path: String): String = { - val sb = new StringBuilder - var i = 0 - - while (i < path.length) { - val c = path.charAt(i) - if (c == '%' && i + 2 < path.length) { - val code: Int = try { - Integer.valueOf(path.substring(i + 1, i + 3), 16) - } catch { case e: Exception => - -1: Integer - } - if (code >= 0) { - sb.append(code.asInstanceOf[Char]) - i += 3 - } else { - sb.append(c) - i += 1 - } - } else { - sb.append(c) - i += 1 - } - } - - sb.toString() - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala deleted file mode 100644 index 2bdc341021..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.text.SimpleDateFormat -import java.util.Date - -import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit} -import org.apache.spark.broadcast.Broadcast - -import org.apache.spark.{Partition => SparkPartition, _} -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.executor.DataReadMethod -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.rdd.{RDD, HadoopRDD} -import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{SerializableConfiguration, Utils} - -import scala.reflect.ClassTag - -private[spark] class SqlNewHadoopPartition( - rddId: Int, - val index: Int, - @transient rawSplit: InputSplit with Writable) - extends SparkPartition { - - val serializableHadoopSplit = new SerializableWritable(rawSplit) - - override def hashCode(): Int = 41 * (41 + rddId) + index -} - -/** - * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, - * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). - * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions. - * 1. A shared broadcast Hadoop Configuration. - * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side - * to the shared Hadoop Configuration. - * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side - * and the executor side to the shared Hadoop Configuration. - * - * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with - * changes based on [[org.apache.spark.rdd.HadoopRDD]]. In future, this functionality will be - * folded into core. - */ -private[sql] class SqlNewHadoopRDD[K, V]( - @transient sc : SparkContext, - broadcastedConf: Broadcast[SerializableConfiguration], - @transient initDriverSideJobFuncOpt: Option[Job => Unit], - initLocalJobFuncOpt: Option[Job => Unit], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V]) - extends RDD[(K, V)](sc, Nil) - with SparkHadoopMapReduceUtil - with Logging { - - protected def getJob(): Job = { - val conf: Configuration = broadcastedConf.value.value - // "new Job" will make a copy of the conf. Then, it is - // safe to mutate conf properties with initLocalJobFuncOpt - // and initDriverSideJobFuncOpt. - val newJob = new Job(conf) - initLocalJobFuncOpt.map(f => f(newJob)) - newJob - } - - def getConf(isDriverSide: Boolean): Configuration = { - val job = getJob() - if (isDriverSide) { - initDriverSideJobFuncOpt.map(f => f(job)) - } - job.getConfiguration - } - - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new Date()) - } - - @transient protected val jobId = new JobID(jobTrackerId, id) - - override def getPartitions: Array[SparkPartition] = { - val conf = getConf(isDriverSide = true) - val inputFormat = inputFormatClass.newInstance - inputFormat match { - case configurable: Configurable => - configurable.setConf(conf) - case _ => - } - val jobContext = newJobContext(conf, jobId) - val rawSplits = inputFormat.getSplits(jobContext).toArray - val result = new Array[SparkPartition](rawSplits.size) - for (i <- 0 until rawSplits.size) { - result(i) = - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) - } - result - } - - override def compute( - theSplit: SparkPartition, - context: TaskContext): InterruptibleIterator[(K, V)] = { - val iter = new Iterator[(K, V)] { - val split = theSplit.asInstanceOf[SqlNewHadoopPartition] - logInfo("Input split: " + split.serializableHadoopSplit) - val conf = getConf(isDriverSide = false) - - val inputMetrics = context.taskMetrics - .getInputMetricsForReadMethod(DataReadMethod.Hadoop) - - // Find a function that will return the FileSystem bytes read by this thread. Do this before - // creating RecordReader, because RecordReader's constructor might read some bytes - val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { - split.serializableHadoopSplit.value match { - case _: FileSplit | _: CombineFileSplit => - SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() - case _ => None - } - } - inputMetrics.setBytesReadCallback(bytesReadCallback) - - val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) - val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) - val format = inputFormatClass.newInstance - format match { - case configurable: Configurable => - configurable.setConf(conf) - case _ => - } - val reader = format.createRecordReader( - split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) - - // Register an on-task-completion callback to close the input stream. - context.addTaskCompletionListener(context => close()) - var havePair = false - var finished = false - var recordsSinceMetricsUpdate = 0 - - override def hasNext: Boolean = { - if (!finished && !havePair) { - finished = !reader.nextKeyValue - havePair = !finished - } - !finished - } - - override def next(): (K, V) = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - if (!finished) { - inputMetrics.incRecordsRead(1) - } - (reader.getCurrentKey, reader.getCurrentValue) - } - - private def close() { - try { - reader.close() - if (bytesReadCallback.isDefined) { - inputMetrics.updateBytesRead() - } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] || - split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) { - // If we can't get the bytes read from the FS stats, fall back to the split size, - // which may be inaccurate. - try { - inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) - } catch { - case e: java.io.IOException => - logWarning("Unable to get input size to set InputMetrics for task", e) - } - } - } catch { - case e: Exception => { - if (!Utils.inShutdown()) { - logWarning("Exception in RecordReader.close()", e) - } - } - } - } - } - new InterruptibleIterator(context, iter) - } - - /** Maps over a partition, providing the InputSplit that was used as the base of the partition. */ - @DeveloperApi - def mapPartitionsWithInputSplit[U: ClassTag]( - f: (InputSplit, Iterator[(K, V)]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = { - new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning) - } - - override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = { - val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value - val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match { - case Some(c) => - try { - val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]] - Some(HadoopRDD.convertSplitLocationInfo(infos)) - } catch { - case e : Exception => - logDebug("Failed to use InputSplit#getLocationInfo.", e) - None - } - case None => None - } - locs.getOrElse(split.getLocations.filter(_ != "localhost")) - } - - override def persist(storageLevel: StorageLevel): this.type = { - if (storageLevel.deserialized) { - logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" + - " behavior because Hadoop's RecordReader reuses the same Writable object for all records." + - " Use a map transformation to make copies of the records.") - } - super.persist(storageLevel) - } -} - -private[spark] object SqlNewHadoopRDD { - /** - * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to - * the given function rather than the index of the partition. - */ - private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag]( - prev: RDD[T], - f: (InputSplit, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false) - extends RDD[U](prev) { - - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None - - override def getPartitions: Array[SparkPartition] = firstParent[T].partitions - - override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = { - val partition = split.asInstanceOf[SqlNewHadoopPartition] - val inputSplit = partition.serializableHadoopSplit.value - f(inputSplit, firstParent[T].iterator(split, context)) - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala deleted file mode 100644 index 5c6ef2dc90..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ /dev/null @@ -1,581 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import java.util.{Date, UUID} - -import scala.collection.JavaConversions.asScalaIterator - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} - -import org.apache.spark._ -import org.apache.spark.mapred.SparkHadoopMapRedUtil -import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.types.StringType -import org.apache.spark.util.SerializableConfiguration - -private[sql] case class InsertIntoDataSource( - logicalRelation: LogicalRelation, - query: LogicalPlan, - overwrite: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] - val data = DataFrame(sqlContext, query) - // Apply the schema of the existing table to the new data. - val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) - relation.insert(df, overwrite) - - // Invalidate the cache. - sqlContext.cacheManager.invalidateCache(logicalRelation) - - Seq.empty[Row] - } -} - -/** - * A command for writing data to a [[HadoopFsRelation]]. Supports both overwriting and appending. - * Writing to dynamic partitions is also supported. Each [[InsertIntoHadoopFsRelation]] issues a - * single write job, and owns a UUID that identifies this job. Each concrete implementation of - * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for - * each task output file. This UUID is passed to executor side via a property named - * `spark.sql.sources.writeJobUUID`. - * - * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]] - * are used to write to normal tables and tables with dynamic partitions. - * - * Basic work flow of this command is: - * - * 1. Driver side setup, including output committer initialization and data source specific - * preparation work for the write job to be issued. - * 2. Issues a write job consists of one or more executor side tasks, each of which writes all - * rows within an RDD partition. - * 3. If no exception is thrown in a task, commits that task, otherwise aborts that task; If any - * exception is thrown during task commitment, also aborts that task. - * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is - * thrown during job commitment, also aborts the job. - */ -private[sql] case class InsertIntoHadoopFsRelation( - @transient relation: HadoopFsRelation, - @transient query: LogicalPlan, - mode: SaveMode) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - require( - relation.paths.length == 1, - s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") - - val hadoopConf = sqlContext.sparkContext.hadoopConfiguration - val outputPath = new Path(relation.paths.head) - val fs = outputPath.getFileSystem(hadoopConf) - val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - - val pathExists = fs.exists(qualifiedOutputPath) - val doInsertion = (mode, pathExists) match { - case (SaveMode.ErrorIfExists, true) => - sys.error(s"path $qualifiedOutputPath already exists.") - case (SaveMode.Overwrite, true) => - fs.delete(qualifiedOutputPath, true) - true - case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => - true - case (SaveMode.Ignore, exists) => - !exists - } - // If we are appending data to an existing dir. - val isAppend = pathExists && (mode == SaveMode.Append) - - if (doInsertion) { - val job = new Job(hadoopConf) - job.setOutputKeyClass(classOf[Void]) - job.setOutputValueClass(classOf[InternalRow]) - FileOutputFormat.setOutputPath(job, qualifiedOutputPath) - - // We create a DataFrame by applying the schema of relation to the data to make sure. - // We are writing data based on the expected schema, - val df = { - // For partitioned relation r, r.schema's column ordering can be different from the column - // ordering of data.logicalPlan (partition columns are all moved after data column). We - // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can - // safely apply the schema of r.schema to the data. - val project = Project( - relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query) - - sqlContext.internalCreateDataFrame( - DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema) - } - - val partitionColumns = relation.partitionColumns.fieldNames - if (partitionColumns.isEmpty) { - insert(new DefaultWriterContainer(relation, job, isAppend), df) - } else { - val writerContainer = new DynamicPartitionWriterContainer( - relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend) - insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns) - } - } - - Seq.empty[Row] - } - - /** - * Inserts the content of the [[DataFrame]] into a table without any partitioning columns. - */ - private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = { - // Uses local vals for serialization - val needsConversion = relation.needConversion - val dataSchema = relation.dataSchema - - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - writerContainer.driverSideSetup() - - try { - df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) - writerContainer.commitJob() - relation.refresh() - } catch { case cause: Throwable => - logError("Aborting job.", cause) - writerContainer.abortJob() - throw new SparkException("Job aborted.", cause) - } - - def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - // If anything below fails, we should abort the task. - try { - writerContainer.executorSideSetup(taskContext) - - val converter: InternalRow => Row = if (needsConversion) { - CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] - } else { - r: InternalRow => r.asInstanceOf[Row] - } - while (iterator.hasNext) { - val internalRow = iterator.next() - writerContainer.outputWriterForRow(internalRow).write(converter(internalRow)) - } - - writerContainer.commitTask() - } catch { case cause: Throwable => - logError("Aborting task.", cause) - writerContainer.abortTask() - throw new SparkException("Task failed while writing rows.", cause) - } - } - } - - /** - * Inserts the content of the [[DataFrame]] into a table with partitioning columns. - */ - private def insertWithDynamicPartitions( - sqlContext: SQLContext, - writerContainer: BaseWriterContainer, - df: DataFrame, - partitionColumns: Array[String]): Unit = { - // Uses a local val for serialization - val needsConversion = relation.needConversion - val dataSchema = relation.dataSchema - - require( - df.schema == relation.schema, - s"""DataFrame must have the same schema as the relation to which is inserted. - |DataFrame schema: ${df.schema} - |Relation schema: ${relation.schema} - """.stripMargin) - - val partitionColumnsInSpec = relation.partitionColumns.fieldNames - require( - partitionColumnsInSpec.sameElements(partitionColumns), - s"""Partition columns mismatch. - |Expected: ${partitionColumnsInSpec.mkString(", ")} - |Actual: ${partitionColumns.mkString(", ")} - """.stripMargin) - - val output = df.queryExecution.executedPlan.output - val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name)) - val codegenEnabled = df.sqlContext.conf.codegenEnabled - - // This call shouldn't be put into the `try` block below because it only initializes and - // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called. - writerContainer.driverSideSetup() - - try { - df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _) - writerContainer.commitJob() - relation.refresh() - } catch { case cause: Throwable => - logError("Aborting job.", cause) - writerContainer.abortJob() - throw new SparkException("Job aborted.", cause) - } - - def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { - // If anything below fails, we should abort the task. - try { - writerContainer.executorSideSetup(taskContext) - - // Projects all partition columns and casts them to strings to build partition directories. - val partitionCasts = partitionOutput.map(Cast(_, StringType)) - val partitionProj = newProjection(codegenEnabled, partitionCasts, output) - val dataProj = newProjection(codegenEnabled, dataOutput, output) - - val dataConverter: InternalRow => Row = if (needsConversion) { - CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] - } else { - r: InternalRow => r.asInstanceOf[Row] - } - - while (iterator.hasNext) { - val internalRow = iterator.next() - val partitionPart = partitionProj(internalRow) - val dataPart = dataConverter(dataProj(internalRow)) - writerContainer.outputWriterForRow(partitionPart).write(dataPart) - } - - writerContainer.commitTask() - } catch { case cause: Throwable => - logError("Aborting task.", cause) - writerContainer.abortTask() - throw new SparkException("Task failed while writing rows.", cause) - } - } - } - - // This is copied from SparkPlan, probably should move this to a more general place. - private def newProjection( - codegenEnabled: Boolean, - expressions: Seq[Expression], - inputSchema: Seq[Attribute]): Projection = { - log.debug( - s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") - if (codegenEnabled) { - - try { - GenerateProjection.generate(expressions, inputSchema) - } catch { - case e: Exception => - if (sys.props.contains("spark.testing")) { - throw e - } else { - log.error("failed to generate projection, fallback to interpreted", e) - new InterpretedProjection(expressions, inputSchema) - } - } - } else { - new InterpretedProjection(expressions, inputSchema) - } - } -} - -private[sql] abstract class BaseWriterContainer( - @transient val relation: HadoopFsRelation, - @transient job: Job, - isAppend: Boolean) - extends SparkHadoopMapReduceUtil - with Logging - with Serializable { - - protected val serializableConf = new SerializableConfiguration(job.getConfiguration) - - // This UUID is used to avoid output file name collision between different appending write jobs. - // These jobs may belong to different SparkContext instances. Concrete data source implementations - // may use this UUID to generate unique file names (e.g., `part-r--.parquet`). - // The reason why this ID is used to identify a job rather than a single task output file is - // that, speculative tasks must generate the same output file name as the original task. - private val uniqueWriteJobId = UUID.randomUUID() - - // This is only used on driver side. - @transient private val jobContext: JobContext = job - - // The following fields are initialized and used on both driver and executor side. - @transient protected var outputCommitter: OutputCommitter = _ - @transient private var jobId: JobID = _ - @transient private var taskId: TaskID = _ - @transient private var taskAttemptId: TaskAttemptID = _ - @transient protected var taskAttemptContext: TaskAttemptContext = _ - - protected val outputPath: String = { - assert( - relation.paths.length == 1, - s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") - relation.paths.head - } - - protected val dataSchema = relation.dataSchema - - protected var outputWriterFactory: OutputWriterFactory = _ - - private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _ - - def driverSideSetup(): Unit = { - setupIDs(0, 0, 0) - setupConf() - - // This UUID is sent to executor side together with the serialized `Configuration` object within - // the `Job` instance. `OutputWriters` on the executor side should use this UUID to generate - // unique task output files. - job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString) - - // Order of the following two lines is important. For Hadoop 1, TaskAttemptContext constructor - // clones the Configuration object passed in. If we initialize the TaskAttemptContext first, - // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext. - // - // Also, the `prepareJobForWrite` call must happen before initializing output format and output - // committer, since their initialization involve the job configuration, which can be potentially - // decorated in `prepareJobForWrite`. - outputWriterFactory = relation.prepareJobForWrite(job) - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - - outputFormatClass = job.getOutputFormatClass - outputCommitter = newOutputCommitter(taskAttemptContext) - outputCommitter.setupJob(jobContext) - } - - def executorSideSetup(taskContext: TaskContext): Unit = { - setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber()) - setupConf() - taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId) - outputCommitter = newOutputCommitter(taskAttemptContext) - outputCommitter.setupTask(taskAttemptContext) - initWriters() - } - - protected def getWorkPath: String = { - outputCommitter match { - // FileOutputCommitter writes to a temporary location returned by `getWorkPath`. - case f: MapReduceFileOutputCommitter => f.getWorkPath.toString - case _ => outputPath - } - } - - private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = { - val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context) - - if (isAppend) { - // If we are appending data to an existing dir, we will only use the output committer - // associated with the file output format since it is not safe to use a custom - // committer for appending. For example, in S3, direct parquet output committer may - // leave partial data in the destination dir when the the appending job fails. - logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + - "for appending.") - defaultOutputCommitter - } else { - val committerClass = context.getConfiguration.getClass( - SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) - - Option(committerClass).map { clazz => - logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}") - - // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat - // has an associated output committer. To override this output committer, - // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS. - // If a data source needs to override the output committer, it needs to set the - // output committer in prepareForWrite method. - if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) { - // The specified output committer is a FileOutputCommitter. - // So, we will use the FileOutputCommitter-specified constructor. - val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext]) - ctor.newInstance(new Path(outputPath), context) - } else { - // The specified output committer is just a OutputCommitter. - // So, we will use the no-argument constructor. - val ctor = clazz.getDeclaredConstructor() - ctor.newInstance() - } - }.getOrElse { - // If output committer class is not set, we will use the one associated with the - // file output format. - logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}") - defaultOutputCommitter - } - } - } - - private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = { - this.jobId = SparkHadoopWriter.createJobID(new Date, jobId) - this.taskId = new TaskID(this.jobId, true, splitId) - this.taskAttemptId = new TaskAttemptID(taskId, attemptId) - } - - private def setupConf(): Unit = { - serializableConf.value.set("mapred.job.id", jobId.toString) - serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString) - serializableConf.value.set("mapred.task.id", taskAttemptId.toString) - serializableConf.value.setBoolean("mapred.task.is.map", true) - serializableConf.value.setInt("mapred.task.partition", 0) - } - - // Called on executor side when writing rows - def outputWriterForRow(row: InternalRow): OutputWriter - - protected def initWriters(): Unit - - def commitTask(): Unit = { - SparkHadoopMapRedUtil.commitTask( - outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId) - } - - def abortTask(): Unit = { - if (outputCommitter != null) { - outputCommitter.abortTask(taskAttemptContext) - } - logError(s"Task attempt $taskAttemptId aborted.") - } - - def commitJob(): Unit = { - outputCommitter.commitJob(jobContext) - logInfo(s"Job $jobId committed.") - } - - def abortJob(): Unit = { - if (outputCommitter != null) { - outputCommitter.abortJob(jobContext, JobStatus.State.FAILED) - } - logError(s"Job $jobId aborted.") - } -} - -private[sql] class DefaultWriterContainer( - @transient relation: HadoopFsRelation, - @transient job: Job, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { - - @transient private var writer: OutputWriter = _ - - override protected def initWriters(): Unit = { - taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath) - writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) - } - - override def outputWriterForRow(row: InternalRow): OutputWriter = writer - - override def commitTask(): Unit = { - try { - assert(writer != null, "OutputWriter instance should have been initialized") - writer.close() - super.commitTask() - } catch { case cause: Throwable => - // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will - // cause `abortTask()` to be invoked. - throw new RuntimeException("Failed to commit task", cause) - } - } - - override def abortTask(): Unit = { - try { - // It's possible that the task fails before `writer` gets initialized - if (writer != null) { - writer.close() - } - } finally { - super.abortTask() - } - } -} - -private[sql] class DynamicPartitionWriterContainer( - @transient relation: HadoopFsRelation, - @transient job: Job, - partitionColumns: Array[String], - defaultPartitionName: String, - isAppend: Boolean) - extends BaseWriterContainer(relation, job, isAppend) { - - // All output writers are created on executor side. - @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _ - - override protected def initWriters(): Unit = { - outputWriters = new java.util.HashMap[String, OutputWriter] - } - - // The `row` argument is supposed to only contain partition column values which have been casted - // to strings. - override def outputWriterForRow(row: InternalRow): OutputWriter = { - val partitionPath = { - val partitionPathBuilder = new StringBuilder - var i = 0 - - while (i < partitionColumns.length) { - val col = partitionColumns(i) - val partitionValueString = { - val string = row.getString(i) - if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string) - } - - if (i > 0) { - partitionPathBuilder.append(Path.SEPARATOR_CHAR) - } - - partitionPathBuilder.append(s"$col=$partitionValueString") - i += 1 - } - - partitionPathBuilder.toString() - } - - val writer = outputWriters.get(partitionPath) - if (writer.eq(null)) { - val path = new Path(getWorkPath, partitionPath) - taskAttemptContext.getConfiguration.set( - "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) - val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) - outputWriters.put(partitionPath, newWriter) - newWriter - } else { - writer - } - } - - private def clearOutputWriters(): Unit = { - if (!outputWriters.isEmpty) { - asScalaIterator(outputWriters.values().iterator()).foreach(_.close()) - outputWriters.clear() - } - } - - override def commitTask(): Unit = { - try { - clearOutputWriters() - super.commitTask() - } catch { case cause: Throwable => - throw new RuntimeException("Failed to commit task", cause) - } - } - - override def abortTask(): Unit = { - try { - clearOutputWriters() - } finally { - super.abortTask() - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala deleted file mode 100644 index 5a8c97c773..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ /dev/null @@ -1,493 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import scala.language.{existentials, implicitConversions} -import scala.util.matching.Regex - -import org.apache.hadoop.fs.Path - -import org.apache.spark.Logging -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.catalyst.AbstractSparkSQLParser -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode} -import org.apache.spark.util.Utils - -/** - * A parser for foreign DDL commands. - */ -private[sql] class DDLParser( - parseQuery: String => LogicalPlan) - extends AbstractSparkSQLParser with DataTypeParser with Logging { - - def parse(input: String, exceptionOnError: Boolean): LogicalPlan = { - try { - parse(input) - } catch { - case ddlException: DDLException => throw ddlException - case _ if !exceptionOnError => parseQuery(input) - case x: Throwable => throw x - } - } - - // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword` - // properties via reflection the class in runtime for constructing the SqlLexical object - protected val CREATE = Keyword("CREATE") - protected val TEMPORARY = Keyword("TEMPORARY") - protected val TABLE = Keyword("TABLE") - protected val IF = Keyword("IF") - protected val NOT = Keyword("NOT") - protected val EXISTS = Keyword("EXISTS") - protected val USING = Keyword("USING") - protected val OPTIONS = Keyword("OPTIONS") - protected val DESCRIBE = Keyword("DESCRIBE") - protected val EXTENDED = Keyword("EXTENDED") - protected val AS = Keyword("AS") - protected val COMMENT = Keyword("COMMENT") - protected val REFRESH = Keyword("REFRESH") - - protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable - - protected def start: Parser[LogicalPlan] = ddl - - /** - * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * or - * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS] - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * or - * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS] - * USING org.apache.spark.sql.avro - * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")` - * AS SELECT ... - */ - protected lazy val createTable: Parser[LogicalPlan] = - // TODO: Support database.table. - (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ - tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { - case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => - if (temp.isDefined && allowExisting.isDefined) { - throw new DDLException( - "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.") - } - - val options = opts.getOrElse(Map.empty[String, String]) - if (query.isDefined) { - if (columns.isDefined) { - throw new DDLException( - "a CREATE TABLE AS SELECT statement does not allow column definitions.") - } - // When IF NOT EXISTS clause appears in the query, the save mode will be ignore. - val mode = if (allowExisting.isDefined) { - SaveMode.Ignore - } else if (temp.isDefined) { - SaveMode.Overwrite - } else { - SaveMode.ErrorIfExists - } - - val queryPlan = parseQuery(query.get) - CreateTableUsingAsSelect(tableName, - provider, - temp.isDefined, - Array.empty[String], - mode, - options, - queryPlan) - } else { - val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing( - tableName, - userSpecifiedSchema, - provider, - temp.isDefined, - options, - allowExisting.isDefined, - managedIfNoPath = false) - } - } - - protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" - - /* - * describe [extended] table avroTable - * This will display all columns of table `avroTable` includes column_name,column_type,comment - */ - protected lazy val describeTable: Parser[LogicalPlan] = - (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ { - case e ~ db ~ tbl => - val tblIdentifier = db match { - case Some(dbName) => - Seq(dbName, tbl) - case None => - Seq(tbl) - } - DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined) - } - - protected lazy val refreshTable: Parser[LogicalPlan] = - REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ { - case maybeDatabaseName ~ tableName => - RefreshTable(maybeDatabaseName.getOrElse("default"), tableName) - } - - protected lazy val options: Parser[Map[String, String]] = - "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } - - protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")} - - override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch( - s"identifier matching regex $regex", { - case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str - case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str - } - ) - - protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ { - case name => name - } - - protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ { - case parts => parts.mkString(".") - } - - protected lazy val pair: Parser[(String, String)] = - optionName ~ stringLit ^^ { case k ~ v => (k, v) } - - protected lazy val column: Parser[StructField] = - ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm => - val meta = cm match { - case Some(comment) => - new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build() - case None => Metadata.empty - } - - StructField(columnName, typ, nullable = true, meta) - } -} - -private[sql] object ResolvedDataSource { - - private val builtinSources = Map( - "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource", - "json" -> "org.apache.spark.sql.json.DefaultSource", - "parquet" -> "org.apache.spark.sql.parquet.DefaultSource", - "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource" - ) - - /** Given a provider name, look up the data source class definition. */ - def lookupDataSource(provider: String): Class[_] = { - val loader = Utils.getContextOrSparkClassLoader - - if (builtinSources.contains(provider)) { - return loader.loadClass(builtinSources(provider)) - } - - try { - loader.loadClass(provider) - } catch { - case cnf: java.lang.ClassNotFoundException => - try { - loader.loadClass(provider + ".DefaultSource") - } catch { - case cnf: java.lang.ClassNotFoundException => - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { - sys.error("The ORC data source must be used with Hive support enabled.") - } else { - sys.error(s"Failed to load class for data source: $provider") - } - } - } - } - - /** Create a [[ResolvedDataSource]] for reading data in. */ - def apply( - sqlContext: SQLContext, - userSpecifiedSchema: Option[StructType], - partitionColumns: Array[String], - provider: String, - options: Map[String, String]): ResolvedDataSource = { - val clazz: Class[_] = lookupDataSource(provider) - def className: String = clazz.getCanonicalName - val relation = userSpecifiedSchema match { - case Some(schema: StructType) => clazz.newInstance() match { - case dataSource: SchemaRelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema) - case dataSource: HadoopFsRelationProvider => - val maybePartitionsSchema = if (partitionColumns.isEmpty) { - None - } else { - Some(partitionColumnsSchema(schema, partitionColumns)) - } - - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray - } - - val dataSchema = - StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable - - dataSource.createRelation( - sqlContext, - paths, - Some(dataSchema), - maybePartitionsSchema, - caseInsensitiveOptions) - case dataSource: org.apache.spark.sql.sources.RelationProvider => - throw new AnalysisException(s"$className does not allow user-specified schemas.") - case _ => - throw new AnalysisException(s"$className is not a RelationProvider.") - } - - case None => clazz.newInstance() match { - case dataSource: RelationProvider => - dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options)) - case dataSource: HadoopFsRelationProvider => - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val paths = { - val patternPath = new Path(caseInsensitiveOptions("path")) - val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray - } - dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions) - case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider => - throw new AnalysisException( - s"A schema needs to be specified when using $className.") - case _ => - throw new AnalysisException( - s"$className is neither a RelationProvider nor a FSBasedRelationProvider.") - } - } - new ResolvedDataSource(clazz, relation) - } - - private def partitionColumnsSchema( - schema: StructType, - partitionColumns: Array[String]): StructType = { - StructType(partitionColumns.map { col => - schema.find(_.name == col).getOrElse { - throw new RuntimeException(s"Partition column $col not found in schema $schema") - } - }).asNullable - } - - /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */ - def apply( - sqlContext: SQLContext, - provider: String, - partitionColumns: Array[String], - mode: SaveMode, - options: Map[String, String], - data: DataFrame): ResolvedDataSource = { - if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) { - throw new AnalysisException("Cannot save interval data type into external storage.") - } - val clazz: Class[_] = lookupDataSource(provider) - val relation = clazz.newInstance() match { - case dataSource: CreatableRelationProvider => - dataSource.createRelation(sqlContext, mode, options, data) - case dataSource: HadoopFsRelationProvider => - // Don't glob path for the write path. The contracts here are: - // 1. Only one output path can be specified on the write path; - // 2. Output path must be a legal HDFS style file system path; - // 3. It's OK that the output path doesn't exist yet; - val caseInsensitiveOptions = new CaseInsensitiveMap(options) - val outputPath = { - val path = new Path(caseInsensitiveOptions("path")) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - path.makeQualified(fs.getUri, fs.getWorkingDirectory) - } - val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) - val r = dataSource.createRelation( - sqlContext, - Array(outputPath.toString), - Some(dataSchema.asNullable), - Some(partitionColumnsSchema(data.schema, partitionColumns)), - caseInsensitiveOptions) - - // For partitioned relation r, r.schema's column ordering can be different from the column - // ordering of data.logicalPlan (partition columns are all moved after data column). This - // will be adjusted within InsertIntoHadoopFsRelation. - sqlContext.executePlan( - InsertIntoHadoopFsRelation( - r, - data.logicalPlan, - mode)).toRdd - r - case _ => - sys.error(s"${clazz.getCanonicalName} does not allow create table as select.") - } - new ResolvedDataSource(clazz, relation) - } -} - -private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation) - -/** - * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. - * @param table The table to be described. - * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. - * It is effective only when the table is a Hive table. - */ -private[sql] case class DescribeCommand( - table: LogicalPlan, - isExtended: Boolean) extends LogicalPlan with Command { - - override def children: Seq[LogicalPlan] = Seq.empty - override val output: Seq[Attribute] = Seq( - // Column names are based on Hive. - AttributeReference("col_name", StringType, nullable = false, - new MetadataBuilder().putString("comment", "name of the column").build())(), - AttributeReference("data_type", StringType, nullable = false, - new MetadataBuilder().putString("comment", "data type of the column").build())(), - AttributeReference("comment", StringType, nullable = false, - new MetadataBuilder().putString("comment", "comment of the column").build())()) -} - -/** - * Used to represent the operation of create table using a data source. - * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - */ -private[sql] case class CreateTableUsing( - tableName: String, - userSpecifiedSchema: Option[StructType], - provider: String, - temporary: Boolean, - options: Map[String, String], - allowExisting: Boolean, - managedIfNoPath: Boolean) extends LogicalPlan with Command { - - override def output: Seq[Attribute] = Seq.empty - override def children: Seq[LogicalPlan] = Seq.empty -} - -/** - * A node used to support CTAS statements and saveAsTable for the data source API. - * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer - * can analyze the logical plan that will be used to populate the table. - * So, [[PreWriteCheck]] can detect cases that are not allowed. - */ -private[sql] case class CreateTableUsingAsSelect( - tableName: String, - provider: String, - temporary: Boolean, - partitionColumns: Array[String], - mode: SaveMode, - options: Map[String, String], - child: LogicalPlan) extends UnaryNode { - override def output: Seq[Attribute] = Seq.empty[Attribute] - // TODO: Override resolved after we support databaseName. - // override lazy val resolved = databaseName != None && childrenResolved -} - -private[sql] case class CreateTempTableUsing( - tableName: String, - userSpecifiedSchema: Option[StructType], - provider: String, - options: Map[String, String]) extends RunnableCommand { - - def run(sqlContext: SQLContext): Seq[InternalRow] = { - val resolved = ResolvedDataSource( - sqlContext, userSpecifiedSchema, Array.empty[String], provider, options) - sqlContext.registerDataFrameAsTable( - DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) - Seq.empty - } -} - -private[sql] case class CreateTempTableUsingAsSelect( - tableName: String, - provider: String, - partitionColumns: Array[String], - mode: SaveMode, - options: Map[String, String], - query: LogicalPlan) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[InternalRow] = { - val df = DataFrame(sqlContext, query) - val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df) - sqlContext.registerDataFrameAsTable( - DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName) - - Seq.empty - } -} - -private[sql] case class RefreshTable(databaseName: String, tableName: String) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[InternalRow] = { - // Refresh the given table's metadata first. - sqlContext.catalog.refreshTable(databaseName, tableName) - - // If this table is cached as a InMemoryColumnarRelation, drop the original - // cached version and make the new version cached lazily. - val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName)) - // Use lookupCachedData directly since RefreshTable also takes databaseName. - val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty - if (isCached) { - // Create a data frame to represent the table. - // TODO: Use uncacheTable once it supports database name. - val df = DataFrame(sqlContext, logicalPlan) - // Uncache the logicalPlan. - sqlContext.cacheManager.tryUncacheQuery(df, blocking = true) - // Cache it again. - sqlContext.cacheManager.cacheQuery(df, Some(tableName)) - } - - Seq.empty[InternalRow] - } -} - -/** - * Builds a map in which keys are case insensitive - */ -protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] - with Serializable { - - val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase)) - - override def get(k: String): Option[String] = baseMap.get(k.toLowerCase) - - override def + [B1 >: String](kv: (String, B1)): Map[String, B1] = - baseMap + kv.copy(_1 = kv._1.toLowerCase) - - override def iterator: Iterator[(String, String)] = baseMap.iterator - - override def -(key: String): Map[String, String] = baseMap - key.toLowerCase -} - -/** - * The exception thrown from the DDL parser. - */ -protected[sql] class DDLException(message: String) extends Exception(message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala index 24e86ca415..4d942e4f92 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.sources +//////////////////////////////////////////////////////////////////////////////////////////////////// +// This file defines all the filters that we can push down to the data sources. +//////////////////////////////////////////////////////////////////////////////////////////////////// + /** * A filter predicate for data sources. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 2cd8b358d8..7cd005b959 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.sources import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -33,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.execution.RDDConversions +import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition} import org.apache.spark.sql.types.StructType import org.apache.spark.sql._ import org.apache.spark.util.SerializableConfiguration @@ -523,7 +523,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio }) } - private[sources] final def buildScan( + private[sql] final def buildScan( requiredColumns: Array[String], filters: Array[Filter], inputPaths: Array[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala deleted file mode 100644 index 40ee048e26..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources - -import org.apache.spark.sql.{SaveMode, AnalysisException} -import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog} -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias} -import org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.types.DataType - -/** - * A rule to do pre-insert data type casting and field renaming. Before we insert into - * an [[InsertableRelation]], we will use this rule to make sure that - * the columns to be inserted have the correct data type and fields have the correct names. - */ -private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Wait until children are resolved. - case p: LogicalPlan if !p.childrenResolved => p - - // We are inserting into an InsertableRelation or HadoopFsRelation. - case i @ InsertIntoTable( - l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => { - // First, make sure the data to be inserted have the same number of fields with the - // schema of the relation. - if (l.output.size != child.output.size) { - sys.error( - s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " + - s"statement generates the same number of columns as its schema.") - } - castAndRenameChildOutput(i, l.output, child) - } - } - - /** If necessary, cast data types and rename fields to the expected types and names. */ - def castAndRenameChildOutput( - insertInto: InsertIntoTable, - expectedOutput: Seq[Attribute], - child: LogicalPlan): InsertIntoTable = { - val newChildOutput = expectedOutput.zip(child.output).map { - case (expected, actual) => - val needCast = !expected.dataType.sameType(actual.dataType) - // We want to make sure the filed names in the data to be inserted exactly match - // names in the schema. - val needRename = expected.name != actual.name - (needCast, needRename) match { - case (true, _) => Alias(Cast(actual, expected.dataType), expected.name)() - case (false, true) => Alias(actual, expected.name)() - case (_, _) => actual - } - } - - if (newChildOutput == child.output) { - insertInto - } else { - insertInto.copy(child = Project(newChildOutput, child)) - } - } -} - -/** - * A rule to do various checks before inserting into or writing to a data source table. - */ -private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { - def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } - - def apply(plan: LogicalPlan): Unit = { - plan.foreach { - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) => - // Right now, we do not support insert into a data source table with partition specs. - if (partition.nonEmpty) { - failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") - } else { - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation) => src - } - if (srcRelations.contains(t)) { - failAnalysis( - "Cannot insert overwrite into table that is also being read from.") - } else { - // OK - } - } - - case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) => - // We need to make sure the partition columns specified by users do match partition - // columns of the relation. - val existingPartitionColumns = r.partitionColumns.fieldNames.toSet - val specifiedPartitionColumns = part.keySet - if (existingPartitionColumns != specifiedPartitionColumns) { - failAnalysis(s"Specified partition columns " + - s"(${specifiedPartitionColumns.mkString(", ")}) " + - s"do not match the partition columns of the table. Please use " + - s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") - } else { - // OK - } - - case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) => - // The relation in l is not an InsertableRelation. - failAnalysis(s"$l does not allow insertion.") - - case logical.InsertIntoTable(t, _, _, _, _) => - if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) { - failAnalysis(s"Inserting into an RDD-based table is not allowed.") - } else { - // OK - } - - case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => - // When the SaveMode is Overwrite, we need to check if the table is an input table of - // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (catalog.tableExists(Seq(tableName))) { - // Need to remove SubQuery operator. - EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { - // Only do the check if the table is a data source table - // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation) => - // Get all input data source relations of the query. - val srcRelations = query.collect { - case LogicalRelation(src: BaseRelation) => src - } - if (srcRelations.contains(dest)) { - failAnalysis( - s"Cannot overwrite table $tableName that is also being read from.") - } else { - // OK - } - - case _ => // OK - } - } else { - // OK - } - - case _ => // OK - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 3475f9dd67..1d04513a44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -26,8 +26,8 @@ import org.scalactic.Tolerance._ import org.apache.spark.sql.{QueryTest, Row, SQLConf} import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.json.InferSchema.compatibleType -import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index a2763c78b6..23df102cd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -24,7 +24,7 @@ import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.sources.LogicalRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala index 37b0a9fbf7..4f98776b91 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala @@ -28,11 +28,11 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.sources.PartitioningUtils._ -import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec} +import org.apache.spark.sql.execution.datasources.{LogicalRelation, PartitionSpec, Partition, PartitioningUtils} import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.apache.spark.unsafe.types.UTF8String +import PartitioningUtils._ // The data where the partitioning key exists only in the directory structure. case class ParquetData(intField: Int, stringField: String) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index a71088430b..1907e643c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -22,6 +22,7 @@ import java.io.{File, IOException} import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.datasources.DDLException import org.apache.spark.util.Utils class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 296b0d6f74..3cbf5467b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.execution.datasources.ResolvedDataSource class ResolvedDataSourceSuite extends SparkFunSuite { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4684d48aff..cec7685bb6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -44,9 +44,9 @@ import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand} +import org.apache.spark.sql.execution.datasources.{PreWriteCheck, PreInsertCastAndRename, DataSourceStrategy} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} -import org.apache.spark.sql.sources.DataSourceStrategy import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -384,11 +384,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { catalog.PreInsertionCasts :: ExtractPythonUDFs :: ResolveHiveWindowFunction :: - sources.PreInsertCastAndRename :: + PreInsertCastAndRename :: Nil override val extendedCheckRules = Seq( - sources.PreWriteCheck(catalog) + PreWriteCheck(catalog) ) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b15261b791..0a2121c955 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import scala.collection.JavaConversions._ + import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} @@ -28,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions._ @@ -35,14 +38,12 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.execution.datasources +import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.parquet.ParquetRelation2 -import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource} import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources} -/* Implicit conversions */ -import scala.collection.JavaConversions._ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext) extends Catalog with Logging { @@ -278,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive parquetRelation.paths.toSet == pathsInMetastore.toSet && logical.schema.sameType(metastoreSchema) && parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[sources.Partition]) + PartitionSpec(StructType(Nil), Array.empty[datasources.Partition]) } if (useCached) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 7fc517b646..f5574509b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.execution.ExplainCommand -import org.apache.spark.sql.sources.DescribeCommand +import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.HiveShim._ import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 9638a8201e..a22c3292ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _} +import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand} import org.apache.spark.sql.types.StringType diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 71fa3e9c33..a47f9a4feb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.RunnableCommand +import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 48d35a60a7..de63ee56dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -37,6 +37,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.datasources.PartitionSpec import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim} import org.apache.spark.sql.sources.{Filter, _} import org.apache.spark.sql.types.StructType diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index d910af22c3..e403f32efa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -28,12 +28,12 @@ import org.apache.hadoop.mapred.InvalidInputException import org.apache.spark.Logging import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.ParquetRelation2 -import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index c9dd4c0935..efb04bf3d5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -22,11 +22,11 @@ import java.io._ import org.scalatest.{BeforeAndAfterAll, GivenWhenThen} import org.apache.spark.{Logging, SparkFunSuite} -import org.apache.spark.sql.sources.DescribeCommand -import org.apache.spark.sql.execution.{SetCommand, ExplainCommand} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.{SetCommand, ExplainCommand} +import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.test.TestHive /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 05a1f0094e..0342826542 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -23,12 +23,12 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation} import org.apache.spark.sql.parquet.ParquetRelation2 -import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ case class Nested1(f1: Nested2) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 9d79a4b007..82a8daf8b4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -23,12 +23,12 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD} +import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan} -import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation} import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index afecf9675e..1cef83fd5e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.sources -import scala.collection.JavaConversions._ - import java.io.File +import scala.collection.JavaConversions._ + import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -31,10 +31,12 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ + abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { override lazy val sqlContext: SQLContext = TestHive -- cgit v1.2.3