1. <i id='FtuK0'><tr id='FtuK0'><dt id='FtuK0'><q id='FtuK0'><span id='FtuK0'><b id='FtuK0'><form id='FtuK0'><ins id='FtuK0'></ins><ul id='FtuK0'></ul><sub id='FtuK0'></sub></form><legend id='FtuK0'></legend><bdo id='FtuK0'><pre id='FtuK0'><center id='FtuK0'></center></pre></bdo></b><th id='FtuK0'></th></span></q></dt></tr></i><div id='FtuK0'><tfoot id='FtuK0'></tfoot><dl id='FtuK0'><fieldset id='FtuK0'></fieldset></dl></div>
    2. <small id='FtuK0'></small><noframes id='FtuK0'>

    3. <tfoot id='FtuK0'></tfoot>

        <legend id='FtuK0'><style id='FtuK0'><dir id='FtuK0'><q id='FtuK0'></q></dir></style></legend>
          <bdo id='FtuK0'></bdo><ul id='FtuK0'></ul>

        一种从Mysql读取表数据到Pig的方法

        时间:2023-08-22
        <tfoot id='mfPUc'></tfoot>
              <legend id='mfPUc'><style id='mfPUc'><dir id='mfPUc'><q id='mfPUc'></q></dir></style></legend>
              <i id='mfPUc'><tr id='mfPUc'><dt id='mfPUc'><q id='mfPUc'><span id='mfPUc'><b id='mfPUc'><form id='mfPUc'><ins id='mfPUc'></ins><ul id='mfPUc'></ul><sub id='mfPUc'></sub></form><legend id='mfPUc'></legend><bdo id='mfPUc'><pre id='mfPUc'><center id='mfPUc'></center></pre></bdo></b><th id='mfPUc'></th></span></q></dt></tr></i><div id='mfPUc'><tfoot id='mfPUc'></tfoot><dl id='mfPUc'><fieldset id='mfPUc'></fieldset></dl></div>

                <tbody id='mfPUc'></tbody>

                  <bdo id='mfPUc'></bdo><ul id='mfPUc'></ul>

                  <small id='mfPUc'></small><noframes id='mfPUc'>

                  本文介绍了一种从Mysql读取表数据到Pig的方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

                  问题描述

                  大家都知道Pig已经支持DBStorage,但它们只支持从 Pig 到 mysql 这样的加载结果

                  Everyone know that Pig have supported DBStorage, but they are only supported load results from Pig to mysql like that

                  STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...');
                  

                  但是请告诉我像那样从 mysql 读取表的方法

                  But Please show me the way to read table from mysql like that

                  data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table');
                  

                  这是我的代码

                  public class DBLoader extends LoadFunc {
                      private final Log log = LogFactory.getLog(getClass());
                      private ArrayList mProtoTuple = null;
                      private Connection con;
                      private String jdbcURL;
                      private String user;
                      private String pass;
                      private int batchSize;
                      private int count = 0;
                      private String query;
                      ResultSet result;
                      protected TupleFactory mTupleFactory = TupleFactory.getInstance();
                  
                      public DBLoader() {
                      }
                  
                      public DBLoader(String driver, String jdbcURL, String user, String pass,
                              String query) {
                  
                          try {
                              Class.forName(driver);
                          } catch (ClassNotFoundException e) {
                              log.error("can't load DB driver:" + driver, e);
                              throw new RuntimeException("Can't load DB Driver", e);
                          }
                          this.jdbcURL = jdbcURL;
                          this.user = user;
                          this.pass = pass;
                          this.query = query;
                  
                      }
                  
                      @Override
                      public InputFormat getInputFormat() throws IOException {
                          // TODO Auto-generated method stub
                          return new TextInputFormat();
                      }
                  
                      @Override
                      public Tuple getNext() throws IOException {
                          // TODO Auto-generated method stub
                          boolean next = false;
                  
                          try {
                              next = result.next();
                          } catch (SQLException e) {
                              // TODO Auto-generated catch block
                              e.printStackTrace();
                          }
                  
                          if (!next)
                              return null;
                          int numColumns = 0;
                          // Get result set meta data
                          ResultSetMetaData rsmd;
                          try {
                              rsmd = result.getMetaData();
                              numColumns = rsmd.getColumnCount();
                          } catch (SQLException e) {
                              // TODO Auto-generated catch block
                              e.printStackTrace();
                          }
                  
                          for (int i = 0; i < numColumns; i++) {
                  
                              try {
                                  Object field = result.getObject(i);
                  
                                  switch (DataType.findType(field)) {
                                  case DataType.NULL:
                  
                                      mProtoTuple.add(null);
                  
                                      break;
                  
                                  case DataType.BOOLEAN:
                                      mProtoTuple.add((Boolean) field);
                  
                                      break;
                  
                                  case DataType.INTEGER:
                                      mProtoTuple.add((Integer) field);
                  
                                      break;
                  
                                  case DataType.LONG:
                                      mProtoTuple.add((Long) field);
                  
                                      break;
                  
                                  case DataType.FLOAT:
                                      mProtoTuple.add((Float) field);
                  
                                      break;
                  
                                  case DataType.DOUBLE:
                                      mProtoTuple.add((Double) field);
                  
                                      break;
                  
                                  case DataType.BYTEARRAY:
                                      byte[] b = ((DataByteArray) field).get();
                                      mProtoTuple.add(b);
                  
                                      break;
                                  case DataType.CHARARRAY:
                                      mProtoTuple.add((String) field);
                  
                                      break;
                                  case DataType.BYTE:
                                      mProtoTuple.add((Byte) field);
                  
                                      break;
                  
                                  case DataType.MAP:
                                  case DataType.TUPLE:
                                  case DataType.BAG:
                                      throw new RuntimeException("Cannot store a non-flat tuple "
                                              + "using DbStorage");
                  
                                  default:
                                      throw new RuntimeException("Unknown datatype "
                                              + DataType.findType(field));
                  
                                  }
                  
                              } catch (Exception ee) {
                                  throw new RuntimeException(ee);
                              }
                          }
                  
                          Tuple t = mTupleFactory.newTuple(mProtoTuple);
                          mProtoTuple.clear();
                          return t;
                  
                      }
                  
                      @Override
                      public void prepareToRead(RecordReader arg0, PigSplit arg1)
                              throws IOException {
                  
                          con = null;
                          if (query == null) {
                              throw new IOException("SQL Insert command not specified");
                          }
                          try {
                              if (user == null || pass == null) {
                                  con = DriverManager.getConnection(jdbcURL);
                              } else {
                                  con = DriverManager.getConnection(jdbcURL, user, pass);
                              }
                              con.setAutoCommit(false);
                              result = con.createStatement().executeQuery(query);
                          } catch (SQLException e) {
                              log.error("Unable to connect to JDBC @" + jdbcURL);
                              throw new IOException("JDBC Error", e);
                          }
                          count = 0;
                      }
                  
                      @Override
                      public void setLocation(String location, Job job) throws IOException {
                          // TODO Auto-generated method stub
                  
                          //TextInputFormat.setInputPaths(job, location);
                  
                      }
                  
                      class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{
                  
                          @Override
                          public RecordReader<NullWritable, NullWritable> createRecordReader(
                                  InputSplit arg0, TaskAttemptContext arg1) throws IOException,
                                  InterruptedException {
                              // TODO Auto-generated method stub
                              return null;
                          }
                  
                          @Override
                          public List<InputSplit> getSplits(JobContext arg0) throws IOException,
                                  InterruptedException {
                              // TODO Auto-generated method stub
                              return null;
                          }
                  
                      }
                  
                  }
                  

                  我尝试了很多次写 UDF 但都没有成功.....

                  I try many times to write UDF but not success.....

                  推荐答案

                  就像你说的,DBStorage 只支持将结果保存到数据库中.

                  Like you say, DBStorage only supports saving results to a database.

                  要从 MySQL 加载数据,您可以查看名为 sqoop(将数据从数据库复制到HDFS),或者您可以执行 mysql 转储,然后将文件复制到 HDFS.两种方式都需要一些交互,不能直接从 Pig 内部使用.

                  To load data from MySQL you could look into a project called sqoop (that copies data from a database to HDFS), or you could perform a mysql dump and then copy the file into HDFS. Both ways required some interaction and cannot be directly used from inside Pig.

                  第三种选择是考虑编写 Pig LoadFunc(您说您尝试编写 UDF).这应该不会太难,您需要传递与 DBStorage 相同的选项(驱动程序、连接凭据和要执行的 SQL 查询),并且您可能也可以使用一些结果集元数据检查来自动生成模式.

                  A third option would be to look into writing a Pig LoadFunc (you say your tried to write a UDF). It shouldn't be too difficult, you'll need to pass much the same options as DBStorage (driver, connection credentials and a SQL query to execute), and you can probably use some result set metadata inspection to auto generate a schema too.

                  这篇关于一种从Mysql读取表数据到Pig的方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!

                  上一篇:Flowfile 绝对路径 Nifi 下一篇:猪中的组串联等价物?

                  相关文章

                  最新文章

                    <bdo id='dzORJ'></bdo><ul id='dzORJ'></ul>
                  <legend id='dzORJ'><style id='dzORJ'><dir id='dzORJ'><q id='dzORJ'></q></dir></style></legend>

                  <small id='dzORJ'></small><noframes id='dzORJ'>

                  <i id='dzORJ'><tr id='dzORJ'><dt id='dzORJ'><q id='dzORJ'><span id='dzORJ'><b id='dzORJ'><form id='dzORJ'><ins id='dzORJ'></ins><ul id='dzORJ'></ul><sub id='dzORJ'></sub></form><legend id='dzORJ'></legend><bdo id='dzORJ'><pre id='dzORJ'><center id='dzORJ'></center></pre></bdo></b><th id='dzORJ'></th></span></q></dt></tr></i><div id='dzORJ'><tfoot id='dzORJ'></tfoot><dl id='dzORJ'><fieldset id='dzORJ'></fieldset></dl></div>

                  1. <tfoot id='dzORJ'></tfoot>