From a3860c59deebf996f0c32bcc0d15b2903216e732 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 10 Jan 2017 14:34:33 -0800 Subject: REPL compiles and runs --- assembly/pom.xml | 2 ++ pom.xml | 8 +++--- project/SparkBuild.scala | 5 ++-- .../scala/org/apache/spark/repl/SparkILoop.scala | 21 +++++++++------ .../apache/spark/sql/DataFrameNaFunctions.scala | 2 +- .../spark/sql/execution/command/tables.scala | 2 ++ .../datasources/PartitioningAwareFileIndex.scala | 31 ++++++++++++++++++++++ .../sql/execution/datasources/jdbc/JdbcUtils.scala | 5 ++-- 8 files changed, 59 insertions(+), 17 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index b16bcd3528..8f0ae7b1d6 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -61,11 +61,13 @@ spark-streaming_${scala.binary.version} ${project.version} + org.apache.spark spark-sql_${scala.binary.version} diff --git a/pom.xml b/pom.xml index c562396c64..6684dd2b3f 100644 --- a/pom.xml +++ b/pom.xml @@ -141,7 +141,7 @@ 1.6.0 9.2.16.v20160414 3.1.0 - 0.8.0 + 0.9.1 2.4.0 2.0.8 3.1.2 @@ -2669,10 +2669,10 @@ scala-2.12 - 2.12.0 + 2.12.1 2.12 - - 0.8.2-SNAPSHOT + 2.14.1 + jline 2.8.4 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ef3cc787e2..167a97661b 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -117,14 +117,15 @@ object SparkBuild extends PomBuild { lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") - lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq( + // TODO 2.12 + lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq.empty /*Seq( libraryDependencies += compilerPlugin( "com.typesafe.genjavadoc" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full), scalacOptions ++= Seq( "-P:genjavadoc:out=" + (target.value / "java"), "-P:genjavadoc:strictVisibility=true" // hide package private types ) - ) + )*/ lazy val scalaStyleRules = Project("scalaStyleRules", file("scalastyle")) .settings( diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 76a66c1bea..f2ad7adcd1 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -21,7 +21,7 @@ import java.io.BufferedReader import scala.Predef.{println => _, _} import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} +import scala.tools.nsc.interpreter.{ILoop, JPrintWriter, replProps} import scala.tools.nsc.util.stringFromStream import scala.util.Properties.{javaVersion, javaVmName, versionString} @@ -63,8 +63,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") - processLine("import spark.sql") - processLine("import org.apache.spark.sql.functions._") + //TODO 2.12 processLine("import spark.sql") + //TODO 2.12 processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } } @@ -86,11 +86,17 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) echo("Type :help for more information.") } + private def initCommand(): Result = { + initializeSpark + Result(keepRunning = true, lineToRecord = None) + } + /** Add repl commands that needs to be blocked. e.g. reset */ private val blockedCommands = Set[String]() /** Standard commands */ lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = + LoopCommand.nullary("initSpark", "initialize spark context", initCommand) :: standardCommands.filter(cmd => !blockedCommands(cmd.name)) /** Available commands */ @@ -101,11 +107,10 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) * sees any files, so that the Spark context is visible in those files. This is a bit of a * hack, but there isn't another hook available to us at this point. */ - override def loadFiles(settings: Settings): Unit = { - initializeSpark() - super.loadFiles(settings) - } - + // override def loadFiles(settings: Settings): Unit = { + // initializeSpark() + // super.loadFiles(settings) + // } override def resetCommand(line: String): Unit = { super.resetCommand(line) initializeSpark() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index 052d85ad33..b78285c4a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -458,7 +458,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { val columnEquals = df.sparkSession.sessionState.analyzer.resolver val projections = df.schema.fields.map { f => - val typeMatches = (targetType, f.dataType) match { + val typeMatches = ((targetType, f.dataType): @unchecked) match { case (NumericType, dt) => dt.isInstanceOf[NumericType] case (StringType, dt) => dt == StringType } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ebf03e1bf8..e4fa221d31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -857,6 +857,8 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman case EXTERNAL => " EXTERNAL TABLE" case VIEW => " VIEW" case MANAGED => " TABLE" + case CatalogTableType(other) => + throw new AnalysisException(s"Unsupported table type '$other'") } builder ++= s"CREATE$tableTypeString ${table.quotedString}" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ffd7f6c750..f8d456dd6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -236,6 +236,37 @@ abstract class PartitioningAwareFileIndex( val name = path.getName !((name.startsWith("_") && !name.contains("=")) || name.startsWith(".")) } + + /** + * List leaf files of given paths. This method will submit a Spark job to do parallel + * listing whenever there is a path having more files than the parallel partition discovery + * discovery threshold. + * + * This is publicly visible for testing. + */ + def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = { + val output = mutable.LinkedHashSet[FileStatus]() + val pathsToFetch = mutable.ArrayBuffer[Path]() + for (path <- paths) { + fileStatusCache.getLeafFiles(path) match { + case Some(files) => + HiveCatalogMetrics.incrementFileCacheHits(files.length) + output ++= files + case None => + pathsToFetch += path + } + () + } + val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val discovered = PartitioningAwareFileIndex.bulkListLeafFiles( + pathsToFetch, hadoopConf, filter, sparkSession) + discovered.foreach { case (path, leafFiles) => + HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) + fileStatusCache.putLeafFiles(path, leafFiles.toArray) + output ++= leafFiles + } + output + } } object PartitioningAwareFileIndex { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 5fc3c2753b..49d6b2d83c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -770,8 +770,9 @@ object JdbcUtils extends Logging { case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n) case _ => df } - repartitionedDF.foreachPartition(iterator => savePartition( - getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel) + repartitionedDF.foreachPartition((iterator: Iterator[Row]) => + savePartition( + getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel): Unit ) } -- cgit v1.2.3