我正在尝试使用批量加载选项将流文件加载到 MySQL 数据库中.下面是我在 UpdateAttribute 处理器中使用的查询,并在更新参数以执行批量加载后将该查询传递给 PutSQL.
LOAD DATA INFILE '${absolute.path}${filename}' INTO TABLE ${dest.database}.${db.table.name} FIELDS TERMINATED BY ', LINES TERMINATED BY '\n'当我运行流程时,它没有说文件未找到异常.
<预><代码>.总共有1个FlowFiles失败,0个成功,0个没有执行,将被路由重试;:java.sql.BatchUpdateException:无法为LOAD DATA INFILE"命令打开文件data.csv".由于底层IOException:`** 开始嵌套异常 **java.io.FileNotFoundException消息:data.csv(没有那个文件或目录)java.io.FileNotFoundException: data.csv(没有这样的文件或目录).这里 MySQL 服务器和 Nifi 在不同的节点上,所以我不能使用 LOAD DATA LOCAL INFILE 查询.
即使我在 SQL 查询中提到了流文件的完整绝对路径,我也不知道为什么会出现文件未找到异常.
当我使用带有硬编码文件名的查询并在 nifi 节点中提供文件的绝对路径时,它按预期工作.
工作:
LOAD DATA LOCAL INFILE '/path/in/nifi/node/to/file/data.csv' INTO TABLE ${dest.database}.${db.table.name} FIELDS TERMINATED BY ',' 以 '\n'} 结尾的行问题是如何获取流文件的绝对路径并将相同的流文件加载到mysql中.
流程:
PutSQL 处理器并让流文件排队.success关系UpdateAttribute 和 PutSQL 之间并选择 List Queue.Attributes 选项卡并查看如果属性absolute.path 和flowfilename 存在并且如果absolute.path 应该有值 /path/in/nifi/node/to/file 和 flowfilename 应该有值 <代码>/data.csv问题:您是否使用 UpdateAttribute 自己设置这些属性,原因是,NiFi 不会生成名为 flowfilename 的属性,而是生成名为 <代码>文件名代码>.
还有一点,请确保 absolute.path 的值以 / 结尾或 flowfilename 的值开始带有 /.如果没有,它们将被附加,结果将是 /path/in/nifi/node/to/filedata.csv.您可以尝试@Mahendra 建议的 append 函数,否则您可以简单地使用 ${absolute.path}/${flowfilename}.
更新
我刚刚意识到absolute.path 是一个核心属性,如filename、filesize、mime.type等.有些处理器使用所有核心属性,而有些处理器使用很少的必要属性.GenerateTableFetch 写入 absolute.path 但没有为它设置任何东西.这就是为什么它有 ./ 这是默认值.
所以我对您的工作方法的建议是,您可以使用 UpdateAttribute 手动设置/覆盖 absolute.path 属性(就像您覆盖了 filename) 并设置所需的值,即 /path/in/nifi/node/to/file
I'm trying to load the flow files into MySQL database using bulk load option. Below is the query I'm using as part of the UpdateAttribute processor and passing that query to PutSQL after updating the parameters to do bulk load.
LOAD DATA INFILE '${absolute.path}${filename}' INTO TABLE ${dest.database}.${db.table.name} FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'
When I ran the flow it's failing saying file not found exception.
. There were a total of 1 FlowFiles that failed, 0 that succeeded, and 0 that were not execute and will be routed to retry; : java.sql.BatchUpdateException: Unable to open file 'data.csv'for 'LOAD DATA INFILE command.Due to underlying IOException:`
** BEGIN NESTED EXCEPTION **
java.io.FileNotFoundException
MESSAGE: data.csv (No such file or directory)
java.io.FileNotFoundException: data.csv (No such file or directory).
Here MySQL Server and Nifi are on different nodes so I can't use LOAD DATA LOCAL INFILE query.
I'm not sure why I'm getting file not found exception even though I mentioned the complete absolute path of the flow file in the SQL Query.
When I use query with hard coded file name and providing the absolute path of the file in nifi node, it's working as expected.
Working:
LOAD DATA LOCAL INFILE '/path/in/nifi/node/to/file/data.csv' INTO TABLE ${dest.database}.${db.table.name} FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n'}
Question is how to get the absolute path of the flow file and load the same flow file into mysql.
Flow:
PutSQL processor and let the flowfiles queue up.success relationshipUpdateAttribute and PutSQL and select List Queue.Attributes tab and see
if the attributes absolute.path and flowfilename exists and ifabsolute.path should have the value /path/in/nifi/node/to/file and flowfilename should have the value /data.csvQuestion for you: Are you setting these attributes yourself using UpdateAttribute, reason is, NiFi doesn't generate an attribute named flowfilename, it generates one with the name filename.
One more thing, make sure either the value for absolute.path ends with a / in the end or the value of flowfilename begins with a /. If not, they will be appended and the result will be /path/in/nifi/node/to/filedata.csv. You can try the append function that @Mahendra suggested, else you can simply use ${absolute.path}/${flowfilename}.
Update
I just realized that absolute.path is a core attribute like filename, filesize, mime.type, etc. Some processors use all the core attributes while some use very few which are needed. GenerateTableFetch writes absolute.path but doesn't set anything for it. That's why it has ./ which is the default value.
So my suggestion for your approach to work is, you can manually set/overwrite absolute.path attribute using UpdateAttribute (just like you have overwritten filename) and set the desired value which is /path/in/nifi/node/to/file
这篇关于Flowfile 绝对路径 Nifi的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!
如何有效地使用窗口函数根据 N 个先前值来决定How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函数根据
在“GROUP BY"中重用选择表达式的结果;条款reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用选择表达式的结果;条款?)
Pyspark DataFrameWriter jdbc 函数的 ignore 选项是忽略整Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函数的 ig
使用 INSERT INTO table ON DUPLICATE KEY 时出错,使用 Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 时出错,使用 for 循环数组
pyspark mysql jdbc load 调用 o23.load 时发生错误 没有合pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 调用 o23.load 时发生错误 没有合适的
如何将 Apache Spark 与 MySQL 集成以将数据库表作为How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何将 Apache Spark 与 MySQL 集成以将数据库表作为