I tried this technique as well, I basically run a while(true) loop to determine currently how many pending messages are in the flight.
However, the number never drops down below,for example, 10 which is maxInflight value.
To be clear, I am wandering do I need to clear the pending message queue manually?If so, how do I clear or remove the message.
Another issue was, when I used waitforCompletion(), I never got the callback….which is really odd.
below it the code for my project, I took over this project from my colleague, so not sure if we have done the the implementation in the right way.
Please have a look if you have time. Much appreciated!!!
public class Mqtt implements MqttCallback {
public final static String TAG = "Mqtt";
public final static int KEEP_ALIVE_INTERVAL = 30;
public final static int CONNECTION_TIMEOUT = 15;
public final static int CLIENT_ID_MAX = 23;
private static Mqtt _instance;
private String mServer = null;
private String mClientId = null;
private MqttClient _mqttClient = null;
private MqttWorkerBase _worker = null;
private Set<String> _topics = null;
private Packet _lastWillAndTestament = null;
public static synchronized Mqtt getInstance() {
if(_instance == null)
_instance = new Mqtt();
return _instance;
}
private Mqtt() {
_topics = new HashSet<String>(2);
}
public synchronized void setWorker(MqttWorkerBase worker) {
_worker = worker;
}
public synchronized void setLastWillAndTestament(Packet lastWillAndTestament) {
_lastWillAndTestament = lastWillAndTestament;
}
public synchronized boolean connect(String server, String clientId, ArrayList<String> topics) {
disconnect_();
try {
mServer = server;
mClientId = clientId;
if( mClientId.length() > Mqtt.CLIENT_ID_MAX ) {
mClientId = mClientId.substring(0, Mqtt.CLIENT_ID_MAX);
}
String topicsString = Arrays.toString(topics.toArray());
_worker.log(TAG, "MqttConnection(" + mServer + ", " + mClientId + "," + topicsString + ") connecting...");
MemoryPersistence persistence = new MemoryPersistence();
//if( _mqttClient == null )
_mqttClient = new MqttClient(mServer, mClientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setConnectionTimeout(CONNECTION_TIMEOUT);
if( _lastWillAndTestament != null ) {
MqttTopic topic = _mqttClient.getTopic(_lastWillAndTestament.getTopicName());
String msg = _lastWillAndTestament.getMessage();
options.setWill(topic, msg.getBytes(), 2, false);
msg = Packet.logify(msg, _lastWillAndTestament._type, _lastWillAndTestament.getToken());
_worker.log(TAG, "<will topic='" + topic.getName() + "'>" + msg + "</will>");
}
_mqttClient.connect(options);
_mqttClient.setCallback(this);
if(_mqttClient == null || _mqttClient.isConnected()==false) {
_worker.log(TAG, "Error: mqtt connect(" + mServer + ", " + mClientId + ") failed.");
return false;
}
if( topics != null ) {
for( String topic: topics )
subscribe_(topic);
}
_worker.log(TAG, "MqttConnection(" + mServer + ", " + mClientId + "," + topicsString + ") ok.");
} catch (MqttException e) {
e.printStackTrace();
_worker.log(TAG, "Error: mqtt connect failed.");
return false;
}
return true;
}
public synchronized boolean isConnected() {
return _mqttClient != null && _mqttClient.isConnected();
}
private synchronized void disconnect_() {
try {
if(_mqttClient == null || _mqttClient.isConnected()==false)
return;
_mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
_worker.log(TAG, "Error: mqtt disconnect failed.");
}
}
public synchronized void disconnect() {
_topics.clear();
disconnect_();
}
private synchronized boolean subscribe_(String topicFilter) {
try {
_mqttClient.subscribe(topicFilter);
return true;
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
return false;
}
private synchronized boolean unsubscribe_(String topicFilter) {
try {
_mqttClient.unsubscribe(topicFilter);
return true;
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
return false;
}
public synchronized boolean subscribe(String topicFilter) {
boolean ok = subscribe_(topicFilter);
if(ok) {
_topics.add(topicFilter);
}
return ok;
}
public synchronized boolean unsubscribe(String topicFilter) {
boolean ok = unsubscribe_(topicFilter);
if(ok) {
_topics.remove(topicFilter);
}
return ok;
}
public synchronized void send(Packet packet) {
if(_mqttClient == null || _mqttClient.isConnected() == false) {
_worker.log(TAG, "Error: mqtt client offline.");
return;
}
try {
MqttTopic topic = _mqttClient.getTopic(packet.getTopicName());
String msg = packet.getMessage();
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(2);
msg = Packet.logify(msg, packet._type, packet.getToken());
_worker.log(TAG, "<mqtt-snd topic='" + topic.getName() + "'>" + msg + "</mqtt-snd>");
// _mqttClient.setTimeToWait(-1);
//
// _mqttClient.publish(topic.getName(), message);
// while(true){
// IMqttDeliveryToken[] tokens = _mqttClient.getPendingDeliveryTokens();
// System.out.println("pending token number is : " + tokens.length);
// if(tokens.length < 5){
// break;
// }
// }
MqttDeliveryToken token = topic.publish(message);
// token.waitForCompletion();
/*MqttDeliveryToken resp = */
/*resp.waitForCompletion(10000);*/
}
catch (MqttException exception) {
_worker.log(TAG, exception.getMessage());
_worker.log(TAG, "Error: mqtt send packet failed: < " + packet.getMessage() + " >");
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void connectionLost(Throwable cause) {
_worker.log(TAG, "Mqtt connectionLost(" + cause.getMessage() + ")");
if( _lastWillAndTestament != null ) {
MqttTopic topic = _mqttClient.getTopic(_lastWillAndTestament.getTopicName());
String msg = _lastWillAndTestament.getMessage();
msg = Packet.logify(msg, _lastWillAndTestament._type, _lastWillAndTestament.getToken());
_worker.log(TAG, "<mqtt-snd topic='" + topic.getName() + "'>" + msg + "</mqtt-snd> (ASSUMED - LastWillAndTestament)");
}
_worker.connectionLost();
/*
connect(mServer, mClientId);
if(_topics.isEmpty()) {
_worker.log(TAG, "Error: _topics empty");
}
for(String topic: _topics) {
subscribe_(topic);
}
*/
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub
}
@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
try{
Packet packet = new Packet(topic, message);
String msg = Packet.logify(new String(message.getPayload()), packet._type, packet.getToken());
_worker.log(TAG, "<mqtt-rcv topic='" + topic + "'>" + msg + "</mqtt-rcv>");
if(packet.ok() == false) {
if(packet.isRequest()) {
Packet reply = new Packet(Packet.Destination.AGENT, Packet.Reply.TYPE, packet);
reply.setString(Packet.Reply.RETURN, packet._error);
send(reply);
}
return;
}
if(_worker != null)
_worker.handle(packet);
}
catch (Exception e) {
e.printStackTrace();
}
}
}