From c481bdf512f09060c9b9f341a5ce9fce00427d08 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 23 Feb 2016 12:55:44 -0800 Subject: [SPARK-13329] [SQL] considering output for statistics of logical plan The current implementation of statistics of UnaryNode does not considering output (for example, Project may product much less columns than it's child), we should considering it to have a better guess. We usually only join with few columns from a parquet table, the size of projected plan could be much smaller than the original parquet files. Having a better guess of size help we choose between broadcast join or sort merge join. After this PR, I saw a few queries choose broadcast join other than sort merge join without turning spark.sql.autoBroadcastJoinThreshold for every query, ended up with about 6-8X improvements on end-to-end time. We use `defaultSize` of DataType to estimate the size of a column, currently For DecimalType/StringType/BinaryType and UDT, we are over-estimate too much (4096 Bytes), so this PR change them to some more reasonable values. Here are the new defaultSize for them: DecimalType: 8 or 16 bytes, based on the precision StringType: 20 bytes BinaryType: 100 bytes UDF: default size of SQL type These numbers are not perfect (hard to have a perfect number for them), but should be better than 4096. Author: Davies Liu Closes #11210 from davies/statics. --- python/pyspark/sql/dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'python/pyspark/sql/dataframe.py') diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 83b034fe77..bf43452e08 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -551,8 +551,8 @@ class DataFrame(object): >>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') - >>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect() - [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)] + >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() + [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', name=u'Bob', age=5)] """ assert isinstance(alias, basestring), "alias should be a string" return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx) -- cgit v1.2.3