Skip to content

Commit

Permalink
Assist maintenance task when executor is exhausted (fixes #90)
Browse files Browse the repository at this point in the history
The write buffer is used to allow writers to update the eviction policy
in a non-blocking manner. The maintenance work is delegated to an async
task when possible to minimize request latencies. Previous iterations
(Guava, CLHM) amortized it on the calling thread due to not having a
system-wide executor to take advantage of.

Previously when the write buffer was full the writing threads would
spin, yield, and wait for the maintenance task to catch up. This was
under the assumption that the task was running but starved out due to
synthetic load testing, e.g. running `cache.put` with more threads than
cores. The belief was that the write buffer would be full under normal
usage and the maintenance task would be scheduled promptly.

This assumption fails for workloads where every worker in the executor
is updating the cache. This can happen in a synthetic refresh test, but
also with an AsyncLoadingCache when futures complete. In that case the
maintenance task is scheduled but unable to run, and all of the worker
threads are spinnining endlessly trying to append to the write buffer.

In this case we degrade to amortize the maintenance work on the caller.
This allows progress to be made, avoids wasteful busy waiting, and
should not increase the response penalty in most cases. That is because
writers would have had to wait anyway and this would typically happen
only on asynchronous non-user facing tasks (completers, refresh). This
also removes the ugly Thread.yield() hack, which did look unnatural.

Thanks goes to @DougLea for identifying the oversight that the executor
may exhaust its threads, causing this problem.
  • Loading branch information
ben-manes committed Jun 13, 2016
1 parent 28118c5 commit 419cfe7
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -899,23 +899,23 @@ void afterWrite(@Nullable Node<K, V> node, Runnable task, long now) {
node.setWriteTime(now);
}
if (buffersWrites()) {
boolean submitted = false;
for (;;) {
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
submitted = writeBuffer().offer(task);
if (submitted) {
break;
}
scheduleDrainBuffers();
}
if (submitted) {
break;
} else {
Thread.yield();
for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
if (writeBuffer().offer(task)) {
scheduleAfterWrite();
return;
}
scheduleDrainBuffers();
}

// The maintenance task may be scheduled but not running due to all of the executor's threads
// being busy. If all of the threads are writing into the cache then no progress can be made
// without assistance.
try {
performCleanUp(task);
} catch (RuntimeException e) {
logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
}
}
scheduleAfterWrite();
}

/**
Expand Down Expand Up @@ -965,7 +965,7 @@ void scheduleDrainBuffers() {
executor().execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
performCleanUp();
performCleanUp(/* ignored */ null);
} finally {
evictionLock.unlock();
}
Expand All @@ -975,7 +975,7 @@ void scheduleDrainBuffers() {
@Override
public void cleanUp() {
try {
performCleanUp();
performCleanUp(/* ignored */ null);
} catch (RuntimeException e) {
logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
}
Expand All @@ -985,11 +985,16 @@ public void cleanUp() {
* Performs the maintenance work, blocking until the lock is acquired, and sets the state flags
* to avoid excess scheduling attempts. Any exception thrown, such as by
* {@link CacheWriter#delete()}, is propagated to the caller.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
void performCleanUp() {
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
lazySetDrainStatus(PROCESSING_TO_IDLE);
if (task != null) {
task.run();
}
maintenance();
} finally {
if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
Expand Down Expand Up @@ -2827,7 +2832,7 @@ public boolean exec() {

@Override
public void run() {
performCleanUp();
performCleanUp(/* ignored */ null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -370,6 +372,23 @@ public void fastpath(Cache<Integer, Integer> cache, CacheContext context) {
assertThat(localCache.readBuffer.reads(), is(1));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.FULL, maximumSize = Maximum.FULL)
public void afterWrite_drainFullWriteBuffer(Cache<Integer, Integer> cache, CacheContext context) {
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
Runnable task = Mockito.mock(Runnable.class);
localCache.drainStatus = PROCESSING_TO_IDLE;
int expectedCount = 1;

while (localCache.writeBuffer().offer(task)) {
expectedCount++;
}

localCache.afterWrite(null, task, 0L);
verify(task, times(expectedCount)).run();
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.FULL, maximumSize = Maximum.FULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,40 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* A stress test to observe if the cache has a memory leak by not being able to drain the buffers
* fast enough.
* A stress test to observe if the cache is able to able to drain the buffers fast enough under a
* synthetic load.
*
* @author ben.manes@gmail.com (Ben Manes)
*/
public final class Stresser {
private static final String[] STATUS =
{ "Idle", "Required", "Processing -> Idle", "Processing -> Required" };
private static final int THREADS = 2 * Runtime.getRuntime().availableProcessors();
private static final int MAX_THREADS = 2 * Runtime.getRuntime().availableProcessors();
private static final int WRITE_MAX_SIZE = (1 << 12);
private static final int TOTAL_KEYS = (1 << 20);
private static final int MASK = TOTAL_KEYS - 1;
private static final int STATUS_INTERVAL = 5;

private final BoundedLocalCache<Integer, Integer> local;
private final Cache<Integer, Integer> cache;
private final LoadingCache<Integer, Integer> cache;
private final Stopwatch stopwatch;
private final Integer[] ints;

private final int maximum;
private final Stopwatch stopwatch;
private final boolean reads = false;
private enum Operation {
READ(MAX_THREADS, TOTAL_KEYS),
WRITE(MAX_THREADS, WRITE_MAX_SIZE),
REFRESH(1, WRITE_MAX_SIZE);

private final int maxThreads;
private final int maxEntries;

private Operation(int maxThreads, int maxEntries) {
this.maxThreads = maxThreads;
this.maxEntries = maxEntries;
}
}

private static final Operation operation = Operation.REFRESH;

public Stresser() {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
Expand All @@ -61,11 +74,10 @@ public Stresser() {
.build();
Executors.newSingleThreadScheduledExecutor(threadFactory)
.scheduleAtFixedRate(this::status, STATUS_INTERVAL, STATUS_INTERVAL, SECONDS);
maximum = reads ? TOTAL_KEYS : WRITE_MAX_SIZE;
cache = Caffeine.newBuilder()
.maximumSize(maximum)
.maximumSize(operation.maxEntries)
.recordStats()
.build();
.build(key -> key);
local = (BoundedLocalCache<Integer, Integer>) cache.asMap();
ints = new Integer[TOTAL_KEYS];
Arrays.setAll(ints, key -> {
Expand All @@ -78,15 +90,20 @@ public Stresser() {
}

public void run() throws InterruptedException {
ConcurrentTestHarness.timeTasks(THREADS, () -> {
ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> {
int index = ThreadLocalRandom.current().nextInt();
for (;;) {
Integer key = ints[index++ & MASK];
if (reads) {
cache.getIfPresent(key);
} else {
cache.put(key, key);
//Thread.yield();
switch (operation) {
case READ:
cache.getIfPresent(key);
break;
case WRITE:
cache.put(key, key);
break;
case REFRESH:
cache.refresh(key);
break;
}
}
});
Expand All @@ -95,14 +112,15 @@ public void run() throws InterruptedException {
private void status() {
local.evictionLock.lock();
int pendingWrites = local.writeBuffer().size();
int drainStatus = local.drainStatus();
local.evictionLock.unlock();

LocalTime elapsedTime = LocalTime.ofSecondOfDay(stopwatch.elapsed(TimeUnit.SECONDS));
System.out.printf("---------- %s ----------%n", elapsedTime);
System.out.printf("Pending reads: %,d; writes: %,d%n", local.readBuffer.size(), pendingWrites);
System.out.printf("Drain status = %s%n", STATUS[local.drainStatus]);
System.out.printf("Drain status = %s (%s)%n", STATUS[drainStatus], drainStatus);
System.out.printf("Evictions = %,d%n", cache.stats().evictionCount());
System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), maximum);
System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), operation.maxEntries);
System.out.printf("Lock = [%s%n", StringUtils.substringAfter(
local.evictionLock.toString(), "["));
System.out.printf("Pending tasks = %,d%n",
Expand All @@ -121,4 +139,4 @@ private void status() {
public static void main(String[] args) throws Exception {
new Stresser().run();
}
}
}
6 changes: 3 additions & 3 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ext {
jcache: '1.0.0',
jsr305: '3.0.1',
stream: '2.9.2',
univocity_parsers: '2.1.1',
univocity_parsers: '2.1.2',
ycsb: '0.9.0',
xz: '1.5',
]
Expand All @@ -47,7 +47,7 @@ ext {
jcache_tck: '1.0.1',
jctools: '1.2',
junit: '4.12',
mockito: '2.0.54-beta',
mockito: '2.0.55-beta',
pax_exam: '4.9.1',
testng: '6.9.11',
truth: '0.24',
Expand All @@ -59,7 +59,7 @@ ext {
ehcache3: '3.0.2',
elastic_search: '5.0.0-alpha3',
infinispan: '9.0.0.Alpha2',
jackrabbit: '1.5.2',
jackrabbit: '1.5.3',
jamm: '0.3.1',
java_object_layout: '0.5',
koloboke: '0.6.8',
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-rc-2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-2.14-rc-6-bin.zip

0 comments on commit 419cfe7

Please sign in to comment.