diff options
8 files changed, 59 insertions, 17 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml index b16bcd3528..8f0ae7b1d6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -61,11 +61,13 @@ <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> @@ -141,7 +141,7 @@ <hive.parquet.version>1.6.0</hive.parquet.version> <jetty.version>9.2.16.v20160414</jetty.version> <javaxservlet.version>3.1.0</javaxservlet.version> - <chill.version>0.8.0</chill.version> + <chill.version>0.9.1</chill.version><!-- TODO 2.12 use 0.8.2? --> <ivy.version>2.4.0</ivy.version> <oro.version>2.0.8</oro.version> <codahale.metrics.version>3.1.2</codahale.metrics.version> @@ -2669,10 +2669,10 @@ <property><name>scala-2.12</name></property> </activation> <properties> - <scala.version>2.12.0</scala.version> + <scala.version>2.12.1</scala.version> <scala.binary.version>2.12</scala.binary.version> - <!-- This corresponds to https://github.com/twitter/chill/pull/253 --> - <chill.version>0.8.2-SNAPSHOT</chill.version> + <jline.version>2.14.1</jline.version> + <jline.groupid>jline</jline.groupid> <!-- This incorporates https://github.com/FasterXML/jackson-module-scala/pull/247 --> <fasterxml.jackson.version>2.8.4</fasterxml.jackson.version> </properties> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ef3cc787e2..167a97661b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -117,14 +117,15 @@ object SparkBuild extends PomBuild { lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") - lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq( + // TODO 2.12 + lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq.empty /*Seq( libraryDependencies += compilerPlugin( "com.typesafe.genjavadoc" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full), scalacOptions ++= Seq( "-P:genjavadoc:out=" + (target.value / "java"), "-P:genjavadoc:strictVisibility=true" // hide package private types ) - ) + )*/ lazy val scalaStyleRules = Project("scalaStyleRules", file("scalastyle")) .settings( diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 76a66c1bea..f2ad7adcd1 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -21,7 +21,7 @@ import java.io.BufferedReader import scala.Predef.{println => _, _} import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} +import scala.tools.nsc.interpreter.{ILoop, JPrintWriter, replProps} import scala.tools.nsc.util.stringFromStream import scala.util.Properties.{javaVersion, javaVmName, versionString} @@ -63,8 +63,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") - processLine("import spark.sql") - processLine("import org.apache.spark.sql.functions._") + //TODO 2.12 processLine("import spark.sql") + //TODO 2.12 processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } } @@ -86,11 +86,17 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) echo("Type :help for more information.") } + private def initCommand(): Result = { + initializeSpark + Result(keepRunning = true, lineToRecord = None) + } + /** Add repl commands that needs to be blocked. e.g. reset */ private val blockedCommands = Set[String]() /** Standard commands */ lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = + LoopCommand.nullary("initSpark", "initialize spark context", initCommand) :: standardCommands.filter(cmd => !blockedCommands(cmd.name)) /** Available commands */ @@ -101,11 +107,10 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) * sees any files, so that the Spark context is visible in those files. This is a bit of a * hack, but there isn't another hook available to us at this point. */ - override def loadFiles(settings: Settings): Unit = { - initializeSpark() - super.loadFiles(settings) - } - + // override def loadFiles(settings: Settings): Unit = { + // initializeSpark() + // super.loadFiles(settings) + // } override def resetCommand(line: String): Unit = { super.resetCommand(line) initializeSpark() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index 052d85ad33..b78285c4a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -458,7 +458,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { val columnEquals = df.sparkSession.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => - val typeMatches = (targetType, f.dataType) match { + val typeMatches = ((targetType, f.dataType): @unchecked) match { case (NumericType, dt) => dt.isInstanceOf[NumericType] case (StringType, dt) => dt == StringType } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ebf03e1bf8..e4fa221d31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -857,6 +857,8 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman case EXTERNAL => " EXTERNAL TABLE" case VIEW => " VIEW" case MANAGED => " TABLE" + case CatalogTableType(other) => + throw new AnalysisException(s"Unsupported table type '$other'") } builder ++= s"CREATE$tableTypeString ${table.quotedString}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ffd7f6c750..f8d456dd6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -236,6 +236,37 @@ abstract class PartitioningAwareFileIndex( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path + } + () + } + val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val discovered = PartitioningAwareFileIndex.bulkListLeafFiles( + pathsToFetch, hadoopConf, filter, sparkSession) + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + output + } } object PartitioningAwareFileIndex { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5fc3c2753b..49d6b2d83c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -770,8 +770,9 @@ object JdbcUtils extends Logging { case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n) case _ => df } - repartitionedDF.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel) + repartitionedDF.foreachPartition((iterator: Iterator[Row]) => + savePartition( + getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel): Unit ) } |