aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala2
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala4
-rw-r--r--project/SparkBuild.scala33
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala6
12 files changed, 55 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
index 9658e9a696..a5de10fe89 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala
@@ -20,6 +20,7 @@ package org.apache.spark.api.r
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import scala.collection.mutable.HashMap
+import scala.language.existentials
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 658e8c8b89..130b58882d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -94,13 +94,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
+ rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
- new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
+ new ShuffleDependency[K, Any, CoGroupCombiner](
+ rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
@@ -133,7 +134,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
- case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
+ case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a078f14af5..c600319d9d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -94,6 +94,8 @@ private[spark] object JsonProtocol {
logStartToJson(logStart)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
executorMetricsUpdateToJson(metricsUpdate)
+ case blockUpdated: SparkListenerBlockUpdated =>
+ throw new MatchError(blockUpdated) // TODO(ekl) implement this
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
index 30bcf1d2f2..3354a92327 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala
@@ -20,8 +20,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.util.Utils
-
private[spark]
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
index afbcc6efc8..cadae472b3 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala
@@ -21,8 +21,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream}
import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.util.Utils
-
private[spark]
class SerializableJobConf(@transient var value: JobConf) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
index d89b0059d8..2b3ed6df48 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat.test
import scala.annotation.varargs
import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution}
-import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
@@ -187,7 +187,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
}
private def evalOneSampleP(ksStat: Double, n: Long): KolmogorovSmirnovTestResult = {
- val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
+ val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt)
new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
}
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 12828547d7..61a05d375d 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -154,7 +154,38 @@ object SparkBuild extends PomBuild {
if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
},
- javacOptions in Compile ++= Seq("-encoding", "UTF-8")
+ javacOptions in Compile ++= Seq("-encoding", "UTF-8"),
+
+ // Implements -Xfatal-warnings, ignoring deprecation warnings.
+ // Code snippet taken from https://issues.scala-lang.org/browse/SI-8410.
+ compile in Compile := {
+ val analysis = (compile in Compile).value
+ val s = streams.value
+
+ def logProblem(l: (=> String) => Unit, f: File, p: xsbti.Problem) = {
+ l(f.toString + ":" + p.position.line.fold("")(_ + ":") + " " + p.message)
+ l(p.position.lineContent)
+ l("")
+ }
+
+ var failed = 0
+ analysis.infos.allInfos.foreach { case (k, i) =>
+ i.reportedProblems foreach { p =>
+ val deprecation = p.message.contains("is deprecated")
+
+ if (!deprecation) {
+ failed = failed + 1
+ }
+
+ logProblem(if (deprecation) s.log.warn else s.log.error, k, p)
+ }
+ }
+
+ if (failed > 0) {
+ sys.error(s"$failed fatal warnings")
+ }
+ analysis
+ }
)
def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 8f63d2120a..ae0ab2f4c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -24,6 +24,7 @@ import java.util.{Map => JavaMap}
import javax.annotation.Nullable
import scala.collection.mutable.HashMap
+import scala.language.existentials
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
@@ -401,7 +402,7 @@ object CatalystTypeConverters {
case seq: Seq[Any] => seq.map(convertToCatalyst)
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray
- case m: Map[Any, Any] =>
+ case m: Map[_, _] =>
m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap
case other => other
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index ee0201a9d4..05da05d7b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -197,6 +197,9 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// the table. But, insertInto with Overwrite requires the schema of data be the same
// the schema of the table.
insertInto(tableName)
+
+ case SaveMode.Overwrite =>
+ throw new UnsupportedOperationException("overwrite mode unsupported.")
}
} else {
val cmd =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
index 84a0441e14..cd2aa7f743 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala
@@ -100,7 +100,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
val pathExists = fs.exists(qualifiedOutputPath)
val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
- sys.error(s"path $qualifiedOutputPath already exists.")
+ throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
fs.delete(qualifiedOutputPath, true)
true
@@ -108,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
true
case (SaveMode.Ignore, exists) =>
!exists
+ case (s, exists) =>
+ throw new IllegalStateException(s"unsupported save mode $s ($exists)")
}
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 250e73a4db..ddd5d24717 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -41,10 +41,10 @@ private[orc] object OrcFilters extends Logging {
private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = {
def newBuilder = SearchArgument.FACTORY.newBuilder()
- def isSearchableLiteral(value: Any) = value match {
+ def isSearchableLiteral(value: Any): Boolean = value match {
// These are types recognized by the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
- case _: String | _: Long | _: Double | _: DateWritable | _: HiveDecimal | _: HiveChar |
- _: HiveVarchar | _: Byte | _: Short | _: Integer | _: Float => true
+ case _: String | _: Long | _: Double | _: Byte | _: Short | _: Integer | _: Float => true
+ case _: DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar => true
case _ => false
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 1cef83fd5e..2a8748d913 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -134,7 +134,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
test("save()/load() - non-partitioned table - ErrorIfExists") {
withTempDir { file =>
- intercept[RuntimeException] {
+ intercept[AnalysisException] {
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
}
}
@@ -233,7 +233,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
test("save()/load() - partitioned table - ErrorIfExists") {
withTempDir { file =>
- intercept[RuntimeException] {
+ intercept[AnalysisException] {
partitionedTestDF.write
.format(dataSourceName)
.mode(SaveMode.ErrorIfExists)
@@ -696,7 +696,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
// This should only complain that the destination directory already exists, rather than file
// "empty" is not a Parquet file.
assert {
- intercept[RuntimeException] {
+ intercept[AnalysisException] {
df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
}.getMessage.contains("already exists")
}