-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Websocket Both Client and Broker. #206
Comments
I had implemented it by adding WebSocket Client handshake before the pipeline. class file : io.vertx.mqtt.impl.MqttClientImpl key code and total file private URI websocketURI; if(options.isWebsocket()){
try {
websocketURI = new URI("ws"+(options.isSsl()?"s":"")+"://"+host+":"+port+"/mqtt");
} catch (URISyntaxException e) {
soi.close();
connectPromise.fail(e.getMessage());
disconnectPromise.complete();
return;
}
}
initChannel(soi).onSuccess(vvoid -> { if (this.options.isWebsocket()) {
// The websocket mark
HttpHeaders httpHeaders = new DefaultHttpHeaders();
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, "mqtt, mqttv3.1, mqttv3.1.1", true, httpHeaders);
pipeline.addBefore("mqttEncoder", "httpCClientCodec", new HttpClientCodec());
pipeline.addAfter("httpCClientCodec", "aggregator", new HttpObjectAggregator(65536));
pipeline.addAfter("aggregator", "webSocketHandler", new WebSocketClientProtocolHandler(handshaker));
pipeline.addAfter("webSocketHandler", "ws-success-checker", new WebScoketSucccessHandler(handshakePromise));
pipeline.addAfter("webSocketHandler", "bytebuf2wsEncoder", new MqttServerImpl.WebSocketFrameToByteBufDecoder());
pipeline.addAfter("bytebuf2wsEncoder", "ws2bytebufDecoder", new MqttServerImpl.ByteBufToWebSocketFrameEncoder());
System.out.println(pipeline.toMap().keySet());
} else {
handshakePromise.complete();
}
return handshakePromise.future();
} private class WebScoketSucccessHandler extends SimpleChannelInboundHandler<io.vertx.core.http.WebSocketFrame> {
public static final String WS_CONNECT_SUCCESS_EVENT = "WS_CONNECT_SUCCESS_EVENT";
PromiseInternal<Void> handshakePromise;
public WebScoketSucccessHandler(PromiseInternal<Void> handshakePromise) {
this.handshakePromise = handshakePromise;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, io.vertx.core.http.WebSocketFrame msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().fireUserEventTriggered(WS_CONNECT_SUCCESS_EVENT);
handshakePromise.complete();
ctx.pipeline().remove(this);
} else if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
handshakePromise.fail(new VertxException("Websocket handshake timeout !"));
} else {
super.userEventTriggered(ctx, evt);
}
}
} /*
* Copyright 2016 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.mqtt.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.vertx.mqtt.MqttServerOptions.MQTT_SUBPROTOCOL_CSV_LIST;
/**
* MQTT client implementation
*/
public class MqttClientImpl implements MqttClient {
private enum Status {CLOSED, CONNECTING, CONNECTED, CLOSING}
// patterns for topics validation
private static final Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
private static final Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");
private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);
private static final int MAX_MESSAGE_ID = 65535;
private static final int MAX_TOPIC_LEN = 65535;
private static final int MIN_TOPIC_LEN = 1;
private static final String PROTOCOL_NAME = "MQTT";
private static final int PROTOCOL_VERSION = 4;
private static final int DEFAULT_IDLE_TIMEOUT = 0;
private final VertxInternal vertx;
private final MqttClientOptions options;
private NetSocketInternal connection;
private ContextInternal ctx;
// handler to call when a publish is complete
private Handler<Integer> publishCompletionHandler;
// handler to call when a publish has expired
private Handler<Integer> publishCompletionExpirationHandler;
// handler to call when a PUBACK is received for an unknown packetId
private Handler<Integer> publishCompletionPhantomHandler;
// handler to call when a unsubscribe request is completed
private Handler<Integer> unsubscribeCompletionHandler;
// handler to call when a publish message comes in
private Handler<MqttPublishMessage> publishHandler;
// handler to call when a subscribe request is completed
private Handler<MqttSubAckMessage> subscribeCompletionHandler;
// handler to call when a connection request is completed
private Promise<MqttConnAckMessage> connectPromise;
// handler to call when a connection disconnects
private Promise<Void> disconnectPromise;
// handler to call when a pingresp is received
private Handler<Void> pingrespHandler;
// handler to call when a problem at protocol level happens
private Handler<Throwable> exceptionHandler;
//handler to call when the remote MQTT server closes the connection
private Handler<Void> closeHandler;
// storage of PUBLISH QoS=1 messages which was not responded with PUBACK
private HashMap<Integer, ExpiringPacket> qos1outbound = new HashMap<>();
// storage of PUBLISH QoS=2 messages which was not responded with PUBREC
// and PUBREL messages which was not responded with PUBCOMP
private HashMap<Integer, ExpiringPacket> qos2outbound = new HashMap<>();
// storage of PUBLISH messages which was responded with PUBREC
private HashMap<Integer, MqttMessage> qos2inbound = new HashMap<>();
// counter for the message identifier
private int messageIdCounter;
// Keep alive management
private final long keepAliveTimeout;
private Deque<Ping> pings = new ArrayDeque<>();
// total number of unacknowledged packets
private int countInflightQueue;
private NetClient client;
private Status status = Status.CLOSED;
private URI websocketURI;
/**
* Constructor
*
* @param vertx Vert.x instance
* @param options MQTT client options
*/
public MqttClientImpl(Vertx vertx, MqttClientOptions options) {
// copy given options
NetClientOptions netClientOptions = new NetClientOptions(options);
netClientOptions.setIdleTimeout(DEFAULT_IDLE_TIMEOUT);
this.vertx = (VertxInternal) vertx;
this.options = new MqttClientOptions(options);
this.keepAliveTimeout = ((options.getKeepAliveInterval() * 1000) * 3) / 2;
}
int getInFlightMessagesCount() {
synchronized (this) {
return countInflightQueue;
}
}
@Override
public Future<MqttConnAckMessage> connect(int port, String host) {
return this.doConnect(port, host, null);
}
/**
* See {@link MqttClient#connect(int, String, Handler)} for more details
*/
@Override
public MqttClient connect(int port, String host, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
Future<MqttConnAckMessage> fut = connect(port, host);
if (connectHandler != null) {
fut.onComplete(connectHandler);
}
return this;
}
@Override
public Future<MqttConnAckMessage> connect(int port, String host, String serverName) {
return this.doConnect(port, host, serverName);
}
/**
* See {@link MqttClient#connect(int, String, String, Handler)} for more details
*/
@Override
public MqttClient connect(int port, String host, String serverName, Handler<AsyncResult<MqttConnAckMessage>> connectHandler) {
Future<MqttConnAckMessage> fut = this.doConnect(port, host, serverName);
if (connectHandler != null) {
fut.onComplete(connectHandler);
}
return this;
}
private Future<MqttConnAckMessage> doConnect(int port, String host, String serverName) {
ContextInternal ctx = vertx.getOrCreateContext();
NetClient client = vertx.createNetClient(options, new CloseFuture());
PromiseInternal<MqttConnAckMessage> connectPromise = ctx.promise();
PromiseInternal<Void> disconnectPromise = ctx.promise();
synchronized (this) {
if (this.status != Status.CLOSED) {
return ctx.failedFuture(new IllegalStateException("Client " + this.status.name().toLowerCase()));
}
this.status = Status.CONNECTING;
this.ctx = ctx;
this.connectPromise = connectPromise;
this.disconnectPromise = disconnectPromise;
this.client = client;
}
ctx.runOnContext(v -> {
log.debug(String.format("Trying to connect with %s:%d", host, port));
client.connect(port, host, serverName, done -> {
// the TCP connection fails
if (done.failed()) {
log.error(String.format("Can't connect to %s:%d", host, port), done.cause());
synchronized (this) {
this.status = Status.CLOSED;
this.connectPromise = null;
this.disconnectPromise = null;
this.ctx = null;
this.client = null;
}
client.close();
connectPromise.fail(done.cause());
disconnectPromise.complete();
} else {
log.info(String.format("Connection with %s:%d established successfully", host, port));
boolean closing;
synchronized (MqttClientImpl.this) {
if (closing = (status == Status.CLOSING)) {
this.status = Status.CLOSED;
this.client = null;
this.connectPromise = null;
this.disconnectPromise = null;
}
}
NetSocketInternal soi = (NetSocketInternal) done.result();
if (closing) {
soi.close();
connectPromise.fail("Disconnected");
disconnectPromise.complete();
return;
}
if (options.isAutoGeneratedClientId() && (options.getClientId() == null || options.getClientId().isEmpty())) {
options.setClientId(generateRandomClientId());
}
if(options.isWebsocket()){
try {
websocketURI = new URI("ws"+(options.isSsl()?"s":"")+"://"+host+":"+port+"/mqtt");
} catch (URISyntaxException e) {
soi.close();
connectPromise.fail(e.getMessage());
disconnectPromise.complete();
return;
}
}
initChannel(soi).onSuccess(vvoid -> {
synchronized (MqttClientImpl.this) {
this.connection = soi;
}
soi.messageHandler(msg -> this.handleMessage(soi.channelHandlerContext(), msg));
soi.closeHandler(v2 -> {
client.close();
synchronized (MqttClientImpl.this) {
this.connection = null;
this.status = Status.CLOSED;
this.connectPromise = null;
this.disconnectPromise = null;
}
connectPromise.fail("Closed");
disconnectPromise.complete();
});
// an exception at connection level
soi.exceptionHandler(this::handleException);
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT,
false,
AT_MOST_ONCE,
false,
0);
MqttConnectVariableHeader variableHeader = new MqttConnectVariableHeader(
PROTOCOL_NAME,
PROTOCOL_VERSION,
options.hasUsername(),
options.hasPassword(),
options.isWillRetain(),
options.getWillQoS(),
options.isWillFlag(),
options.isCleanSession(),
options.getKeepAliveInterval()
);
MqttConnectPayload payload = new MqttConnectPayload(
options.getClientId() == null ? "" : options.getClientId(),
options.getWillTopic(),
options.getWillMessage() != null ? options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null,
options.hasUsername() ? options.getUsername() : null,
options.hasPassword() ? options.getPassword().getBytes() : null
);
io.netty.handler.codec.mqtt.MqttMessage connect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(connect);
}).onFailure(cause -> {
soi.close();
connectPromise.fail(cause.getMessage());
disconnectPromise.complete();
});
}
});
});
return connectPromise.future();
}
/**
* See {@link MqttClient#disconnect()} for more details
*/
@Override
public Future<Void> disconnect() {
NetSocketInternal connection;
Status status;
Future<Void> fut;
synchronized (this) {
status = this.status;
switch (this.status) {
case CLOSED:
return vertx.getOrCreateContext().succeededFuture();
case CONNECTED:
this.status = Status.CLOSING;
connection = this.connection;
break;
case CONNECTING:
this.status = Status.CLOSING;
connection = this.connection;
break;
case CLOSING:
connection = null;
break;
default:
throw new AssertionError();
}
fut = this.disconnectPromise.future();
}
if (connection != null) {
if (status == Status.CONNECTED) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.DISCONNECT,
false,
AT_MOST_ONCE,
false,
0
);
io.netty.handler.codec.mqtt.MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, null, null);
connection.writeMessage(disconnect);
}
connection.close();
}
return fut;
}
/**
* See {@link MqttClient#disconnect(Handler)} for more details
*/
@Override
public MqttClient disconnect(Handler<AsyncResult<Void>> disconnectHandler) {
Future<Void> fut = disconnect();
if (disconnectHandler != null) {
fut.onComplete(disconnectHandler);
}
return this;
}
/**
* See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean)} for more details
*/
@Override
public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
if (MqttQoS.FAILURE == qosLevel) {
throw new IllegalArgumentException("QoS level must be one of AT_MOST_ONCE, AT_LEAST_ONCE or EXACTLY_ONCE");
}
io.netty.handler.codec.mqtt.MqttMessage publish;
MqttPublishVariableHeader variableHeader;
synchronized (this) {
if (countInflightQueue >= options.getMaxInflightQueue()) {
String msg = String.format("Attempt to exceed the limit of %d inflight messages", options.getMaxInflightQueue());
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INFLIGHT_QUEUE_FULL, msg);
return ctx.failedFuture(exception);
}
if (!isValidTopicName(topic)) {
String msg = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", topic);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_NAME, msg);
return ctx.failedFuture(exception);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH,
isDup,
qosLevel,
isRetain,
0
);
ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes());
variableHeader = new MqttPublishVariableHeader(topic, nextMessageId());
publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf);
switch (qosLevel) {
case AT_LEAST_ONCE:
qos1outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubackTimeout, variableHeader.packetId()));
countInflightQueue++;
break;
case EXACTLY_ONCE:
qos2outbound.put(variableHeader.packetId(), new ExpiringPacket(this::handlePubrecTimeout, variableHeader.packetId()));
countInflightQueue++;
break;
default:
// nothing to do for AT_MOST_ONCE
break;
}
}
return this.write(publish).map(variableHeader.packetId());
}
/**
* See {@link MqttClient#publish(String, Buffer, MqttQoS, boolean, boolean, Handler)} for more details
*/
@Override
public MqttClient publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) {
Future<Integer> fut = publish(topic, payload, qosLevel, isDup, isRetain);
if (publishSentHandler != null) {
fut.onComplete(publishSentHandler);
}
return this;
}
/**
* See {@link MqttClient#publishCompletionHandler(Handler)} for more details
*/
@Override
public MqttClient publishCompletionHandler(Handler<Integer> publishCompletionHandler) {
this.publishCompletionHandler = publishCompletionHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionHandler() {
return this.publishCompletionHandler;
}
/**
* {@inheritDoc}
*/
@Override
public MqttClient publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) {
this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionExpirationHandler() {
return this.publishCompletionExpirationHandler;
}
/**
* {@inheritDoc}
*/
@Override
public MqttClient publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionPhantomHandler) {
this.publishCompletionPhantomHandler = publishCompletionPhantomHandler;
return this;
}
private synchronized Handler<Integer> publishCompletionUnknownPacketIdHandler() {
return this.publishCompletionPhantomHandler;
}
/**
* See {@link MqttClient#publishHandler(Handler)} for more details
*/
@Override
public MqttClient publishHandler(Handler<MqttPublishMessage> publishHandler) {
this.publishHandler = publishHandler;
return this;
}
private synchronized Handler<MqttPublishMessage> publishHandler() {
return this.publishHandler;
}
/**
* See {@link MqttClient#subscribeCompletionHandler(Handler)} for more details
*/
@Override
public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> subscribeCompletionHandler) {
this.subscribeCompletionHandler = subscribeCompletionHandler;
return this;
}
private synchronized Handler<MqttSubAckMessage> subscribeCompletionHandler() {
return this.subscribeCompletionHandler;
}
/**
* See {@link MqttClient#subscribe(String, int)} for more details
*/
@Override
public Future<Integer> subscribe(String topic, int qos) {
return subscribe(Collections.singletonMap(topic, qos));
}
/**
* See {@link MqttClient#subscribe(String, int, Handler)} for more details
*/
@Override
public MqttClient subscribe(String topic, int qos, Handler<AsyncResult<Integer>> subscribeSentHandler) {
return subscribe(Collections.singletonMap(topic, qos), subscribeSentHandler);
}
/**
* See {@link MqttClient#subscribe(Map)} for more details
*/
@Override
public Future<Integer> subscribe(Map<String, Integer> topics) {
Map<String, Integer> invalidTopics = topics.entrySet()
.stream()
.filter(e -> !isValidTopicFilter(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (invalidTopics.size() > 0) {
String msg = String.format("Invalid Topic Filters: %s", invalidTopics);
log.error(msg);
MqttException exception = new MqttException(MqttException.MQTT_INVALID_TOPIC_FILTER, msg);
return ctx.failedFuture(exception);
}
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);
List<MqttTopicSubscription> subscriptions = topics.entrySet()
.stream()
.map(e -> new MqttTopicSubscription(e.getKey(), valueOf(e.getValue())))
.collect(Collectors.toList());
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
io.netty.handler.codec.mqtt.MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
return this.write(subscribe).map(variableHeader.messageId());
}
/**
* See {@link MqttClient#subscribe(Map, Handler)} for more details
*/
@Override
public MqttClient subscribe(Map<String, Integer> topics, Handler<AsyncResult<Integer>> subscribeSentHandler) {
Future<Integer> fut = subscribe(topics);
if (subscribeSentHandler != null) {
fut.onComplete(subscribeSentHandler);
}
return this;
}
/**
* See {@link MqttClient#unsubscribeCompletionHandler(Handler)} for more details
*/
@Override
public MqttClient unsubscribeCompletionHandler(Handler<Integer> unsubscribeCompletionHandler) {
this.unsubscribeCompletionHandler = unsubscribeCompletionHandler;
return this;
}
private synchronized Handler<Integer> unsubscribeCompletionHandler() {
return this.unsubscribeCompletionHandler;
}
/**
* See {@link MqttClient#unsubscribe(String, Handler)} )} for more details
*/
@Override
public MqttClient unsubscribe(String topic, Handler<AsyncResult<Integer>> unsubscribeSentHandler) {
Future<Integer> fut = unsubscribe(topic);
if (unsubscribeSentHandler != null) {
fut.onComplete(unsubscribeSentHandler);
}
return this;
}
/**
* See {@link MqttClient#unsubscribe(String)} )} for more details
*/
@Override
public Future<Integer> unsubscribe(String topic) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBSCRIBE,
false,
AT_LEAST_ONCE,
false,
0);
MqttMessageIdVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(nextMessageId(), MqttProperties.NO_PROPERTIES);
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(Stream.of(topic).collect(Collectors.toList()));
io.netty.handler.codec.mqtt.MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
this.write(unsubscribe);
return ctx.succeededFuture(variableHeader.messageId());
}
/**
* See {@link MqttClient#pingResponseHandler(Handler)} for more details
*/
@Override
public synchronized MqttClient pingResponseHandler(Handler<Void> pingResponseHandler) {
this.pingrespHandler = pingResponseHandler;
return this;
}
private synchronized Handler<Void> pingResponseHandler() {
return this.pingrespHandler;
}
/**
* See {@link MqttClient#exceptionHandler(Handler)} for more details
*/
@Override
public synchronized MqttClient exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
private synchronized Handler<Throwable> exceptionHandler() {
return this.exceptionHandler;
}
/**
* See {@link MqttClient#closeHandler(Handler)} for more details
*/
@Override
public synchronized MqttClient closeHandler(Handler<Void> closeHandler) {
this.closeHandler = closeHandler;
return this;
}
private synchronized Handler<Void> closeHandler() {
return this.closeHandler;
}
private class Ping {
final long id;
private Ping(long id) {
this.id = id;
}
void ack() {
vertx.cancelTimer(id);
}
void cancel() {
vertx.cancelTimer(id);
}
}
/**
* See {@link MqttClient#ping()} for more details
*/
@Override
public MqttClient ping() {
ctx.execute(() -> {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
io.netty.handler.codec.mqtt.MqttMessage pingreq = MqttMessageFactory.newMessage(fixedHeader, null, null);
long id = vertx.setTimer(keepAliveTimeout, _id -> {
disconnect();
});
pings.add(new Ping(id));
this.write(pingreq);
});
return this;
}
@Override
public synchronized String clientId() {
return this.options.getClientId();
}
@Override
public synchronized boolean isConnected() {
return this.status == Status.CONNECTED;
}
/**
* Sends PUBACK packet to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishAcknowledge(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBACK, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(puback);
}
/**
* Sends PUBREC packet to server
*
* @param publishMessage a PUBLISH message to acknowledge
*/
private void publishReceived(MqttPublishMessage publishMessage) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREC, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessage.messageId());
io.netty.handler.codec.mqtt.MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2inbound.put(publishMessage.messageId(), publishMessage);
}
this.write(pubrec);
}
/**
* Sends PUBCOMP packet to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishComplete(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
this.write(pubcomp);
}
/**
* Sends the PUBREL message to server
*
* @param publishMessageId identifier of the PUBLISH message to acknowledge
*/
private void publishRelease(int publishMessageId) {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(publishMessageId);
io.netty.handler.codec.mqtt.MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
synchronized (this) {
qos2outbound.put(publishMessageId, new ExpiringPacket(this::handlePubcompTimeout, publishMessageId));
}
this.write(pubrel);
}
private Future<Void> initChannel(NetSocketInternal sock) {
PromiseInternal<Void> handshakePromise = ctx.promise();
ChannelPipeline pipeline = sock.channelHandlerContext().pipeline();
// add into pipeline netty's (en/de)coder
pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
if (this.options.getMaxMessageSize() > 0) {
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
} else {
// max message size not set, so the default from Netty MQTT codec is used
pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
}
if (this.options.isAutoKeepAlive() &&
this.options.getKeepAliveInterval() != 0) {
int keepAliveInterval = this.options.getKeepAliveInterval();
// handler for sending PINGREQ (keepAlive) if reader- or writer-channel become idle
pipeline.addBefore("handler", "idle",
new IdleStateHandler(0, keepAliveInterval, 0) {
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
if (evt.state() == IdleState.WRITER_IDLE) {
// verify that server is still connected (e.g. when using QoS-0)
ping();
}
}
});
}
if (this.options.isWebsocket()) {
// The websocket mark
HttpHeaders httpHeaders = new DefaultHttpHeaders();
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, "mqtt, mqttv3.1, mqttv3.1.1", true, httpHeaders);
pipeline.addBefore("mqttEncoder", "httpCClientCodec", new HttpClientCodec());
pipeline.addAfter("httpCClientCodec", "aggregator", new HttpObjectAggregator(65536));
pipeline.addAfter("aggregator", "webSocketHandler", new WebSocketClientProtocolHandler(handshaker));
pipeline.addAfter("webSocketHandler", "ws-success-checker", new WebScoketSucccessHandler(handshakePromise));
pipeline.addAfter("webSocketHandler", "bytebuf2wsEncoder", new MqttServerImpl.WebSocketFrameToByteBufDecoder());
pipeline.addAfter("bytebuf2wsEncoder", "ws2bytebufDecoder", new MqttServerImpl.ByteBufToWebSocketFrameEncoder());
System.out.println(pipeline.toMap().keySet());
} else {
handshakePromise.complete();
}
return handshakePromise.future();
}
/**
* Update and return the next message identifier
*
* @return message identifier
*/
private synchronized int nextMessageId() {
// if 0 or MAX_MESSAGE_ID, it becomes 1 (first valid messageId)
this.messageIdCounter = ((this.messageIdCounter % MAX_MESSAGE_ID) != 0) ? this.messageIdCounter + 1 : 1;
return this.messageIdCounter;
}
private synchronized NetSocketInternal connection() {
return connection;
}
private Future<Void> write(io.netty.handler.codec.mqtt.MqttMessage mqttMessage) {
log.debug(String.format("Sending packet %s", mqttMessage));
return this.connection().writeMessage(mqttMessage);
}
/**
* Used for calling the close handler when the remote MQTT server closes the connection
*/
private void handleClosed() {
Promise<MqttConnAckMessage> connectPromise;
Promise<Void> disconnectPromise;
NetClient client;
Deque<Ping> pings;
synchronized (this) {
client = this.client;
connectPromise = this.connectPromise;
disconnectPromise = this.disconnectPromise;
pings = this.pings;
this.disconnectPromise = null;
this.status = Status.CLOSED;
this.connection = null;
this.ctx = null;
this.client = null;
this.pings = new ArrayDeque<>();
}
// Cleanup pending pings
pings.forEach(ping -> {
ping.cancel();
});
Handler<Void> handler = closeHandler();
if (handler != null) {
handler.handle(null);
}
disconnectPromise.complete();
if (connectPromise != null) {
connectPromise.fail("Closed");
}
client.close();
}
/**
* Handle the MQTT message received from the remote MQTT server
*
* @param msg Incoming Packet
*/
private void handleMessage(ChannelHandlerContext chctx, Object msg) {
// handling directly native Netty MQTT messages, some of them are translated
// to the related Vert.x ones for polyglotization
if (msg instanceof io.netty.handler.codec.mqtt.MqttMessage) {
io.netty.handler.codec.mqtt.MqttMessage mqttMessage = (io.netty.handler.codec.mqtt.MqttMessage) msg;
DecoderResult result = mqttMessage.decoderResult();
if (result.isFailure()) {
chctx.pipeline().fireExceptionCaught(result.cause());
return;
}
if (!result.isFinished()) {
chctx.pipeline().fireExceptionCaught(new Exception("Unfinished message"));
return;
}
log.debug(String.format("Incoming packet %s", msg));
switch (mqttMessage.fixedHeader().messageType()) {
case CONNACK:
io.netty.handler.codec.mqtt.MqttConnAckMessage connack = (io.netty.handler.codec.mqtt.MqttConnAckMessage) mqttMessage;
MqttConnAckMessage mqttConnAckMessage = MqttConnAckMessage.create(
connack.variableHeader().connectReturnCode(),
connack.variableHeader().isSessionPresent());
handleConnack(mqttConnAckMessage);
break;
case PUBLISH:
io.netty.handler.codec.mqtt.MqttPublishMessage publish = (io.netty.handler.codec.mqtt.MqttPublishMessage) mqttMessage;
ByteBuf newBuf = VertxHandler.safeBuffer(publish.payload());
MqttPublishMessage mqttPublishMessage = MqttPublishMessage.create(
publish.variableHeader().packetId(),
publish.fixedHeader().qosLevel(),
publish.fixedHeader().isDup(),
publish.fixedHeader().isRetain(),
publish.variableHeader().topicName(),
newBuf);
handlePublish(mqttPublishMessage);
break;
case PUBACK:
handlePuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREC:
handlePubrec(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBREL:
handlePubrel(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PUBCOMP:
handlePubcomp(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case SUBACK:
io.netty.handler.codec.mqtt.MqttSubAckMessage unsuback = (io.netty.handler.codec.mqtt.MqttSubAckMessage) mqttMessage;
MqttSubAckMessage mqttSubAckMessage = MqttSubAckMessage.create(
unsuback.variableHeader().messageId(),
unsuback.payload().grantedQoSLevels());
handleSuback(mqttSubAckMessage);
break;
case UNSUBACK:
handleUnsuback(((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId());
break;
case PINGRESP:
handlePingresp();
break;
default:
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type " + msg.getClass().getName()));
break;
}
} else {
chctx.pipeline().fireExceptionCaught(new Exception("Wrong message type"));
}
}
/**
* Used for calling the pingresp handler when the server replies to the ping
*/
private void handlePingresp() {
Ping ping = pings.poll();
if (ping != null) {
ping.ack();
}
Handler<Void> handler = pingResponseHandler();
if (handler != null) {
handler.handle(null);
}
}
/**
* Used for calling the unsuback handler when the server acks an unsubscribe
*
* @param unsubackMessageId identifier of the subscribe acknowledged by the server
*/
private void handleUnsuback(int unsubackMessageId) {
Handler<Integer> handler = unsubscribeCompletionHandler();
if (handler != null) {
handler.handle(unsubackMessageId);
}
}
/**
* Used for calling the puback handler when the server acknowledge a QoS 1 message with puback
*
* @param pubackMessageId identifier of the message acknowledged by the server
*/
private void handlePuback(int pubackMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos1outbound.remove(pubackMessageId);
if (removedPacket == null) {
log.debug("Received PUBACK packet without having related PUBLISH packet in storage");
// PUBACK has been received after timer has already fired
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubackMessageId);
}
return;
}
removedPacket.cancelTimer();
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubackMessageId);
}
}
private void handlePubackTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos1outbound.remove(packetId);
if (expiredMessage == null) {
// the message has already been ACKed
log.debug("PUBLISH expiration timer fired but QoS 1 message has already been PUBACKed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
/**
* Used for calling the pubcomp handler when the server client acknowledge a QoS 2 message with pubcomp
*
* @param pubcompMessageId identifier of the message acknowledged by the server
*/
private void handlePubcomp(int pubcompMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos2outbound.remove(pubcompMessageId);
if (removedPacket == null) {
log.debug("Received PUBCOMP packet without having related PUBREL packet in storage");
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubcompMessageId);
}
return;
}
removedPacket.cancelTimer();
countInflightQueue--;
}
Handler<Integer> handler = publishCompletionHandler();
if (handler != null) {
handler.handle(pubcompMessageId);
}
}
private void handlePubcompTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos2outbound.remove(packetId);
if (expiredMessage == null) {
log.debug("PUBCOMP expiration timer fired but QoS 2 message has already been PUBCOMPed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
/**
* Used for sending the pubrel when a pubrec is received from the server
*
* @param pubrecMessageId identifier of the message acknowledged by server
*/
private void handlePubrec(int pubrecMessageId) {
synchronized (this) {
ExpiringPacket removedPacket = qos2outbound.remove(pubrecMessageId);
if (removedPacket == null) {
log.debug("Received PUBREC packet without having related PUBLISH packet in storage");
Handler<Integer> handler = publishCompletionUnknownPacketIdHandler();
if (handler != null) {
handler.handle(pubrecMessageId);
}
return;
}
removedPacket.cancelTimer();
}
this.publishRelease(pubrecMessageId);
}
private void handlePubrecTimeout(int packetId) {
ExpiringPacket expiredMessage;
synchronized (this) {
expiredMessage = qos2outbound.remove(packetId);
if (expiredMessage == null) {
log.debug("PUBREC expiration timer fired but QoS 2 message has already been PUBRECed by server");
return;
}
}
countInflightQueue--;
Handler<Integer> handler = publishCompletionExpirationHandler();
if (handler != null) {
handler.handle(expiredMessage.packetId);
}
}
/**
* Used for calling the suback handler when the server acknowledges subscribe to topics
*
* @param msg message with suback information
*/
private void handleSuback(MqttSubAckMessage msg) {
Handler<MqttSubAckMessage> handler = subscribeCompletionHandler();
if (handler != null) {
handler.handle(msg);
}
}
/**
* Used for calling the publish handler when the server publishes a message
*
* @param msg published message
*/
private void handlePublish(MqttPublishMessage msg) {
Handler<MqttPublishMessage> handler;
switch (msg.qosLevel()) {
case AT_MOST_ONCE:
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case AT_LEAST_ONCE:
this.publishAcknowledge(msg.messageId());
handler = this.publishHandler();
if (handler != null) {
handler.handle(msg);
}
break;
case EXACTLY_ONCE:
this.publishReceived(msg);
// we will handle the PUBLISH when a PUBREL comes
break;
}
}
/**
* Used for calling the pubrel handler when the server acknowledge a QoS 2 message with pubrel
*
* @param pubrelMessageId identifier of the message acknowledged by the server
*/
private void handlePubrel(int pubrelMessageId) {
MqttMessage message;
synchronized (this) {
message = qos2inbound.remove(pubrelMessageId);
if (message == null) {
log.warn("Received PUBREL packet without having related PUBREC packet in storage");
return;
}
this.publishComplete(pubrelMessageId);
}
Handler<MqttPublishMessage> handler = this.publishHandler();
if (handler != null) {
handler.handle((MqttPublishMessage) message);
}
}
/**
* Used for calling the connect handler when the server replies to the request
*
* @param msg connection response message
*/
private void handleConnack(MqttConnAckMessage msg) {
Status status = msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED ? Status.CONNECTED : Status.CLOSING;
if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
NetSocketInternal connection;
Promise<MqttConnAckMessage> connectPromise;
synchronized (this) {
connection = this.connection;
connectPromise = this.connectPromise;
this.connectPromise = null;
this.status = Status.CONNECTED;
}
connection.closeHandler(v -> handleClosed());
connectPromise.complete(msg);
} else {
Promise<MqttConnAckMessage> connectPromise;
Promise<Void> disconnectPromise;
NetSocketInternal connection;
NetClient client;
synchronized (this) {
connectPromise = this.connectPromise;
disconnectPromise = this.disconnectPromise;
connection = this.connection;
client = this.client;
this.connectPromise = null;
this.disconnectPromise = null;
this.status = Status.CLOSED;
this.connection = null;
this.client = null;
}
connection.closeHandler(null);
MqttConnectionException exception = new MqttConnectionException(msg.code());
log.error(String.format("Connection refused by the server - code: %s", msg.code()));
connectPromise.fail(exception);
disconnectPromise.complete();
client.close();
}
}
/**
* Used for calling the exception handler when an error at connection level
*
* @param t exception raised
*/
private void handleException(Throwable t) {
Handler<Throwable> handler = exceptionHandler();
if (handler != null) {
handler.handle(t);
}
}
/**
* @return Randomly-generated ClientId
*/
private String generateRandomClientId() {
return UUID.randomUUID().toString();
}
/**
* Check either given Topic Name valid of not
*
* @param topicName given Topic Name
* @return true - valid, otherwise - false
*/
private boolean isValidTopicName(String topicName) {
if (!isValidStringSizeInUTF8(topicName)) {
return false;
}
Matcher matcher = validTopicNamePattern.matcher(topicName);
return matcher.find();
}
/**
* Check either given Topic Filter valid of not
*
* @param topicFilter given Topic Filter
* @return true - valid, otherwise - false
*/
private boolean isValidTopicFilter(String topicFilter) {
if (!isValidStringSizeInUTF8(topicFilter)) {
return false;
}
Matcher matcher = validTopicFilterPattern.matcher(topicFilter);
return matcher.find();
}
/**
* Check either given string has size more then 65535 bytes in UTF-8 Encoding
*
* @param string given string
* @return true - size is lower or equal than 65535, otherwise - false
*/
private boolean isValidStringSizeInUTF8(String string) {
try {
int length = string.getBytes("UTF-8").length;
return length >= MIN_TOPIC_LEN && length <= MAX_TOPIC_LEN;
} catch (UnsupportedEncodingException e) {
log.error("UTF-8 charset is not supported", e);
}
return false;
}
/**
* A wrapper around a packet ID for which the client will wait a limited time
* for the server's ACK to arrive.
*/
private class ExpiringPacket {
private final int packetId;
private final long timerId;
/**
* Creates a new expiring packet.
*
* @param timeoutHandler The handler to invoke once the client stops waiting for the server's ACK.
* @param packetId The packet ID.
*/
ExpiringPacket(Handler<Integer> timeoutHandler, final int packetId) {
this.packetId = packetId;
if (options.getAckTimeout() > -1) {
this.timerId = vertx.setTimer(options.getAckTimeout() * 1000L, tid -> timeoutHandler.handle(packetId));
} else {
// default MQTT client behavior,
// don't start a timer for expiring the publish
this.timerId = -1;
}
}
/**
* Cancels the timer created for expiring the ACK.
* <p>
* This method should be invoked once the server's ACK for the packet ID has arrived
* in order to prevent the client from timing out while waiting for an ACK.
*
* @return {@code true} if the timer has been canceled.
*/
boolean cancelTimer() {
return vertx.cancelTimer(timerId);
}
}
private class WebScoketSucccessHandler extends SimpleChannelInboundHandler<io.vertx.core.http.WebSocketFrame> {
public static final String WS_CONNECT_SUCCESS_EVENT = "WS_CONNECT_SUCCESS_EVENT";
PromiseInternal<Void> handshakePromise;
public WebScoketSucccessHandler(PromiseInternal<Void> handshakePromise) {
this.handshakePromise = handshakePromise;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, io.vertx.core.http.WebSocketFrame msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().fireUserEventTriggered(WS_CONNECT_SUCCESS_EVENT);
handshakePromise.complete();
ctx.pipeline().remove(this);
} else if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) {
handshakePromise.fail(new VertxException("Websocket handshake timeout !"));
} else {
super.userEventTriggered(ctx, evt);
}
}
}
} |
can you provide a pull request ? |
Currently, the broker can either work with native clients, or websocket clients, but not both. Can you please confirm? In that case, I think it makes more sense to have the broker support both native and websocket clients at the same time. Most MQTT brokers support this. |
Describe the feature
The broker support the websocket in recent versions, I suggest to make Client support too.
Use cases
In most cases, The service will be deploy on an k8s instance or behind nginx with vitural host, To make a tcp port bind for a container is not allowed in most companies, At past we use an tcp proxy which handshake with websocket, It feels not good.
Contribution
The text was updated successfully, but these errors were encountered: