From 9422a9b084e3fd5b2b9be2752013588adfb430d0 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Thu, 14 Aug 2014 19:03:51 -0700 Subject: [SPARK-2736] PySpark converter and example script for reading Avro files JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter --- .../apache/spark/api/python/PythonHadoopUtil.scala | 3 ++- .../org/apache/spark/api/python/PythonRDD.scala | 24 +++++++++++----------- .../main/scala/org/apache/spark/util/Utils.scala | 3 +++ 3 files changed, 17 insertions(+), 13 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index f3b05e1243..49dc95f349 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils import org.apache.spark.{Logging, SerializableWritable, SparkException} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io._ @@ -42,7 +43,7 @@ private[python] object Converter extends Logging { defaultConverter: Converter[Any, Any]): Converter[Any, Any] = { converterClass.map { cc => Try { - val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]] + val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]] logInfo(s"Loaded converter: $cc") c } match { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fefe1cb6f1..9f5c5bd30f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging { batchSize: Int) = { val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") - val kc = Class.forName(keyClass).asInstanceOf[Class[K]] - val vc = Class.forName(valueClass).asInstanceOf[Class[V]] + val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]] + val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]] val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration())) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, @@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging { keyClass: String, valueClass: String, conf: Configuration) = { - val kc = Class.forName(keyClass).asInstanceOf[Class[K]] - val vc = Class.forName(valueClass).asInstanceOf[Class[V]] - val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]] + val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]] + val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]] + val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]] if (path.isDefined) { sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf) } else { @@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging { keyClass: String, valueClass: String, conf: Configuration) = { - val kc = Class.forName(keyClass).asInstanceOf[Class[K]] - val vc = Class.forName(valueClass).asInstanceOf[Class[V]] - val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]] + val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]] + val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]] + val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]] if (path.isDefined) { sc.sc.hadoopFile(path.get, fc, kc, vc) } else { @@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging { for { k <- Option(keyClass) v <- Option(valueClass) - } yield (Class.forName(k), Class.forName(v)) + } yield (Utils.classForName(k), Utils.classForName(v)) } private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String, @@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging { val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse( inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass)) val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) - val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]]) + val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]]) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, new JavaToWritableConverter) - val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]] + val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]] converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec) } @@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging { val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, new JavaToWritableConverter) - val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]] + val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]] converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf) } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8cac5da644..019f68b160 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -146,6 +146,9 @@ private[spark] object Utils extends Logging { Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess } + /** Preferred alternative to Class.forName(className) */ + def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader) + /** * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}. */ -- cgit v1.2.3