Skip to content

Commit

Permalink
Fixed #1
Browse files Browse the repository at this point in the history
1. Use executor.shutdownNow() to interrupt running task threads.
2. Don't spawn new thread pool, if the previous one can not shutdown.
  • Loading branch information
myzhan committed Dec 26, 2017
1 parent ab950a3 commit 24f73e2
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
14 changes: 13 additions & 1 deletion src/main/java/com/github/myzhan/locust4j/AbstractTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ public abstract class AbstractTask implements Runnable {

@Override
public void run() {

Runner runner = Runner.getInstance();

while (true) {

if (runner.getState().equals(State.Stopped)) {
runner.getLatch().countDown();
return;
}

try {
if (Locust.getInstance().isMaxRPSEnabled()) {
long token = Locust.getInstance().getMaxRPSThreshold().decrementAndGet();
Expand All @@ -24,7 +33,10 @@ public void run() {
} else {
this.execute();
}
} catch (Exception ex) {
} catch (InterruptedException ex) {
runner.getLatch().countDown();
return;
} catch(Exception ex) {
Locust.getInstance().recordFailure("unknown", "error", 0, ex.getMessage());
}
}
Expand Down
43 changes: 30 additions & 13 deletions src/main/java/com/github/myzhan/locust4j/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -78,6 +79,11 @@ public class Runner {
*/
private AtomicInteger threadNumber = new AtomicInteger();

/**
* Wait for all threads terminated when shutting down thread pool.
*/
private CountDownLatch latch;

private Runner() {
this.nodeID = Utils.getNodeID();
}
Expand All @@ -86,6 +92,14 @@ public static Runner getInstance() {
return RunnerInstanceHolder.RUNNER;
}

protected State getState() {
return this.state;
}

protected CountDownLatch getLatch() {
return this.latch;
}

protected void setTasks(List<AbstractTask> tasks) {
this.tasks = tasks;
}
Expand All @@ -95,6 +109,8 @@ private void spawnWorkers(int spawnCount) {
Log.debug(
String.format("Hatching and swarming %d clients at the rate %d clients/s...", spawnCount, this.hatchRate));

this.latch = new CountDownLatch(spawnCount);

float weightSum = 0;
for (AbstractTask task : this.tasks) {
weightSum += task.getWeight();
Expand Down Expand Up @@ -139,7 +155,7 @@ protected void startHatching(int spawnCount, int hatchRate) {
this.numClients = spawnCount;
}
if (this.state == State.Running) {
this.executor.shutdown();
this.shutdownThreadPool();
}
this.state = State.Hatching;
this.hatchRate = hatchRate;
Expand Down Expand Up @@ -169,29 +185,30 @@ protected void quit() {
Queues.MESSAGE_TO_MASTER.add(new Message("quit", null, this.nodeID));
}

private void shutdownThreadPool() {
this.executor.shutdownNow();
this.state = State.Stopped;
try {
this.executor.awaitTermination(1, TimeUnit.SECONDS);
this.latch.await();
} catch (InterruptedException ex) {
Log.error(ex.getMessage());
}
this.executor = null;
}

protected void stop() {
if (this.state == State.Running) {
this.executor.shutdown();
try {
this.executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
Log.error(ex.getMessage());
}
this.state = State.Stopped;
this.executor = null;
this.shutdownThreadPool();
Log.debug("Recv stop message from master, all the workers are stopped");
}
}

public void getReady() {
this.state = State.Ready;

Locust.getInstance().submitToCoreThreadPool(new Receiver(this));

Queues.MESSAGE_TO_MASTER.add(new Message("client_ready", null, this.nodeID));

Locust.getInstance().submitToCoreThreadPool(new Sender(this));

}

private static class RunnerInstanceHolder {
Expand Down

0 comments on commit 24f73e2

Please sign in to comment.