executeBatch
Api
java.sql.Statement#addBatch
java.sql.PreparedStatement#addBatch
java.sql.Statement#executeBatch
java.sql.Statement#clearBatch原理
减少了 JDBC 客户端和数据库服务器之间网络传输的开销:使用 batch 功能前,每提交一个SQL,都需要一次网络IO开销,且提交后需要等待服务端返回结果后,才能提交下一个SQL;而使用 batch 功能后,客户端的多个SQL是一起提交给服务器的,只涉及到一次网络IO开销
当 batch 底层使用的是静态SQL并参数化执行时(JAVA中一般是使用类 java.sql.PreparedStatement 来参数化执行静态SQL),数据库服务器可以只做一次解析:利用对参数化机制的支持,数据库服务器仅需要对 PreparedStatement 做一次解析(sql parse),即可传入不同参数执行该 batch 中所有的 SQL
批量插入:
会改写批量中的一组sql为一条 “multi-values” 语句,并一次性提交给数据库服务器:batch Insert 会被改写为
insert into t(…) values (…), (…), (…)并一次性提交;如果不能被改写为 "multi-values", 则会改写为多个;分割的sql语句并一次性提交:
insert into t(...) values(...);insert into t(...) values(...)。
批量更新:
batchUpdate 会被改写为 update t set… where id = 1; update t set… where id = 2; update t set… where id = 3… 并一次性提交;
批量删除:
batchDelete 会被改写为 delete from t where id = 1; delete from t where id = 2; delete from t where id = 3;….并一次性提交;
注意点:
batch 功能对 statement 和 PreparedStatement 都有效,但为了避免 SQL 注入的风险,不推荐使用动态SQL,而是推荐使用静态 SQL 和绑定变量(也就是使用 PreparedStatement 和 stored procedures);
batch 功能对所有SQL 都有效, 包括 SELECT/INSERT/UPDATE/DELETE,但由于使用 batch 功能后,返回值是 int[] 数组,不太方便获取 batch 底层每个sql的执行结果,所以大家一般不会对 SELECT 语句使用 batch 功能 (毕竟select查询的目的是获得每个select语句的结果resultSet),而只会在大量 INSERT/UPDATE/DELETE 的场景下,尤其是大批量插入的场景下,使用 batch 功能,所以大家提到 batch时,常说“批量更新“;
另外需要注意的是,使用 batch 功能并不代表所有的 SQL 都在一个事务里:自动提交(autocommit=true)模式下,在创建了每个statement 后,数据库将确保结果正确存在,然后再转到下一个statement 。如果批处理的第n句引发约束异常,则不会回滚以前插入的所有行。
MySQL
设置:
&rewriteBatchedStatements=trueMySQL的 JDBC连接的url中要加 &rewriteBatchedStatements 参数,并保证5.1.13以上版本的驱动,才能实现高性能的批量插入
MySQL JDBC 驱动在默认情况下会无视 executeBatch() 语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给 MySQL 数据库,批量插入实际上是单条插入,直接造成较低的性能。
只有把 rewriteBatchedStatements 参数置为 true, 驱动才会帮你批量执行SQL
这个选项对 INSERT/UPDATE/DELETE 都有效
executeBatch
HikariProxyPreparedStatement
executeBatch()
ProxyStatement
executeBatch()
this.connection.markCommitStateDirty();
return this.delegate.executeBatch();
PreparedStatementWrapper p6spy
StatementWrapper p6spy
executeBatch()
this.delegate.executeBatch();
StatementImpl
executeBatch()
return Util.truncateAndConvertToInt(executeBatchInternal());
executeBatchInternal()
...
ClientPreparedStatement
executeBatchInternal()
int batchTimeout = getTimeoutInMillis();
// 如果加了批量连接参数 且是插入
return executeBatchedInserts(batchTimeout);
// 如果加了批量连接参数 不是插入
return executePreparedBatchAsMultiStatement(batchTimeout);
// 没加
return executeBatchSerially(batchTimeout);
=============================
executeBatchedInserts()
// 数据条数
int numBatchedArgs = this.query.getBatchedArgs().size();
long[] updateCounts = new long[numBatchedArgs]
long updateCountRunningTotal = 0;
// 有一条数据插入失败直接是0,因为服务器实际执行时是一条完整sql,要么全部成功返回记录条数,要么有出错返回0
updateCountRunningTotal += batchedStatement.executeLargeUpdate();
如果 numBatchedArgs > 1
// 要么是-2要么是0
long updCount = updateCountRunningTotal > 0 ? java.sql.Statement.SUCCESS_NO_INFO : 0;
for (int j = 0; j < numBatchedArgs; j++) {
updateCounts[j] = updCount;
}
否则
updateCounts[0] = updateCountRunningTotal;
return updateCounts;
executeLargeUpdate()
executeUpdateInternal(true, false)
executeUpdateInternal(boolean,boolean)
return executeUpdateInternal(((PreparedQuery) this.query).getQueryBindings(), isBatch);
=============================
executeBatchSerially(int batchTimeout)
int nbrCommands = this.query.getBatchedArgs().size();
updateCounts = new long[nbrCommands];
循环填充为-3
循环执行
Object arg = this.query.getBatchedArgs().get(batchCommandIndex);
QueryBindings queryBindings = (QueryBindings) arg;
updateCounts[batchCommandIndex] = executeUpdateInternal(queryBindings, true);
executeUpdateInternal(QueryBindings bindings, boolean isReallyBatch)
Message sendPacket = ((PreparedQuery) this.query).fillSendPacket(bindings);
ResultSetInternalMethods rs = executeInternal(-1, sendPacket, false, false, null, isReallyBatch);
this.updateCount = rs.getUpdateCount();
if (containsOnDuplicateKeyUpdate() && this.compensateForOnDuplicateKeyUpdate) {
if (this.updateCount == 2 || this.updateCount == 0) {
this.updateCount = 1;
}
}
return this.updateCount;
executeInternal(...)
JdbcConnection locallyScopedConnection = this.connection;
ResultSetInternalMethods rs = ((NativeSession) locallyScopedConnection.getSession()).execSQL(...);
return rs;
NativeSession
execSQL(...)
return packet == null? ((NativeProtocol) this.protocol).sendQueryString(...)
: ((NativeProtocol) this.protocol).sendQueryPacket(...);
NativeProtocol
sendQueryPacket()
NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
rs = readAllResults(...);
return rs;
sendCommand(...)
send(queryPacket, queryPacket.getPosition());
NativePacketPayload returnPacket = checkErrorMessage(command);
return returnPacket;
checkErrorMessage(int command)
NativePacketPayload resultPacket = readMessage(this.reusablePacket);
checkErrorMessage(resultPacket);
checkErrorMessage(NativePacketPayload resultPacket)
byte statusCode = (byte) resultPacket.readInteger(IntegerDataType.INT1);
if (statusCode == (byte) 0xff)
//从 Mysql报文解析错误码,封装异常
// 抛出 CJException
readAllResults()
T topLevelResultSet = read(Resultset.class, ...)
return topLevelResultSet;
read()
ProtocolEntityReader<T, NativePacketPayload> sr = isBinaryEncoded
? this.PROTOCOL_ENTITY_CLASS_TO_BINARY_READER.get(requiredClass)
: this.PROTOCOL_ENTITY_CLASS_TO_TEXT_READER.get(requiredClass);
// com.mysql.cj.protocol.a.TextResultsetReader
return sr.read(...);
readServerStatusForResultSets(...)
OkPacket ok = OkPacket.parse(rowPacket,...);
result = (T) ok;
return result;
TextResultsetReader
Resultset read()
// 封装 updateCount
OkPacket ok = this.protocol.readServerStatusForResultSets(resultPacket, false);
// com.mysql.cj.jdbc.result.ResultSetFactory
Resultset rs = resultSetFactory.createFromProtocolEntity(ok);
// com.mysql.cj.jdbc.result.ResultSetImpl
return rs;
ResultSetFactory
ResultSetImpl createFromProtocolEntity(ProtocolEntity protocolEntity)
// 是 OkPacket,updateCount 封装到 ResultSetImpl
return new ResultSetImpl((OkPacket) protocolEntity, this.conn, this.stmt);
OkPacket
OkPacket parse(NativePacketPayload buf, String errorMessageEncoding)
OkPacket ok = new OkPacket();
ok.setUpdateCount(buf.readInteger(IntegerDataType.INT_LENENC)); // affected_rows
ok.setUpdateID(buf.readInteger(IntegerDataType.INT_LENENC)); // last_insert_idPgSQL
设置:
&reWriteBatchedInserts=trueOracle
方案
多个
insert into t(...) values(...)优化成insert into t(…) values (…), (…), (…)减少网络IO,客户端的多个SQL一起提交给服务器,只涉及一次网络IO开销
关闭自动提交事务,手动提交
设置分片:
batchSize = 1000:解决 sql 过长问题,默认情况下 MySQL 可执行的最大 SQL 为 4M采用并行流:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");MySQL 插入10w 条数据耗时(ms):
总结
设置连接参数
&allowMultiQueries=true,允许客户端按照 sql1;sql2;...sqln 的格式发送到服务器,服务器会逐个执行。设置连接参数
&reWriteBatchedInserts=true,客户端的多个SQL一起提交给服务器,服务器最终执行时会将多个insert into t(...) values(...)优化成insert into t(…) values (…), (…), (…)。需要配合 PreparedStatement 的 addBatch()、executeBatch() 。Mp 的 insertBatchSomeColumn 是将多个
insert into t(...) values(...)先优化成insert into t(…) values (…), (…), (…),再发到服务器,所以加连接参数影响不是很大。