GP-4351: More thorough synchronization, esp., with streams.

This commit is contained in:
Dan 2024-02-22 14:36:17 -05:00
parent ddf4d15327
commit 5a0b262df4
5 changed files with 290 additions and 59 deletions

View file

@ -422,7 +422,8 @@ public class DBTraceObjectManager implements TraceObjectManager, DBTraceManager
public Stream<DBTraceObjectValue> getAllValues() { public Stream<DBTraceObjectValue> getAllValues() {
return Stream.concat( return Stream.concat(
valueMap.values().stream().map(v -> v.getWrapper()), valueMap.values().stream().map(v -> v.getWrapper()),
valueWbCache.streamAllValues().map(v -> v.getWrapper())); StreamUtils.lock(lock.readLock(),
valueWbCache.streamAllValues().map(v -> v.getWrapper())));
} }
protected Stream<DBTraceObjectValueData> streamValuesIntersectingData(Lifespan span, protected Stream<DBTraceObjectValueData> streamValuesIntersectingData(Lifespan span,
@ -672,7 +673,7 @@ public class DBTraceObjectManager implements TraceObjectManager, DBTraceManager
return new UnionAddressSetView( return new UnionAddressSetView(
valueMap.getAddressSetView(Lifespan.at(snap), valueMap.getAddressSetView(Lifespan.at(snap),
v -> acceptValue(v.getWrapper(), key, ifaceCls, predicate)), v -> acceptValue(v.getWrapper(), key, ifaceCls, predicate)),
valueWbCache.getObjectsAddresSet(snap, key, ifaceCls, predicate)); valueWbCache.getObjectsAddressSet(snap, key, ifaceCls, predicate));
} }
public <I extends TraceObjectInterface> I getSuccessor(TraceObject seed, public <I extends TraceObjectInterface> I getSuccessor(TraceObject seed,

View file

@ -176,10 +176,11 @@ class DBTraceObjectValueWriteBehindCache {
} }
public Stream<DBTraceObjectValueBehind> streamAllValues() { public Stream<DBTraceObjectValueBehind> streamAllValues() {
return doStreamAllValues(); return StreamUtils.sync(cachedValues, doStreamAllValues());
} }
public DBTraceObjectValueBehind get(DBTraceObject parent, String key, long snap) { public DBTraceObjectValueBehind get(DBTraceObject parent, String key, long snap) {
synchronized (cachedValues) {
var keys = cachedValues.get(parent); var keys = cachedValues.get(parent);
if (keys == null) { if (keys == null) {
return null; return null;
@ -199,6 +200,7 @@ class DBTraceObjectValueWriteBehindCache {
} }
return floor.getValue(); return floor.getValue();
} }
}
public Stream<DBTraceObjectValueBehind> streamParents(DBTraceObject child, Lifespan lifespan) { public Stream<DBTraceObjectValueBehind> streamParents(DBTraceObject child, Lifespan lifespan) {
// TODO: Optimize/index this? // TODO: Optimize/index this?
@ -236,16 +238,19 @@ class DBTraceObjectValueWriteBehindCache {
} }
public Stream<DBTraceObjectValueBehind> streamValues(DBTraceObject parent, Lifespan lifespan) { public Stream<DBTraceObjectValueBehind> streamValues(DBTraceObject parent, Lifespan lifespan) {
// TODO: Better indexing? synchronized (cachedValues) {
var keys = cachedValues.get(parent); var keys = cachedValues.get(parent);
if (keys == null) { if (keys == null) {
return Stream.of(); return Stream.of();
} }
return keys.values().stream().flatMap(v -> streamSub(v, lifespan, true)); return StreamUtils.sync(cachedValues,
keys.values().stream().flatMap(v -> streamSub(v, lifespan, true)));
}
} }
public Stream<DBTraceObjectValueBehind> streamValues(DBTraceObject parent, String key, public Stream<DBTraceObjectValueBehind> streamValues(DBTraceObject parent, String key,
Lifespan lifespan, boolean forward) { Lifespan lifespan, boolean forward) {
synchronized (cachedValues) {
var keys = cachedValues.get(parent); var keys = cachedValues.get(parent);
if (keys == null) { if (keys == null) {
return Stream.of(); return Stream.of();
@ -254,7 +259,8 @@ class DBTraceObjectValueWriteBehindCache {
if (values == null) { if (values == null) {
return Stream.of(); return Stream.of();
} }
return streamSub(values, lifespan, forward); return StreamUtils.sync(cachedValues, streamSub(values, lifespan, forward));
}
} }
static boolean intersectsRange(Object value, AddressRange range) { static boolean intersectsRange(Object value, AddressRange range) {
@ -265,6 +271,7 @@ class DBTraceObjectValueWriteBehindCache {
private Stream<DBTraceObjectValueBehind> streamValuesIntersectingLifespan(Lifespan lifespan, private Stream<DBTraceObjectValueBehind> streamValuesIntersectingLifespan(Lifespan lifespan,
String entryKey) { String entryKey) {
// TODO: In-memory spatial index? // TODO: In-memory spatial index?
synchronized (cachedValues) {
var top = cachedValues.values().stream(); var top = cachedValues.values().stream();
var keys = entryKey == null var keys = entryKey == null
? top.flatMap(v -> v.values().stream()) ? top.flatMap(v -> v.values().stream())
@ -272,7 +279,8 @@ class DBTraceObjectValueWriteBehindCache {
.stream() .stream()
.filter(e -> entryKey.equals(e.getKey())) .filter(e -> entryKey.equals(e.getKey()))
.map(e -> e.getValue())); .map(e -> e.getValue()));
return keys.flatMap(v -> streamSub(v, lifespan, true)); return StreamUtils.sync(cachedValues, keys.flatMap(v -> streamSub(v, lifespan, true)));
}
} }
public Stream<DBTraceObjectValueBehind> streamValuesIntersecting(Lifespan lifespan, public Stream<DBTraceObjectValueBehind> streamValuesIntersecting(Lifespan lifespan,
@ -302,11 +310,13 @@ class DBTraceObjectValueWriteBehindCache {
return null; return null;
} }
public <I extends TraceObjectInterface> AddressSetView getObjectsAddresSet(long snap, public <I extends TraceObjectInterface> AddressSetView getObjectsAddressSet(long snap,
String key, Class<I> ifaceCls, Predicate<? super I> predicate) { String key, Class<I> ifaceCls, Predicate<? super I> predicate) {
return new AbstractAddressSetView() { return new AbstractAddressSetView() {
AddressSet collectRanges() { AddressSet collectRanges() {
AddressSet result = new AddressSet(); AddressSet result = new AddressSet();
try (LockHold hold = LockHold.lock(manager.lock.readLock())) {
synchronized (cachedValues) {
for (DBTraceObjectValueBehind v : StreamUtils for (DBTraceObjectValueBehind v : StreamUtils
.iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) {
AddressRange range = getIfRangeOrAddress(v.getValue()); AddressRange range = getIfRangeOrAddress(v.getValue());
@ -319,11 +329,15 @@ class DBTraceObjectValueWriteBehindCache {
} }
result.add(range); result.add(range);
} }
}
}
return result; return result;
} }
@Override @Override
public boolean contains(Address addr) { public boolean contains(Address addr) {
try (LockHold hold = LockHold.lock(manager.lock.readLock())) {
synchronized (cachedValues) {
for (DBTraceObjectValueBehind v : StreamUtils for (DBTraceObjectValueBehind v : StreamUtils
.iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) { .iter(streamValuesIntersectingLifespan(Lifespan.at(snap), key))) {
if (!addr.equals(v.getValue())) { if (!addr.equals(v.getValue())) {
@ -335,6 +349,8 @@ class DBTraceObjectValueWriteBehindCache {
} }
return true; return true;
} }
}
}
return false; return false;
} }

View file

@ -19,10 +19,25 @@ import java.util.*;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
import ghidra.util.database.DBSynchronizedSpliterator;
import ghidra.util.database.SynchronizedSpliterator;
/**
* Some utilities for streams
*/
public class StreamUtils { public class StreamUtils {
private StreamUtils() { private StreamUtils() {
} }
/**
* Union two sorted streams into a single sorted stream
*
* @param <T> the type of elements
* @param streams the streams to be merged
* @param comparator the comparator that orders each stream and that will order the resulting
* stream
* @return the sorted stream
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> Stream<T> merge(Collection<? extends Stream<? extends T>> streams, public static <T> Stream<T> merge(Collection<? extends Stream<? extends T>> streams,
Comparator<? super T> comparator) { Comparator<? super T> comparator) {
@ -33,8 +48,53 @@ public class StreamUtils {
streams.stream().map(s -> s.spliterator()).toList(), comparator), false); streams.stream().map(s -> s.spliterator()).toList(), comparator), false);
} }
/**
* Adapt a stream into an iterable
*
* @param <T> the type of elements
* @param stream the stream
* @return an iterable over the same elements in the stream in the same order
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> Iterable<T> iter(Stream<? extends T> stream) { public static <T> Iterable<T> iter(Stream<? extends T> stream) {
return () -> (Iterator<T>) stream.iterator(); return () -> (Iterator<T>) stream.iterator();
} }
/**
* Wrap the given stream into a synchronized stream on the given object's intrinsic lock
*
* <p>
* <b>NOTE:</b> This makes no guarantees regarding the consistency or visit order if the
* underlying resource is modified between elements being visited. It merely prevents the stream
* client from accessing the underlying resource concurrently. For such guarantees, the client
* may need to acquire the lock for its whole use of the stream.
*
* @param <T> the type of elements
* @param lock the object on which to synchronize
* @param stream the (un)synchronized stream
* @return the synchronized stream
*/
public static <T> Stream<T> sync(Object lock, Stream<T> stream) {
var wrapped = new SynchronizedSpliterator<T>(stream.spliterator(), lock);
return StreamSupport.stream(wrapped, stream.isParallel());
}
/**
* Wrap the given stream into a synchronized stream on the given lock
*
* <p>
* <b>NOTE:</b> This makes no guarantees regarding the consistency or visit order if the
* underlying resource is modified between elements being visited. It merely prevents the stream
* client from accessing the underlying resource concurrently. For such guarantees, the client
* may need to acquire the lock for its whole use of the stream.
*
* @param <T> the type of elements
* @param lock the lock
* @param stream the (un)synchronized stream
* @return the synchronized stream
*/
public static <T> Stream<T> lock(java.util.concurrent.locks.Lock lock, Stream<T> stream) {
var wrapped = new DBSynchronizedSpliterator<T>(stream.spliterator(), lock);
return StreamSupport.stream(wrapped, stream.isParallel());
}
} }

View file

@ -0,0 +1,78 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.util.database;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import ghidra.util.LockHold;
/**
* Wraps an unsynchronized spliterator in one that synchronizes on a given {@link Lock}
*
* @param <T> the type of elements
*/
public class DBSynchronizedSpliterator<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final Lock lock;
public DBSynchronizedSpliterator(Spliterator<T> spliterator, Lock lock) {
this.spliterator = spliterator;
this.lock = lock;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
AtomicReference<T> ref = new AtomicReference<>();
boolean result;
try (LockHold hold = LockHold.lock(lock)) {
result = spliterator.tryAdvance(ref::set);
}
if (!result) {
return false;
}
action.accept(ref.get());
return true;
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> newSplit;
try (LockHold hold = LockHold.lock(lock)) {
newSplit = spliterator.trySplit();
}
if (newSplit == null) {
return null;
}
return new DBSynchronizedSpliterator<>(newSplit, lock);
}
@Override
public long estimateSize() {
try (LockHold hold = LockHold.lock(lock)) {
return spliterator.estimateSize();
}
}
@Override
public int characteristics() {
try (LockHold hold = LockHold.lock(lock)) {
return spliterator.characteristics();
}
}
}

View file

@ -0,0 +1,76 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.util.database;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
/**
* Wraps an unsynchronized spliterator in one that synchronizes on a given object's intrinsic lock,
* often the collection that provided the stream or spliterator.
*
* @param <T> the type of elements
*/
public class SynchronizedSpliterator<T> implements Spliterator<T> {
private final Spliterator<T> spliterator;
private final Object lock;
public SynchronizedSpliterator(Spliterator<T> spliterator, Object lock) {
this.spliterator = spliterator;
this.lock = lock;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
AtomicReference<T> ref = new AtomicReference<>();
boolean result;
synchronized (lock) {
result = spliterator.tryAdvance(ref::set);
}
if (!result) {
return false;
}
action.accept(ref.get());
return true;
}
@Override
public Spliterator<T> trySplit() {
Spliterator<T> newSplit;
synchronized (lock) {
newSplit = spliterator.trySplit();
}
if (newSplit == null) {
return null;
}
return new SynchronizedSpliterator<>(newSplit, lock);
}
@Override
public long estimateSize() {
synchronized (lock) {
return spliterator.estimateSize();
}
}
@Override
public int characteristics() {
synchronized (lock) {
return spliterator.characteristics();
}
}
}