aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-08-14 19:03:51 -0700
committerMatei Zaharia <matei@databricks.com>2014-08-14 19:03:51 -0700
commit9422a9b084e3fd5b2b9be2752013588adfb430d0 (patch)
tree72d21725ac720cb2b796a42e4803d547a6a4514b /core
parent3a8b68b7353fea50245686903b308fa9eb52cb51 (diff)
downloadspark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.gz
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.tar.bz2
spark-9422a9b084e3fd5b2b9be2752013588adfb430d0.zip
[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang <kzhang@apache.org> Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala3
3 files changed, 17 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index f3b05e1243..49dc95f349 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
@@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
converterClass.map { cc =>
Try {
- val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
+ val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
logInfo(s"Loaded converter: $cc")
c
} match {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fefe1cb6f1..9f5c5bd30f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int) = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
@@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
- val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
- val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
- val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+ val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+ val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+ val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
@@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
for {
k <- Option(keyClass)
v <- Option(valueClass)
- } yield (Class.forName(k), Class.forName(v))
+ } yield (Utils.classForName(k), Utils.classForName(v))
}
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
@@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
- val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+ val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
}
@@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
- val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+ val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8cac5da644..019f68b160 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -146,6 +146,9 @@ private[spark] object Utils extends Logging {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}
+ /** Preferred alternative to Class.forName(className) */
+ def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)
+
/**
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
*/