所以我正在做一些应该很简单的事情,但显然它不在 Spark SQL 中.
So I'm doing something that should be simple, but apparently it's not in Spark SQL.
如果我在 MySQL 中运行以下查询,查询会在几分之一秒内完成:
If I run the following query in MySQL, the query finishes in a fraction of a second:
SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;
但是,在 Spark (1.5.1) 下的 HiveContext 中运行相同的查询需要超过 13 秒.添加更多连接会使查询运行很长时间(超过 10 分钟).我不确定我在这里做错了什么以及如何加快速度.
However, running the same query in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query run for a very very long time (over 10 minutes). I'm not sure what I'm doing wrong here and how I can speed things up.
这些表是 MySQL 表,它们作为临时表加载到 Hive 上下文中.它在单个实例中运行,数据库在远程机器上.
The tables are MySQL tables that are loaded into the Hive Context as temporary tables.This is running in a single instance, with the database on a remote machine.
表有外键字段,但在数据库中没有定义明确的 fk 关系.我正在使用 InnoDB.
The tables have foreign key fields, but no explicit fk relationships is defined in the db. I'm using InnoDB.
Spark 中的执行计划:
The execution plan in Spark:
计划:
扫描JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc,{user=, password=, url=jdbc:mysql://, dbtable=user})[address_id#0L,user_address_id#27L]
Scan JDBCRelation(jdbc:mysql://.user,[Lorg.apache.spark.Partition;@596f5dfc, {user=, password=, url=jdbc:mysql://, dbtable=user}) [address_id#0L,user_address_id#27L]
过滤器 (user_id#0L = 123) 扫描JDBCRelation(jdbc:mysql://.user_address,[Lorg.apache.spark.Partition;@2ce558f3,{user=, password=,url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
Filter (user_id#0L = 123) Scan JDBCRelation(jdbc:mysql://.user_address, [Lorg.apache.spark.Partition;@2ce558f3,{user=, password=, url=jdbc:mysql://, dbtable=user_address})[address_id#52L]
ConvertToUnsafe ConvertToUnsafe
ConvertToUnsafe ConvertToUnsafe
TungstenExchange hashpartitioning(address_id#52L) TungstenExchangehashpartitioning(user_address_id#27L) TungstenSort [address_id#52LASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
TungstenExchange hashpartitioning(address_id#52L) TungstenExchange hashpartitioning(user_address_id#27L) TungstenSort [address_id#52L ASC], false, 0 TungstenSort [user_address_id#27L ASC], false, 0
SortMergeJoin [user_address_id#27L], [address_id#52L]
SortMergeJoin [user_address_id#27L], [address_id#52L]
== 物理计划 == TungstenProject [address_id#0L]
== Physical Plan == TungstenProject [address_id#0L]
首先,您执行的查询类型极其低效.至于现在(Spark 1.5.0*)要执行这样的连接,每次执行查询时都必须对两个表进行混洗/散列分区.对于 users 表,其中 user_id = 123 谓词最有可能被下推,但仍然需要对 user_address.
First of all type of query you perform is extremely inefficient. As for now (Spark 1.5.0*) to perform join like this, both tables has to be shuffled / hash-partitioned each time query is executed. It shouldn't be a problem in case of users table where user_id = 123 predicate is most likely pushed-down but still requires full shuffle on user_address.
此外,如果表只注册而不缓存,那么每次执行此查询都会从 MySQL 获取整个 user_address 表到 Spark.
Moreover, if tables are only registered and not cached, then every execution of this query will fetch a whole user_address table from MySQL to Spark.
我不确定我在这里做错了什么以及如何加快速度.
I'm not sure what I'm doing wrong here and how I can speed things up.
不清楚为什么要将 Spark 用于应用程序,但单机设置、小数据和查询类型表明 Spark 不适合这里.
It is not exactly clear why you want to use Spark for application but single machine setup, small data and type of queries suggest that Spark is not a good fit here.
一般来说,如果应用程序逻辑需要单条记录访问,那么 Spark SQL 的性能就不会很好.它专为分析查询而设计,而不是作为 OLTP 数据库的替代品.
Generally speaking if application logic requires a single record access then Spark SQL won't perform well. It is designed for analytical queries not as a OLTP database replacement.
如果单个表/数据框小得多,您可以尝试广播.
If a single table / data frame is much smaller you could try broadcasting.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
val user: DataFrame = ???
val user_address: DataFrame = ???
val userFiltered = user.where(???)
user_addresses.join(
broadcast(userFiltered), $"address_id" === $"user_address_id")
<小时>
* 这应该在 Spark 1.6.0 中改变,SPARK-11410应该启用持久表分区.
这篇关于Spark SQL/Hive 查询永远需要加入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!
如何有效地使用窗口函数根据 N 个先前值来决定How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函数根据
在“GROUP BY"中重用选择表达式的结果;条款reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用选择表达式的结果;条款?)
Pyspark DataFrameWriter jdbc 函数的 ignore 选项是忽略整Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函数的 ig
使用 INSERT INTO table ON DUPLICATE KEY 时出错,使用 Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 时出错,使用 for 循环数组
pyspark mysql jdbc load 调用 o23.load 时发生错误 没有合pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 调用 o23.load 时发生错误 没有合适的
如何将 Apache Spark 与 MySQL 集成以将数据库表作为How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何将 Apache Spark 与 MySQL 集成以将数据库表作为