class ConcurrentMergeScheduler {
public void merge(IndexWriter writer)
throws CorruptIndexException, IOException {
// TODO: enable this once we are on JRE 1.5
// assert !Thread.holdsLock(writer);
this.writer = writer;
initMergeThreadPriority();
dir = writer.getDirectory();
// First, quickly run through the newly proposed merges
// and add any orthogonal merges (ie a merge not
// involving segments already pending to be merged) to
// the queue. If we are way behind on merging, many of
// these newly proposed merges will likely already be
// registered.
if (verbose()) {
message("now merge");
message(" index: " + writer.segString());
}
// Iterate, pulling from the IndexWriter's queue of
// pending merges, until it's empty:
while(true) {
// TODO: we could be careful about which merges to do in
// the BG (eg maybe the "biggest" ones) vs FG, which
// merges to do first (the easiest ones?), etc.
MergePolicy.OneMerge merge = writer.getNextMerge();
if (merge == null) {
if (verbose())
message(" no more merges pending; now return");
return;
}
// We do this w/ the primary thread to keep
// deterministic assignment of segment names
writer.mergeInit(merge);
boolean success = false;
try {
synchronized(this) {
final MergeThread merger;
while (mergeThreadCount() >= maxThreadCount) {
if (verbose())
message(" too many merge threads running; stalling...");
try {
wait();
} catch (InterruptedException ie) {
// In 3.0 we will change this to throw
// InterruptedException instead
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
if (verbose())
message(" consider merge " + merge.segString(dir));
assert mergeThreadCount() < maxThreadCount;
// OK to spawn a new merge thread to handle this
// merge:
merger = getMergeThread(writer, merge);
mergeThreads.add(merger);
if (verbose())
message(" launch new thread [" + merger.getName() + "]");
merger.start();
success = true;
}
} finally {
if (!success) {
writer.mergeFinish(merge);
}
}
}
}
}