龙空技术网

用Java编写Logstash输出到MySQL的插件

SapphireCoder 38

前言:

目前看官们对“java批量执行sql”大约比较着重,兄弟们都需要知道一些“java批量执行sql”的相关文章。那么小编同时在网络上搜集了一些关于“java批量执行sql””的相关知识,希望同学们能喜欢,同学们快快来学习一下吧!

导读:基于上篇文章 Logstash Java Output 插件构建与使用 ,我们可以实现将官方提供的 logstash-output-java_output_example 项目在本地构建、测试和打包使用。本文将基于此 Demo 之上讨论如实现一个输出到 MySQL 的 output插件,本例子只是一个简单的例子,并未达到生产可用。目的是想通过该例子让各位熟悉如何用 Java 编写 Logstash 插件及分享我编写和设计插件的思路供各位参考。

// logstash-outup-java-mysql-plugin 插件 github 地址
项目结构及核心文件

1、主要项目结构可划分为三块

核心处理逻辑类测试类Gradle 依赖管理

2、核心文件 JavaOutputExample.class

// 官方文档 How to write a Java output plugin

在官方文档中已详细描述 JavaOutputExample 这个类每一部分的作用和含义,大致为以下几部分

构造方法:插件必须提供一个可收 id,、Configuration、Context 的构造方法,初始化参数PluginConfigSpec:允许开发人员通过设置名称、数据类型、弃用状态、必需状态和默认值来指定该插件支持的setting设置。Output 方法:编写核心输出逻辑,们将在该方法中具体编写输出到 MySQL 的逻辑Stop and await Stop编写和设计的思路

我在编写该插件时参考了 Github 上一个采用 Ruby 语言编写的 logstash-output-jdbc 项目,该项目这个插件允许您使用 JDBC 适配器输出到 SQL 数据库(支持多种数据库)。关于如何使用该插件可以参考Logstash实时读取Nginx日志并存储到MySQL思路 。

// logstash-output-jdbc GitHub

在该项目中其使用了 Hikari 作为数据源,并引入各种数据库对应的 JDBC 驱动。通过安装该插件后我们在 logstash.conf 配置以下代码即可实现输入到 mysql 数据库中。

output {   jdbc {      driver_jar_path => "/usr/local/logstash/jdbc/mysql-connector-java-8.0.19.jar"      driver_class => "com.mysql.jdbc.Driver"      #数据库连接      connection_string => "jdbc:mysql://xxxxxxxxx.ads.aliyuncs.com:3306/数据库名?user=xxxxx&password=xxxxxx"      #insert语句      statement => [ "insert into v2_ods_media_show_log (`idfa`) value(?)",             "idfa"]   }}

参考了该项目,我尝试使用 Java 语言编写输出到 MySQL 的插件,核心设计点如下:

可配置参数SQL 简化批处理重试机制

先附上修改后的 JavaOutputExample 类实现代码(省略部分非核心代码)

@LogstashPlugin(name = "java_output_example")public class JavaOutputExample implements Output {    private final String id;    private final Integer flushSize;    private final Integer retryMaxCount;    private final Integer retryIntervalTime;    private final CountDownLatch done = new CountDownLatch(1);    private volatile boolean stopped = false;    private PreparedStatement pstm = null;    private String statement;    private List<String> fields;    public static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss";    /**     * all plugins must provide a constructor that accepts id, Configuration, and Context     */    public JavaOutputExample(final String id, final Configuration config, final Context context) {        this.id = id;        this.flushSize = Integer.parseInt(config.get(PluginConfigParams.FLUSH_SIZE));        this.retryMaxCount = Integer.parseInt(config.get(PluginConfigParams.RETRY_MAX_COUNT));        this.retryIntervalTime = Integer.parseInt(config.get(PluginConfigParams.RETRY_INTERVAL_TIME));        this.statement = config.get(PluginConfigParams.STATEMENT);        // init HikariPool        HikariPool.setUpPool(config);    }    @Override    public void output(final Collection<Event> events) {        // divide and rule        if (events.size() > 0) {            List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);            for (List<Event> list : lists) {                retryingSubmit(list);            }        }    }    @Override    public Collection<PluginConfigSpec<?>> configSchema() {        // should return a list of all configuration options for this plugin        return Arrays.asList(                PluginConfigParams.DRIVER_CLASS,                PluginConfigParams.CONNECTION_STRING,                PluginConfigParams.STATEMENT,                PluginConfigParams.CONNECTION_TIMEOUT,                PluginConfigParams.MAX_POOL_SIZE,                PluginConfigParams.FLUSH_SIZE);    }    /**     * deal Statement     * 1、Find all {value} in the statement, and extract the fields in {} and put them in the list     * 2、replace {value} to ?     *     * @param statement     * @return     */    private DealedStatement dealStatement(String statement) {        List<String> fields = new ArrayList<>();        Pattern pattern = Pattern.compile("\\{[\\w]*\\}");        Matcher matcher = pattern.matcher(statement);        while (matcher.find()) {            String e = matcher.group();            String substring = e.substring(1, e.length() - 1);            fields.add(substring);            statement = statement.replace(e, "?");        }        DealedStatement ds = new DealedStatement(statement, fields);        return ds;    }    private void paramReplace(PreparedStatement prepStatement, Object param, int i) throws SQLException {        // PreparedStatement's index is starting with 1        if (param instanceof Integer) {            int value = ((Integer) param).intValue();            prepStatement.setInt(i + 1, value);        } else if (param instanceof String) {            String s = (String) param;            prepStatement.setString(i + 1, s);        } else if (param instanceof Double) {            double d = ((Double) param).doubleValue();            prepStatement.setDouble(i + 1, d);        } else if (param instanceof Float) {            float f = ((Float) param).floatValue();            prepStatement.setFloat(i + 1, f);        } else if (param instanceof Long) {            long l = ((Long) param).longValue();            prepStatement.setLong(i + 1, l);        } else if (param instanceof Boolean) {            boolean b = ((Boolean) param).booleanValue();            prepStatement.setBoolean(i + 1, b);        } else if (param instanceof org.logstash.Timestamp) {            // use joda            prepStatement.setString(i + 1, ((Timestamp) param).getTime().toString(STANDARD_FORMAT));        }    }    /**     * Retry submit     */    private void retryingSubmit(List<Event> events) {        Integer retryCount = 0;        while (retryCount < retryMaxCount) {            if (submit(events)) {                break;            } else {                retryCount++;                System.err.println("Execution exception,retry count: " + retryCount);                try {                    TimeUnit.MILLISECONDS.sleep(retryIntervalTime);                } catch (InterruptedException e) {                    e.printStackTrace();                    break;                }            }        }    }    /**     * Submit     */    private boolean submit(List<Event> events) {        Connection conn = null;        try {            conn = HikariPool.getConnection();            // deal prepareStatement            DealedStatement ds = dealStatement(statement);            fields = ds.getFields();            pstm = conn.prepareStatement(ds.getPreparedStatement());            for (Event event : events) {                // param replace                for (int i = 0; i < fields.size(); i++) {                    paramReplace(pstm, event.getField(fields.get(i)), i);                }                pstm.addBatch();            }            // Batch processing            pstm.executeBatch();            return true;        } catch (Exception e) {            e.printStackTrace();            return false;        } finally {            // close            if (pstm != null) {                try {                    pstm.close();                } catch (Exception e) {                    e.printStackTrace();                }            }            if (conn != null) {                try {                    conn.close();                } catch (Exception e) {                    e.printStackTrace();                }            }        }    }}

1、SQL 简化设计

在 logstash-output-jdbc 项目中其设计的思路为通过 statement 参数接收一个数组,数组的第一个位置放置一条预编译 SQL,之后每一个数组位按顺序对应该 SQL 的 "?"。

statement => [ "insert into v2_ods_media_show_log (`idfa`) value(?)",             "idfa"]

不过上面写法似乎有些过于繁琐,我基于 MyBatis SQL 使用 #{param} 语法灵感,因为 logstash中 # 为注释,所以我设计为使用 {param} 替换参数,之后 Statement 可简化如下。

statement => "INSERT INTO datahub_test.v2_dim_agent_id(                agent_id,agent_name,platform,insert_time,update_time)                value({agent_id},{agent_name},{platform},DEFAULT,DEFAULT)"

实现的逻辑则是将语句中 {param} 的参数找出来,并将它们的param值存到一个列表中,并将所有{param} 都替换为 ?,行成一条预编译语句。具体实现细节在 dealStatement() 方法中。

2、批处理

在 logstash-output-jdbc 项目中是为每条 INSERT 语句创建一个 Connect 并逐条执行,这样效率较低,所以我采用了批处理来提高效率。

// 将多条语句加入到同一个 Batchpstm.addBatch();pstm.executeBatch();

注意:

MySQL 执行 executeBatch 需要在 URL 加上 rewriteBatchedStatements=true

MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。

MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。另外这个选项对INSERT/UPDATE/DELETE都有效

这里提一下output() 方法里的切块设计,通过 Slicer.fixedGrouping 配置将传入来的数据进行切块,如传入 1W 条,可设置切块大小为 2000.则切成 5块进行批处理,也是采用了分而治之的思想。

3、重试机制

重试机制设计比较简单通过 retryMaxCount 最大重试次数和 retryIntervalTime 每次重试间隔(毫秒) 来实现,在output 方法中调用retryingSubmit,代码如下:

public void output(final Collection<Event> events) {        // 切块        if (events.size() > 0) {            List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize);            for (List<Event> list : lists) {                // 重试                retryingSubmit(list);            }        }    }// 重试 private void retryingSubmit(List<Event> events) {        Integer retryCount = 0;        while (retryCount < retryMaxCount) {            if (submit(events)) {                break;            } else {                retryCount++;                System.err.println("Execution exception,retry count: " + retryCount);                try {                    TimeUnit.MILLISECONDS.sleep(retryIntervalTime);                } catch (InterruptedException e) {                    e.printStackTrace();                    break;                }            }        }    }

4、可配置参数

本插件只提供了部分基础的可配置参数

最后

以上就是用 Java 编写 Logstash 输出到 MySQL 插件的简单例子,本例子意在提供一个设计思路和帮助各位熟悉如何采用 Java 编写 logstash 插件。该插件并未应用于生产之上,只是作为学习的 Demo,设计上还有诸多可提升的地方,各位如果有好的建议欢迎提出讨论,共同进步。

感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

标签: #java批量执行sql