我想对 SQLite 数据库中的所有表运行自定义函数.该功能或多或少相同,但取决于单个表的架构.此外,表及其模式仅在运行时才知道(调用程序时使用指定数据库路径的参数).
I want to run a custom function on all tables in a SQLite database. The function is more or less the same, but depends on the schema of the individual table. Also, the tables and their schemata are only known at runtime (the program is called with an argument that specifies the path of the database).
这是我目前所拥有的:
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// somehow bind sqlContext to DB
val allTables = sqlContext.tableNames
for( t <- allTables) {
val df = sqlContext.table(t)
val schema = df.columns
sqlContext.sql("SELECT * FROM " + t + "...").map(x => myFunc(x,schema))
}
我目前发现的唯一提示需要提前知道表格,在我的场景中不是这样的:
The only hint I found so far needs to know the table in advance, which is not the case in my scenario:
val tableData =
sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db", "dbtable" -> t))
.load()
我使用的是 xerial sqlite jdbc 驱动程序.那么我怎样才能只连接到一个数据库,而不是一个表呢?
I am using the xerial sqlite jdbc driver. So how can I conntect solely to a database, not to a table?
使用 Beryllium 的答案作为开始,我将代码更新为:
Using Beryllium's answer as a start I updated my code to this:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val metaData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> "(SELECT * FROM sqlite_master) AS t")).load()
val myTableNames = metaData.select("tbl_name").distinct()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.table(t.toString)
for (record <- tableData.select("*")) {
println(record)
}
}
至少我可以在运行时读取表名,这对我来说是一个巨大的进步.但是我看不懂表格.我两个都试了
At least I can read the table names at runtime which is a huge step forward for me. But I can't read the tables. I tried both
val tableData = sqlContext.table(t.toString)
和
val tableData = sqlContext.read.format("jdbc")
.options(Map("url" -> "jdbc:sqlite:/path/to/file.db",
"dbtable" -> t.toString)).load()
在循环中,但在这两种情况下,我都会收到 NullPointerException.虽然我可以打印表名,但似乎我无法连接到它们.
in the loop, but in both cases I get a NullPointerException. Although I can print the table names it seems I cannot connect to them.
最后但并非最不重要的一点是,我总是收到 SQLITE_ERROR: Connection is closed 错误.它看起来与此问题中描述的问题相同:SQLITE_ERROR: 当通过 JDBC 从 Spark 连接到 SQLite 数据库时,连接被关闭
Last but not least I always get an SQLITE_ERROR: Connection is closed error. It looks to be the same issue described in this question: SQLITE_ERROR: Connection is closed when connecting from Spark via JDBC to SQLite database
您可以尝试两种选择
for 理解您可以将查询指定为 dbtable 参数的值.在语法上,这个查询必须看起来"像一个表,所以它必须包含在一个子查询中.
You can specify a query as the value for the dbtable argument. Syntactically this query must "look" like a table, so it must be wrapped in a sub query.
在该查询中,从数据库中获取元数据:
In that query, get the meta data from the database:
val df = sqlContext.read.format("jdbc").options(
Map(
"url" -> "jdbc:postgresql:xxx",
"user" -> "x",
"password" -> "x",
"dbtable" -> "(select * from pg_tables) as t")).load()
此示例适用于 PostgreSQL,您必须将其调整为适用于 SQLite.
This example works with PostgreSQL, you have to adapt it for SQLite.
更新
似乎JDBC驱动程序只支持迭代一个结果集.无论如何,当您使用 collect() 来具体化表名列表时,以下代码段应该可以工作:
It seems that the JDBC driver only supports to iterate over one result set.
Anyway, when you materialize the list of table names using collect(), then the following snippet should work:
val myTableNames = metaData.select("tbl_name").map(_.getString(0)).collect()
for (t <- myTableNames) {
println(t.toString)
val tableData = sqlContext.read.format("jdbc")
.options(
Map(
"url" -> "jdbc:sqlite:/x.db",
"dbtable" -> t)).load()
tableData.show()
}
这篇关于在 Apache Spark 中连接到 SQLite的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!
如何有效地使用窗口函数根据 N 个先前值来决定How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函数根据
在“GROUP BY"中重用选择表达式的结果;条款reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用选择表达式的结果;条款?)
Pyspark DataFrameWriter jdbc 函数的 ignore 选项是忽略整Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函数的 ig
如何将 Apache Spark 与 MySQL 集成以将数据库表作为How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何将 Apache Spark 与 MySQL 集成以将数据库表作为
在 Apache Spark 2.0.0 中,是否可以从外部数据库获取In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?(在 Apache Spark 2.0.0 中,是否可
使用 SparkSQL 删除 MySQL 表Dropping MySQL table with SparkSQL(使用 SparkSQL 删除 MySQL 表)