<legend id='Cb3h0'><style id='Cb3h0'><dir id='Cb3h0'><q id='Cb3h0'></q></dir></style></legend>
    1. <small id='Cb3h0'></small><noframes id='Cb3h0'>

    2. <tfoot id='Cb3h0'></tfoot>

        • <bdo id='Cb3h0'></bdo><ul id='Cb3h0'></ul>

        <i id='Cb3h0'><tr id='Cb3h0'><dt id='Cb3h0'><q id='Cb3h0'><span id='Cb3h0'><b id='Cb3h0'><form id='Cb3h0'><ins id='Cb3h0'></ins><ul id='Cb3h0'></ul><sub id='Cb3h0'></sub></form><legend id='Cb3h0'></legend><bdo id='Cb3h0'><pre id='Cb3h0'><center id='Cb3h0'></center></pre></bdo></b><th id='Cb3h0'></th></span></q></dt></tr></i><div id='Cb3h0'><tfoot id='Cb3h0'></tfoot><dl id='Cb3h0'><fieldset id='Cb3h0'></fieldset></dl></div>

      1. Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录

        时间:2023-08-21
          <tbody id='GR7DV'></tbody>

        <tfoot id='GR7DV'></tfoot>

            <small id='GR7DV'></small><noframes id='GR7DV'>

            • <i id='GR7DV'><tr id='GR7DV'><dt id='GR7DV'><q id='GR7DV'><span id='GR7DV'><b id='GR7DV'><form id='GR7DV'><ins id='GR7DV'></ins><ul id='GR7DV'></ul><sub id='GR7DV'></sub></form><legend id='GR7DV'></legend><bdo id='GR7DV'><pre id='GR7DV'><center id='GR7DV'></center></pre></bdo></b><th id='GR7DV'></th></span></q></dt></tr></i><div id='GR7DV'><tfoot id='GR7DV'></tfoot><dl id='GR7DV'><fieldset id='GR7DV'></fieldset></dl></div>
                <bdo id='GR7DV'></bdo><ul id='GR7DV'></ul>
                <legend id='GR7DV'><style id='GR7DV'><dir id='GR7DV'><q id='GR7DV'></q></dir></style></legend>
                1. 本文介绍了Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

                  问题描述

                  我必须将记录从 Aurora/Mysql 发送到 MSK,然后再从那里发送到 Elastic 搜索服务

                  Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->弹性搜索

                  极光表结构中的记录是这样的
                  我认为记录将以这种格式发送到 AWS MSK.

                  "o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?0xml version=""1"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-38d179749-setwr23424","","","",,"","",""

                  因此,为了通过弹性搜索使用,我需要使用正确的架构,因此我必须使用架构注册表.

                  我的问题

                  问题 1

                  对于需要上述类型的消息架构注册表,我应该如何使用架构注册表?.我是否必须为此创建 JSON 结构,如果是,我将其保留在哪里.这里需要更多帮助才能理解这一点?

                  我已经编辑了

                  vim/usr/local/confluent/etc/schema-registry/schema-registry.properties

                  提到了zookeper,但我没有提到什么是kafkastore.topic=_schema如何将其链接到自定义架构.

                  即使我开始并收到此错误

                  Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas 在 60000 ms 后不存在于元数据中.

                  这是我所期待的,因为我没有对架构做任何事情.

                  我确实安装了 jdbc 连接器,当我启动时出现以下错误

                  无效值 java.sql.SQLException:找不到适合 jdbc 的驱动程序:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration 无法打开与 jdbc 的连接:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=欢迎123无效值 java.sql.SQLException:找不到适合 jdbc 的驱动程序:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123用于配置无法打开与 jdbc 的连接:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123您还可以在端点 `/{connectorType}/config/validate` 找到上面的错误列表

                  问题 2我可以在一个 ec2 上创建两个连接器吗(jdbc 和弹性 serach 一个).如果是,我是否必须在 sepearte cli 中同时启动?

                  问题 3当我打开 vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties我只看到像下面这样的属性值

                  name=test-source-sqlite-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnector任务.max=1connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123模式=递增递增.column.name=idtopic.prefix=trf-aurora-fspaudit-

                  在上面的属性文件中,我可以提到架构名称和表名称吗?

                  根据答案,我正在更新我的 Kafka 连接 JDBC 配置

                  --------------启动JDBC连接弹性搜索------------------------------

                  wget/usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/须藤 mv ~/Downloads/confluent-5.2.0/usr/local/confluentwget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gztar -xzf mysql-connector-java-5.1.48.tar.gz须藤 mv mysql-connector-java-5.1.48 mv/usr/local/confluent/share/java/kafka-connect-jdbc

                  然后

                  vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

                  然后我修改了下面的属性

                  connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf模式=递增connection.user=adminconnection.password=Welcome123table.whitelist=PANStatementInstanceLogschema.pattern=dbo

                  最后我修改了

                  vim/usr/local/confluent/etc/kafka/connect-standalone.properties

                  在这里我修改了以下属性

                  bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-East-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092key.converter.schemas.enable=truevalue.converter.schemas.enable=trueoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000plugin.path=/usr/local/confluent/share/java

                  当我列出主题时,我没有看到任何为表名列出的主题.

                  错误信息的堆栈跟踪

                  [2020-01-03 07:40:57,169] 错误未能为/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache) 创建作业.kafka.connect.cli.ConnectStandalone:108)[2020-01-03 07:40:57,169] 连接器错误后停止错误 (org.apache.kafka.connect.cli.ConnectStandalone:119)java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 连接器配置无效并包含以下 2 个错误:无效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信链接失败最后一个成功发送到服务器的数据包是 0 毫秒前.驱动程序没有收到来自服务器的任何数据包.用于配置无法打开与 jdbc 的连接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf无效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信链接失败最后一个成功发送到服务器的数据包是 0 毫秒前.驱动程序没有收到来自服务器的任何数据包.用于配置无法打开与 jdbc 的连接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf您还可以在端点 `/{connectorType}/config/validate` 找到上面的错误列表在 org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)在 org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)引起:org.apache.kafka.connect.runtime.rest.errors.BadRequestException:连接器配置无效并包含以下2个错误:无效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信链接失败最后一个成功发送到服务器的数据包是 0 毫秒前.驱动程序没有收到来自服务器的任何数据包.用于配置无法打开与 jdbc 的连接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf无效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信链接失败最后一个成功发送到服务器的数据包是 0 毫秒前.驱动程序没有收到来自服务器的任何数据包.用于配置无法打开与 jdbc 的连接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf您还可以在端点 `/{connectorType}/config/validate` 找到上面的错误列表在 org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)在 org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/-d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'

                  解决方案

                  是否需要架构注册表?

                  没有.您可以在 json 记录中启用模式.JDBC 源可以根据表信息为您创建

                  value.converter=org.apache.kafka...JsonConvertervalue.converter.schemas.enable=true

                  <块引用>

                  提到了zookeper,但我不知道什么是kafkastore.topic=_schema

                  如果你想使用 Schema Registry,你应该使用 kafkastore.bootstrap.servers.with Kafka 地址,而不是 Zookeeper.所以删除 kafkastore.connection.url

                  请阅读文档 所有属性的解释

                  <块引用>

                  我没有对架构做任何事情.

                  没关系.模式主题在注册表第一次启动时被创建

                  <块引用>

                  我可以在一个 ec2 上创建两个连接器吗

                  是(忽略可用的 JVM 堆空间).同样,这在 Kafka Connect 文档中有详细说明.

                  使用独立模式,您首先传递连接工作器配置,然后在一个命令中最多传递 N 个连接器属性

                  使用分布式模式,您使用 Kafka Connect REST API

                  https://docs.confluent.io/current/connect/managing/configuring.html

                  <块引用>

                  当我打开 vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

                  首先,这是针对 Sqlite,而不是针对 Mysql/Postgres.您不需要使用快速入门文件,它们仅供参考

                  同样,所有属性都有详细记录

                  https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

                  <块引用>

                  我确实安装了 jdbc 连接器,当我启动时出现以下错误

                  这里有更多关于如何调试的信息

                  https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/

                  <小时>

                  如前所述,我个人建议尽可能使用 Debezium/CDC

                  用于 RDS Aurora 的 Debezium 连接器

                  I have to send records from Aurora/Mysql to MSK and from there to Elastic search service

                  Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->Elastic search

                  The record in Aurora table structure is something like this
                  I think record will go to AWS MSK in this format.

                  "o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""
                  

                  So in order to consume by elastic search i need to use proper schema so schema registry i have to use.

                  My question

                  Question 1

                  How should i use schema registry for above type of message schema registry is required ?. Do i have to create JSON structure for this and if yes where i have keep that. More help required here to understand this ?

                  I have edited

                  vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
                  

                  Mentioned zookeper but i did not what is kafkastore.topic=_schema How to link this to custom schema .

                  Even i started and got this error

                  Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.
                  

                  Which i was expecting because i did not do anything about schema .

                  I do have jdbc connector installed and when i start i get below error

                  Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
                  Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
                  You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
                  

                  Question 2 Can i create two onnector on one ec2 (jdbc and elastic serach one ).If yes do i have to start both in sepearte cli ?

                  Question 3 When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties I see only propeties value like below

                  name=test-source-sqlite-jdbc-autoincrement
                  connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
                  tasks.max=1
                  connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
                  mode=incrementing
                  incrementing.column.name=id
                  topic.prefix=trf-aurora-fspaudit-
                  

                  In the above properties file where i can mention schema name and table name?

                  Based on answer i am updating my configuration for Kafka connect JDBC

                  ---------------start JDBC connect elastic search -----------------------------

                  wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
                  tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
                  sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
                  
                  wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
                  tar -xzf  mysql-connector-java-5.1.48.tar.gz
                  sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc
                  

                  And then

                  vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
                  

                  Then i modified below properties

                  connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
                  mode=incrementing
                  connection.user=admin
                  connection.password=Welcome123
                  table.whitelist=PANStatementInstanceLog
                  schema.pattern=dbo
                  

                  Last i modified

                  vim /usr/local/confluent/etc/kafka/connect-standalone.properties
                  

                  and here i modified below properties

                  bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
                  key.converter.schemas.enable=true
                  value.converter.schemas.enable=true
                  offset.storage.file.filename=/tmp/connect.offsets
                  offset.flush.interval.ms=10000
                  plugin.path=/usr/local/confluent/share/java
                  

                  When i list topic i do not see any topic listed for table name .

                  Stack trace for the error message

                  [2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
                  [2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
                  java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
                  Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
                  
                  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
                  Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
                  
                  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
                  You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
                          at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
                          at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
                          at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
                  Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
                  Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
                  
                  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
                  Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
                  
                  The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
                  You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
                          at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
                          at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
                          at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
                  
                          curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
                  

                  解决方案

                  schema registry is required ?

                  No. You can enable schemas in json records. JDBC source can create them for you based on the table information

                  value.converter=org.apache.kafka...JsonConverter 
                  value.converter.schemas.enable=true
                  

                  Mentioned zookeper but i did not what is kafkastore.topic=_schema

                  If you want to use Schema Registry, you should be using kafkastore.bootstrap.servers.with the Kafka address, not Zookeeper. So remove kafkastore.connection.url

                  Please read the docs for explanations of all properties

                  i did not do anything about schema .

                  Doesn't matter. The schemas topic gets created when the Registry first starts

                  Can i create two onnector on one ec2

                  Yes (ignoring available JVM heap space). Again, this is detailed in the Kafka Connect documentation.

                  Using standalone mode, you first pass the connect worker configuration, then up to N connector properties in one command

                  Using distributed mode, you use the Kafka Connect REST API

                  https://docs.confluent.io/current/connect/managing/configuring.html

                  When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

                  First of all, that's for Sqlite, not Mysql/Postgres. You don't need to use the quickstart files, they are only there for reference

                  Again, all properties are well documented

                  https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

                  I do have jdbc connector installed and when i start i get below error

                  Here's more information about how you can debug that

                  https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/


                  As stated before, I would personally suggest using Debezium/CDC where possible

                  Debezium Connector for RDS Aurora

                  这篇关于Kafka 连接设置以使用 AWS MSK 从 Aurora 发送记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!

                  上一篇:Oracle 使用 Apache kafka 复制数据 下一篇:使用带有 Helm 安装的 Kafka/Confluent 的连接器

                  相关文章

                  最新文章

                  <i id='ZriVq'><tr id='ZriVq'><dt id='ZriVq'><q id='ZriVq'><span id='ZriVq'><b id='ZriVq'><form id='ZriVq'><ins id='ZriVq'></ins><ul id='ZriVq'></ul><sub id='ZriVq'></sub></form><legend id='ZriVq'></legend><bdo id='ZriVq'><pre id='ZriVq'><center id='ZriVq'></center></pre></bdo></b><th id='ZriVq'></th></span></q></dt></tr></i><div id='ZriVq'><tfoot id='ZriVq'></tfoot><dl id='ZriVq'><fieldset id='ZriVq'></fieldset></dl></div>

                2. <legend id='ZriVq'><style id='ZriVq'><dir id='ZriVq'><q id='ZriVq'></q></dir></style></legend>

                  1. <small id='ZriVq'></small><noframes id='ZriVq'>

                  2. <tfoot id='ZriVq'></tfoot>
                    • <bdo id='ZriVq'></bdo><ul id='ZriVq'></ul>