前言:
目前看官们对“java批量执行sql”大约比较着重,兄弟们都需要知道一些“java批量执行sql”的相关文章。那么小编同时在网络上搜集了一些关于“java批量执行sql””的相关知识,希望同学们能喜欢,同学们快快来学习一下吧!导读:基于上篇文章 Logstash Java Output 插件构建与使用 ,我们可以实现将官方提供的 logstash-output-java_output_example 项目在本地构建、测试和打包使用。本文将基于此 Demo 之上讨论如实现一个输出到 MySQL 的 output插件,本例子只是一个简单的例子,并未达到生产可用。目的是想通过该例子让各位熟悉如何用 Java 编写 Logstash 插件及分享我编写和设计插件的思路供各位参考。
// logstash-outup-java-mysql-plugin 插件 github 地址项目结构及核心文件
1、主要项目结构可划分为三块
核心处理逻辑类测试类Gradle 依赖管理
2、核心文件 JavaOutputExample.class
// 官方文档 How to write a Java output plugin
在官方文档中已详细描述 JavaOutputExample 这个类每一部分的作用和含义,大致为以下几部分
构造方法:插件必须提供一个可收 id,、Configuration、Context 的构造方法,初始化参数PluginConfigSpec:允许开发人员通过设置名称、数据类型、弃用状态、必需状态和默认值来指定该插件支持的setting设置。Output 方法:编写核心输出逻辑,们将在该方法中具体编写输出到 MySQL 的逻辑Stop and await Stop编写和设计的思路
我在编写该插件时参考了 Github 上一个采用 Ruby 语言编写的 logstash-output-jdbc 项目,该项目这个插件允许您使用 JDBC 适配器输出到 SQL 数据库(支持多种数据库)。关于如何使用该插件可以参考Logstash实时读取Nginx日志并存储到MySQL思路 。
// logstash-output-jdbc GitHub
在该项目中其使用了 Hikari 作为数据源,并引入各种数据库对应的 JDBC 驱动。通过安装该插件后我们在 logstash.conf 配置以下代码即可实现输入到 mysql 数据库中。
output { jdbc { driver_jar_path => "/usr/local/logstash/jdbc/mysql-connector-java-8.0.19.jar" driver_class => "com.mysql.jdbc.Driver" #数据库连接 connection_string => "jdbc:mysql://xxxxxxxxx.ads.aliyuncs.com:3306/数据库名?user=xxxxx&password=xxxxxx" #insert语句 statement => [ "insert into v2_ods_media_show_log (`idfa`) value(?)", "idfa"] }}
参考了该项目,我尝试使用 Java 语言编写输出到 MySQL 的插件,核心设计点如下:
可配置参数SQL 简化批处理重试机制
先附上修改后的 JavaOutputExample 类实现代码(省略部分非核心代码)
@LogstashPlugin(name = "java_output_example")public class JavaOutputExample implements Output { private final String id; private final Integer flushSize; private final Integer retryMaxCount; private final Integer retryIntervalTime; private final CountDownLatch done = new CountDownLatch(1); private volatile boolean stopped = false; private PreparedStatement pstm = null; private String statement; private List<String> fields; public static final String STANDARD_FORMAT = "yyyy-MM-dd HH:mm:ss"; /** * all plugins must provide a constructor that accepts id, Configuration, and Context */ public JavaOutputExample(final String id, final Configuration config, final Context context) { this.id = id; this.flushSize = Integer.parseInt(config.get(PluginConfigParams.FLUSH_SIZE)); this.retryMaxCount = Integer.parseInt(config.get(PluginConfigParams.RETRY_MAX_COUNT)); this.retryIntervalTime = Integer.parseInt(config.get(PluginConfigParams.RETRY_INTERVAL_TIME)); this.statement = config.get(PluginConfigParams.STATEMENT); // init HikariPool HikariPool.setUpPool(config); } @Override public void output(final Collection<Event> events) { // divide and rule if (events.size() > 0) { List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize); for (List<Event> list : lists) { retryingSubmit(list); } } } @Override public Collection<PluginConfigSpec<?>> configSchema() { // should return a list of all configuration options for this plugin return Arrays.asList( PluginConfigParams.DRIVER_CLASS, PluginConfigParams.CONNECTION_STRING, PluginConfigParams.STATEMENT, PluginConfigParams.CONNECTION_TIMEOUT, PluginConfigParams.MAX_POOL_SIZE, PluginConfigParams.FLUSH_SIZE); } /** * deal Statement * 1、Find all {value} in the statement, and extract the fields in {} and put them in the list * 2、replace {value} to ? * * @param statement * @return */ private DealedStatement dealStatement(String statement) { List<String> fields = new ArrayList<>(); Pattern pattern = Pattern.compile("\\{[\\w]*\\}"); Matcher matcher = pattern.matcher(statement); while (matcher.find()) { String e = matcher.group(); String substring = e.substring(1, e.length() - 1); fields.add(substring); statement = statement.replace(e, "?"); } DealedStatement ds = new DealedStatement(statement, fields); return ds; } private void paramReplace(PreparedStatement prepStatement, Object param, int i) throws SQLException { // PreparedStatement's index is starting with 1 if (param instanceof Integer) { int value = ((Integer) param).intValue(); prepStatement.setInt(i + 1, value); } else if (param instanceof String) { String s = (String) param; prepStatement.setString(i + 1, s); } else if (param instanceof Double) { double d = ((Double) param).doubleValue(); prepStatement.setDouble(i + 1, d); } else if (param instanceof Float) { float f = ((Float) param).floatValue(); prepStatement.setFloat(i + 1, f); } else if (param instanceof Long) { long l = ((Long) param).longValue(); prepStatement.setLong(i + 1, l); } else if (param instanceof Boolean) { boolean b = ((Boolean) param).booleanValue(); prepStatement.setBoolean(i + 1, b); } else if (param instanceof org.logstash.Timestamp) { // use joda prepStatement.setString(i + 1, ((Timestamp) param).getTime().toString(STANDARD_FORMAT)); } } /** * Retry submit */ private void retryingSubmit(List<Event> events) { Integer retryCount = 0; while (retryCount < retryMaxCount) { if (submit(events)) { break; } else { retryCount++; System.err.println("Execution exception,retry count: " + retryCount); try { TimeUnit.MILLISECONDS.sleep(retryIntervalTime); } catch (InterruptedException e) { e.printStackTrace(); break; } } } } /** * Submit */ private boolean submit(List<Event> events) { Connection conn = null; try { conn = HikariPool.getConnection(); // deal prepareStatement DealedStatement ds = dealStatement(statement); fields = ds.getFields(); pstm = conn.prepareStatement(ds.getPreparedStatement()); for (Event event : events) { // param replace for (int i = 0; i < fields.size(); i++) { paramReplace(pstm, event.getField(fields.get(i)), i); } pstm.addBatch(); } // Batch processing pstm.executeBatch(); return true; } catch (Exception e) { e.printStackTrace(); return false; } finally { // close if (pstm != null) { try { pstm.close(); } catch (Exception e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (Exception e) { e.printStackTrace(); } } } }}
1、SQL 简化设计
在 logstash-output-jdbc 项目中其设计的思路为通过 statement 参数接收一个数组,数组的第一个位置放置一条预编译 SQL,之后每一个数组位按顺序对应该 SQL 的 "?"。
statement => [ "insert into v2_ods_media_show_log (`idfa`) value(?)", "idfa"]
不过上面写法似乎有些过于繁琐,我基于 MyBatis SQL 使用 #{param} 语法灵感,因为 logstash中 # 为注释,所以我设计为使用 {param} 替换参数,之后 Statement 可简化如下。
statement => "INSERT INTO datahub_test.v2_dim_agent_id( agent_id,agent_name,platform,insert_time,update_time) value({agent_id},{agent_name},{platform},DEFAULT,DEFAULT)"
实现的逻辑则是将语句中 {param} 的参数找出来,并将它们的param值存到一个列表中,并将所有{param} 都替换为 ?,行成一条预编译语句。具体实现细节在 dealStatement() 方法中。
2、批处理
在 logstash-output-jdbc 项目中是为每条 INSERT 语句创建一个 Connect 并逐条执行,这样效率较低,所以我采用了批处理来提高效率。
// 将多条语句加入到同一个 Batchpstm.addBatch();pstm.executeBatch();
注意:
MySQL 执行 executeBatch 需要在 URL 加上 rewriteBatchedStatements=true
MySQL的JDBC连接的url中要加rewriteBatchedStatements参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入。
MySQL JDBC驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,批量插入实际上是单条插入,直接造成较低的性能。只有把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。另外这个选项对INSERT/UPDATE/DELETE都有效
这里提一下output() 方法里的切块设计,通过 Slicer.fixedGrouping 配置将传入来的数据进行切块,如传入 1W 条,可设置切块大小为 2000.则切成 5块进行批处理,也是采用了分而治之的思想。
3、重试机制
重试机制设计比较简单通过 retryMaxCount 最大重试次数和 retryIntervalTime 每次重试间隔(毫秒) 来实现,在output 方法中调用retryingSubmit,代码如下:
public void output(final Collection<Event> events) { // 切块 if (events.size() > 0) { List<List<Event>> lists = Slicer.fixedGrouping(new ArrayList<>(events), this.flushSize); for (List<Event> list : lists) { // 重试 retryingSubmit(list); } } }// 重试 private void retryingSubmit(List<Event> events) { Integer retryCount = 0; while (retryCount < retryMaxCount) { if (submit(events)) { break; } else { retryCount++; System.err.println("Execution exception,retry count: " + retryCount); try { TimeUnit.MILLISECONDS.sleep(retryIntervalTime); } catch (InterruptedException e) { e.printStackTrace(); break; } } } }
4、可配置参数
本插件只提供了部分基础的可配置参数
最后
以上就是用 Java 编写 Logstash 输出到 MySQL 插件的简单例子,本例子意在提供一个设计思路和帮助各位熟悉如何采用 Java 编写 logstash 插件。该插件并未应用于生产之上,只是作为学习的 Demo,设计上还有诸多可提升的地方,各位如果有好的建议欢迎提出讨论,共同进步。
感谢您的阅读,如果喜欢本文欢迎关注和转发,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
标签: #java批量执行sql