1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
|
/* Glazed Lists (c) 2003-2006 */
/* http://publicobject.com/glazedlists/ publicobject.com,*/
/* O'Dell Engineering Ltd.*/
package ca.odell.glazedlists.impl.gui;
import ca.odell.glazedlists.EventList;
import ca.odell.glazedlists.SortedList;
import ca.odell.glazedlists.TransformedList;
import ca.odell.glazedlists.event.ListEvent;
import ca.odell.glazedlists.event.ListEventAssembler;
import ca.odell.glazedlists.event.ListEventListener;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.RandomAccess;
/**
* An {@link EventList} that only forwards its events on a proxy thread,
* regardless of the thread of their origin.
*
* <p>While the proxy thread is not executing, any number of events may arrive.
* This means that multiple changes may have occurred before the proxy can
* notify its listeners. One problem created by this scenario is that some of
* these changes may contradict one another. For example, an inserted item may
* be later removed before either event is processed. To overcome this problem,
* this class uses the change-contradiction resolving logic of
* {@link ListEventAssembler}.
*
* <p>The flow of events is as follows:
* <ol>
* <li>Any thread makes a change to the source list and calls
* {@link ThreadProxyEventList#listChanged(ListEvent)}.
* <li>The event is enqueued and the proxy thread is notified that it has a
* task to process in the future.
* </ol>
*
* <p>At some point in the future, after one or more events have been enqueued,
* the proxy thread executes and processes its queue:
* <ol>
* <li>First it acquires a lock to prevent further concurrent changes
* <li>All enqueued changes are combined into a single change. Currently this
* implementation does a best effort on conflict resolution. It is
* limited in its handling of reordering events, as caused by changing a
* {@link SortedList} {@link Comparator}.
* <li>The single, combined event is fired.
* <li>The first listener is the {@link ThreadProxyEventList} itself. It listens
* to its own event because this event will be free of conflicts. It applies
* the changes to its own private copy of the data.
* <li>All other listeners are notified of the change.
* <li>The lock is released.
* </ol>
*
* <p>The {@link ThreadProxyEventList} keeps a private copy of the elements of the
* source {@link EventList}. This enables interested classes to read a consistent
* (albeit potentially out of date) view of the data at all times.
*
* <p><strong><font color="#FF0000">Important:</font></strong> ThreadProxyEventList
* relies heavily on its ability to pause changes to its source EventList
* while it is updating its private copy of the source data. It does this by
* acquiring the writeLock for the list pipeline. This implies that
* <stong>ALL</stong> code which accesses the pipeline must be thread-safe
* (or the acquisition of the writeLock will be meaningless). See
* {@link EventList} for an example of thread safe code.
*
* @author <a href="mailto:jesse@swank.ca">Jesse Wilson</a>
*/
public abstract class ThreadProxyEventList<E> extends TransformedList<E, E> implements RandomAccess {
/** a local cache of the source list */
private List<E> localCache = new ArrayList<E>();
/** propagates events on the proxy thread */
private UpdateRunner updateRunner = new UpdateRunner();
/** propagates events immediately. The source events might be fired later */
private final ListEventAssembler<E> cacheUpdates = new ListEventAssembler<E>(this, ListEventAssembler.createListEventPublisher());
/** whether the proxy thread has been scheduled */
private volatile boolean scheduled = false;
/**
* Create a {@link ThreadProxyEventList} which delivers changes to the
* given <code>source</code> on a particular {@link Thread}, called the
* proxy {@link Thread} of a subclasses choosing. The {@link Thread} used
* depends on the implementation of {@link #schedule(Runnable)}.
*
* @param source the {@link EventList} for which to proxy events
*/
public ThreadProxyEventList(EventList<E> source) {
super(source);
// populate the initial cache value
localCache.addAll(source);
// handle my own events to update the internal state
cacheUpdates.addListEventListener(updateRunner);
// handle changes in the source event list
source.addListEventListener(this);
}
/** {@inheritDoc} */
@Override
public final void listChanged(ListEvent<E> listChanges) {
// if we've haven't scheduled a commit, we need to begin a new event
if(!scheduled) {
updates.beginEvent(true);
cacheUpdates.beginEvent(true);
}
// add the changes for this event to our queue
updates.forwardEvent(listChanges);
cacheUpdates.forwardEvent(listChanges);
// commit the event on the appropriate thread
if(!scheduled) {
scheduled = true;
schedule(updateRunner);
}
}
/**
* Schedule the specified runnable to be executed on the proxy thread.
*
* @param runnable a unit of work to be executed on the proxy thread
*/
protected abstract void schedule(Runnable runnable);
/** {@inheritDoc} */
@Override
public final int size() {
return localCache.size();
}
/** {@inheritDoc} */
@Override
public final E get(int index) {
return localCache.get(index);
}
/** {@inheritDoc} */
@Override
protected final boolean isWritable() {
return true;
}
/**
* Apply the {@link ListEvent} to the {@link List}.
*
* @param source the EventList whose changes are being proxied to another thread
* @param listChanges the list of changes from the <code>source</code> to be applied
* @param localCache a private snapshot of the <code>source</code> which
* is now out of sync with that source list and will be repaired
* @return a new List to serve as the up-to-date local cache
*/
private List<E> applyChangeToCache(EventList<E> source, ListEvent<E> listChanges, List<E> localCache) {
List<E> result = new ArrayList<E>(source.size());
// cacheOffset is the running index delta between localCache and result
int resultIndex = 0;
int cacheOffset = 0;
while(true) {
// find the next change (or the end of the list)
int changeIndex;
int changeType;
if(listChanges.next()) {
changeIndex = listChanges.getIndex();
changeType = listChanges.getType();
} else {
changeIndex = source.size();
changeType = -1;
}
// perform all the updates before this change
for(; resultIndex < changeIndex; resultIndex++) {
result.add(resultIndex, localCache.get(resultIndex + cacheOffset));
}
// perform this change
if(changeType == ListEvent.DELETE) {
cacheOffset++;
} else if(changeType == ListEvent.UPDATE) {
result.add(resultIndex, source.get(changeIndex));
resultIndex++;
} else if(changeType == ListEvent.INSERT) {
result.add(resultIndex, source.get(changeIndex));
resultIndex++;
cacheOffset--;
} else if(changeType == -1) {
break;
}
}
return result;
}
/** {@inheritDoc} */
@Override
public void dispose() {
super.dispose();
cacheUpdates.removeListEventListener(updateRunner);
}
/**
* Updates the internal data using the proxy thread.
*
* @author <a href="mailto:jesse@swank.ca">Jesse Wilson</a>
*/
private class UpdateRunner implements Runnable, ListEventListener<E> {
/**
* When run, this combines all events thus far and forwards them.
*
* <p>If a reordering event is being forwarded, the reordering may be lost
* if it arrives simultaneously with another event. This is somewhat of a
* hack for the time being. Hopefully later we can refine this so that a
* new event is created with these changes properly.
*/
public void run() {
getReadWriteLock().writeLock().lock();
try {
// We need to apply the changes to the local cache immediately,
// before forwarding the event downstream to other listeners.
// This is necessary so that intermediate states in this list
// are visible to larger list changes (such as clearing tables,
// see bug 447)
cacheUpdates.commitEvent();
updates.commitEvent();
} finally {
scheduled = false;
getReadWriteLock().writeLock().unlock();
}
}
/**
* Update local state as a consequence of the change event.
*/
public void listChanged(ListEvent<E> listChanges) {
localCache = applyChangeToCache(source, listChanges, localCache);
}
}
}
|