Skip to content

Commit

Permalink
:fix: acceptor initialization lifecycle
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Nov 18, 2024
1 parent 84eced9 commit 13aaa9b
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*******************************************************************************
* Copyright (c) 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Eurotech
*******************************************************************************/
package org.eclipse.kapua.broker.artemis.plugin.security;

import org.eclipse.kapua.broker.artemis.plugin.security.connector.AcceptorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Class to intercept Broker lifecycle events
*/
public class ActivateCallback implements org.apache.activemq.artemis.core.server.ActivateCallback {

private final Logger logger = LoggerFactory.getLogger(AcceptorHandler.class);

private final AcceptorHandler acceptorHandler;

public ActivateCallback(AcceptorHandler acceptorHandler) {
this.acceptorHandler = acceptorHandler;
}

@Override
/**
* Use this callback to start Acceptors for now (but could be used to do whatever needs to have a fully running broker
*/
public void activationComplete() {
logger.info("Broker activation completed!");
org.apache.activemq.artemis.core.server.ActivateCallback.super.activationComplete();
logger.info("Creating acceptors...");
try {
acceptorHandler.syncAcceptors();
} catch (Exception e) {
logger.error("Creating acceptors... ERROR: {}", e.getMessage(), e);
//TODO throw runtime? the broker doesn't work properly if not all the acceptors are created
}
logger.info("Creating acceptors... DONE");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public String getAsUrl() {
private final BrokerSetting brokerSetting;
private final PluginUtility pluginUtility;

private ActivateCallback activateCallback;

protected BrokerEventHandler brokerEventHandler;
protected AcceptorHandler acceptorHandler;
protected String version;
Expand Down Expand Up @@ -137,8 +139,8 @@ public void registered(ActiveMQServer server) {
serverContext.init(server);
acceptorHandler = new AcceptorHandler(server,
brokerSetting.getMap(String.class, BrokerSettingKey.ACCEPTORS));
//init acceptors
acceptorHandler.syncAcceptors();
//init activateCallback to handle acceptor initialization instead of calling it from here
activateCallback = new ActivateCallback(acceptorHandler);

deviceConnectionEventListenerService.addReceiver(serviceEvent -> processDeviceConnectionEvent(serviceEvent));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,29 @@ public String removeAcceptor(String name) throws Exception {
* @throws Exception
*/
public void syncAcceptors() throws Exception {
logger.info("Init acceptors... server started: {} - {}", server.isStarted(), server.getState());
if (server.isStarted()) {
List<String> acceptorToRemove = new ArrayList<>();
server.getConfiguration().getAcceptorConfigurations().forEach(tc -> {
String acceptorName = tc.getName();
logger.info("Checking acceptor {}", acceptorName);
if (definedAcceptors.get(acceptorName) == null) {
acceptorToRemove.add(acceptorName);
logger.info("Adding acceptor {} to the remove list", acceptorName);
} else {
logger.info("Leaving acceptor {} running", acceptorName);
}
});
acceptorToRemove.forEach(acceptorName -> {
logger.info("Stopping acceptor {}...", acceptorName);
try {
server.getRemotingService().getAcceptor(acceptorName).stop();
server.getRemotingService().destroyAcceptor(acceptorName);
TransportConfiguration tc = getByName(acceptorName);
server.getConfiguration().getAcceptorConfigurations().remove(tc);
} catch (Exception e) {
logger.error("Error stopping acceptor {}... Error: {}", acceptorName, e.getMessage(), e);
}
logger.info("Stopping acceptor {}... DONE", acceptorName);
});
}
List<String> acceptorToRemove = new ArrayList<>();
server.getConfiguration().getAcceptorConfigurations().forEach(tc -> {
String acceptorName = tc.getName();
logger.info("Checking acceptor {}", acceptorName);
if (definedAcceptors.get(acceptorName) == null) {
acceptorToRemove.add(acceptorName);
logger.info("Adding acceptor {} to the remove list", acceptorName);
} else {
logger.info("Leaving acceptor {} running", acceptorName);
}
});
acceptorToRemove.forEach(acceptorName -> {
logger.info("Stopping acceptor {}...", acceptorName);
try {
server.getRemotingService().getAcceptor(acceptorName).stop();
server.getRemotingService().destroyAcceptor(acceptorName);
TransportConfiguration tc = getByName(acceptorName);
server.getConfiguration().getAcceptorConfigurations().remove(tc);
} catch (Exception e) {
logger.error("Error stopping acceptor {}... Error: {}", acceptorName, e.getMessage(), e);
}
logger.info("Stopping acceptor {}... DONE", acceptorName);
});
// server.getConfiguration().clearAcceptorConfigurations();

definedAcceptors.forEach((name, uri) -> {
Expand Down

0 comments on commit 13aaa9b

Please sign in to comment.