aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-06-03 13:57:57 -0700
committerReynold Xin <rxin@databricks.com>2015-06-03 13:57:57 -0700
commit939e4f3d8def16dfe03f0196be8e1c218a9daa32 (patch)
tree6f396292937f65b199717207385123fa38d9b998 /sql
parent708c63bbbe9580eb774fe47e23ef61338103afda (diff)
downloadspark-939e4f3d8def16dfe03f0196be8e1c218a9daa32.tar.gz
spark-939e4f3d8def16dfe03f0196be8e1c218a9daa32.tar.bz2
spark-939e4f3d8def16dfe03f0196be8e1c218a9daa32.zip
[SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures.
Author: Reynold Xin <rxin@databricks.com> Closes #6608 from rxin/parquet-analysis and squashes the following commits: b5dc8e2 [Reynold Xin] Code review feedback. 5617cf6 [Reynold Xin] [SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala14
2 files changed, 17 insertions, 17 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 6698b19c74..f8a5d84549 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.parquet
import java.io.IOException
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
import scala.util.Try
import org.apache.hadoop.conf.Configuration
@@ -33,12 +33,11 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeNa
import parquet.schema.Type.Repetition
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
+import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types._
-import org.apache.spark.{Logging, SparkException}
-// Implicits
-import scala.collection.JavaConversions._
/** A class representing Parquet info fields we care about, for passing back to Parquet */
private[parquet] case class ParquetTypeInfo(
@@ -73,13 +72,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
case ParquetPrimitiveTypeName.INT96 =>
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
- sys.error("Potential loss of precision: cannot convert INT96")
+ throw new AnalysisException("Potential loss of precision: cannot convert INT96")
case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) =>
// TODO: for now, our reader only supports decimals that fit in a Long
DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
- case _ => sys.error(
- s"Unsupported parquet datatype $parquetType")
+ case _ => throw new AnalysisException(s"Unsupported parquet datatype $parquetType")
}
}
@@ -371,7 +369,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
parquetKeyType,
parquetValueType)
}
- case _ => sys.error(s"Unsupported datatype $ctype")
+ case _ => throw new AnalysisException(s"Unsupported datatype $ctype")
}
}
}
@@ -403,7 +401,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
def convertFromString(string: String): Seq[Attribute] = {
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
case s: StructType => s.toAttributes
- case other => sys.error(s"Can convert $string to row")
+ case other => throw new AnalysisException(s"Can convert $string to row")
}
}
@@ -411,8 +409,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
// ,;{}()\n\t= and space character are special characters in Parquet schema
schema.map(_.name).foreach { name =>
if (name.matches(".*[ ,;{}()\n\t=].*")) {
- sys.error(
- s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
+ throw new AnalysisException(
+ s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
|Please use alias to rename it.
""".stripMargin.split("\n").mkString(" "))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 824ae36968..bf55e2383a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -39,6 +39,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD._
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
@@ -83,7 +84,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
case partFilePattern(id) => id.toInt
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
- case name => sys.error(
+ case name => throw new AnalysisException(
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
@@ -380,11 +381,12 @@ private[sql] class ParquetRelation2(
// time-consuming.
if (dataSchema == null) {
dataSchema = {
- val dataSchema0 =
- maybeDataSchema
- .orElse(readSchema())
- .orElse(maybeMetastoreSchema)
- .getOrElse(sys.error("Failed to get the schema."))
+ val dataSchema0 = maybeDataSchema
+ .orElse(readSchema())
+ .orElse(maybeMetastoreSchema)
+ .getOrElse(throw new AnalysisException(
+ s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
+ paths.mkString("\n\t")))
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
// case insensitivity issue and possible schema mismatch (probably caused by schema