Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [paho-dev] maxInflight window size in ClientState

Hi,Marc

I tried this technique as well, I basically run a while(true) loop to determine currently how many pending messages are in the flight.

However, the number never drops down below,for example, 10 which is maxInflight value.


To be clear, I am wandering do I need to clear the pending message queue manually?If so, how do I clear or remove the message.

Another issue was, when I used waitforCompletion(), I never got the callback….which is really odd.





below it the code for my project, I took over this project from my colleague, so not sure if we have done the the implementation in the right way.


Please have a look if you have time. Much appreciated!!!

Thanks in advance!


public class Mqtt implements MqttCallback {


public final static String TAG = "Mqtt";


public final static int KEEP_ALIVE_INTERVAL = 30;
public final static int CONNECTION_TIMEOUT = 15;
public final static int CLIENT_ID_MAX = 23;


private static Mqtt _instance;


private String mServer = null;
private String mClientId = null;
private MqttClient _mqttClient = null;
private MqttWorkerBase _worker = null;
private Set<String> _topics = null
private Packet _lastWillAndTestament = null;

public static synchronized Mqtt getInstance() {
if(_instance == null)
_instance = new Mqtt();
return _instance;
}


private Mqtt() {
_topics = new HashSet<String>(2);
}


public synchronized void setWorker(MqttWorkerBase worker) {
_worker = worker;
}


public synchronized void setLastWillAndTestament(Packet lastWillAndTestament) {
_lastWillAndTestament = lastWillAndTestament;
}


public synchronized boolean connect(String server, String clientId, ArrayList<String> topics) {


disconnect_();


try {
mServer = server;
mClientId = clientId;
if( mClientId.length() > Mqtt.CLIENT_ID_MAX ) {
mClientId = mClientId.substring(0, Mqtt.CLIENT_ID_MAX);
}
String topicsString = Arrays.toString(topics.toArray());
_worker.log(TAG, "MqttConnection(" + mServer + ", " + mClientId + "," + topicsString + ") connecting...");
MemoryPersistence persistence = new MemoryPersistence();
//if( _mqttClient == null ) 
_mqttClient = new MqttClient(mServer, mClientId, persistence);


MqttConnectOptions options = new MqttConnectOptions();
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setConnectionTimeout(CONNECTION_TIMEOUT);



if( _lastWillAndTestament != null ) {
MqttTopic topic = _mqttClient.getTopic(_lastWillAndTestament.getTopicName());
String msg = _lastWillAndTestament.getMessage();
options.setWill(topic, msg.getBytes(), 2, false);
msg = Packet.logify(msg, _lastWillAndTestament._type, _lastWillAndTestament.getToken());
_worker.log(TAG, "<will topic='" + topic.getName() + "'>" + msg + "</will>");
}




_mqttClient.connect(options);
_mqttClient.setCallback(this);


if(_mqttClient == null || _mqttClient.isConnected()==false) {
_worker.log(TAG, "Error: mqtt connect(" + mServer + ", " + mClientId + ") failed.");
return false;
}




if( topics != null ) {
for( String topic: topics ) 
subscribe_(topic);
}


_worker.log(TAG, "MqttConnection(" + mServer + ", " + mClientId + "," + topicsString + ") ok.");


} catch (MqttException e) {
e.printStackTrace();
_worker.log(TAG, "Error: mqtt connect failed.");
return false;
}


return true;
}


public synchronized boolean isConnected() {


return _mqttClient != null && _mqttClient.isConnected();
}


private synchronized void disconnect_() {


try {
if(_mqttClient == null || _mqttClient.isConnected()==false
return;
_mqttClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
_worker.log(TAG, "Error: mqtt disconnect failed.");
}
}


public synchronized void disconnect() {
_topics.clear();
disconnect_();
}


private synchronized boolean subscribe_(String topicFilter) {
try {
_mqttClient.subscribe(topicFilter);
return true;
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}


return false;
}


private synchronized boolean unsubscribe_(String topicFilter) {
try {
_mqttClient.unsubscribe(topicFilter);
return true;
} catch (MqttSecurityException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}


return false;
}


public synchronized boolean subscribe(String topicFilter) {


boolean ok = subscribe_(topicFilter);
if(ok) {
_topics.add(topicFilter);
}


return ok;
}


public synchronized boolean unsubscribe(String topicFilter) {


boolean ok = unsubscribe_(topicFilter);
if(ok) {
_topics.remove(topicFilter);
}


return ok;
}


public synchronized void send(Packet packet) {


if(_mqttClient == null || _mqttClient.isConnected() == false) {
_worker.log(TAG, "Error: mqtt client offline.");
return;
}


try {
MqttTopic topic = _mqttClient.getTopic(packet.getTopicName());
String msg = packet.getMessage();
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(2);


msg = Packet.logify(msg, packet._type, packet.getToken());
_worker.log(TAG, "<mqtt-snd topic='" + topic.getName() + "'>" + msg + "</mqtt-snd>");


// _mqttClient.setTimeToWait(-1);
//
// _mqttClient.publish(topic.getName(), message);



// while(true){
// IMqttDeliveryToken[] tokens = _mqttClient.getPendingDeliveryTokens();
// System.out.println("pending token number is : " + tokens.length);
// if(tokens.length < 5){
// break;
// }
// }




MqttDeliveryToken token = topic.publish(message);
// token.waitForCompletion();


/*MqttDeliveryToken resp = */
/*resp.waitForCompletion(10000);*/
}
catch (MqttException exception) {
_worker.log(TAG, exception.getMessage());
_worker.log(TAG, "Error: mqtt send packet failed: < " + packet.getMessage() + " >");



} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void connectionLost(Throwable cause) {


_worker.log(TAG, "Mqtt connectionLost(" + cause.getMessage() + ")");


if( _lastWillAndTestament != null ) {
MqttTopic topic = _mqttClient.getTopic(_lastWillAndTestament.getTopicName());
String msg = _lastWillAndTestament.getMessage();
msg = Packet.logify(msg, _lastWillAndTestament._type, _lastWillAndTestament.getToken());
_worker.log(TAG, "<mqtt-snd topic='" + topic.getName() + "'>" + msg + "</mqtt-snd> (ASSUMED - LastWillAndTestament)");
}
_worker.connectionLost();
/*
connect(mServer, mClientId);
if(_topics.isEmpty()) {
_worker.log(TAG, "Error: _topics empty");
}
for(String topic: _topics) {
subscribe_(topic);
}
*/
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// TODO Auto-generated method stub


}

@Override
public void messageArrived(String topic, MqttMessage message)
throws Exception {
 try{
Packet packet = new Packet(topic, message);
String msg = Packet.logify(new String(message.getPayload()), packet._type, packet.getToken());
_worker.log(TAG, "<mqtt-rcv topic='" + topic + "'>" + msg + "</mqtt-rcv>");
if(packet.ok() == false) {
if(packet.isRequest()) {
Packet reply = new Packet(Packet.Destination.AGENT, Packet.Reply.TYPE, packet);
reply.setString(Packet.Reply.RETURN, packet._error);
send(reply);
}
return;
}


if(_worker != null)
_worker.handle(packet);
}
catch (Exception e) {
e.printStackTrace();
}
}
}





On 28/11/2013, at 2:11 AM, Marc L Cohen <mlcohen@xxxxxxxxxx> wrote:

Another technique I have used, rather than keeping track of how many are in flight. There is a method to request the list of identifiers of inflight messages. I do that and count, and wait for the number to drop below what I need to continue.

Marc L Cohen

Sent from my iPad

On Nov 27, 2013, at 3:48 AM, "Dave Locke" <locke@xxxxxxxxxx> wrote:

There is no external API to change the max inflight window today, this was discussed on a recent thread along with the idea of a buffered client.  A buffered client would enable messages to be sent when there is no connectivity up to a maximum configurable amount.  This is a reasonable amount of work to implement.   In the meantime there is no reason not to make the max inflight window configurable - would be handy if you can raise a bug on the Paho bugzilla so that it can be tracked.  The inflight window effects how messages can be delivered in parallel from the client to the server.  The bigger the window the more state needs to be stored in the client.  It will also be useful for devices that are more constrained and only wish to allow 1 or a small amount of messages to be inflight.

Even without changes there are approaches to manage the delivery of messages. For instance:
  • Blocking mode: Send a message and wait for it to be delivered - typical approach for this is to use the waitForCompletion() method on the token returned from the publish call
  • Keep a count of messages in flight and block when at the limit and release when a delivery completion is notified.

All the best
Dave

 



From:        Jianing Ren <jianingr@xxxxxxxxxxxx>
To:        "paho-dev@xxxxxxxxxxx" <paho-dev@xxxxxxxxxxx>
Date:        27/11/2013 02:04
Subject:        [paho-dev] maxInflight window size in ClientState
Sent by:        paho-dev-bounces@xxxxxxxxxxx




Hi,Paho  people,


I am wandering if I can set this maxInflight  value in ClientState to a great number, lets say, 10000? If I do so, does it have any side effect?

Because I have an issue when a large amount of messages publishes at the same time, I will fail to send out from the 11th one, as the default maxInflight value was 10.


Thanks in advance,guys.

Much appreciated.


Regards,


Johnny Ren.






This email has been processed by SmoothZap -
www.smoothwall.net
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev


Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598.
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev
_______________________________________________
paho-dev mailing list
paho-dev@xxxxxxxxxxx
https://dev.eclipse.org/mailman/listinfo/paho-dev



This email has been processed by SmoothZap - www.smoothwall.net


Back to the top