GP-5082 Add shutdown hook to BSim DB connection managers and use SET

SESSION instead of SET LOCAL
This commit is contained in:
James 2024-11-01 11:07:59 -04:00 committed by ghidra1
parent e8702cf698
commit 31cd80b647
3 changed files with 82 additions and 23 deletions

View file

@ -18,6 +18,7 @@ package ghidra.features.bsim.query;
import java.net.URL; import java.net.URL;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import javax.security.auth.callback.NameCallback; import javax.security.auth.callback.NameCallback;
@ -40,12 +41,14 @@ public class BSimPostgresDBConnectionManager {
private static final int CONN_POOL_MAX_IDLE = 2; private static final int CONN_POOL_MAX_IDLE = 2;
private static HashMap<BSimServerInfo, BSimPostgresDataSource> dataSourceMap = new HashMap<>(); private static HashMap<BSimServerInfo, BSimPostgresDataSource> dataSourceMap = new HashMap<>();
private static boolean shutdownHookInstalled = false;
public static synchronized BSimPostgresDataSource getDataSource( public static synchronized BSimPostgresDataSource getDataSource(
BSimServerInfo postgresServerInfo) { BSimServerInfo postgresServerInfo) {
if (postgresServerInfo.getDBType() != DBType.postgres) { if (postgresServerInfo.getDBType() != DBType.postgres) {
throw new IllegalArgumentException("expected postgres server info"); throw new IllegalArgumentException("expected postgres server info");
} }
enableShutdownHook();
return dataSourceMap.computeIfAbsent(postgresServerInfo, return dataSourceMap.computeIfAbsent(postgresServerInfo,
info -> new BSimPostgresDataSource(info)); info -> new BSimPostgresDataSource(info));
} }
@ -75,6 +78,30 @@ public class BSimPostgresDBConnectionManager {
dataSourceMap.remove(serverInfo); dataSourceMap.remove(serverInfo);
} }
private static synchronized void enableShutdownHook() {
if (shutdownHookInstalled) {
return;
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Collection<BSimPostgresDataSource> dataSources = dataSourceMap.values();
for (BSimPostgresDataSource ds : dataSources) {
int activeConnections = ds.getActiveConnections();
if (activeConnections != 0) {
Msg.error(BSimPostgresDBConnectionManager.class,
activeConnections +
" BSim active Postgres connections were not properly closed: " +
ds.serverInfo);
}
ds.close();
}
dataSourceMap.clear();
}
});
shutdownHookInstalled = true;
}
public static class BSimPostgresDataSource implements BSimJDBCDataSource { // NOTE: can be renamed public static class BSimPostgresDataSource implements BSimJDBCDataSource { // NOTE: can be renamed
private final BSimServerInfo serverInfo; private final BSimServerInfo serverInfo;

View file

@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -42,7 +42,7 @@ import ghidra.features.bsim.query.protocol.*;
* *
*/ */
public final class PostgresFunctionDatabase public final class PostgresFunctionDatabase
extends AbstractSQLFunctionDatabase<WeightedLSHCosineVectorFactory> { extends AbstractSQLFunctionDatabase<WeightedLSHCosineVectorFactory> {
// NOTE: Previously named ColumnDatabase // NOTE: Previously named ColumnDatabase
@ -101,7 +101,7 @@ public final class PostgresFunctionDatabase
} }
private void changePassword(Connection c, String username, char[] newPassword) private void changePassword(Connection c, String username, char[] newPassword)
throws SQLException { throws SQLException {
StringBuilder buffer = new StringBuilder(); StringBuilder buffer = new StringBuilder();
buffer.append("ALTER ROLE \""); buffer.append("ALTER ROLE \"");
buffer.append(username); buffer.append(username);
@ -158,7 +158,7 @@ public final class PostgresFunctionDatabase
*/ */
private void serverLoadWeights(Connection db) throws SQLException { private void serverLoadWeights(Connection db) throws SQLException {
try (Statement st = db.createStatement(); try (Statement st = db.createStatement();
ResultSet rs = st.executeQuery("SELECT lsh_load()")) { ResultSet rs = st.executeQuery("SELECT lsh_load()")) {
while (rs.next()) { while (rs.next()) {
// int val = rs.getInt(1); // int val = rs.getInt(1);
} }
@ -171,13 +171,16 @@ public final class PostgresFunctionDatabase
Connection db = initConnection(); Connection db = initConnection();
serverLoadWeights(db); serverLoadWeights(db);
if (asynchronous) { try (Statement st = db.createStatement()) {
try (Statement st = db.createStatement()) { // Tell server to do asynchronous commits. This speeds up large
// Tell server to do asynchronous commits. This speeds up large // ingests with a (slight) danger of
// ingests with a (slight) danger of // losing the most recent commits if the server crashes (NOTE:
// losing the most recent commits if the server crashes (NOTE: // database integrity should still be recoverable)
// database integrity should still be recoverable) if (asynchronous) {
st.executeUpdate("SET LOCAL synchronous_commit TO OFF"); st.executeUpdate("SET SESSION synchronous_commit TO OFF");
}
else {
st.executeUpdate("SET SESSION synchronous_commit to ON");
} }
} }
@ -223,12 +226,15 @@ public final class PostgresFunctionDatabase
serverLoadWeights(db); serverLoadWeights(db);
// Tell server to do asynchronous commits. This speeds up large
// ingests with a (slight) danger of
// losing the most recent commits if the server crashes (NOTE:
// database integrity should still be recoverable)
if (asynchronous) { if (asynchronous) {
// Tell server to do asynchronous commits. This speeds up large st.executeUpdate("SET SESSION synchronous_commit TO OFF");
// ingests with a (slight) danger of }
// losing the most recent commits if the server crashes (NOTE: else {
// database integrity should still be recoverable) st.executeUpdate("SET SESSION synchronous_commit to ON");
st.executeUpdate("SET LOCAL synchronous_commit TO OFF");
} }
} }
} }
@ -253,7 +259,7 @@ public final class PostgresFunctionDatabase
*/ */
private void rebuildIndex(Connection c) throws SQLException { private void rebuildIndex(Connection c) throws SQLException {
try (Statement st = c.createStatement(); try (Statement st = c.createStatement();
ResultSet rs = st.executeQuery("SELECT lsh_reload()")) { ResultSet rs = st.executeQuery("SELECT lsh_reload()")) {
st.execute("SET maintenance_work_mem TO '2GB'"); st.execute("SET maintenance_work_mem TO '2GB'");
st.execute( st.execute(
"CREATE INDEX vectable_vec_idx ON vectable USING gin (vec gin_lshvector_ops)"); "CREATE INDEX vectable_vec_idx ON vectable USING gin (vec gin_lshvector_ops)");
@ -275,7 +281,7 @@ public final class PostgresFunctionDatabase
* @throws SQLException if there is a problem creating or executing the query * @throws SQLException if there is a problem creating or executing the query
*/ */
private int preWarm(Connection c, int mainIndex, int secondaryIndex, int vectors) private int preWarm(Connection c, int mainIndex, int secondaryIndex, int vectors)
throws SQLException { throws SQLException {
try (Statement st = c.createStatement()) { try (Statement st = c.createStatement()) {
// Try to load the entire main index into the PostgreSQL cache // Try to load the entire main index into the PostgreSQL cache
int res = -1; int res = -1;
@ -386,8 +392,8 @@ public final class PostgresFunctionDatabase
* @throws SQLException if there is a problem creating or executing the query * @throws SQLException if there is a problem creating or executing the query
*/ */
@Override @Override
protected int queryNearestVector(List<VectorResult> resultset, LSHVector vec, protected int queryNearestVector(List<VectorResult> resultset, LSHVector vec, double simthresh,
double simthresh, double sigthresh, int max) throws SQLException { double sigthresh, int max) throws SQLException {
PreparedStatement s = PreparedStatement s =
selectNearestVectorStatement.prepareIfNeeded(() -> initConnection().prepareStatement( selectNearestVectorStatement.prepareIfNeeded(() -> initConnection().prepareStatement(
"WITH const(cvec) AS (VALUES( lshvector_in( CAST( ? AS cstring) ) ) )," + "WITH const(cvec) AS (VALUES( lshvector_in( CAST( ? AS cstring) ) ) )," +
@ -469,7 +475,7 @@ public final class PostgresFunctionDatabase
@Override @Override
protected VectorResult queryVectorId(long id) throws SQLException { protected VectorResult queryVectorId(long id) throws SQLException {
PreparedStatement s = selectVectorByRowIdStatement.prepareIfNeeded(() -> initConnection() PreparedStatement s = selectVectorByRowIdStatement.prepareIfNeeded(() -> initConnection()
.prepareStatement("SELECT id,count,vec FROM vectable WHERE id = ?")); .prepareStatement("SELECT id,count,vec FROM vectable WHERE id = ?"));
s.setLong(1, id); s.setLong(1, id);
try (ResultSet rs = s.executeQuery()) { try (ResultSet rs = s.executeQuery()) {
if (!rs.next()) { if (!rs.next()) {
@ -506,7 +512,7 @@ public final class PostgresFunctionDatabase
@Override @Override
public QueryResponseRecord doQuery(BSimQuery<?> query, Connection c) public QueryResponseRecord doQuery(BSimQuery<?> query, Connection c)
throws SQLException, LSHException, DatabaseNonFatalException { throws SQLException, LSHException, DatabaseNonFatalException {
if (query instanceof PrewarmRequest q) { if (query instanceof PrewarmRequest q) {
fdbPrewarm(q, c); fdbPrewarm(q, c);

View file

@ -40,6 +40,7 @@ public class BSimH2FileDBConnectionManager {
* Data source map keyed by absolute DB file path * Data source map keyed by absolute DB file path
*/ */
private static HashMap<BSimServerInfo, BSimH2FileDataSource> dataSourceMap = new HashMap<>(); private static HashMap<BSimServerInfo, BSimH2FileDataSource> dataSourceMap = new HashMap<>();
private static boolean shutdownHookInstalled = false;
/** /**
* Get all H2 File DB data sorces which exist in the JVM. * Get all H2 File DB data sorces which exist in the JVM.
@ -62,6 +63,7 @@ public class BSimH2FileDBConnectionManager {
if (fileServerInfo.getDBType() != DBType.file) { if (fileServerInfo.getDBType() != DBType.file) {
throw new IllegalArgumentException("expected file info"); throw new IllegalArgumentException("expected file info");
} }
enableShutdownHook();
return dataSourceMap.computeIfAbsent(fileServerInfo, return dataSourceMap.computeIfAbsent(fileServerInfo,
info -> new BSimH2FileDataSource(info)); info -> new BSimH2FileDataSource(info));
} }
@ -102,6 +104,30 @@ public class BSimH2FileDBConnectionManager {
return true; return true;
} }
private static synchronized void enableShutdownHook() {
if (shutdownHookInstalled) {
return;
}
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
Collection<BSimH2FileDataSource> dataSources = dataSourceMap.values();
for (BSimH2FileDataSource ds : dataSources) {
int activeConnections = ds.getActiveConnections();
if (activeConnections != 0) {
Msg.error(BSimH2FileDBConnectionManager.class,
activeConnections +
" BSim active H2 File connections were not properly closed: " +
ds.serverInfo);
}
ds.close();
}
dataSourceMap.clear();
}
});
shutdownHookInstalled = true;
}
/** /**
* {@link BSimH2FileDataSource} provides a pooled DB data source for a specific H2 File DB. * {@link BSimH2FileDataSource} provides a pooled DB data source for a specific H2 File DB.
*/ */