diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java index 7d06a08998..25e65e0f95 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncRequester.java @@ -66,7 +66,8 @@ DefaultConnectingIOReactor createIOReactor( LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, LoggingReactorMetricsListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + null); } private InetSocketAddress toSocketAddress(final HttpHost host) { diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java index 3149de2ebd..b334827041 100644 --- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java +++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/AsyncServer.java @@ -61,7 +61,8 @@ DefaultListeningIOReactor createIOReactor( LoggingExceptionCallback.INSTANCE, LoggingIOSessionListener.INSTANCE, LoggingReactorMetricsListener.INSTANCE, - sessionShutdownCallback); + sessionShutdownCallback, + null); } public Future listen(final InetSocketAddress address) { diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java index 399ca923cf..b158bedb20 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncRequester.java @@ -75,8 +75,7 @@ public AsyncRequester( final IOSessionListener sessionListener, final Callback sessionShutdownCallback, final Resolver addressResolver, - final IOReactorMetricsListener threadPoolListener - ) { + final IOReactorMetricsListener threadPoolListener) { this.ioReactor = new DefaultConnectingIOReactor( eventHandlerFactory, ioReactorConfig, @@ -85,7 +84,8 @@ public AsyncRequester( exceptionCallback, sessionListener, threadPoolListener, - sessionShutdownCallback); + sessionShutdownCallback, + null); this.addressResolver = addressResolver != null ? addressResolver : DefaultAddressResolver.INSTANCE; } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java index f4c34b45f6..9b462055d1 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/http/impl/bootstrap/AsyncServer.java @@ -76,7 +76,8 @@ public AsyncServer( exceptionCallback, sessionListener, threadPoolListener, - sessionShutdownCallback); + sessionShutdownCallback, + null); } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java index 706b6a3c70..e3914f80e8 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/AbstractIOReactorBase.java @@ -50,13 +50,17 @@ public final Future connect( throw new IOReactorShutdownException("I/O reactor has been shut down"); } try { - return getWorkerSelector().next().connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback); + final SingleCoreIOReactor dispatcher = selectWorker(); + if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) { + throw new IOReactorShutdownException("I/O reactor has been shut down"); + } + return dispatcher.connect(remoteEndpoint, remoteAddress, localAddress, timeout, attachment, callback); } catch (final IOReactorShutdownException ex) { initiateShutdown(); throw ex; } } - abstract IOWorkers.Selector getWorkerSelector(); + abstract SingleCoreIOReactor selectWorker(); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java index 4d89dc200b..f8a3728923 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultConnectingIOReactor.java @@ -29,7 +29,9 @@ import java.io.IOException; import java.util.concurrent.ThreadFactory; +import java.util.function.Function; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Decorator; @@ -48,16 +50,16 @@ */ public class DefaultConnectingIOReactor extends AbstractIOReactorBase { - private final int workerCount; private final SingleCoreIOReactor[] workers; private final MultiCoreIOReactor ioReactor; - private final IOWorkers.Selector workerSelector; + private final Function workerSelector; private final static ThreadFactory THREAD_FACTORY = new DefaultThreadFactory("I/O client dispatch", true); /** * @since 5.4 */ + @Internal public DefaultConnectingIOReactor( final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig ioReactorConfig, @@ -66,9 +68,10 @@ public DefaultConnectingIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final Function workerSelector) { Args.notNull(eventHandlerFactory, "Event handler factory"); - this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); + final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; final Thread[] threads = new Thread[workerCount]; for (int i = 0; i < this.workers.length; i++) { @@ -84,7 +87,7 @@ public DefaultConnectingIOReactor( threads[i] = (threadFactory != null ? threadFactory : THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } this.ioReactor = new MultiCoreIOReactor(this.workers, threads); - this.workerSelector = IOWorkers.newSelector(workers); + this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount); } public DefaultConnectingIOReactor( @@ -95,7 +98,8 @@ public DefaultConnectingIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final Callback sessionShutdownCallback) { - this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback); + this(eventHandlerFactory,ioReactorConfig, threadFactory,ioSessionDecorator, exceptionCallback, sessionListener, + null, sessionShutdownCallback, null); } public DefaultConnectingIOReactor( @@ -125,8 +129,8 @@ public IOReactorStatus getStatus() { } @Override - IOWorkers.Selector getWorkerSelector() { - return workerSelector; + SingleCoreIOReactor selectWorker() { + return workers[workerSelector.apply(workers)]; } @Override diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java index feb6b662a7..2e71d649e6 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/DefaultListeningIOReactor.java @@ -32,7 +32,9 @@ import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; +import java.util.function.Function; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.DefaultThreadFactory; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.function.Callback; @@ -55,37 +57,15 @@ public class DefaultListeningIOReactor extends AbstractIOReactorBase implements private final static ThreadFactory DISPATCH_THREAD_FACTORY = new DefaultThreadFactory("I/O server dispatch", true); private final static ThreadFactory LISTENER_THREAD_FACTORY = new DefaultThreadFactory("I/O listener", true); - private final int workerCount; private final SingleCoreIOReactor[] workers; private final SingleCoreListeningIOReactor listener; private final MultiCoreIOReactor ioReactor; - private final IOWorkers.Selector workerSelector; - - /** - * Creates an instance of DefaultListeningIOReactor with the given configuration. - * - * @param eventHandlerFactory the factory to create I/O event handlers. - * @param ioReactorConfig I/O reactor configuration. - * @param listenerThreadFactory the factory to create listener thread. - * Can be {@code null}. - * - * @since 5.0 - */ - public DefaultListeningIOReactor( - final IOEventHandlerFactory eventHandlerFactory, - final IOReactorConfig ioReactorConfig, - final ThreadFactory dispatchThreadFactory, - final ThreadFactory listenerThreadFactory, - final Decorator ioSessionDecorator, - final Callback exceptionCallback, - final IOSessionListener sessionListener, - final Callback sessionShutdownCallback) { - this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, exceptionCallback, sessionListener, null, sessionShutdownCallback); - } + private final Function workerSelector; /** * @since 5.4 */ + @Internal public DefaultListeningIOReactor( final IOEventHandlerFactory eventHandlerFactory, final IOReactorConfig ioReactorConfig, @@ -95,9 +75,10 @@ public DefaultListeningIOReactor( final Callback exceptionCallback, final IOSessionListener sessionListener, final IOReactorMetricsListener threadPoolListener, - final Callback sessionShutdownCallback) { + final Callback sessionShutdownCallback, + final Function workerSelector) { Args.notNull(eventHandlerFactory, "Event handler factory"); - this.workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); + final int workerCount = ioReactorConfig != null ? ioReactorConfig.getIoThreadCount() : IOReactorConfig.DEFAULT.getIoThreadCount(); this.workers = new SingleCoreIOReactor[workerCount]; final Thread[] threads = new Thread[workerCount + 1]; for (int i = 0; i < this.workers.length; i++) { @@ -112,15 +93,36 @@ public DefaultListeningIOReactor( this.workers[i] = dispatcher; threads[i + 1] = (dispatchThreadFactory != null ? dispatchThreadFactory : DISPATCH_THREAD_FACTORY).newThread(new IOReactorWorker(dispatcher)); } - final IOReactor[] ioReactors = new IOReactor[this.workerCount + 1]; - System.arraycopy(this.workers, 0, ioReactors, 1, this.workerCount); + final IOReactor[] ioReactors = new IOReactor[workerCount + 1]; + System.arraycopy(this.workers, 0, ioReactors, 1, workerCount); this.listener = new SingleCoreListeningIOReactor(exceptionCallback, ioReactorConfig, this::enqueueChannel); ioReactors[0] = this.listener; threads[0] = (listenerThreadFactory != null ? listenerThreadFactory : LISTENER_THREAD_FACTORY).newThread(new IOReactorWorker(listener)); - this.ioReactor = new MultiCoreIOReactor(ioReactors, threads); + this.workerSelector = workerSelector != null ? workerSelector : IOWorkerSelectors.newSelector(workerCount); + } - workerSelector = IOWorkers.newSelector(workers); + /** + * Creates an instance of DefaultListeningIOReactor with the given configuration. + * + * @param eventHandlerFactory the factory to create I/O event handlers. + * @param ioReactorConfig I/O reactor configuration. + * @param listenerThreadFactory the factory to create listener thread. + * Can be {@code null}. + * + * @since 5.0 + */ + public DefaultListeningIOReactor( + final IOEventHandlerFactory eventHandlerFactory, + final IOReactorConfig ioReactorConfig, + final ThreadFactory dispatchThreadFactory, + final ThreadFactory listenerThreadFactory, + final Decorator ioSessionDecorator, + final Callback exceptionCallback, + final IOSessionListener sessionListener, + final Callback sessionShutdownCallback) { + this(eventHandlerFactory, ioReactorConfig, dispatchThreadFactory, listenerThreadFactory, ioSessionDecorator, + exceptionCallback, sessionListener, null, sessionShutdownCallback, null); } /** @@ -191,19 +193,18 @@ public IOReactorStatus getStatus() { } @Override - IOWorkers.Selector getWorkerSelector() { - return workerSelector; + SingleCoreIOReactor selectWorker() { + return workers[workerSelector.apply(workers)]; } private void enqueueChannel(final ChannelEntry entry) { try { - workerSelector.next().enqueueChannel(entry); + selectWorker().enqueueChannel(entry); } catch (final IOReactorShutdownException ex) { initiateShutdown(); } } - @Override public void initiateShutdown() { ioReactor.initiateShutdown(); diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java new file mode 100644 index 0000000000..56d06df025 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerSelectors.java @@ -0,0 +1,76 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +final class IOWorkerSelectors { + + static Function newSelector(final int workerCount, final int start) { + return isPowerOfTwo(workerCount) ? new PowerOfTwoSelector(start) : new GenericSelector(start); + } + + static Function newSelector(final int workerCount) { + return newSelector(workerCount, 0); + } + + static boolean isPowerOfTwo(final int n) { + return (n & -n) == n; + } + + static final class PowerOfTwoSelector implements Function { + + private final AtomicInteger idx; + + PowerOfTwoSelector(final int n) { + this.idx = new AtomicInteger(n); + } + + @Override + public Integer apply(final IOWorkerStats[] dispatchers) { + return idx.getAndIncrement() & (dispatchers.length - 1); + } + + } + + static final class GenericSelector implements Function { + + private final AtomicInteger idx; + + GenericSelector(final int n) { + this.idx = new AtomicInteger(n); + } + + @Override + public Integer apply(final IOWorkerStats[] dispatchers) { + return (idx.getAndIncrement() & Integer.MAX_VALUE) % dispatchers.length; + } + + } + +} diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java similarity index 69% rename from httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java rename to httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java index c087af4320..e5c1a75737 100644 --- a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkersTest.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkerStats.java @@ -24,21 +24,16 @@ * . * */ + package org.apache.hc.core5.reactor; -import static org.mockito.Mockito.mock; +import org.apache.hc.core5.annotation.Internal; -import org.junit.jupiter.api.Test; +@Internal +public interface IOWorkerStats { -class IOWorkersTest { + int totalChannelCount(); - @Test - void testIndexOverflow() { - final SingleCoreIOReactor reactor = new SingleCoreIOReactor(null, mock(IOEventHandlerFactory.class), IOReactorConfig.DEFAULT, null, null, null, null); - final IOWorkers.Selector selector = IOWorkers.newSelector(new SingleCoreIOReactor[]{reactor, reactor, reactor}); - for (long i = Integer.MAX_VALUE - 10; i < (long) Integer.MAX_VALUE + 10; i++) { - selector.next(); - } - } + int pendingChannelCount(); } diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java deleted file mode 100644 index c7cadac330..0000000000 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOWorkers.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * . - * - */ -package org.apache.hc.core5.reactor; - -import java.util.concurrent.atomic.AtomicInteger; - -final class IOWorkers { - - interface Selector { - - SingleCoreIOReactor next(); - - } - - static Selector newSelector(final SingleCoreIOReactor[] dispatchers) { - return isPowerOfTwo(dispatchers.length) - ? new PowerOfTwoSelector(dispatchers) - : new GenericSelector(dispatchers); - } - - private static boolean isPowerOfTwo(final int val) { - return (val & -val) == val; - } - - private static void validate(final SingleCoreIOReactor dispatcher) { - if (dispatcher.getStatus() == IOReactorStatus.SHUT_DOWN) { - throw new IOReactorShutdownException("I/O reactor has been shut down"); - } - } - - private static final class PowerOfTwoSelector implements Selector { - - private final AtomicInteger idx = new AtomicInteger(0); - private final SingleCoreIOReactor[] dispatchers; - - PowerOfTwoSelector(final SingleCoreIOReactor[] dispatchers) { - this.dispatchers = dispatchers; - } - - @Override - public SingleCoreIOReactor next() { - final SingleCoreIOReactor dispatcher = dispatchers[idx.getAndIncrement() & (dispatchers.length - 1)]; - validate(dispatcher); - return dispatcher; - } - } - - private static final class GenericSelector implements Selector { - - private final AtomicInteger idx = new AtomicInteger(0); - private final SingleCoreIOReactor[] dispatchers; - - GenericSelector(final SingleCoreIOReactor[] dispatchers) { - this.dispatchers = dispatchers; - } - - @Override - public SingleCoreIOReactor next() { - final SingleCoreIOReactor dispatcher = dispatchers[(idx.getAndIncrement() & Integer.MAX_VALUE) % dispatchers.length]; - validate(dispatcher); - return dispatcher; - } - } - -} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java index 2d4f4f9035..e334f2a10f 100644 --- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java +++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java @@ -55,7 +55,7 @@ import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Timeout; -class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator { +class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements ConnectionInitiator, IOWorkerStats { private static final int MAX_CHANNEL_REQUESTS = 10000; @@ -465,4 +465,14 @@ private void reportStatusToThreadPoolListener() { } } + @Override + public int totalChannelCount() { + return selector.keys().size(); + } + + @Override + public int pendingChannelCount() { + return channelQueue.size() + requestQueue.size(); + } + } diff --git a/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java new file mode 100644 index 0000000000..34ade84ba8 --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/reactor/IOWorkerSelectorsTest.java @@ -0,0 +1,53 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.reactor; + +import java.util.function.Function; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.Mockito; + +class IOWorkerSelectorsTest { + + @ParameterizedTest(name = "worker count = {0}") + @ValueSource(ints = {1, 2, 3, 4, 5, 10, 15, 16, 32}) + void testIndexOverflow(final int workerCount) { + final long start = (long) Integer.MAX_VALUE - 10; + final long end = (long) Integer.MAX_VALUE + 10; + final IOWorkerStats[] workers = new IOWorkerStats[workerCount]; + for (int i = 0; i < workerCount; i++) { + workers[i] = Mockito.mock(IOWorkerStats.class); + } + final Function selector = IOWorkerSelectors.newSelector(workerCount, (int) start); + for (long i = start; i < end; i++) { + Assertions.assertTrue(selector.apply(workers) < workerCount); + } + } + +}