GP-5436: Move AsyncComm remains into Generic

This commit is contained in:
Dan 2025-03-25 16:44:37 +00:00
parent 397a814f5f
commit 14d0432554
32 changed files with 43 additions and 131 deletions

View file

@ -0,0 +1,158 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* A debouncer for asynchronous events
*
* <P>
* A debouncer has an input "contact" event and produces an output "settled" once sufficient time
* has passed since the last contact event. The goal is to prevent the needless frequent firing of
* asynchronous events if the next event is going to negate the current one. The idea is that a
* series of events, each negating the previous, can be fired within relative temporal proximity.
* Without a debouncer, event processing time may be wasted. By passing the events through a
* debouncer configured with a time window that contains all the events, only the final event in the
* cluster will be processed. The cost of doing this is a waiting period, so event processing may be
* less responsive, but will also be less frantic.
*
* @param <T> the value type
*/
public class AsyncDebouncer<T> {
protected final AsyncTimer timer;
protected final long windowMillis;
protected final List<Consumer<T>> listeners = new ArrayList<>();
protected CompletableFuture<T> settledPromise;
protected T lastContact;
protected CompletableFuture<Void> alarm;
/**
* Construct a new debouncer
*
* @param timer the timer to use for delay
* @param windowMillis the timing window of changes to ignore
*/
public AsyncDebouncer(AsyncTimer timer, long windowMillis) {
this.timer = timer;
this.windowMillis = windowMillis;
}
/**
* Add a listener for the settled event
*
* @param listener the listener
*/
public synchronized void addListener(Consumer<T> listener) {
listeners.add(listener);
}
/**
* Remove a listener from the settled event
*
* @param listener the listener
*/
public synchronized void removeListener(Consumer<T> listener) {
listeners.remove(listener);
}
protected void doSettled() {
List<Consumer<T>> ls;
CompletableFuture<T> promise;
synchronized (this) {
alarm = null;
// Avoid synchronization issues
ls = new ArrayList<>(this.listeners); // This seems wasteful
promise = settledPromise;
settledPromise = null;
}
for (Consumer<T> listener : ls) {
listener.accept(lastContact);
}
if (promise != null) {
promise.complete(lastContact);
}
}
/**
* Send a contact event
*
* <P>
* This sets or resets the timer for the event window. The settled event will fire with the
* given value after this waiting period, unless another contact event occurs first.
*
* @param val the new value
*/
public synchronized void contact(T val) {
lastContact = val;
if (alarm != null) {
alarm.cancel(false);
}
alarm = timer.mark().after(windowMillis).thenRun(this::doSettled);
}
/**
* Receive the next settled event
*
* <p>
* The returned future completes <em>after</em> all registered listeners have been invoked.
*
* @return a future which completes with the value of the next settled event
*/
public synchronized CompletableFuture<T> settled() {
if (settledPromise == null) {
settledPromise = new CompletableFuture<>();
}
return settledPromise;
}
/**
* Wait for the debouncer to be stable
*
* <p>
* If the debouncer has not received a contact event within the event window, it's considered
* stable, and this returns a completed future with the value of the last received contact
* event. Otherwise, the returned future completes on the next settled event, as in
* {@link #settled()}.
*
* @return a future which completes, perhaps immediately, when the debouncer is stable
*/
public synchronized CompletableFuture<T> stable() {
if (alarm == null) {
return CompletableFuture.completedFuture(lastContact);
}
return settled();
}
public static class Bypass<T> extends AsyncDebouncer<T> {
public Bypass() {
super(null, 0);
}
@Override
public synchronized void contact(T val) {
lastContact = val;
doSettled();
}
}
}

View file

@ -0,0 +1,97 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.ArrayList;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* A fence that completes when all participating futures complete
*
* <p>
* This provides an alternative shorthand for Java's
* {@link CompletableFuture#thenAcceptBoth(CompletionStage, BiConsumer)} or
* {@link CompletableFuture#allOf(CompletableFuture...)}.
*
* <p>
* Example:
*
* <pre>{@code
* public CompletableFuture<Void> processAll(List<Integer> list) {
* AsyncFence fence = new AsyncFence();
* for (int entry : list) {
* fence.include(process(entry));
* }
* return fence.ready();
* }
* }</pre>
*/
public class AsyncFence {
private final ArrayList<CompletableFuture<?>> participants = new ArrayList<>();
private CompletableFuture<Void> ready;
/**
* Include a participant with this fence
*
* The result of the participating future is ignored implicitly. If the result is needed, it
* must be consumed out of band, e.g., by using {@link CompletableFuture#thenAccept(Consumer)}:
*
* <pre>
* fence.include(process(entry).thenAccept(result::addTo));
* </pre>
*
* Calling this method after {@link #ready()} will yield undefined results.
*
* @param future the participant to add
* @return this fence
*/
public synchronized AsyncFence include(CompletableFuture<?> future) {
if (ready != null) {
throw new IllegalStateException("Fence already ready");
}
participants.add(future);
return this;
}
/**
* Obtain a future that completes when all participating futures have completed
*
* Calling this method more than once will yield undefined results.
*
* @return the "all of" future
*/
public synchronized CompletableFuture<Void> ready() {
if (ready == null) {
ready = CompletableFuture
.allOf((participants.toArray(new CompletableFuture[participants.size()])));
}
return ready;
}
/**
* Diagnostic: Get the participants which have not yet completed
*
* @return the pending participants
*/
public Set<CompletableFuture<?>> getPending() {
return participants.stream().filter(f -> !f.isDone()).collect(Collectors.toSet());
}
}

View file

@ -0,0 +1,424 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* A map of cached values computed upon the first request, asynchronously
*
* <p>
* Each key present in the cache behaves similarly to {@link AsyncLazyValue}. The cache starts
* empty. Whenever a key is requested, a computation for that key is started, but a future is
* immediately returned. If the computation succeeds, the completed future is cached indefinitely,
* and the result is recorded. Any subsequent requests for the same key return the same future, even
* if the computation for that key has not yet completed. Thus, when it completes, all requests for
* that key will be fulfilled by the result of the first request. If the computation completes
* exceptionally, the key is optionally removed from the cache. Thus, a subsequent request for a
* failed key may retry the computation.
*
* <p>
* Values can also be provided "out of band." That is, they may be provided by an alternative
* computation. This is accomplished using {@link #get(Object, Function)}, {@link #put(Object)} or
* {@link #put(Object, Object)}. The last immediately provides a value and completes any outstanding
* requests, even if there was an active computation for the key. The first claims the key and
* promises to provide the value at a later time.
*
* <p>
* At any point, an unmodifiable view of the completed, cached values may be obtained.
*
* @param <K> the type of keys
* @param <V> the type of values
*/
public class AsyncLazyMap<K, V> {
public static class KeyedFuture<K, V> extends CompletableFuture<V> {
protected final K key;
protected CompletableFuture<V> future;
public KeyedFuture(K key) {
this.key = key;
}
public KeyedFuture(K key, V value) {
this(key);
complete(value);
}
@Override
public String toString() {
return key + "=" + super.toString();
}
public K getKey() {
return key;
}
public CompletableFuture<V> getFuture() {
return future;
}
}
protected final Map<K, KeyedFuture<K, V>> futures = new HashMap<>();
protected final Map<K, V> map;
protected final Map<K, V> unmodifiable;
protected final Function<K, CompletableFuture<V>> function;
protected BiPredicate<? super K, ? super Throwable> forgetErrors = (k, t) -> true;
protected BiPredicate<? super K, ? super V> forgetValues = (k, v) -> false;
/**
* Construct a lazy map for the given function
*
* @param map the backing map. The lazy map ought to have an exclusive reference to this map.
* Mutations to the map outside of those caused by the lazy map may cause undefined
* behavior.
* @param function specifies the computation, given a key
*/
public AsyncLazyMap(Map<K, V> map, Function<K, CompletableFuture<V>> function) {
this.map = map;
this.unmodifiable = Collections.unmodifiableMap(map);
this.function = function;
}
protected void putFuture(K key, KeyedFuture<K, V> future) {
futures.put(key, future);
future.exceptionally((exc) -> {
synchronized (this) {
if (forgetErrors.test(key, exc)) {
//Msg.debug(this, "Work failed for " + key + " -> " + exc);
futures.remove(key);
}
}
return ExceptionUtils.rethrow(exc);
}).thenAccept(val -> {
synchronized (this) {
if (futures.get(key) != future) {
// The pending future was invalidated
return;
}
if (forgetValues.test(key, val)) {
futures.remove(key);
return;
}
//Msg.debug(this, "Work completed for " + key + " -> " + value);
map.put(key, val);
}
});
}
/**
* Sets a predicate to determine which errors to forget (i.e., retry)
*
*
* <p>
* A request resulting in an error that is remembered will not be retried until the cache is
* invalidated. For a forgotten error, the request is retried if re-requested later.
*
* <p>
* This will replace the behavior of any previous error-testing predicate.
*
* @param predicate the predicate
* @return this lazy map
*/
public synchronized AsyncLazyMap<K, V> forgetErrors(
BiPredicate<? super K, ? super Throwable> predicate) {
forgetErrors = predicate;
return this;
}
/**
* Sets a predicate to determine which errors to remember
*
* @see #forgetErrors(BiPredicate)
* @param predicate the predicate
* @return this lazy map
*/
public AsyncLazyMap<K, V> rememberErrors(
BiPredicate<? super K, ? super Throwable> predicate) {
return forgetErrors((k, t) -> !predicate.test(k, t));
}
/**
* Sets a predicate to determine which values to forget
*
* <p>
* The predicate is applied to a cached entry when its key is re-requested. If forgotten, the
* request will launch a fresh computation. The predicate is also applied at the time a
* computation is completed. An entry that is forgotten still completes normally; however, it
* never enters the cache, thus a subsequent request for the same key will launch a fresh
* computation.
*
* <p>
* This will replace the behavior of any previous value-testing predicate.
*
* @param predicate the rule for forgetting entries
* @return this lazy map
*/
public synchronized AsyncLazyMap<K, V> forgetValues(
BiPredicate<? super K, ? super V> predicate) {
forgetValues = predicate;
return this;
}
/**
* Sets a predicate to determine which values to remember
*
* @see #forgetValues(BiPredicate)
* @param predicate the rule for <em>not</em> forgetting entries
* @return this lazy map
*/
public synchronized AsyncLazyMap<K, V> rememberValues(
BiPredicate<? super K, ? super V> predicate) {
return forgetValues((k, v) -> !predicate.test(k, v));
}
/**
* Request the value for a given key, using an alternative computation
*
* <p>
* If this is called before any other get or put, the given function is launched for the given
* key. A {@link CompletableFuture} is returned immediately. Subsequent gets or puts on the same
* key will return the same future without starting any new computation.
*
* @param key the key
* @param func an alternative computation function, given a key
* @return a future, possibly already completed, for the key's value
*/
public synchronized KeyedFuture<K, V> get(K key, Function<K, CompletableFuture<V>> func) {
/**
* NOTE: I must populate the key's entry before invoking the function. Since the lock is
* re-entrant, it's possible (likely even) that the same thread comes back around for the
* same entry. If the key is not associated with a pending future, that will cause the
* function to be re-invoked.
*/
// Don't refactor as put, since I need to know whether or not it existed
KeyedFuture<K, V> future = futures.get(key);
if (future != null) {
if (!future.isDone() || !forgetValues.test(key, future.getNow(null))) {
return future;
}
}
final KeyedFuture<K, V> f = new KeyedFuture<>(key);
putFuture(key, f);
CompletableFuture<V> dep = func.apply(key);
f.future = dep;
dep.handle(AsyncUtils.copyTo(f));
return f;
}
/**
* Request the value for a given key
*
* <p>
* If this is called before any other get or put, the computation given at construction is
* launched for the given key. A {@link CompletableFuture} is returned immediately. Subsequent
* calls gets or puts on the same key return the same future without starting any new
* computation.
*
* @param key the key
* @return a future, possible already completed, for the key's value
*/
public synchronized KeyedFuture<K, V> get(K key) {
return get(key, function);
}
/**
* Immediately provide an out-of-band value for a given key
*
* <p>
* On occasion, the value for a key may become known outside of the specified computation. This
* method circumvents the function given during construction by providing the value for a key.
* If there is an outstanding request for the key's value -- a rare occasion -- it is completed
* immediately with the provided value. Calling this method for a key that has already completed
* has no effect.
*
* <p>
* This is equivalent to the code {@code map.put(k).complete(value)}, but atomic.
*
* @param key the key whose value to provide
* @param value the provided value
* @return true if the key was completed by this call, false if the key had already been
* completed
*/
public synchronized boolean put(K key, V value) {
//Msg.debug(this, "Inserting finished work for " + key + ": " + value);
KeyedFuture<K, V> future = futures.get(key);
if (future != null) {
return future.complete(value); // should cause map.put given in #get(K)
}
future = new KeyedFuture<>(key, value);
futures.put(key, future);
map.put(key, value);
return true;
}
/**
* Provide an out-of-band value for a given key
*
* <p>
* If this is called before {@link #get(Object)}, the computation given at construction is
* ignored for the given key. A new {@link CompletableFuture} is returned instead. The caller
* must see to this future's completion. Subsequent calls to either {@link #get(Object)} or
* {@link #put(Object)} on the same key return this same future without starting any
* computation.
*
* <p>
* Under normal circumstances, the caller cannot determine whether or not it has "claimed" the
* computation for the key. If the usual computation is already running, then the computations
* are essentially in a race. As such, it is essential that alternative computations result in
* the same value for a given key as the usual computation. In other words, the functions must
* not differ, but the means of computation can differ. Otherwise, race conditions may arise.
*
* @param key the key whose value to provide
* @return a promise that the caller must fulfill or arrange to have fulfilled
*/
public synchronized KeyedFuture<K, V> put(K key) {
KeyedFuture<K, V> future = futures.get(key);
if (future != null) {
return future;
}
future = new KeyedFuture<>(key);
putFuture(key, future);
return future;
}
/**
* Remove a key from the map, without canceling any pending computation
*
* <p>
* If the removed future has not yet completed, its value will never be added to the map of
* values. Subsequent gets or puts to the invalidated key will behave as if the key had never
* been requested.
*
* @param key the key to remove
* @return the invalidated future
*/
public synchronized CompletableFuture<V> forget(K key) {
map.remove(key);
return futures.remove(key);
}
/**
* Remove a key from the map, canceling any pending computation
*
* @param key the key to remove
* @return the previous value, if completed
*/
public V remove(K key) {
KeyedFuture<K, V> f;
V val;
synchronized (this) {
f = futures.remove(key);
val = map.remove(key);
}
if (f != null) {
f.cancel(false);
}
return val;
}
/**
* Get a view of completed keys with values
*
* <p>
* The view is unmodifiable, but the backing map may still be modified as more keys are
* completed. Thus, access to the view ought to be synchronized on this lazy map.
*
* @return a map view of keys to values
*/
public Map<K, V> getCompletedMap() {
return unmodifiable;
}
/**
* Get a copy of the keys which are requested but not completed
*
* <p>
* This should only be used for diagnostics.
*
* @return a copy of the pending key set
*/
public synchronized Set<K> getPendingKeySet() {
Set<K> result = new LinkedHashSet<>();
for (KeyedFuture<K, V> f : futures.values()) {
if (!f.isDone()) {
result.add(f.key);
}
}
return result;
}
/**
* Clear the lazy map, including pending requests
*
* <p>
* Pending requests will be cancelled
*/
public void clear() {
Set<KeyedFuture<K, V>> copy = new LinkedHashSet<>();
synchronized (this) {
copy.addAll(futures.values());
futures.clear();
map.clear();
}
for (KeyedFuture<K, V> f : copy) {
f.cancel(false);
}
}
/**
* Retain only those entries whose keys appear in the given collection
*
* <p>
* All removed entries with pending computations will be canceled
*
* @param keys the keys to retain
*/
public void retainKeys(Collection<K> keys) {
Set<KeyedFuture<K, V>> removed = new LinkedHashSet<>();
synchronized (this) {
for (Iterator<Entry<K, KeyedFuture<K, V>>> it = futures.entrySet().iterator(); it
.hasNext();) {
Entry<K, KeyedFuture<K, V>> ent = it.next();
if (!keys.contains(ent.getKey())) {
removed.add(ent.getValue());
it.remove();
}
}
map.keySet().retainAll(keys);
}
for (KeyedFuture<K, V> f : removed) {
f.cancel(false);
}
}
/**
* Check if a given key is in the map, pending or completed
*
* @param key the key to check
* @return true if present, false otherwise
*/
public synchronized boolean containsKey(K key) {
return futures.containsKey(key) || map.containsKey(key);
}
}

View file

@ -0,0 +1,152 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* A value to be completed once upon the first request, asynchronously
*
* This contains a single lazy value. It is computed only if requested. When requested, a future is
* returned and the computation is started. If the computation succeeds, the completed future is
* cached indefinitely. Any subsequent requests return the same future, even if the computation has
* not yet completed. Thus, when it completes, all requests will be fulfilled by the result of the
* first request. If the computation completes exceptionally, the result is immediately discarded.
* Thus, a subsequent request will retry the computation.
*
* @param <T> the type of the value
*/
public class AsyncLazyValue<T> {
private CompletableFuture<T> future;
private Throwable lastExc = null;
private Supplier<CompletableFuture<T>> supplier;
/**
* Construct a lazy value for the given computation
*
* @param supplier specifies the computation
*/
public AsyncLazyValue(Supplier<CompletableFuture<T>> supplier) {
this.supplier = supplier;
}
/**
* Request the value
*
* If this is called before {@link #provide()}, the computation given at construction is
* launched. The {@link CompletableFuture} it provides is returned immediately. Subsequent calls
* to either {@link #request()} or {@link #provide()} return the same future without starting
* any new computation.
*
* @return a future, possibly already completed, for the value
*/
public synchronized CompletableFuture<T> request() {
if (future == null) {
future = supplier.get();
future.exceptionally((exc) -> {
synchronized (this) {
lastExc = exc;
future = null;
}
// We return the future, not the result of exceptionally
// So no need to rethrow here
return null;
});
}
// It's possible the future completed exceptionally on this thread, so future may be null
if (future == null) {
return CompletableFuture.failedFuture(lastExc);
}
return future;
}
/**
* Provide the value out of band
*
* If this is called before {@link #request()}, the computation given at construction is
* ignored. A new {@link CompletableFuture} is returned instead. The caller must see to this
* future's completion. Subsequent calls to either {@link #request()} or {@link #provide()}
* return this same future without starting any computation.
*
* Under normal circumstances, the caller cannot determine whethor or not is has "claimed" the
* computation. If the usual computation is already running, then the computations are
* essentially in a race. As such, it is essential that alternative computations result in the
* same value as the usual computation. In other words, the functions must not differ, but the
* means of computation can differ. Otherwise, race conditions may arise.
*
* @return a promise that the caller must fulfill or arrange to have fulfilled
*/
public synchronized CompletableFuture<T> provide() {
if (future == null) {
future = new CompletableFuture<>();
future.exceptionally((exc) -> {
synchronized (this) {
future = null;
}
return ExceptionUtils.rethrow(exc);
});
}
return future;
}
/**
* Forget the value
*
* Instead of returning a completed (or even in-progress) future, the next request will cause
* the value to be re-computed.
*/
public synchronized void forget() {
future = null;
}
@Override
public synchronized String toString() {
if (future == null) {
return "(lazy)";
}
if (!future.isDone()) {
return "(lazy-req)";
}
if (future.isCompletedExceptionally()) {
return "(lazy-err)";
}
return future.getNow(null).toString();
}
/**
* Check if the value has been requested, but not yet completed
*
* <p>
* This will also return true if something is providing the value out of band.
*
* @return true if {@link #request()} or {@link #provide()} has been called, but not completed
*/
public synchronized boolean isBusy() {
return future != null && !future.isDone();
}
/**
* Check if the value is available immediately
*
* @return true if {@link #request()} or {@link #provide()} has been called and completed.
*/
public synchronized boolean isDone() {
return future != null && future.isDone();
}
}

View file

@ -0,0 +1,75 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
public class AsyncPairingQueue<T> {
private final Deque<CompletableFuture<? extends T>> givers = new LinkedList<>();
private final Deque<CompletableFuture<T>> takers = new LinkedList<>();
public void give(CompletableFuture<? extends T> giver) {
CompletableFuture<T> taker;
synchronized (givers) {
if (takers.isEmpty()) {
givers.add(giver);
return;
}
taker = takers.poll();
}
pair(giver, taker);
}
public CompletableFuture<T> give() {
CompletableFuture<T> giver = new CompletableFuture<>();
give(giver);
return giver;
}
public CompletableFuture<T> take() {
CompletableFuture<T> taker = new CompletableFuture<>();
CompletableFuture<? extends T> giver;
synchronized (givers) {
if (givers.isEmpty()) {
takers.add(taker);
return taker;
}
giver = givers.poll();
}
pair(giver, taker);
return taker;
}
private void pair(CompletableFuture<? extends T> giver, CompletableFuture<T> taker) {
giver.handle((val, exc) -> {
if (exc != null) {
taker.completeExceptionally(exc);
}
else {
taker.complete(val);
}
return null;
});
}
public boolean isEmpty() {
synchronized (givers) {
return givers.isEmpty() && takers.isEmpty();
}
}
}

View file

@ -0,0 +1,481 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.lang.ref.Cleaner.Cleanable;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import ghidra.util.Msg;
import ghidra.util.TriConsumer;
/**
* An observable reference useful for asynchronous computations
*
* <p>
* The reference supports the usual set and get operations. The set operation accepts an optional
* "cause" argument which is forwarded to some observers. The set operation may also be intercepted
* by an optional filter. The filter function is provided a copy of the current value, proposed
* value, and cause. The value it returns becomes the new value. If that value is different than the
* current value, the observers are notified. The default filter returns the new value, always.
*
* <p>
* The reference provides three types of observation callbacks. The first is to listen for all
* changes. This follows the listener pattern. When the value changes, i.e., is set to a value
* different than the current value, all change listener are invoked with a copy of the new value
* and a reference to the provided cause, if given. The second is to wait for the very next change.
* It follows the promises pattern. The returned future completes with the new value upon the very
* next change. The cause is not provided to the type of observer. The third is to wait for a given
* value. It, too, follows the promises pattern. The returned future completes as soon as the
* reference takes the given value. The cause is not provided to this type of observer.
*
* @param <T> the type of object stored by reference
* @param <C> when updated, the type of the causes of those updates (often {@link Void})
*/
public class AsyncReference<T, C> {
private T val;
private List<TriConsumer<? super T, ? super T, ? super C>> listeners = new ArrayList<>();
private CompletableFuture<T> changePromise = null;
private final Map<T, CompletableFuture<Void>> waitsFor = new HashMap<>();
private final List<WaitUntilFuture<T>> waitsUntil = new ArrayList<>();
private FilterFunction<T, ? super C> filter = (cur, set, cause) -> set;
private Throwable disposalReason;
/**
* A function to filter updates to an {@link AsyncReference}
*
* @param <T> the type of object stored by the reference
* @param <C> when updated, the type of the causes of those updates
*/
@FunctionalInterface
public interface FilterFunction<T, C> {
/**
* Filter an incoming update, i.e., call to {@link AsyncReference#set(Object, Object)}
*
* @param cur the current value of the reference
* @param set the incoming value from the update
* @param cause the cause of the update
* @return the new value to assign to the reference
*/
T filter(T cur, T set, C cause);
}
/**
* For {@link AsyncReference#waitUntil}
*
* @param <T> the type of the associated reference
*/
private static class WaitUntilFuture<T> extends CompletableFuture<T> {
private final Predicate<T> predicate;
public WaitUntilFuture(Predicate<T> predicate) {
this.predicate = predicate;
}
}
/**
* Construct a new reference initialized to {@code null}
*/
public AsyncReference() {
this(null);
}
/**
* Construct a new reference initialized to the given value
*
* @param t the initial value
*/
public AsyncReference(T t) {
this.val = t;
}
/**
* Apply a filter function to all subsequent updates
*
* <p>
* The given function replaces the current function.
*
* @param newFilter the filter
*/
public synchronized void filter(FilterFunction<T, ? super C> newFilter) {
if (newFilter == null) {
throw new NullPointerException();
}
this.filter = newFilter;
}
/**
* Get the current value of this reference
*
* @return the current value
*/
public synchronized T get() {
return val;
}
protected CompletableFuture<T> getAndClearChangePromise() {
CompletableFuture<T> promise = changePromise;
changePromise = null;
return promise;
}
protected CompletableFuture<Void> getAndRemoveWaitFor(T t) {
return waitsFor.remove(t);
}
protected List<WaitUntilFuture<T>> getAndRemoveUntils(T t) {
List<WaitUntilFuture<T>> untils = new ArrayList<>();
for (Iterator<WaitUntilFuture<T>> it = waitsUntil.iterator(); it.hasNext();) {
WaitUntilFuture<T> wuf = it.next();
if (wuf.predicate.test(t)) {
it.remove();
untils.add(wuf);
}
}
return untils;
}
protected boolean filterAndSet(T t, C cause) {
t = filter.filter(this.val, t, cause);
if (Objects.equals(this.val, t)) {
return false;
}
this.val = t;
return true;
}
protected void invokeListeners(List<TriConsumer<? super T, ? super T, ? super C>> copy,
T oldVal, T newVal, C cause) {
for (TriConsumer<? super T, ? super T, ? super C> listener : copy) {
try {
listener.accept(oldVal, newVal, cause);
}
catch (RejectedExecutionException exc) {
Msg.trace(this, "Ignoring rejection: " + exc);
}
catch (Throwable exc) {
Msg.error(this, "Ignoring exception on async reference listener: ", exc);
}
}
}
protected void invokePromise(CompletableFuture<T> promise, T t) {
if (promise != null) {
promise.complete(t);
}
}
protected void invokeWaitFor(CompletableFuture<Void> waiter) {
if (waiter != null) {
waiter.complete(null);
}
}
protected void invokeWaitUntils(List<WaitUntilFuture<T>> untils, T t) {
for (WaitUntilFuture<T> wuf : untils) {
wuf.complete(t);
}
}
/**
* Update this reference to the given value because of the given cause
*
* @param newVal the proposed value (subject to the filter)
* @param cause the cause, often {@code null}
* @return true if the value of this reference changed (post filter)
*/
public boolean set(T newVal, C cause) {
List<TriConsumer<? super T, ? super T, ? super C>> volatileListeners;
CompletableFuture<T> promise;
CompletableFuture<Void> waiter;
List<WaitUntilFuture<T>> untils = new ArrayList<>();
T oldVal;
synchronized (this) {
oldVal = this.val;
if (!filterAndSet(newVal, cause)) {
return false;
}
newVal = this.val;
// Invoke listeners without the lock
volatileListeners = listeners;
promise = getAndClearChangePromise();
waiter = getAndRemoveWaitFor(newVal);
untils = getAndRemoveUntils(newVal);
}
invokeListeners(volatileListeners, oldVal, newVal, cause);
invokePromise(promise, newVal);
invokeWaitFor(waiter);
invokeWaitUntils(untils, newVal);
return true;
}
/**
* Update this reference using the given function because of the given cause
*
* @param func the function taking the current value and returning the proposed value (subject
* to the filter)
* @param cause the cause, often {@code null}
* @return the new value of this reference (post filter)
*/
public T compute(Function<? super T, ? extends T> func, C cause) {
List<TriConsumer<? super T, ? super T, ? super C>> volatileListeners;
CompletableFuture<T> promise;
CompletableFuture<Void> waiter;
List<WaitUntilFuture<T>> untils = new ArrayList<>();
T newVal;
T oldVal;
synchronized (this) {
oldVal = this.val;
newVal = func.apply(this.val);
if (!filterAndSet(newVal, cause)) {
return this.val;
}
newVal = this.val;
// Invoke listeners without the lock
volatileListeners = listeners;
promise = getAndClearChangePromise();
waiter = getAndRemoveWaitFor(newVal);
untils = getAndRemoveUntils(newVal);
}
invokeListeners(volatileListeners, oldVal, newVal, cause);
invokePromise(promise, newVal);
invokeWaitFor(waiter);
invokeWaitUntils(untils, newVal);
return newVal;
}
/**
* Add a listener for any change to this reference's value
*
* <p>
* Updates that get "filtered out" do not cause a change listener to fire.
*
* @param listener the listener, which is passed the new value (post-filter) and cause
*/
public void addChangeListener(TriConsumer<? super T, ? super T, ? super C> listener) {
List<TriConsumer<? super T, ? super T, ? super C>> copy = new ArrayList<>(listeners);
copy.add(listener);
synchronized (this) {
listeners = copy;
}
}
/**
* Remove a change listener
*
* @param listener the listener to remove
*/
public synchronized void removeChangeListener(TriConsumer<T, T, C> listener) {
List<TriConsumer<? super T, ? super T, ? super C>> copy = new ArrayList<>(listeners);
copy.remove(listener);
synchronized (this) {
listeners = copy;
}
}
/**
* Wait for the next change and capture the new value
*
* The returned future completes with the value of the very next change, at the time of that
* change. Subsequent changes to the value of the reference do not affect the returned future.
*
* @return the future value at the next change
*/
public synchronized CompletableFuture<T> waitChanged() {
if (disposalReason != null) {
return CompletableFuture.failedFuture(disposalReason);
}
if (changePromise == null) {
changePromise = new CompletableFuture<>();
}
return changePromise;
}
/**
* Wait for this reference to accept a particular value (post-filter)
*
* <p>
* If the reference already has the given value, a completed future is returned.
*
* @param t the expected value to wait on
* @return a future that completes the next time the reference accepts the given value
*/
public synchronized CompletableFuture<Void> waitValue(T t) {
if (disposalReason != null) {
return CompletableFuture.failedFuture(disposalReason);
}
if (Objects.equals(this.val, t)) {
return AsyncUtils.nil();
}
CompletableFuture<Void> waiter = waitsFor.get(t);
if (waiter == null) {
waiter = new CompletableFuture<>();
waitsFor.put(t, waiter);
}
return waiter;
}
/**
* Wait for this reference to accept the first value meeting the given condition (post-filter)
*
* <p>
* If the current value already meets the condition, a completed future is returned.
*
* @param predicate the condition to meet
* @return a future that completes the next time the reference accepts a passing value
*/
public synchronized CompletableFuture<T> waitUntil(Predicate<T> predicate) {
if (disposalReason != null) {
return CompletableFuture.failedFuture(disposalReason);
}
if (predicate.test(val)) {
return CompletableFuture.completedFuture(val);
}
WaitUntilFuture<T> waiter = new WaitUntilFuture<>(predicate);
waitsUntil.add(waiter);
return waiter;
}
/**
* Clear out the queues of future, completing each exceptionally
*
* @param reason the reason for disposal
*/
public void dispose(Throwable reason) {
List<CompletableFuture<?>> toExcept = new ArrayList<>();
synchronized (this) {
disposalReason = reason;
toExcept.addAll(waitsFor.values());
waitsFor.clear();
toExcept.addAll(waitsUntil);
waitsUntil.clear();
if (changePromise != null) {
toExcept.add(changePromise);
changePromise = null;
}
}
DisposedException ex = new DisposedException(reason);
for (CompletableFuture<?> future : toExcept) {
future.completeExceptionally(ex);
}
}
/**
* Used for debouncing
*/
protected static class ChangeRecord<T, C> {
final T val;
final C cause;
public ChangeRecord(T val, C cause) {
this.val = val;
this.cause = cause;
}
}
protected static class DebouncedAsyncReference<T, C> extends AsyncReference<T, C> {
static class State<T, C> implements Runnable, TriConsumer<T, T, C> {
final WeakReference<DebouncedAsyncReference<T, C>> to;
final AsyncReference<T, C> from;
final AsyncDebouncer<ChangeRecord<T, C>> db;
public State(WeakReference<DebouncedAsyncReference<T, C>> to, AsyncReference<T, C> from,
AsyncTimer timer, long windowMillis) {
this.to = to;
this.from = from;
this.db = new AsyncDebouncer<>(timer, windowMillis);
from.addChangeListener(this);
db.addListener(r -> {
DebouncedAsyncReference<T, C> ref = to.get();
if (ref == null) {
return;
}
ref.doSet(r.val, r.cause);
});
}
@Override
public void accept(T oldVal, T newVal, C c) {
db.contact(new ChangeRecord<>(newVal, c));
}
@Override
public void run() {
from.removeChangeListener(this);
}
}
final State<T, C> state;
final Cleanable cleanable;
public DebouncedAsyncReference(AsyncReference<T, C> from, AsyncTimer timer,
long windowMillis) {
super(from.val);
this.state = new State<>(new WeakReference<>(this), from, timer, windowMillis);
this.cleanable = AsyncUtils.CLEANER.register(this, state);
}
@Override
public boolean set(T t, C cause) {
throw new IllegalStateException("Cannot set a debounced async reference.");
}
private boolean doSet(T t, C cause) {
return super.set(t, cause);
}
}
/**
* Obtain a new {@link AsyncReference} whose value is updated after this reference has settled
*
* <p>
* The original {@link AsyncReference} continues to behave as usual, except that is has an
* additional listener on it. When this reference is updated, the update is passed through an
* {@link AsyncDebouncer} configured with the given timer and window. When the debouncer
* settles, the debounced reference is updated.
*
* <p>
* Directly updating, i.e., calling {@link #set(Object, Object)} on, the debounced reference
* subverts the debouncing mechanism, and will result in an exception. Only the original
* reference should be updated directly.
*
* <p>
* Setting a filter on the debounced reference may have undefined behavior.
*
* <p>
* If the original reference changes value rapidly, settling on the debounced reference's
* current value, no update event is produced by the debounced reference. If the original
* reference changes value rapidly, settling on a value different from the debounced reference's
* current value, an update event is produced, using the cause of the final update, even if an
* earlier cause was associated with the same final value.
*
* @param timer a timer for measuring the window
* @param windowMillis the period of inactive time to consider this reference settled
* @return the new {@link AsyncReference}
*/
public AsyncReference<T, C> debounced(AsyncTimer timer, long windowMillis) {
return new DebouncedAsyncReference<>(this, timer, windowMillis);
}
}

View file

@ -0,0 +1,161 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.Timer;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* A timer for asynchronous scheduled tasks
*
* <p>
* This object provides a futures which complete at specified times. This is useful for pausing amid
* a chain of callback actions, i.e., between iterations of a loop. A critical tenant of
* asynchronous reactive programming is to never block a thread, at least not for an indefinite
* period of time. If an action blocks, it may prevent completion of other tasks in its executor's
* queue, possibly resulting in deadlock. An easy and tempting way to accidentally block is to call
* {@link Object#wait()} or {@link Thread#sleep(long)} when trying to wait for a specific period of
* time. Unfortunately, this does not just block the chain, but blocks the thread. Java provides a
* {@link Timer}, but its {@link Future}s are not {@link CompletableFuture}s. The same is true of
* {@link ScheduledThreadPoolExecutor}.
*
* <p>
* A delay is achieved using {@link #mark()}, then {@link Mark#after(long)}.
*
* <pre>
* future.thenCompose(__ -> timer.mark().after(1000))
* </pre>
*
* <p>
* {@link #mark()} marks the current system time; all calls to the mark's {@link Mark#after(long)}
* schedule futures relative to this mark. Scheduling a timed sequence of actions is best
* accomplished using times relative to a single mark. For example:
*
* <pre>
* Mark mark = timer.mark();
* mark.after(1000).thenCompose(__ -> {
* doTaskAtOneSecond();
* return mark.after(2000);
* }).thenAccept(__ -> {
* doTaskAtTwoSeconds();
* });
* </pre>
*
* <p>
* This provides slightly more precise scheduling than delaying for a fixed period between tasks.
* Consider a second example:
*
* <p>
* Like {@link Timer}, each {@link AsyncTimer} is backed by a single thread which uses
* {@link Object#wait()} to implement its timing. Thus, this is not suitable for real-time
* applications. Unlike {@link Timer}, the backing thread is always a daemon. It will not prevent
* process termination. If a task is long running, the sequence should queue it on another executor,
* perhaps using {@link CompletableFuture#supplyAsync(Supplier, Executor)}. Otherwise, other
* scheduled tasks may be inordinately delayed.
*/
public class AsyncTimer {
public static final AsyncTimer DEFAULT_TIMER = new AsyncTimer();
protected ExecutorService thread = Executors.newSingleThreadExecutor();
public class Mark {
protected final long mark;
protected Mark(long mark) {
this.mark = mark;
}
/**
* Schedule a task to run when the given number of milliseconds has passed since this mark
*
* <p>
* The method returns immediately, giving a future result. The future completes "soon after"
* the requested interval since the last mark passes. There is some minimal overhead, but
* the scheduler endeavors to complete the future as close to the given time as possible.
* The actual scheduled time will not precede the requested time.
*
* @param intervalMillis the interval after which the returned future completes
* @return a future that completes soon after the given interval
*/
public CompletableFuture<Void> after(long intervalMillis) {
return atSystemTime(mark + intervalMillis);
}
/**
* Time a future out after the given interval
*
* @param <T> the type of the future
* @param future the future whose value is expected in the given interval
* @param millis the time interval in milliseconds
* @param valueIfLate a supplier for the value if the future doesn't complete in time
* @return a future which completes with the given futures value, or the late value if it
* times out.
*/
public <T> CompletableFuture<T> timeOut(CompletableFuture<T> future, long millis,
Supplier<T> valueIfLate) {
return CompletableFuture.anyOf(future, after(millis)).thenApply(v -> {
if (future.isDone()) {
return future.getNow(null);
}
return valueIfLate.get();
});
}
}
/**
* Create a new timer
*
* <p>
* Except to reduce contention among threads, most applications need only create one timer
* instance. See {@link AsyncTimer#DEFAULT_TIMER}.
*/
public AsyncTimer() {
}
/**
* Schedule a task to run when {@link System#currentTimeMillis()} has passed a given time
*
* <p>
* This method returns immediately, giving a future result. The future completes "soon after"
* the current system time passes the given time in milliseconds. There is some minimal
* overhead, but the scheduler endeavors to complete the future as close to the given time as
* possible. The actual scheduled time will not precede the requested time.
*
* @param timeMillis the time after which the returned future completes
* @return a future that completes soon after the given time
*/
public CompletableFuture<Void> atSystemTime(long timeMillis) {
if (timeMillis - System.currentTimeMillis() <= 0) {
return AsyncUtils.nil();
}
long delta = timeMillis - System.currentTimeMillis();
Executor executor =
delta <= 0 ? thread : CompletableFuture.delayedExecutor(delta, TimeUnit.MILLISECONDS);
return CompletableFuture.runAsync(() -> {
}, executor);
}
/**
* Mark the current system time
*
* @return this same timer
*/
public Mark mark() {
return new Mark(System.currentTimeMillis());
}
}

View file

@ -0,0 +1,93 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.lang.ref.Cleaner;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import org.apache.commons.lang3.exception.ExceptionUtils;
/**
* Some conveniences when dealing with Java's {@link CompletableFuture}s.
*/
public interface AsyncUtils {
Cleaner CLEANER = Cleaner.create();
ExecutorService FRAMEWORK_EXECUTOR = Executors.newWorkStealingPool();
ExecutorService SWING_EXECUTOR = SwingExecutorService.LATER;
// NB. This was a bad idea, because CFs may maintain refs to dependents.
//CompletableFuture<Void> NIL = CompletableFuture.completedFuture(null);
public static <T> CompletableFuture<T> nil() {
return CompletableFuture.completedFuture(null);
}
public interface TemperamentalRunnable {
public void run() throws Throwable;
}
public interface TemperamentalSupplier<T> {
public T get() throws Throwable;
}
/**
* Unwrap {@link CompletionException}s and {@link ExecutionException}s to get the real cause
*
* @param e the (usually wrapped) exception
* @return the nearest cause in the chain that is not a {@link CompletionException}
*/
public static Throwable unwrapThrowable(Throwable e) {
Throwable exc = e;
while (exc instanceof CompletionException || exc instanceof ExecutionException) {
exc = exc.getCause();
}
return exc;
}
/**
* Create a {@link BiFunction} that copies a result from one {@link CompletableFuture} to
* another
*
* <p>
* The returned function is suitable for use in {@link CompletableFuture#handle(BiFunction)} and
* related methods, as in:
*
* <pre>
* sourceCF().handle(AsyncUtils.copyTo(destCF));
* </pre>
*
* <p>
* This will effectively cause {@code destCF} to be completed identically to {@code sourceCF}.
* The returned future from {@code handle} will also behave identically to {@code source CF},
* except that {@code destCF} is guaranteed to complete before the returned future does.
*
* @param <T> the type of the future result
* @param dest the future to copy into
* @return a function which handles the source future
*/
public static <T> BiFunction<T, Throwable, T> copyTo(CompletableFuture<T> dest) {
return (t, ex) -> {
if (ex != null) {
dest.completeExceptionally(ex);
return ExceptionUtils.rethrow(ex);
}
dest.complete(t);
return t;
};
}
}

View file

@ -0,0 +1,22 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
public class DisposedException extends RuntimeException {
public DisposedException(Throwable reason) {
super(reason);
}
}

View file

@ -0,0 +1,74 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.List;
import java.util.concurrent.*;
import javax.swing.SwingUtilities;
import ghidra.util.Swing;
/**
* A wrapper for {@link SwingUtilities#invokeLater(Runnable)} that implements
* {@link ExecutorService}.
*/
public abstract class SwingExecutorService extends AbstractExecutorService {
public static final SwingExecutorService LATER = new SwingExecutorService() {
@Override
public void execute(Runnable command) {
SwingUtilities.invokeLater(command);
}
};
/**
* Wraps {@link Swing#runIfSwingOrRunLater(Runnable)} instead
*/
public static final SwingExecutorService MAYBE_NOW = new SwingExecutorService() {
@Override
public void execute(Runnable command) {
Swing.runIfSwingOrRunLater(command);
}
};
private SwingExecutorService() {
}
@Override
public void shutdown() {
throw new UnsupportedOperationException();
}
@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
}

View file

@ -1210,7 +1210,7 @@ public class StringUtilities {
/**
* Removes any whitespace from start or end of string, then replaces any non-printable
* character (< 32) or spaces (32) with an underscore.
* character (&lt; 32) or spaces (32) with an underscore.
* @param s the string to adjust
* @return a new trimmed string with underscores replacing any non-printable characters.
*/

View file

@ -0,0 +1,109 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections4.map.LazyMap;
import org.junit.Test;
public class AsyncLaziesTest {
@Test
public void testLazyValueAskTwice() {
AtomicInteger calls = new AtomicInteger();
CompletableFuture<String> future = new CompletableFuture<>();
AsyncLazyValue<String> lazy = new AsyncLazyValue<>(() -> {
calls.incrementAndGet();
return future;
});
CompletableFuture<String> lazyReq1 = lazy.request();
assertEquals(future, lazyReq1);
CompletableFuture<String> lazyReq2 = lazy.request();
assertEquals(future, lazyReq2);
assertEquals(1, calls.get());
}
@Test
public void testLazyMapAskTwice() {
Map<String, AtomicInteger> calls =
LazyMap.lazyMap(new HashMap<>(), () -> new AtomicInteger());
Map<String, CompletableFuture<Integer>> reqs = new HashMap<>();
AsyncLazyMap<String, Integer> lazyMap = new AsyncLazyMap<>(new HashMap<>(), (key) -> {
CompletableFuture<Integer> req = new CompletableFuture<>();
reqs.put(key, req);
calls.get(key).incrementAndGet();
return req;
});
CompletableFuture<Integer> req1a = lazyMap.get("One");
CompletableFuture<Integer> req1b = lazyMap.get("One");
CompletableFuture<Integer> req2a = lazyMap.get("Two");
CompletableFuture<Integer> req2b = lazyMap.get("Two");
assertEquals(req1a, req1b);
assertEquals(req2a, req2b);
assertEquals(1, calls.get("One").get());
assertEquals(1, calls.get("Two").get());
assertEquals(2, calls.size());
}
@Test
public void testLazyMapCompletedMap() throws Exception {
Map<String, CompletableFuture<Integer>> reqs = new HashMap<>();
AsyncLazyMap<String, Integer> lazyMap = new AsyncLazyMap<>(new HashMap<>(), (key) -> {
CompletableFuture<Integer> req = new CompletableFuture<>();
reqs.put(key, req);
return req;
});
CompletableFuture<Integer> req1 = lazyMap.get("One");
assertEquals(Map.of(), lazyMap.getCompletedMap());
reqs.get("One").complete(1);
assertEquals(1, req1.get(1000, TimeUnit.MILLISECONDS).intValue());
assertEquals(Map.of("One", 1), lazyMap.getCompletedMap());
}
@Test
public void testLazyMapPut() throws InterruptedException, ExecutionException, TimeoutException {
Map<String, CompletableFuture<Integer>> reqs = new HashMap<>();
AsyncLazyMap<String, Integer> lazyMap = new AsyncLazyMap<>(new HashMap<>(), (key) -> {
CompletableFuture<Integer> req = new CompletableFuture<>();
reqs.put(key, req);
return req;
});
CompletableFuture<Integer> req1 = lazyMap.get("One");
AtomicInteger val1 = new AtomicInteger(0);
req1.thenAccept(val1::set);
assertEquals(0, val1.get());
lazyMap.put("One", 1);
assertEquals(1, req1.get(1000, TimeUnit.MILLISECONDS).intValue());
assertEquals(1, val1.get());
}
}

View file

@ -0,0 +1,162 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import static org.junit.Assert.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import ghidra.util.Msg;
public class AsyncReferenceTest {
@Test
public void testListener() {
AsyncReference<String, Integer> str = new AsyncReference<>();
AtomicReference<String> got = new AtomicReference<>();
AtomicInteger gotCause = new AtomicInteger();
str.addChangeListener((old, val, cause) -> {
got.set(val);
gotCause.set(cause);
});
str.set("Hello", 1);
assertEquals("Hello", got.get());
assertEquals(1, gotCause.get());
str.set("World", 2);
assertEquals("World", got.get());
assertEquals(2, gotCause.get());
}
@Test
public void testWaitChanged() throws InterruptedException, ExecutionException {
AsyncReference<String, Void> str = new AsyncReference<>();
CompletableFuture<String> chg1 = str.waitChanged();
CompletableFuture<String> chg2 = str.waitChanged();
assertFalse(chg1.isDone());
assertFalse(chg2.isDone());
str.set("Hello", null);
assertTrue(chg1.isDone());
assertTrue(chg2.isDone());
assertEquals("Hello", chg1.get());
assertEquals("Hello", chg2.get());
CompletableFuture<String> chg3 = str.waitChanged();
assertFalse(chg3.isDone());
str.set("World", null);
assertTrue(chg3.isDone());
assertEquals("World", chg3.get());
}
@Test
public void testWaitValue() {
AsyncReference<String, Void> str = new AsyncReference<>();
CompletableFuture<Void> matchHello = str.waitValue("Hello");
CompletableFuture<Void> matchWorld = str.waitValue("World");
assertFalse(matchHello.isDone());
assertFalse(matchWorld.isDone());
assertEquals(matchHello, str.waitValue("Hello"));
str.set("Hello", null);
assertTrue(matchHello.isDone());
assertFalse(matchWorld.isDone());
assertTrue(str.waitValue("Hello").isDone());
str.set("World", null);
assertFalse(str.waitValue("Hello").isDone());
assertTrue(matchWorld.isDone());
}
@Test
public void testDebouncer() throws InterruptedException, ExecutionException, TimeoutException {
AsyncDebouncer<Void> debouncer = new AsyncDebouncer<>(new AsyncTimer(), 100);
long startTime = System.currentTimeMillis();
CompletableFuture<Void> settled = debouncer.settled();
debouncer.contact(null);
settled.get(300, TimeUnit.MILLISECONDS);
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
Msg.info(this, "duration: " + duration);
assertTrue(duration >= 100);
}
@Test
public void testDebouncedUnchanged() throws InterruptedException {
AsyncReference<Integer, Void> orig = new AsyncReference<>(1);
AsyncReference<Integer, Void> db = orig.debounced(new AsyncTimer(), 100);
CompletableFuture<Integer> settled = db.waitChanged();
orig.set(1, null);
Thread.sleep(200);
assertFalse(settled.isDone());
}
@Test
public void testDebouncedSingleChange()
throws InterruptedException, ExecutionException, TimeoutException {
AsyncReference<Integer, Void> orig = new AsyncReference<>(1);
AsyncReference<Integer, Void> db = orig.debounced(new AsyncTimer(), 100);
CompletableFuture<Integer> settled = db.waitChanged();
long startTime = System.currentTimeMillis();
orig.set(2, null);
int s = settled.get(300, TimeUnit.MILLISECONDS);
long endTime = System.currentTimeMillis();
assertEquals(2, s);
long duration = endTime - startTime;
Msg.info(this, "duration: " + duration);
assertTrue(duration >= 100);
}
@Test
public void testDebouncedChangedBack() throws InterruptedException {
AsyncReference<Integer, Void> orig = new AsyncReference<>(1);
AsyncReference<Integer, Void> db = orig.debounced(new AsyncTimer(), 100);
CompletableFuture<Integer> settled = db.waitChanged();
orig.set(2, null);
orig.set(1, null);
Thread.sleep(200);
assertFalse(settled.isDone());
}
@Test
public void testManyChanges()
throws InterruptedException, ExecutionException, TimeoutException {
AsyncReference<Integer, String> orig = new AsyncReference<>(1);
AsyncReference<Integer, String> db = orig.debounced(new AsyncTimer(), 100);
CompletableFuture<Integer> settledVal = new CompletableFuture<>();
CompletableFuture<String> settledCause = new CompletableFuture<>();
db.addChangeListener((old, val, cause) -> {
assertTrue(settledVal.complete(val));
assertTrue(settledCause.complete(cause));
});
long startTime = System.currentTimeMillis();
orig.set(2, "First");
Thread.sleep(50);
orig.set(4, "Second");
Thread.sleep(50);
orig.set(3, "Third");
Thread.sleep(50);
orig.set(4, "Fourth");
int s = settledVal.get(300, TimeUnit.MILLISECONDS);
long endTime = System.currentTimeMillis();
assertEquals(4, s);
long duration = endTime - startTime;
Msg.info(this, "duration: " + duration);
assertTrue(duration >= 250);
assertEquals("Fourth", settledCause.get());
}
}

View file

@ -0,0 +1,64 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import java.util.concurrent.*;
import ghidra.util.SystemUtilities;
public interface AsyncTestUtils {
static final long TIMEOUT_MS =
SystemUtilities.isInTestingBatchMode() ? 5000 : Long.MAX_VALUE;
static final long RETRY_INTERVAL_MS = 100;
default <T> T waitOnNoValidate(CompletableFuture<T> future) throws Throwable {
// Do this instead of plain ol' .get(time), to ease debugging
// When suspended in .get(time), you can't introspect much, otherwise
long started = System.currentTimeMillis();
while (true) {
try {
return future.get(100, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
if (Long.compareUnsigned(System.currentTimeMillis() - started, TIMEOUT_MS) >= 0) {
throw e;
}
}
catch (Exception e) {
throw AsyncUtils.unwrapThrowable(e);
}
}
}
default void validateCompletionThread() {
}
default <T> T waitOn(CompletableFuture<T> future) throws Throwable {
/**
* NB. CF's may issue dependent callbacks either on the thread completing the dependency, or
* on the thread chaining the dependent. If the CF completes before the chain, then the
* callback comes to me, and so currentThread will not be the model's callback thread. Thus,
* I should not validate the currentThread at callback if it is the currentThread now.
*/
Thread waitingThread = Thread.currentThread();
CompletableFuture<T> validated = future.whenComplete((t, ex) -> {
if (Thread.currentThread() != waitingThread) {
validateCompletionThread();
}
});
return waitOnNoValidate(validated);
}
}

View file

@ -0,0 +1,153 @@
/* ###
* IP: GHIDRA
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ghidra.async;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class AsyncTimerTest {
@Test
public void testMarkWait1000ms() throws Exception {
AsyncTimer timer = new AsyncTimer();
long start = System.currentTimeMillis();
timer.mark().after(1000).get(5000, TimeUnit.MILLISECONDS);
long diff = System.currentTimeMillis() - start;
assertTrue(diff >= 1000);
assertTrue(diff < 5000);
}
@Test
public void testMarkWait1000Then2000ms() throws Exception {
AsyncTimer timer = new AsyncTimer();
long start = System.currentTimeMillis();
long diff;
AsyncTimer.Mark mark = timer.mark();
mark.after(1000).get(5000, TimeUnit.MILLISECONDS);
diff = System.currentTimeMillis() - start;
assertTrue(diff >= 1000);
assertTrue(diff < 2000);
mark.after(2000).get(5000, TimeUnit.MILLISECONDS);
diff = System.currentTimeMillis() - start;
assertTrue(diff >= 2000);
assertTrue(diff < 3000);
}
@Test
public void testMarkWait1000ThenMarkWait1000ms() throws Exception {
AsyncTimer timer = new AsyncTimer();
long start = System.currentTimeMillis();
long diff;
timer.mark().after(1000).get(5000, TimeUnit.MILLISECONDS);
diff = System.currentTimeMillis() - start;
assertTrue(diff >= 1000);
assertTrue(diff < 2000);
timer.mark().after(1000).get(5000, TimeUnit.MILLISECONDS);
diff = System.currentTimeMillis() - start;
assertTrue(diff >= 2000);
assertTrue(diff < 3000);
}
@Test
public void testMarkWait1000And2000ms() throws Exception {
AsyncTimer.Mark mark = new AsyncTimer().mark();
long start = System.currentTimeMillis();
CompletableFuture<Long> first = new CompletableFuture<>();
CompletableFuture<Long> second = new CompletableFuture<>();
mark.after(1000).thenAccept((v) -> first.complete(System.currentTimeMillis()));
mark.after(2000).thenAccept((v) -> second.complete(System.currentTimeMillis()));
first.get();
second.get();
long diff1 = first.get() - start;
long diff2 = second.get() - start;
assertTrue(diff1 >= 1000);
assertTrue(diff1 < 2000);
assertTrue(diff2 >= 2000);
assertTrue(diff2 < 3000);
}
@Test
public void testMarkWait2000And1000ms() throws Exception {
AsyncTimer.Mark mark = new AsyncTimer().mark();
long start = System.currentTimeMillis();
CompletableFuture<Long> first = new CompletableFuture<>();
CompletableFuture<Long> second = new CompletableFuture<>();
mark.after(2000).thenAccept((v) -> first.complete(System.currentTimeMillis()));
mark.after(1000).thenAccept((v) -> second.complete(System.currentTimeMillis()));
first.get();
second.get();
long diff1 = first.get() - start;
long diff2 = second.get() - start;
assertTrue(diff1 >= 2000);
assertTrue(diff1 < 3000);
assertTrue(diff2 >= 1000);
assertTrue(diff2 < 2000);
}
@Test
public void testMarkWait2000And1000ThenCancel1000ms() throws Exception {
AsyncTimer.Mark mark = new AsyncTimer().mark();
long start = System.currentTimeMillis();
CompletableFuture<Long> first = new CompletableFuture<>();
mark.after(2000).thenAccept((v) -> first.complete(System.currentTimeMillis()));
mark.after(1000).cancel(true);
first.get();
long diff = first.get() - start;
assertTrue(diff >= 2000);
assertTrue(diff < 3000);
}
@Test
public void testMarkWait2000And2000ThenCancel2000ms() throws Exception {
// The two timed futures should be treated separately, so the first must still complete
AsyncTimer.Mark mark = new AsyncTimer().mark();
long start = System.currentTimeMillis();
CompletableFuture<Long> first = new CompletableFuture<>();
mark.after(2000).thenAccept((v) -> first.complete(System.currentTimeMillis()));
mark.after(2000).cancel(true);
first.get();
long diff = first.get() - start;
assertTrue(diff >= 2000);
assertTrue(diff < 3000);
}
@Test
public void testScheduleInPast() throws Exception {
AsyncTimer timer = new AsyncTimer();
long start = System.currentTimeMillis();
timer.atSystemTime(0).get(5000, TimeUnit.MILLISECONDS);
long diff = System.currentTimeMillis() - start;
assertTrue(diff < 100);
}
}