diff options
12 files changed, 55 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 9658e9a696..a5de10fe89 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -20,6 +20,7 @@ package org.apache.spark.api.r import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} import scala.collection.mutable.HashMap +import scala.language.existentials import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 658e8c8b89..130b58882d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -94,13 +94,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: } override def getDependencies: Seq[Dependency[_]] = { - rdds.map { rdd: RDD[_ <: Product2[K, _]] => + rdds.map { rdd: RDD[_] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) - new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) + new ShuffleDependency[K, Any, CoGroupCombiner]( + rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer) } } } @@ -133,7 +134,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: // A list of (rdd iterator, dependency number) pairs val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] for ((dep, depNum) <- dependencies.zipWithIndex) dep match { - case oneToOneDependency: OneToOneDependency[Product2[K, Any]] => + case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => val dependencyPartition = split.narrowDeps(depNum).get.split // Read them from the parent val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index a078f14af5..c600319d9d 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -94,6 +94,8 @@ private[spark] object JsonProtocol { logStartToJson(logStart) case metricsUpdate: SparkListenerExecutorMetricsUpdate => executorMetricsUpdateToJson(metricsUpdate) + case blockUpdated: SparkListenerBlockUpdated => + throw new MatchError(blockUpdated) // TODO(ekl) implement this } } diff --git a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala index 30bcf1d2f2..3354a92327 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableConfiguration.scala @@ -20,8 +20,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.conf.Configuration -import org.apache.spark.util.Utils - private[spark] class SerializableConfiguration(@transient var value: Configuration) extends Serializable { private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { diff --git a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala index afbcc6efc8..cadae472b3 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableJobConf.scala @@ -21,8 +21,6 @@ import java.io.{ObjectInputStream, ObjectOutputStream} import org.apache.hadoop.mapred.JobConf -import org.apache.spark.util.Utils - private[spark] class SerializableJobConf(@transient var value: JobConf) extends Serializable { private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala index d89b0059d8..2b3ed6df48 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat.test import scala.annotation.varargs import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} -import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest +import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => CommonMathKolmogorovSmirnovTest} import org.apache.spark.Logging import org.apache.spark.rdd.RDD @@ -187,7 +187,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { } private def evalOneSampleP(ksStat: Double, n: Long): KolmogorovSmirnovTestResult = { - val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) + val pval = 1 - new CommonMathKolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 12828547d7..61a05d375d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -154,7 +154,38 @@ object SparkBuild extends PomBuild { if (major.toInt >= 1 && minor.toInt >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty }, - javacOptions in Compile ++= Seq("-encoding", "UTF-8") + javacOptions in Compile ++= Seq("-encoding", "UTF-8"), + + // Implements -Xfatal-warnings, ignoring deprecation warnings. + // Code snippet taken from https://issues.scala-lang.org/browse/SI-8410. + compile in Compile := { + val analysis = (compile in Compile).value + val s = streams.value + + def logProblem(l: (=> String) => Unit, f: File, p: xsbti.Problem) = { + l(f.toString + ":" + p.position.line.fold("")(_ + ":") + " " + p.message) + l(p.position.lineContent) + l("") + } + + var failed = 0 + analysis.infos.allInfos.foreach { case (k, i) => + i.reportedProblems foreach { p => + val deprecation = p.message.contains("is deprecated") + + if (!deprecation) { + failed = failed + 1 + } + + logProblem(if (deprecation) s.log.warn else s.log.error, k, p) + } + } + + if (failed > 0) { + sys.error(s"$failed fatal warnings") + } + analysis + } ) def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 8f63d2120a..ae0ab2f4c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -24,6 +24,7 @@ import java.util.{Map => JavaMap} import javax.annotation.Nullable import scala.collection.mutable.HashMap +import scala.language.existentials import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ @@ -401,7 +402,7 @@ object CatalystTypeConverters { case seq: Seq[Any] => seq.map(convertToCatalyst) case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*) case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray - case m: Map[Any, Any] => + case m: Map[_, _] => m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap case other => other } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ee0201a9d4..05da05d7b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -197,6 +197,9 @@ final class DataFrameWriter private[sql](df: DataFrame) { // the table. But, insertInto with Overwrite requires the schema of data be the same // the schema of the table. insertInto(tableName) + + case SaveMode.Overwrite => + throw new UnsupportedOperationException("overwrite mode unsupported.") } } else { val cmd = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala index 84a0441e14..cd2aa7f743 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/commands.scala @@ -100,7 +100,7 @@ private[sql] case class InsertIntoHadoopFsRelation( val pathExists = fs.exists(qualifiedOutputPath) val doInsertion = (mode, pathExists) match { case (SaveMode.ErrorIfExists, true) => - sys.error(s"path $qualifiedOutputPath already exists.") + throw new AnalysisException(s"path $qualifiedOutputPath already exists.") case (SaveMode.Overwrite, true) => fs.delete(qualifiedOutputPath, true) true @@ -108,6 +108,8 @@ private[sql] case class InsertIntoHadoopFsRelation( true case (SaveMode.Ignore, exists) => !exists + case (s, exists) => + throw new IllegalStateException(s"unsupported save mode $s ($exists)") } // If we are appending data to an existing dir. val isAppend = pathExists && (mode == SaveMode.Append) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 250e73a4db..ddd5d24717 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -41,10 +41,10 @@ private[orc] object OrcFilters extends Logging { private def buildSearchArgument(expression: Filter, builder: Builder): Option[Builder] = { def newBuilder = SearchArgument.FACTORY.newBuilder() - def isSearchableLiteral(value: Any) = value match { + def isSearchableLiteral(value: Any): Boolean = value match { // These are types recognized by the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. - case _: String | _: Long | _: Double | _: DateWritable | _: HiveDecimal | _: HiveChar | - _: HiveVarchar | _: Byte | _: Short | _: Integer | _: Float => true + case _: String | _: Long | _: Double | _: Byte | _: Short | _: Integer | _: Float => true + case _: DateWritable | _: HiveDecimal | _: HiveChar | _: HiveVarchar => true case _ => false } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 1cef83fd5e..2a8748d913 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -134,7 +134,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { test("save()/load() - non-partitioned table - ErrorIfExists") { withTempDir { file => - intercept[RuntimeException] { + intercept[AnalysisException] { testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath) } } @@ -233,7 +233,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { test("save()/load() - partitioned table - ErrorIfExists") { withTempDir { file => - intercept[RuntimeException] { + intercept[AnalysisException] { partitionedTestDF.write .format(dataSourceName) .mode(SaveMode.ErrorIfExists) @@ -696,7 +696,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { // This should only complain that the destination directory already exists, rather than file // "empty" is not a Parquet file. assert { - intercept[RuntimeException] { + intercept[AnalysisException] { df.write.format("parquet").mode(SaveMode.ErrorIfExists).save(path) }.getMessage.contains("already exists") } |