aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-10 14:34:33 -0800
committerJakob Odersky <jakob@odersky.com>2017-04-24 14:09:49 -0700
commita3860c59deebf996f0c32bcc0d15b2903216e732 (patch)
tree91f13ce2c756631387950a8f58a44a7dda2d1566
parent3609c837f3aa989f0ae7cbf1fd177bb9f3cba7a2 (diff)
downloadspark-a3860c59deebf996f0c32bcc0d15b2903216e732.tar.gz
spark-a3860c59deebf996f0c32bcc0d15b2903216e732.tar.bz2
spark-a3860c59deebf996f0c32bcc0d15b2903216e732.zip
REPL compiles and runs
-rw-r--r--assembly/pom.xml2
-rw-r--r--pom.xml8
-rw-r--r--project/SparkBuild.scala5
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala31
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala5
8 files changed, 59 insertions, 17 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index b16bcd3528..8f0ae7b1d6 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -61,11 +61,13 @@
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
diff --git a/pom.xml b/pom.xml
index c562396c64..6684dd2b3f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,7 +141,7 @@
<hive.parquet.version>1.6.0</hive.parquet.version>
<jetty.version>9.2.16.v20160414</jetty.version>
<javaxservlet.version>3.1.0</javaxservlet.version>
- <chill.version>0.8.0</chill.version>
+ <chill.version>0.9.1</chill.version><!-- TODO 2.12 use 0.8.2? -->
<ivy.version>2.4.0</ivy.version>
<oro.version>2.0.8</oro.version>
<codahale.metrics.version>3.1.2</codahale.metrics.version>
@@ -2669,10 +2669,10 @@
<property><name>scala-2.12</name></property>
</activation>
<properties>
- <scala.version>2.12.0</scala.version>
+ <scala.version>2.12.1</scala.version>
<scala.binary.version>2.12</scala.binary.version>
- <!-- This corresponds to https://github.com/twitter/chill/pull/253 -->
- <chill.version>0.8.2-SNAPSHOT</chill.version>
+ <jline.version>2.14.1</jline.version>
+ <jline.groupid>jline</jline.groupid>
<!-- This incorporates https://github.com/FasterXML/jackson-module-scala/pull/247 -->
<fasterxml.jackson.version>2.8.4</fasterxml.jackson.version>
</properties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index ef3cc787e2..167a97661b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -117,14 +117,15 @@ object SparkBuild extends PomBuild {
lazy val MavenCompile = config("m2r") extend(Compile)
lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
- lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq(
+ // TODO 2.12
+ lazy val sparkGenjavadocSettings: Seq[sbt.Def.Setting[_]] = Seq.empty /*Seq(
libraryDependencies += compilerPlugin(
"com.typesafe.genjavadoc" %% "genjavadoc-plugin" % unidocGenjavadocVersion.value cross CrossVersion.full),
scalacOptions ++= Seq(
"-P:genjavadoc:out=" + (target.value / "java"),
"-P:genjavadoc:strictVisibility=true" // hide package private types
)
- )
+ )*/
lazy val scalaStyleRules = Project("scalaStyleRules", file("scalastyle"))
.settings(
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 76a66c1bea..f2ad7adcd1 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -21,7 +21,7 @@ import java.io.BufferedReader
import scala.Predef.{println => _, _}
import scala.tools.nsc.Settings
-import scala.tools.nsc.interpreter.{ILoop, JPrintWriter}
+import scala.tools.nsc.interpreter.{ILoop, JPrintWriter, replProps}
import scala.tools.nsc.util.stringFromStream
import scala.util.Properties.{javaVersion, javaVmName, versionString}
@@ -63,8 +63,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
""")
processLine("import org.apache.spark.SparkContext._")
processLine("import spark.implicits._")
- processLine("import spark.sql")
- processLine("import org.apache.spark.sql.functions._")
+ //TODO 2.12 processLine("import spark.sql")
+ //TODO 2.12 processLine("import org.apache.spark.sql.functions._")
replayCommandStack = Nil // remove above commands from session history.
}
}
@@ -86,11 +86,17 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
echo("Type :help for more information.")
}
+ private def initCommand(): Result = {
+ initializeSpark
+ Result(keepRunning = true, lineToRecord = None)
+ }
+
/** Add repl commands that needs to be blocked. e.g. reset */
private val blockedCommands = Set[String]()
/** Standard commands */
lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
+ LoopCommand.nullary("initSpark", "initialize spark context", initCommand) ::
standardCommands.filter(cmd => !blockedCommands(cmd.name))
/** Available commands */
@@ -101,11 +107,10 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
* sees any files, so that the Spark context is visible in those files. This is a bit of a
* hack, but there isn't another hook available to us at this point.
*/
- override def loadFiles(settings: Settings): Unit = {
- initializeSpark()
- super.loadFiles(settings)
- }
-
+ // override def loadFiles(settings: Settings): Unit = {
+ // initializeSpark()
+ // super.loadFiles(settings)
+ // }
override def resetCommand(line: String): Unit = {
super.resetCommand(line)
initializeSpark()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index 052d85ad33..b78285c4a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -458,7 +458,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
- val typeMatches = (targetType, f.dataType) match {
+ val typeMatches = ((targetType, f.dataType): @unchecked) match {
case (NumericType, dt) => dt.isInstanceOf[NumericType]
case (StringType, dt) => dt == StringType
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index ebf03e1bf8..e4fa221d31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -857,6 +857,8 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
case EXTERNAL => " EXTERNAL TABLE"
case VIEW => " VIEW"
case MANAGED => " TABLE"
+ case CatalogTableType(other) =>
+ throw new AnalysisException(s"Unsupported table type '$other'")
}
builder ++= s"CREATE$tableTypeString ${table.quotedString}"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index ffd7f6c750..f8d456dd6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -236,6 +236,37 @@ abstract class PartitioningAwareFileIndex(
val name = path.getName
!((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
}
+
+ /**
+ * List leaf files of given paths. This method will submit a Spark job to do parallel
+ * listing whenever there is a path having more files than the parallel partition discovery
+ * discovery threshold.
+ *
+ * This is publicly visible for testing.
+ */
+ def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
+ val output = mutable.LinkedHashSet[FileStatus]()
+ val pathsToFetch = mutable.ArrayBuffer[Path]()
+ for (path <- paths) {
+ fileStatusCache.getLeafFiles(path) match {
+ case Some(files) =>
+ HiveCatalogMetrics.incrementFileCacheHits(files.length)
+ output ++= files
+ case None =>
+ pathsToFetch += path
+ }
+ ()
+ }
+ val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+ val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+ pathsToFetch, hadoopConf, filter, sparkSession)
+ discovered.foreach { case (path, leafFiles) =>
+ HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
+ fileStatusCache.putLeafFiles(path, leafFiles.toArray)
+ output ++= leafFiles
+ }
+ output
+ }
}
object PartitioningAwareFileIndex {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 5fc3c2753b..49d6b2d83c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -770,8 +770,9 @@ object JdbcUtils extends Logging {
case Some(n) if n < df.rdd.getNumPartitions => df.coalesce(n)
case _ => df
}
- repartitionedDF.foreachPartition(iterator => savePartition(
- getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel)
+ repartitionedDF.foreachPartition((iterator: Iterator[Row]) =>
+ savePartition(
+ getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel): Unit
)
}