diff --git a/org.eclipse.paho.client.mqttv3.internal.traceformat/pom.xml b/org.eclipse.paho.client.mqttv3.internal.traceformat/pom.xml
new file mode 100644
index 0000000..4b25058
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3.internal.traceformat/pom.xml
@@ -0,0 +1,26 @@
+ A listener is registered on an MqttToken and a token is associated
+ * with an action like connect or publish. When used with tokens on the MqttAsyncClient
+ * the listener will be called back on the MQTT clients thread. The listener will be informed
+ * if the action succeeds or fails. It is important that the listener returns control quickly
+ * otherwise the operation of the MQTT client will be stalled.
+ *
+ * It provides applications a simple programming interface to all features of the MQTT version 3.1
+ * specification including:
+ *
+ *
+ *
+ * There are two styles of MQTT client, this one and {@link IMqttClient}. + *
+ * An application is not restricted to using one style, if an IMqttAsyncClient based client is used + * as both blocking and non-blocking methods can be used in the same application. If an IMqttClient + * based client is used then only blocking methods are available to the application. + * For more details on the blocking client see {@link IMqttClient}
+ * + *There are two forms of non-blocking method: + *
+ * IMqttToken token = asyncClient.method(parms)
+ *
+ * In this form the method returns a token that can be used to track the
+ * progress of the action (method). The method provides a waitForCompletion()
+ * method that once invoked will block until the action completes. Once
+ * completed there are method on the token that can be used to check if the
+ * action completed successfully or not. For example
+ * to wait until a connect completes:
+ *
+ * /p>
+ *
+ * IMqttToken conToken;
+ * conToken = asyncClient.client.connect(conToken);
+ * ... do some work...
+ * conToken.waitForCompletion();
+ *
To turn a method into a blocking invocation the following form can be used:
+ *
+
+ *
+ * IMqttToken token;
+ * token = asyncClient.method(parms).waitForCompletion();
+ *
+ * IMqttToken token method(parms, Object userContext, IMqttActionListener callback)
+ *
+ * In this form a callback is registered with the method. The callback will be
+ * notified when the action succeeds or fails. The callback is invoked on the thread
+ * managed by the MQTT client so it is important that processing is minimised in the
+ * callback. If not the operation of the MQTT client will be inhibited. For example
+ * to be notified (called back) when a connect completes:
+ *
+ *
+ * IMqttToken conToken;
+ * conToken = asyncClient.connect("some context",new new MqttAsyncActionListener() {
+ * public void onSuccess(IMqttToken asyncActionToken) {
+ * log("Connected");
+ * }
+ *
+ * public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+ * log ("connect failed" +exception);
+ * }
+ * });
+ * An optional context object can be passed into the method which will then be made
+ * available in the callback. The context is stored by the MQTT client) in the token
+ * which is then returned to the invoker. The token is provided to the callback methods
+ * where the context can then be accessed.
+ *
To understand when the delivery of a message is complete either of the two methods above + * can be used to either wait on or be notified when the publish completes. An alternative is to + * use the {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} method which will + * also be notified when a message has been delivered to the requested quality of service.
+ * + */ +public interface IMqttAsyncClient { + /** + * Connects to an MQTT server using the default options. + *The default options are specified in {@link MqttConnectOptions} class. + *
+ * + * @throws MqttSecurityException for security related problems + * @throws MqttException for non security related problems + * @return token used to track and wait for the connect to complete. The token + * will be passed to the callback methtods if a callback is set. + * @see #connect(MqttConnectOptions, Object, IMqttActionListener) + */ + public IMqttToken connect() throws MqttException, MqttSecurityException; + + /** + * Connects to an MQTT server using the provided connect options. + *The connection will be established using the options specified in the + * {@link MqttConnectOptions} parameter. + *
+ * + * @param options a set of connection parameters that override the defaults. + * @throws MqttSecurityException for security related problems + * @throws MqttException for non security related problems + * @return token used to track and wait for the connect to complete. The token + * will be passed to any callback that has been set. + * @see #connect(MqttConnectOptions, Object, IMqttActionListener) + */ + public IMqttToken connect(MqttConnectOptions options) throws MqttException, MqttSecurityException ; + /** + * Connects to an MQTT server using the default options. + *The default options are specified in {@link MqttConnectOptions} class. + *
+ * + * @param userContext optional object used to pass context to the callback. Use + * null if not required. + * @param callback optional listener that will be notified when the connect completes. Use + * null if not required. + * @throws MqttSecurityException for security related problems + * @throws MqttException for non security related problems + * @return token used to track and wait for the connect to complete. The token + * will be passed to any callback that has been set. + * @see #connect(MqttConnectOptions, Object, IMqttActionListener) + */ + public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException; + + + /** + * Connects to an MQTT server using the specified options. + *The server to connect to is specified on the constructor. + * It is recommended to call {@link #setCallback(MqttCallback)} prior to + * connecting in order that messages destined for the client can be accepted + * as soon as the client is connected. + *
+ *The method returns control before the connect completes. Completion can + * be tracked by: + *
An attempt is made to quiesce the client allowing outstanding + * work to complete before disconnecting. It will wait + * for a maximum of 30 seconds for work to quiesce before disconnecting. + * This method must not be called from inside {@link MqttCallback} methods. + *
+ * + * @return token used to track and wait for disconnect to complete. The token + * will be passed to any callback that has been set. + * @throws MqttException for problems encountered while disconnecting + * @see #disconnect(long, Object, IMqttActionListener) + */ + public IMqttToken disconnect( ) throws MqttException; + + /** + * Disconnects from the server. + *An attempt is made to quiesce the client allowing outstanding + * work to complete before disconnecting. It will wait + * for a maximum of the specified quiesce time for work to complete before disconnecting. + * This method must not be called from inside {@link MqttCallback} methods. + *
+ * @param quiesceTimeout the amount of time in milliseconds to allow for + * existing work to finish before disconnecting. A value of zero or less + * means the client will not quiesce. + * @return token used to track and wait for disconnect to complete. The token + * will be passed to the callback methtods if a callback is set. + * @throws MqttException for problems encountered while disconnecting + * @see #disconnect(long, Object, IMqttActionListener) + */ + public IMqttToken disconnect(long quiesceTimeout) throws MqttException; + + /** + * Disconnects from the server. + *An attempt is made to quiesce the client allowing outstanding + * work to complete before disconnecting. It will wait + * for a maximum of 30 seconds for work to quiesce before disconnecting. + * This method must not be called from inside {@link MqttCallback} methods. + *
+ * + * @param userContext optional object used to pass context to the callback. Use + * null if not required. + * @param callback optional listener that will be notified when the disconnect completes. Use + * null if not required. + * @return token used to track and wait for the disconnect to complete. The token + * will be passed to any callback that has been set. + * @throws MqttException for problems encountered while disconnecting + * @see #disconnect(long, Object, IMqttActionListener) + */ + public IMqttToken disconnect( Object userContext, IMqttActionListener callback) throws MqttException; + + /** + * Disconnects from the server. + *+ * The client will wait for {@link MqttCallback} methods to + * complete. It will then wait for up to the quiesce timeout to allow for + * work which has already been initiated to complete. For instance when a QOS 2 + * message has started flowing to the server but the QOS 2 flow has not completed.It + * prevents new messages being accepted and does not send any messages that have + * been accepted but not yet started delivery across the network to the server. When + * work has completed or after the quiesce timeout, the client will disconnect from + * the server. If the cleansession flag was set to false and is set to false the + * next time a connection is made QoS 1 and 2 messages that + * were not previously delivered will be delivered.
+ *This method must not be called from inside {@link MqttCallback} methods.
+ *The method returns control before the disconnect completes. Completion can + * be tracked by: + *
true
if connected, false
otherwise.
+ */
+ public boolean isConnected();
+
+ /**
+ * Returns the client ID used by this client.
+ * All clients connected to the + * same server or server farm must have a unique ID. + *
+ * + * @return the client ID used by this client. + */ + public String getClientId(); + + /** + * Returns the address of the server used by this client. + *The format of the returned String is the same as that used on the constructor. + *
+ * + * @return the server's address, as a URI String. + * @see MqttAsyncClient#MqttAsyncClient(String, String) + */ + public String getServerURI(); + + /** + * Publishes a message to a topic on the server + *A convenience method, which will + * create a new {@link MqttMessage} object with a byte array payload and the + * specified QoS, and then publish it. + *
+ * + * @param topic to deliver the message to, for example "finance/stock/ibm". + * @param payload the byte array to use as the payload + * @param qos the Quality of Service to deliver the message at. Valid values are 0, 1 or 2. + * @param retained whether or not this message should be retained by the server. + * @return token used to track and wait for the publish to complete. The token + * will be passed to any callback that has been set. + * @throws MqttPersistenceException when a problem occurs storing the message + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @throws MqttException for other errors encountered while publishing the message. + * For instance if too many messages are being processed. + * @see #publish(String, MqttMessage, Object, IMqttActionListener) + * @see MqttMessage#setQos(int) + * @see MqttMessage#setRetained(boolean) + */ + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, + boolean retained ) throws MqttException, MqttPersistenceException; + + /** + * Publishes a message to a topic on the server + *A convenience method, which will + * create a new {@link MqttMessage} object with a byte array payload and the + * specified QoS, and then publish it. + *
+ * + * @param topic to deliver the message to, for example "finance/stock/ibm". + * @param payload the byte array to use as the payload + * @param qos the Quality of Service to deliver the message at. Valid values are 0, 1 or 2. + * @param retained whether or not this message should be retained by the server. + * @param userContext optional object used to pass context to the callback. Use + * null if not required. + * @param callback optional listener that will be notified when message delivery + * hsa completed to the requested quality of service + * @return token used to track and wait for the publish to complete. The token + * will be passed to any callback that has been set. + * @throws MqttPersistenceException when a problem occurs storing the message + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @throws MqttException for other errors encountered while publishing the message. + * For instance client not connected. + * @see #publish(String, MqttMessage, Object, IMqttActionListener) + * @see MqttMessage#setQos(int) + * @see MqttMessage#setRetained(boolean) + */ + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, + boolean retained, Object userContext, IMqttActionListener callback ) throws MqttException, MqttPersistenceException; + + /** + * Publishes a message to a topic on the server + * Takes an {@link MqttMessage} message and delivers it to the server at the + * requested quality of service. + * + * @param topic to deliver the message to, for example "finance/stock/ibm". + * @param message to deliver to the server + * @return token used to track and wait for the publish to complete. The token + * will be passed to any callback that has been set. + * @throws MqttPersistenceException when a problem occurs storing the message + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @throws MqttException for other errors encountered while publishing the message. + * For instance client not connected. + * @see #publish(String, MqttMessage, Object, IMqttActionListener) + */ + public IMqttDeliveryToken publish(String topic, MqttMessage message ) throws MqttException, MqttPersistenceException; + + /** + * Publishes a message to a topic on the server. + *+ * Once this method has returned cleanly, the message has been accepted for publication by the + * client and will be delivered on a background thread. + * In the event the connection fails or the client stops. Messages will be delivered to the + * requested quality of service once the connection is re-established to the server on condition that: + *
When building an application, + * the design of the topic tree should take into account the following principles + * of topic name syntax and semantics:
+ * + *The following principles apply to the construction and content of a topic + * tree:
+ * + *The method returns control before the publish completes. Completion can + * be tracked by: + *
Provides an optimised way to subscribe to multiple topics compared to + * subscribing to each one individually.
+ * + * @see #subscribe(String[], int[], Object, IMqttActionListener) + * + * @param topicFilters one or more topics to subscribe to, which can include wildcards + * @param qos the maximum quality of service at which to subscribe. Messages + * published at a lower quality of service will be received at the published + * QOS. Messages published at a higher quality of service will be received using + * the QOS specified on the subscribe. + * @return token used to track and wait for the subscribe to complete. The token + * will be passed to callback methtods if set. + * @throws MqttException if there was an error registering the subscription. + */ + public IMqttToken subscribe(String[] topicFilters, int[] qos) throws MqttException; + + /** + * Subscribes to multiple topics, each of which may include wildcards. + *Provides an optimised way to subscribe to multiple topics compared to + * subscribing to each one individually.
+ *The {@link #setCallback(MqttCallback)} method + * should be called before this method, otherwise any received messages + * will be discarded. + *
+ *+ * If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true + * when when connecting to the server then the subscription remains in place + * until either: + *
+ * If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false + * when connecting to the server then the subscription remains in place + * until either: + *
The "topic filter" string used when subscribing + * may contain special characters, which allow you to subscribe to multiple topics + * at once.
+ *The topic level separator is used to introduce structure into the topic, and + * can therefore be specified within the topic for that purpose. The multi-level + * wildcard and single-level wildcard can be used for subscriptions, but they + * cannot be used within a topic by the publisher of a message. + *
The number sign (#) is a wildcard character that matches + * any number of levels within a topic. For example, if you subscribe to + * finance/stock/ibm/#, you receive + * messages on these topics: + *
finance/stock/ibm+ * + *
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
The multi-level wildcard + * can represent zero or more levels. Therefore, finance/# can also match + * the singular finance, where # represents zero levels. The topic + * level separator is meaningless in this context, because there are no levels + * to separate.
+ * + *The multi-level wildcard can + * be specified only on its own or next to the topic level separator character. + * Therefore, # and finance/# are both valid, but finance# is + * not valid. The multi-level wildcard must be the last character + * used within the topic tree. For example, finance/# is valid but + * finance/#/closingprice is not valid.
The plus sign (+) is a wildcard character that matches only one topic + * level. For example, finance/stock/+ matches + * finance/stock/ibm and finance/stock/xyz, + * but not finance/stock/ibm/closingprice. Also, because the single-level + * wildcard matches only a single level, finance/+ does not match finance.
+ * + *Use + * the single-level wildcard at any level in the topic tree, and in conjunction + * with the multilevel wildcard. Specify the single-level wildcard next to the + * topic level separator, except when it is specified on its own. Therefore, + * + and finance/+ are both valid, but finance+ is + * not valid. The single-level wildcard can be used at the end of the + * topic tree or within the topic tree. + * For example, finance/+ and finance/+/ibm are both valid.
+ *The method returns control before the subscribe completes. Completion can + * be tracked by: + *
+ * Unsubcribing is the opposite of subscribing. When the server receives the + * unsubscribe request it looks to see if it can find a matching subscription for the + * client and then removes it. After this point the server will send no more + * messages to the client for this subscription. + *
+ *The topic(s) specified on the unsubscribe must match the topic(s) + * specified in the original subscribe request for the unsubscribe to succeed + *
+ *The method returns control before the unsubscribe completes. Completion can + * be tracked by: + *
There are a number of events that the listener will be notified about. + * These include: + *
Other events that track the progress of an individual operation such + * as connect and subscribe can be tracked using the {@link MqttToken} returned from + * each non-blocking method or using setting a {@link IMqttActionListener} on the + * non-blocking method.
+ * @see MqttCallback + * @param callback which will be invoked for certain asyncrhonous events + */ + public void setCallback(MqttCallback callback); + + /** + * Returns the delivery tokens for any outstanding publish operations. + *
If a client has been restarted and there are messages that were in the + * process of being delivered when the client stopped this method + * returns a token for each in-flight message enabling the delivery to be tracked + * Alternately the {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} + * callback can be used to track the delivery of outstanding messages. + *
+ *If a client connects with cleansession true then there will be no + * delivery tokens as the cleansession option deletes all earlier state. + * For state to be remembered the client must connect with cleansession + * set to false
+ * @return zero or more delivery tokens + */ + public IMqttDeliveryToken[] getPendingDeliveryTokens(); + + /** + * Close the client + * Releases all resource associated with the client. After the client has + * been closed it cannot be reused. For instance attempts to connect will fail. + * @throws MqttException if the client is not disconnected. + */ + public void close() throws MqttException; +} \ No newline at end of file diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttClient.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttClient.java new file mode 100644 index 0000000..8f12ac3 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttClient.java @@ -0,0 +1,462 @@ +package org.eclipse.paho.client.mqttv3; + +/** + * Enables an application to communicate with an MQTT server using using blocking methods. + *+ * This interface allows applications to utilise all features of the MQTT version 3.1 + * specification including: + *
+ * There are two styles of MQTT client, this one and {@link IMqttAsyncClient}. + *
+ * The non-blocking client can also be used in a blocking form by turning a non-blocking
+ * method into a blocking invocation using the following pettern:
+ *
+ * Using the non-blocking client allows an application to use a mixture of blocking and
+ * non-blocking styles. Using the blocking client only allows an application to use one
+ * style. The blocking client provides compatibility with earlier versions
+ * of the MQTT client.
+ * IMqttToken token;
+ * token = asyncClient.method(parms).waitForCompletion();
+ *
The default options are specified in {@link MqttConnectOptions} class. + *
+ * + * @throws MqttSecurityException when the server rejects the connect for security + * reasons + * @throws MqttException for non security related problems + * @see #connect(MqttConnectOptions) + */ + public void connect() throws MqttSecurityException, MqttException; + + /** + * Connects to an MQTT server using the specified options. + *The server to connect to is specified on the constructor. + * It is recommended to call {@link #setCallback(MqttCallback)} prior to + * connecting in order that messages destined for the client can be accepted + * as soon as the client is connected. + *
+ *This is a blocking method that returns once connect completes
+ * + * @param options a set of connection parameters that override the defaults. + * @throws MqttSecurityException when the server rejects the connect for security + * reasons + * @throws MqttException for non security related problems including communication errors + */ + public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException; + + /** + * Disconnects from the server. + *An attempt is made to quiesce the client allowing outstanding + * work to complete before disconnecting. It will wait + * for a maximum of 30 seconds for work to quiesce before disconnecting. + * This method must not be called from inside {@link MqttCallback} methods. + *
+ * + * @see #disconnect(long) + */ + public void disconnect() throws MqttException; + + /** + * Disconnects from the server. + *+ * The client will wait for all {@link MqttCallback} methods to + * complete. It will then wait for up to the quiesce timeout to allow for + * work which has already been initiated to complete - for example, it will + * wait for the QoS 2 flows from earlier publications to complete. When work has + * completed or after the quiesce timeout, the client will disconnect from + * the server. If the cleansession flag was set to false and is set to false the + * next time a connection is made QoS 1 and 2 messages that + * were not previously delivered will be delivered.
+ * + *This is a blocking method that returns once disconnect completes
+ * + * @param quiesceTimeout the amount of time in milliseconds to allow for + * existing work to finish before disconnecting. A value of zero or less + * means the client will not quiesce. + * @throws MqttException if a problem is encountered while disconnecting + */ + public void disconnect(long quiesceTimeout) throws MqttException; + + /** + * Subscribe to a topic, which may include wildcards using a QOS of 1. + * + * @see #subscribe(String[], int[]) + * + * @param topicFilter the topic to subscribe to, which can include wildcards. + * @throws MqttException if there was an error registering the subscription. + */ + public void subscribe(String topicFilter) throws MqttException, MqttSecurityException; + + /** + * Subscribes to a one or more topics, which may include wildcards using a QOS of 1. + * + * @see #subscribe(String[], int[]) + * + * @param topicFilters the topic to subscribe to, which can include wildcards. + * @throws MqttException if there was an error registering the subscription. + */ + public void subscribe(String[] topicFilters) throws MqttException; + + /** + * Subscribe to a topic, which may include wildcards. + * + * @see #subscribe(String[], int[]) + * + * @param topicFilter the topic to subscribe to, which can include wildcards. + * @param qos the maximum quality of service at which to subscribe. Messages + * published at a lower quality of service will be received at the published + * QOS. Messages published at a higher quality of service will be received using + * the QOS specified on the subscribe. + * @throws MqttException if there was an error registering the subscription. + */ + public void subscribe(String topicFilter, int qos) throws MqttException; + + /** + * Subscribes to multiple topics, each of which may include wildcards. + *The {@link #setCallback(MqttCallback)} method + * should be called before this method, otherwise any received messages + * will be discarded. + *
+ *+ * If (@link MqttConnectOptions#setCleanSession(boolean)} was set to true + * when when connecting to the server then the subscription remains in place + * until either: + *
+ * If (@link MqttConnectOptions#setCleanSession(boolean)} was set to false + * when when connecting to the server then the subscription remains in place + * until either: + *
The "topic filter" string used when subscribing + * may contain special characters, which allow you to subscribe to multiple topics + * at once.
+ *The topic level separator is used to introduce structure into the topic, and + * can therefore be specified within the topic for that purpose. The multi-level + * wildcard and single-level wildcard can be used for subscriptions, but they + * cannot be used within a topic by the publisher of a message. + *
The number sign (#) is a wildcard character that matches + * any number of levels within a topic. For example, if you subscribe to + * finance/stock/ibm/#, you receive + * messages on these topics: + *
finance/stock/ibm+ * + *
finance/stock/ibm/closingprice
finance/stock/ibm/currentprice
The multi-level wildcard + * can represent zero or more levels. Therefore, finance/# can also match + * the singular finance, where # represents zero levels. The topic + * level separator is meaningless in this context, because there are no levels + * to separate.
+ * + *The multi-level wildcard can + * be specified only on its own or next to the topic level separator character. + * Therefore, # and finance/# are both valid, but finance# is + * not valid. The multi-level wildcard must be the last character + * used within the topic tree. For example, finance/# is valid but + * finance/#/closingprice is not valid.
The plus sign (+) is a wildcard character that matches only one topic + * level. For example, finance/stock/+ matches + * finance/stock/ibm and finance/stock/xyz, + * but not finance/stock/ibm/closingprice. Also, because the single-level + * wildcard matches only a single level, finance/+ does not match finance.
+ * + *Use + * the single-level wildcard at any level in the topic tree, and in conjunction + * with the multilevel wildcard. Specify the single-level wildcard next to the + * topic level separator, except when it is specified on its own. Therefore, + * + and finance/+ are both valid, but finance+ is + * not valid. The single-level wildcard can be used at the end of the + * topic tree or within the topic tree. + * For example, finance/+ and finance/+/ibm are both valid.
+ *This is a blocking method that returns once subscribe completes
+ * + * @param topicFilters one or more topics to subscribe to, which can include wildcards. + * @param qos the maximum quality of service to subscribe each topic at.Messages + * published at a lower quality of service will be received at the published + * QOS. Messages published at a higher quality of service will be received using + * the QOS specified on the subscribe. + * @throws MqttException if there was an error registering the subscription. + * @throws IllegalArgumentException if the two supplied arrays are not the same size. + */ + public void subscribe(String[] topicFilters, int[] qos) throws MqttException; + + /** + * Requests the server unsubscribe the client from a topic. + * + * @see #unsubscribe(String[]) + * @param topicFilter the topic to unsubscribe from. It must match a topicFilter + * specified on the subscribe. + * @throws MqttException if there was an error unregistering the subscription. + */ + public void unsubscribe(String topicFilter) throws MqttException; + + /** + * Requests the server unsubscribe the client from one or more topics + *+ * Unsubcribing is the opposite of subscribing. When the server receives the + * unsubscribe request it looks to see if it can find a subscription for the + * client and then removes it. After this point the server will send no more + * messages to the client for this subscription. + *
+ *The topic(s) specified on the unsubscribe must match the topic(s) + * specified in the original subscribe request for the subscribe to succeed + *
+ * + *This is a blocking method that returns once unsubscribe completes
+ * + * @param topicFilters one or more topics to unsubscribe from. Each topicFilter + * must match one specified on a subscribe + * @throws MqttException if there was an error unregistering the subscription. + */ + public void unsubscribe(String[] topicFilters) throws MqttException; + + + /** + * Publishes a message to a topic on the server and return once it is delivered + *This is a convenience method, which will + * create a new {@link MqttMessage} object with a byte array payload and the + * specified QoS, and then publish it. All other values in the + * message will be set to the defaults. + *
+ * + * @param topic to deliver the message to, for example "finance/stock/ibm". + * @param payload the byte array to use as the payload + * @param qos the Quality of Service to deliver the message at. Valid values are 0, 1 or 2. + * @param retained whether or not this message should be retained by the server. + * @throws MqttPersistenceException when a problem with storing the message + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @throws MqttException for other errors encountered while publishing the message. + * For instance client not connected. + * @see #publish(String, MqttMessage) + * @see MqttMessage#setQos(int) + * @see MqttMessage#setRetained(boolean) + */ + public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException; + + /** + * Publishes a message to a topic on the server. + *+ * Delivers a message to the server at the requested quality of service and returns control + * once the message has been delivered. In the event the connection fails or the client + * stops, any messages that are in the process of being delivered will be delivered once + * a connection is re-established to the server on condition that: + *
In the event that the connection breaks or the client stops it is still possible to determine + * when the delivery of the message completes. Prior to re-establishing the connection to the server: + *
When building an application, + * the design of the topic tree should take into account the following principles + * of topic name syntax and semantics:
+ * + *The following principles apply to the construction and content of a topic + * tree:
+ * + *This is a blocking method that returns once publish completes
* + * + * @param topic to deliver the message to, for example "finance/stock/ibm". + * @param message to delivery to the server + * @throws MqttPersistenceException when a problem with storing the message + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @throws MqttException for other errors encountered while publishing the message. + * For instance client not connected. + */ + public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException; + + /** + * Sets the callback listener to use for events that happen asynchronously. + *There are a number of events that listener will be notified about. These include + *
Other events that track the progress of an individual operation such + * as connect and subscribe can be tracked using the {@link MqttToken} passed to the + * operation
+ * @see MqttCallback + * @param callback the class to callback when for events related to the client + */ + public void setCallback(MqttCallback callback); + + /** + * Get a topic object which can be used to publish messages. + *
An alternative method that should be used in preference to this one when publishing a message is: + *
When building an application, + * the design of the topic tree should take into account the following principles + * of topic name syntax and semantics:
+ * + *The following principles apply to the construction and content of a topic + * tree:
+ * + *true
if connected, false
otherwise.
+ */
+ public boolean isConnected();
+
+ /**
+ * Returns the client ID used by this client.
+ * All clients connected to the + * same server or server farm must have a unique ID. + *
+ * + * @return the client ID used by this client. + */ + public String getClientId(); + + /** + * Returns the address of the server used by this client, as a URI. + *The format is the same as specified on the constructor. + *
+ * + * @return the server's address, as a URI String. + * @see MqttAsyncClient#MqttAsyncClient(String, String) + */ + public String getServerURI(); + + /** + * Returns the delivery tokens for any outstanding publish operations. + *If a client has been restarted and there are messages that were in the + * process of being delivered when the client stopped this method will + * return a token for each message enabling the delivery to be tracked + * Alternately the {@link MqttCallback#deliveryComplete(IMqttDeliveryToken)} + * callback can be used to track the delivery of outstanding messages. + *
+ *If a client connects with cleansession true then there will be no + * delivery tokens as the cleansession option deletes all earlier state. + * For state to be remembered the client must connect with cleansession + * set to false
+ * @return zero or more delivery tokens + */ + public IMqttDeliveryToken[] getPendingDeliveryTokens(); + + /** + * Close the client + * Releases all resource associated with the client. After the client has + * been closed it cannot be reused. For instance attempts to connect will fail. + * @throws MqttException if the client is not disconnected. + */ + public void close() throws MqttException; +} \ No newline at end of file diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttDeliveryToken.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttDeliveryToken.java new file mode 100644 index 0000000..48a42e4 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttDeliveryToken.java @@ -0,0 +1,41 @@ +package org.eclipse.paho.client.mqttv3; +/** + * Provides a mechanism for tracking the delivery of a message + * + *A subclass of IMqttToken that allows the delivery of a message to be tracked. + * Unlike instances of IMqttToken delivery tokens can be used across connection + * and client restarts. This enables the delivery of a messages to be tracked + * after failures. There are two approaches + *
+ * An action is in progress until either: + *
Until the message has been delivered, the message being delivered will
+ * be returned. Once the message has been delivered null
will be
+ * returned.
+ * @return the message associated with this token or null if already delivered.
+ * @throws MqttException if there was a problem completing retrieving the message
+ */
+ public MqttMessage getMessage() throws MqttException;
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttToken.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttToken.java
new file mode 100644
index 0000000..8aaabc6
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/IMqttToken.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3;
+
+
+/**
+ * Provides a mechanism for tracking the completion of an asynchronous task.
+ *
+ *
When using the asynchronous/non-blocking MQTT programming interface all + * methods /operations that take any time and in particular those that involve + * any network operation return control to the caller immediately. The operation + * then proceeds to run in the background so as not to block the invoking thread. + * An IMqttToken is used to track the state of the operation. An application can use the + * token to wait for an operation to complete. A token is passed to callbacks + * once the operation completes and provides context linking it to the original + * request. A token is associated with a single operation.
+ *
+ * An action is in progress until either: + *
The timeout specifies the maximum time it will block for. If the action + * completes before the timeout then control returns immediately, if not + * it will block until the timeout expires.
+ *If the action being tracked fails or the timeout expires an exception will + * be thrown. In the event of a timeout the action may complete after timeout. + *
+ * + * @param timeout the maximum amount of time to wait for, in milliseconds. + * @throws MqttException if there was a problem with the action associated with the token. + */ + public void waitForCompletion(long timeout) throws MqttException; + + /** + * Returns whether or not the action has finished. + *True will be returned both in the case where the action finished successfully + * and in the case where it failed. If the action failed {@link #getException()} will + * be non null. + *
+ */ + public boolean isComplete(); + + /** + * Returns an exception providing more detail if an operation failed + *While an action in in progress and when an action completes successfully + * null will be returned. Certain errors like timeout or shutting down will not + * set the exception as the action has not failed or completed at that time + *
+ * @return exception may return an exception if the operation failed. Null will be + * returned while action is in progress and if action completes successfully. + */ + public MqttException getException(); + + /** + * Register a listener to be notified when an action completes. + *Once a listener is registered it will be invoked when the action the token + * is associated with either succeeds or fails. + *
+ * @param listener to be invoked once the action completes + */ + public void setActionCallback(IMqttActionListener listener); + + /** + * Return the async listener for this token. + * @return listener that is set on the token or null if a listener is not registered. + */ + public IMqttActionListener getActionCallback(); + + /** + * Returns the MQTT client that is responsible for processing the asynchronous + * action + */ + public IMqttAsyncClient getClient(); + + /** + * Returns the topic string(s) for the action being tracked by this + * token. If the action has not been initiated or the action has not + * topic associated with it such as connect then null will be returned. + * + * @return the topic string(s) for the subscribe being tracked by this token or null + */ + public String[] getTopics(); + + /** + * Store some context associated with an action. + *Allows the caller of an action to store some context that can be + * accessed from within the ActionListener associated with the action. This + * can be useful when the same ActionListener is associated with multiple + * actions
+ * @param userContext to associate with an action + */ + public void setUserContext(Object userContext); + + /** + * Retrieve the context associated with an action. + *Allows the ActionListener associated with an action to retrieve any context + * that was associated with the action when the action was invoked. If not + * context was provided null is returned.
+ + * @return Object context associated with an action or null if there is none. + */ + public Object getUserContext(); + + /** + * Returns the message ID of the message that is associated with the token. + * A message id of zero will be returned for tokens associated with + * connect, disconnect and ping operations as there can only ever + * be one of these outstanding at a time. For other operations + * the MQTT message id flowed over the network. + */ + public int getMessageId(); + +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java new file mode 100644 index 0000000..2a63346 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java @@ -0,0 +1,747 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + +import java.util.Hashtable; +import java.util.Properties; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; + +import org.eclipse.paho.client.mqttv3.internal.ClientComms; +import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper; +import org.eclipse.paho.client.mqttv3.internal.LocalNetworkModule; +import org.eclipse.paho.client.mqttv3.internal.NetworkModule; +import org.eclipse.paho.client.mqttv3.internal.SSLNetworkModule; +import org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule; +import org.eclipse.paho.client.mqttv3.internal.security.SSLSocketFactoryFactory; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttDisconnect; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttSubscribe; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttUnsubscribe; +import org.eclipse.paho.client.mqttv3.logging.Logger; +import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.eclipse.paho.client.mqttv3.util.Debug; + +/** + * Lightweight client for talking to an MQTT server using non-blocking methods + * that allow an operation to run in the background. + * + *This class implements the non-blocking {@link IMqttAsyncClient} client interface + * allowing applications to initiate MQTT actions and then carry working while the + * MQTT action completes on a background thread. + * This implementation is compatible with all Java SE runtimes from 1.4.2 and up. + *
+ *An application can connect to an MQTT server using: + *
To enable messages to be delivered even across network and client restarts + * messages need to be safely stored until the message has been delivered at the requested + * quality of service. A pluggable persistence mechanism is provided to store the messages. + *
+ *By default {@link MqttDefaultFilePersistence} is used to store messages to a file. + * If persistence is set to null then messages are stored in memory and hence can be lost + * if the client, Java runtime or device shuts down. + *
+ *If connecting with {@link MqttConnectOptions#setCleanSession(boolean)} set to true it + * is safe to use memory persistence as all state it cleared when a client disconnects. If + * connecting with cleansession set to false, to provide reliable message delivery + * then a persistent message store should be used such as the default one. + *
+ *The message store interface is pluggable. Different stores can be used by implementing + * the {@link MqttClientPersistence} interface and passing it to the clients constructor. + *
+ * + * @see IMqttAsyncClient + */ +public class MqttAsyncClient implements IMqttAsyncClient { // DestinationProvider { + + private static final int URI_TYPE_TCP = 0; + private static final int URI_TYPE_SSL = 1; + private static final int URI_TYPE_LOCAL = 2; + + private String clientId; + private String serverURI; + private int serverURIType; + protected ClientComms comms; + private Hashtable topics; + private MqttClientPersistence persistence; + + final static String className = MqttAsyncClient.class.getName(); + public Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,className); + + /** + * Create an MqttAsyncClient that can be used to communicate with an MQTT server. + *+ * The address of the server should be a URI, using a scheme of either + * "tcp://" for a TCP connection or "ssl://" for a TCP connection secured by SSL/TLS. + * For example: + *
tcp://localhost:1883
ssl://localhost:8883
+ * A client identified to connect to an MQTT server, it + * must be unique across all clients connecting to the same + * server. A convenience method is provided to generate a random client id that + * should satisfy this criteria - {@link #generateClientId()}. As the client identifier + * is used by the server to identify a client when it reconnects, the client must use the + * same identifier between connections if durable subscriptions are used and reliable + * delivery of messages is required. + *
+ *+ * In Java SE, SSL can be configured in one of several ways, which the + * client will use in the following order: + *
+ *SSLSocketFactory
- applications can
+ * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
+ * a factory with the appropriate SSL settings.In Java ME, the platform settings are used for SSL connections.
+ * + *A default instance of {@link MqttDefaultFilePersistence} is used by + * the client. To specify a different persistence implementation, or to turn + * off persistence, use the {@link #MqttAsyncClient(String, String, MqttClientPersistence)} constructor. + * + * @param serverURI the address of the server to connect to, specified as a URI + * @param clientId a client identifier that is unique on the server being connected to + * @throws IllegalArgumentException if the URI does not start with + * "tcp://", "ssl://" or "local://". + * @throws IllegalArgumentException if the clientId is null or is greater than 23 characters in length + * @throws MqttException if any other problem was encountered + */ + public MqttAsyncClient(String serverURI, String clientId) throws MqttException { + this(serverURI,clientId, new MqttDefaultFilePersistence()); + } + + /** + * Create an MqttAsyncClient that can be used to communicate with an MQTT server. + *
+ * The address of the server should be a URI, using a scheme of either + * "tcp://" for a TCP connection or "ssl://" for a TCP connection secured by SSL/TLS. + * For example: + *
tcp://localhost:1883
ssl://localhost:8883
+ * A client identified to connect to an MQTT server, it + * must be unique across all clients connecting to the same + * server. A convenience method is provided to generate a random client id that + * should satisfy this criteria - {@link #generateClientId()}. As the client identifier + * is used by the server to identify a client when it reconnects, the client must use the + * same identifier between connections if durable subscriptions are used and reliable + * delivery of messages is required. + *
+ *+ * In Java SE, SSL can be configured in one of several ways, which the + * client will use in the following order: + *
+ *SSLSocketFactory
- applications can
+ * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
+ * a factory with the appropriate SSL settings.In Java ME, the platform settings are used for SSL connections.
+ *+ * The persistence mechanism is used to enable reliable messaging. + * For qualities of server (QoS) 1 or 2 to work, messages must be persisted + * to disk by both the client and the server. If this is not done, then + * a failure in the client or server can result in lost messages. A pluggable + * persistence mechanism is supported via the {@link MqttClientPersistence} + * interface. A implementer of this interface that safely stores messages + * must be specified in order for delivery of messages to be reliable. In + * addition {@link MqttConnectOptions#setCleanSession(boolean)} must be set + * to false. In the event that only QoS 0 messages are sent or received or + * cleansession is set to true then a safe store is not needed. + *
+ *An implementation of file-based persistence is provided in
+ * class {@link MqttDefaultFilePersistence} which will work in all Java SE based
+ * systems. If no persistence is needed, the persistence parameter
+ * can be explicitly set to null
.
There are two alternative methods that should be used in preference to this one when publishing a message: + *
When you build an application, + * the design of the topic tree should take into account the following principles + * of topic name syntax and semantics:
+ * + *The following principles apply to the construction and content of a topic + * tree:
+ * + *When cleanSession is set to false, an application must ensure it uses the + * same client identifier when it reconnects to the server to resume state and maintain + * assured message delivery.
+ * @return a generated client identifier + * @see MqttConnectOptions#setCleanSession(boolean) + */ + public static String generateClientId() { + return (System.getProperty("user.name") + "." + System.currentTimeMillis()); + } + + /* (non-Javadoc) + * @see IMqttAsyncClient#getPendingDeliveryTokens() + */ + public IMqttDeliveryToken[] getPendingDeliveryTokens() { + return comms.getPendingDeliveryTokens(); + } + + /* (non-Javadoc) + * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, byte[], int, boolean, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener) + */ + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, + boolean retained, Object userContext, IMqttActionListener callback) throws MqttException, + MqttPersistenceException { + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + return this.publish(topic, message, userContext, callback); + } + /* (non-Javadoc) + * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, byte[], int, boolean) + */ + public IMqttDeliveryToken publish(String topic, byte[] payload, int qos, + boolean retained) throws MqttException, MqttPersistenceException { + return this.publish(topic, payload, qos, retained, null, null); + } + + /* (non-Javadoc) + * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage) + */ + public IMqttDeliveryToken publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException { + return this.publish(topic, message, null, null); + } + + /* (non-Javadoc) + * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#publish(java.lang.String, org.eclipse.paho.client.mqttv3.MqttMessage, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener) + */ + public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException, + MqttPersistenceException { + final String methodName = "publish"; + //@TRACE 111=< topic={0} message={1}userContext={1} callback={2} + log.fine(className,methodName,"111", new Object[] {topic, userContext, callback}); + + validateTopic(topic); + + MqttDeliveryToken token = new MqttDeliveryToken(getClientId()); + token.setActionCallback(callback); + token.setUserContext(userContext); + token.setMessage(message); + token.internalTok.setTopics(new String[] {topic}); + + MqttPublish pubMsg = new MqttPublish(topic, message); + comms.sendNoWait(pubMsg, token); + + //@TRACE 112=< + log.fine(className,methodName,"112"); + + return token; + } + + /* (non-Javadoc) + * @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#close() + */ + public void close() throws MqttException { + final String methodName = "close"; + //@TRACE 113=< + log.fine(className,methodName,"113"); + comms.close(); + //@TRACE 114=> + log.fine(className,methodName,"114"); + + } + + /** + * Return a debug object that can be used to help solve problems. + */ + public Debug getDebug() { + return new Debug(clientId,comms); + } + + /** + * Checks a topic is valid when publishing a message + *Checks the topic does not contain a wild card character.
+ * @param topic to validate + * @throws IllegalArgumentException if the topic is not valid + */ + static public void validateTopic(String topic) { + if ((topic.indexOf('#') == -1) && (topic.indexOf('+') == -1)) { + return; + } + // The topic string does not comply with topic string rules. + throw new IllegalArgumentException(); + } +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttCallback.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttCallback.java new file mode 100644 index 0000000..5972803 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttCallback.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + + +/** + * Enables an application to be notified when asynchronous + * events related to the client occur. + * Classes implementing this interface + * can be registered on both types of client: {@link IMqttClient#setCallback(MqttCallback)} + * and {@link IMqttAsyncClient#setCallback(MqttCallback)} + */ +public interface MqttCallback { + /** + * This method is called when the connection to the server is lost. + * + * @param cause the reason behind the loss of connection. + */ + public void connectionLost(Throwable cause); + + /** + * This method is called when a message arrives from the server. + * + *+ * This method is invoked synchronously by the MQTT client. An + * acknowledgement is not sent back to the server until this + * method returns cleanly.
+ *
+ * If an implementation of this method throws an Exception
, then the
+ * client will be shut down. When the client is next re-connected, any QoS
+ * 1 or 2 messages will be redelivered by the server.
+ * Any additional messages which arrive while an + * implementation of this method is running, will build up in memory, and + * will then back up on the network.
+ *+ * If an application needs to persist data, then it + * should ensure the data is persisted prior to returning from this method, as + * after returning from this method, the message is considered to have been + * delivered, and will not be reproducable.
+ *+ * It is possible to send a new message within an implementation of this callback + * (for example, a response to this message), but the implementation must not + * disconnect the client, as it will be impossible to send an acknowledgement for + * the message being processed, and a deadlock will occur.
+ * + * @param topic name of the topic on the message was published to + * @param message the actual message. + * @throws Exception if a terminal error has occurred, and the client should be + * shut down. + */ + public void messageArrived(String topic, MqttMessage message) throws Exception; + + /** + * Called when delivery for a message has been completed, and all + * acknowledgements have been received. For QOS 0 messages it is + * called once the message has been handed to the network for + * delivery. For QOS 1 it is called when PUBACK is received and + * for QOS 2 when PUBCOMP is received. The token will be the same + * token as that returned when the message was published. + * + * @param token the delivery token associated with the message. + */ + public void deliveryComplete(IMqttDeliveryToken token); + +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java new file mode 100644 index 0000000..f38a50e --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClient.java @@ -0,0 +1,376 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + +import java.util.Properties; +import javax.net.SocketFactory; + +import org.eclipse.paho.client.mqttv3.logging.Logger; +import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; +import org.eclipse.paho.client.mqttv3.util.Debug; + +/** + * Lightweight client for talking to an MQTT server using methods that block + * until an operation completes. + * + *This class implements the blocking {@link IMqttClient} client interface where all + * actions block until they have completed (or timed out). + * This implementation is compatible with all Java SE runtimes from 1.4.2 and up. + *
+ *An application can connect to an MQTT server using: + *
To enable messages to be delivered even across network and client restarts + * messages need to be safely stored until the message has been delivered at the requested + * quality of service. A pluggable persistence mechanism is provided to store the messages. + *
+ *By default {@link MqttDefaultFilePersistence} is used to store messages to a file. + * If persistence is set to null then messages are stored in memory and hence can be lost + * if the client, Java runtime or device shuts down. + *
+ *If connecting with {@link MqttConnectOptions#setCleanSession(boolean)} set to true it + * is safe to use memory persistence as all state it cleared when a client disconnects. If + * connecting with cleansession set to false, to provide reliable message delivery + * then a persistent message store should be used such as the default one.
+ *The message store interface is pluggable. Different stores can be used by implementing + * the {@link MqttClientPersistence} interface and passing it to the clients constructor. + *
+ * + * @see IMqttClient + */ +public class MqttClient implements IMqttClient { //), DestinationProvider { + + protected MqttAsyncClient aClient = null; // Delegate implementation to MqttAshyncClient + protected long timeToWait = -1; // How long each method should wait for action to complete + + final static String className = MqttClient.class.getName(); + public Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,className); + + + /** + * Create an MqttClient that can be used to communicate with an MQTT server. + *+ * The address of the server should be a URI, using a scheme of either + * "tcp://" for a TCP connection or "ssl://" for a TCP connection secured by SSL/TLS. + * For example: + *
tcp://localhost:1883
ssl://localhost:8883
+ * A client identified to connect to an MQTT server, it + * must be unique across all clients connecting to the same + * server. A convenience method is provided to generate a random client id that + * should satisfy this criteria - {@link #generateClientId()}. As the client identifier + * is used by the server to identify a client when it reconnects, the client must use the + * same identifier between connections if durable subscriptions are used and reliable + * delivery of messages is required. + *
+ *+ * In Java SE, SSL can be configured in one of several ways, which the + * client will use in the following order: + *
+ *SSLSocketFactory
- applications can
+ * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
+ * a factory with the appropriate SSL settings.In Java ME, the platform settings are used for SSL connections.
+ * + *A default instance of {@link MqttDefaultFilePersistence} is used by + * the client. To specify a different persistence implementation, or to turn + * off persistence, use the {@link #MqttClient(String, String, MqttClientPersistence)} constructor. + * + * @param serverURI the address of the server to connect to, specified as a URI + * @param clientId a client identifier that is unique on the server being connected to + * @throws IllegalArgumentException if the URI does not start with + * "tcp://", "ssl://" or "local://". + * @throws IllegalArgumentException if the clientId is null or is greater than 23 characters in length + * @throws MqttException if any other problem was encountered + */ + public MqttClient(String serverURI, String clientId) throws MqttException { + this(serverURI,clientId, new MqttDefaultFilePersistence()); + } + + /** + * Create an MqttAsyncClient that can be used to communicate with an MQTT server. + *
+ * The address of the server should be a URI, using a scheme of either + * "tcp://" for a TCP connection or "ssl://" for a TCP connection secured by SSL/TLS. + * For example: + *
tcp://localhost:1883
ssl://localhost:8883
+ * A client identified to connect to an MQTT server, it + * must be unique across all clients connecting to the same + * server. A convenience method is provided to generate a random client id that + * should satisfy this criteria - {@link #generateClientId()}. As the client identifier + * is used by the server to identify a client when it reconnects, the client must use the + * same identifier between connections if durable subscriptions are used and reliable + * delivery of messages is required. + *
+ *+ * In Java SE, SSL can be configured in one of several ways, which the + * client will use in the following order: + *
+ *SSLSocketFactory
- applications can
+ * use {@link MqttConnectOptions#setSocketFactory(SocketFactory)} to supply
+ * a factory with the appropriate SSL settings.In Java ME, the platform settings are used for SSL connections.
+ *+ * The persistence mechanism is used to enable reliable messaging. + * For qualities of server (QoS) 1 or 2 to work, messages must be persisted + * to disk by both the client and the server. If this is not done, then + * a failure in the client or server can result in lost messages. A pluggable + * persistence mechanism is supported via the {@link MqttClientPersistence} + * interface. A implementer of this interface that safely stores messages + * must be specified in order for delivery of messages to be reliable. In + * addition {@link MqttConnectOptions#setCleanSession(boolean)} must be set + * to false. In the event that only QoS 0 messages are sent or received or + * cleansession is set to true then a safe store is not needed. + *
+ *An implementation of file-based persistence is provided in
+ * class {@link MqttDefaultFilePersistence} which will work in all Java SE based
+ * systems. If no persistence is needed, the persistence parameter
+ * can be explicitly set to null
.
When cleanSession is set to false, an application must ensure it uses the + * same client identifier when it reconnects to the server to resume state and maintain + * assured message delivery.
+ * @return a generated client identifier + * @see MqttConnectOptions#setCleanSession(boolean) + */ + public static String generateClientId() { + return MqttAsyncClient.generateClientId(); + } + + /** + * Return a debug object that can be used to help solve problems. + */ + public Debug getDebug() { + return (aClient.getDebug()); + } +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClientPersistence.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClientPersistence.java new file mode 100644 index 0000000..6240b19 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttClientPersistence.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + +import java.util.Enumeration; + +/** + * Represents a persistent data store, used to store outbound and inbound messages while they + * are in flight, enabling delivery to the QOS specified. You can specify an implementation + * of this interface using {@link MqttClient#MqttClient(String, String, MqttClientPersistence)}, + * which the {@link MqttClient} will use to persist QoS 1 and 2 messages. + *+ * If the methods defined throw the MqttPersistenceException then the state of the data persisted + * should remain as prior to the method being called. For example, if {@link #put(String, MqttPersistable)} + * throws an exception at any point then the data will be assumed to not be in the persistent store. + * Similarly if {@link #remove(String)} throws an exception then the data will be + * assumed to still be held in the persistent store.
+ *+ * It is up to the persistence interface to log any exceptions or error information + * which may be required when diagnosing a persistence failure.
+ */ +public interface MqttClientPersistence { + /** + * Initialise the persistent store. + * If a persistent store exists for this client ID then open it, otherwise + * create a new one. If the persistent store is already open then just return. + * An application may use the same client ID to connect to many different + * servers, so the client ID in conjunction with the + * connection will uniquely identify the persistence store required. + * + * @param clientId The client for which the persistent store should be opened. + * @param serverURI The connection string as specified when the MQTT client instance was created. + * @throws MqttPersistenceException if there was a problem opening the persistent store. + */ + public void open(String clientId, String serverURI) throws MqttPersistenceException; + + /** + * Close the persistent store that was previously opened. + * This will be called when a client application disconnects from the broker. + * @throws MqttPersistenceException + */ + public void close() throws MqttPersistenceException; + + /** + * Puts the specified data into the persistent store. + * @param key the key for the data, which will be used later to retrieve it. + * @param persistable the data to persist + * @throws MqttPersistenceException if there was a problem putting the data + * into the persistent store. + */ + public void put(String key, MqttPersistable persistable) throws MqttPersistenceException; + + /** + * Gets the specified data out of the persistent store. + * @param key the key for the data, which was used when originally saving it. + * @return the un-persisted data + * @throws MqttPersistenceException if there was a problem getting the data + * from the persistent store. + */ + public MqttPersistable get(String key) throws MqttPersistenceException; + + /** + * Remove the data for the specified key. + */ + public void remove(String key) throws MqttPersistenceException; + + /** + * Returns an Enumeration over the keys in this persistent data store. + * @return an enumeration of {@link String} objects. + */ + public Enumeration keys() throws MqttPersistenceException; + + /** + * Clears persistence, so that it no longer contains any persisted data. + */ + public void clear() throws MqttPersistenceException; + + /** + * Returns whether or not data is persisted using the specified key. + * @param key the key for data, which was used when originally saving it. + */ + public boolean containsKey(String key) throws MqttPersistenceException; +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java new file mode 100644 index 0000000..72e28b5 --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttConnectOptions.java @@ -0,0 +1,366 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + +import java.util.Properties; + +import javax.net.SocketFactory; + +import org.eclipse.paho.client.mqttv3.util.Debug; + +/** + * Holds the set of options that control how the client connects to a server. + */ +public class MqttConnectOptions { + /** + * The default keep alive interval in seconds if one is not specified + */ + public static final int KEEP_ALIVE_INTERVAL_DEFAULT = 60; + /** + * The default connection timeout in seconds if one is not specified + */ + public static final int CONNECTION_TIMEOUT_DEFAULT = 30; + /** + * The default clean session setting if one is not specified + */ + public static final boolean CLEAN_SESSION_DEFAULT = true; + + private int keepAliveInterval = KEEP_ALIVE_INTERVAL_DEFAULT; + private String willDestination = null; + private MqttMessage willMessage = null; + private String userName; + private char[] password; + private SocketFactory socketFactory; + private Properties sslClientProps = null; + private boolean cleanSession = CLEAN_SESSION_DEFAULT; + private int connectionTimeout = CONNECTION_TIMEOUT_DEFAULT; + + /** + * Constructs a newMqttConnectOptions
object using the
+ * default values.
+ *
+ * The defaults are:
+ * The default value is 60 seconds
+ * + * @param keepAliveInterval the interval, measured in seconds, must be >= 0. + */ + public void setKeepAliveInterval(int keepAliveInterval)throws IllegalArgumentException { + if (keepAliveInterval <0 ) { + throw new IllegalArgumentException(); + } + this.keepAliveInterval = keepAliveInterval; + } + + /** + * Returns the connection timeout value. + * @see #setConnectionTimeout(int) + * @return the connection timeout value. + */ + public int getConnectionTimeout() { + return connectionTimeout; + } + + /** + * Sets the connection timeout value. + * This value, measured in seconds, defines the maximum time interval + * the client will wait for the network connection to the MQTT server to be established. + * The default timeout is 30 seconds. + * @param connectionTimeout the timeout value, measured in seconds. + */ + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + /** + * Returns the socket factory that will be used when connecting, or + *null
if one has not been set.
+ */
+ public SocketFactory getSocketFactory() {
+ return socketFactory;
+ }
+
+ /**
+ * Sets the SocketFactory
to use. This allows an application
+ * to apply its own policies around the creation of network sockets. If
+ * using an SSL connection, an SSLSocketFactory
can be used
+ * to supply application-specific security settings.
+ * @param socketFactory the factory to use.
+ */
+ public void setSocketFactory(SocketFactory socketFactory) {
+ this.socketFactory = socketFactory;
+ }
+
+ /**
+ * Returns the topic to be used for last will and testament (LWT).
+ * @return the MqttTopic to use, or null
if LWT is not set.
+ * @see #setWill(MqttTopic, byte[], int, boolean)
+ */
+ public String getWillDestination() {
+ return willDestination;
+ }
+
+ /**
+ * Returns the message to be sent as last will and testament (LWT).
+ * The returned object is "read only". Calling any "setter" methods on
+ * the returned object will result in an
+ * IllegalStateException
being thrown.
+ * @return the message to use, or null
if LWT is not set.
+ */
+ public MqttMessage getWillMessage() {
+ return willMessage;
+ }
+
+ /**
+ * Returns the SSL properties for the connection.
+ * @return the properties for the SSL connection
+ */
+ public Properties getSSLProperties() {
+ return sslClientProps;
+ }
+
+ /**
+ * Sets the SSL properties for the connection. Note that these
+ * properties are only valid if an implementation of the Java
+ * Secure Socket Extensions (JSSE) is available. These properties are
+ * not used if a SocketFactory has been set using
+ * {@link #setSocketFactory(SocketFactory)}.
+ * The following properties can be used:
+ * com.ibm.micro.security.Password.obfuscate(char[] password)
.
+ * This obfuscates the password using a simple and insecure XOR and Base64
+ * encoding mechanism. Note that this is only a simple scrambler to
+ * obfuscate clear-text passwords.com.ibm.micro.security.Password.obfuscate(char[] password)
.
+ * This obfuscates the password using a simple and insecure XOR and Base64
+ * encoding mechanism. Note that this is only a simple scrambler to
+ * obfuscate clear-text passwords.+ * Used to track the the delivery progress of a message when a publish is + * executed in a non-blocking manner (run in the background)
+ * + * @see MqttToken + */ +public class MqttDeliveryToken extends MqttToken implements IMqttDeliveryToken { + + + public MqttDeliveryToken() { + super(); + } + + public MqttDeliveryToken(String logContext) { + super(logContext); + } + + /** + * Returns the message associated with this token. + *Until the message has been delivered, the message being delivered will
+ * be returned. Once the message has been delivered null
will be
+ * returned.
+ * @return the message associated with this token or null if already delivered.
+ * @throws MqttException if there was a problem completing retrieving the message
+ */
+ public MqttMessage getMessage() throws MqttException {
+ return internalTok.getMessage();
+ }
+
+ protected void setMessage(MqttMessage msg) {
+ internalTok.setMessage(msg);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java
new file mode 100644
index 0000000..677b2d9
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttException.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3;
+
+import org.eclipse.paho.client.mqttv3.internal.MessageCatalog;
+
+/**
+ * Thrown if an error occurs communicating with the server.
+ */
+public class MqttException extends Exception {
+ private static final long serialVersionUID = 300L;
+
+ /**
+ * Client encountered an exception. Use the {@link #getCause()}
+ * method to get the underlying reason.
+ */
+ public static final short REASON_CODE_CLIENT_EXCEPTION = 0x00;
+
+ // CONNACK return codes
+ /** The protocol version requested is not supported by the server. */
+ public static final short REASON_CODE_INVALID_PROTOCOL_VERSION = 0x01;
+ /** The server has rejected the supplied client ID */
+ public static final short REASON_CODE_INVALID_CLIENT_ID = 0x02;
+ /** The broker was not available to handle the request. */
+ public static final short REASON_CODE_BROKER_UNAVAILABLE = 0x03;
+ /** Authentication with the server has failed, due to a bad user name or password. */
+ public static final short REASON_CODE_FAILED_AUTHENTICATION = 0x04;
+ /** Not authorized to perform the requested operation */
+ public static final short REASON_CODE_NOT_AUTHORIZED = 0x05;
+
+ /** An unexpected error has occurred. */
+ public static final short REASON_CODE_UNEXPECTED_ERROR = 0x06;
+
+ /**
+ * Client timed out while waiting for a response from the server.
+ * The server is no longer responding to keep-alive messages.
+ */
+ public static final short REASON_CODE_CLIENT_TIMEOUT = 32000;
+ /**
+ * Internal error, caused by no new message IDs being available.
+ */
+ public static final short REASON_CODE_NO_MESSAGE_IDS_AVAILABLE = 32001;
+
+ /**
+ * The client is already connected.
+ */
+ public static final short REASON_CODE_CLIENT_CONNECTED = 32100;
+ /**
+ * The client is already disconnected.
+ */
+ public static final short REASON_CODE_CLIENT_ALREADY_DISCONNECTED = 32101;
+ /**
+ * The client is currently disconnecting and cannot accept any new work.
+ * This can occur when waiting on a token, and then disconnecting the client.
+ * If the message delivery does not complete within the quiesce timeout
+ * period, then the waiting token will be notified with an exception.
+ */
+ public static final short REASON_CODE_CLIENT_DISCONNECTING = 32102;
+
+ /** Unable to connect to server */
+ public static final short REASON_CODE_SERVER_CONNECT_ERROR = 32103;
+
+ /**
+ * The client is not connected to the server. The {@link MqttClient#connect()}
+ * or {@link MqttClient#connect(MqttConnectOptions)} method must be called
+ * first. It is also possible that the connection was lost - see
+ * {@link MqttClient#setCallback(MqttCallback)} for a way to track lost
+ * connections.
+ */
+ public static final short REASON_CODE_CLIENT_NOT_CONNECTED = 32104;
+
+ /**
+ * Server URI and supplied SocketFactory
do not match.
+ * URIs beginning tcp://
must use a javax.net.SocketFactory
,
+ * and URIs beginning ssl://
must use a javax.net.ssl.SSLSocketFactory
.
+ */
+ public static final short REASON_CODE_SOCKET_FACTORY_MISMATCH = 32105;
+
+ /**
+ * SSL configuration error.
+ */
+ public static final short REASON_CODE_SSL_CONFIG_ERROR = 32106;
+
+ /**
+ * Thrown when an attempt to call {@link MqttClient#disconnect()} has been
+ * made from within a method on {@link MqttCallback}. These methods are invoked
+ * by the client's thread, and must not be used to control disconnection.
+ *
+ * @see MqttCallback#messageArrived(String, MqttMessage)
+ */
+ public static final short REASON_CODE_CLIENT_DISCONNECT_PROHIBITED = 32107;
+
+ /**
+ * Protocol error: the message was not recognized as a valid MQTT packet.
+ * Possible reasons for this include connecting to a non-MQTT server, or
+ * connecting to an SSL server port when the client isn't using SSL.
+ */
+ public static final short REASON_CODE_INVALID_MESSAGE = 32108;
+
+ /**
+ * The client has been unexpectedly disconnected from the server. The {@link #getCause() cause}
+ * will provide more details.
+ */
+ public static final short REASON_CODE_CONNECTION_LOST = 32109;
+
+ /**
+ * A connect operation in already in progress, only one connect can happen
+ * at a time.
+ */
+ public static final short REASON_CODE_CONNECT_IN_PROGRESS = 32110;
+
+ /**
+ * The client is closed - no operations are permitted on the client in this
+ * state. New up a new client to continue.
+ */
+ public static final short REASON_CODE_CLIENT_CLOSED = 32111;
+
+ /**
+ * A request has been made to use a token that is already associated with
+ * another action. If the action is complete the reset() can ve called on the
+ * token to allow it to be reused.
+ */
+ public static final short REASON_CODE_TOKEN_INUSE = 32201;
+
+ /**
+ * A request has been made to send a message but the maximum number of inflight
+ * messages has already been reached. Once one or more messages have been moved
+ * then new messages can be sent.
+ */
+ public static final short REASON_CODE_MAX_INFLIGHT = 32202;
+
+ private int reasonCode;
+ private Throwable cause;
+
+ /**
+ * Constructs a new MqttException
with the specified code
+ * as the underlying reason.
+ * @param reasonCode the reason code for the exception.
+ */
+ public MqttException(int reasonCode) {
+ super();
+ this.reasonCode = reasonCode;
+ }
+
+ /**
+ * Constructs a new MqttException
with the specified
+ * Throwable
as the underlying reason.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttException(Throwable cause) {
+ super();
+ this.reasonCode = REASON_CODE_CLIENT_EXCEPTION;
+ this.cause = cause;
+ }
+
+ /**
+ * Constructs a new MqttException
with the specified
+ * Throwable
as the underlying reason.
+ * @param reason the reason code for the exception.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttException(int reason, Throwable cause) {
+ super();
+ this.reasonCode = reason;
+ this.cause = cause;
+ }
+
+
+ /**
+ * Returns the reason code for this exception.
+ * @return the code representing the reason for this exception.
+ */
+ public int getReasonCode() {
+ return reasonCode;
+ }
+
+ /**
+ * Returns the underlying cause of this exception, if available.
+ * @return the Throwable that was the root cause of this exception,
+ * which may be null
.
+ */
+ public Throwable getCause() {
+ return cause;
+ }
+
+ /**
+ * Returns the detail message for this exception.
+ * @return the detail message, which may be null
.
+ */
+ public String getMessage() {
+ return MessageCatalog.getMessage(reasonCode);
+ }
+
+ /**
+ * Returns a String
representation of this exception.
+ * @return a String
representation of this exception.
+ */
+ public String toString() {
+ String result = getMessage() + " (" + reasonCode + ")";
+ if (cause != null) {
+ result = result + " - " + cause.toString();
+ }
+ return result;
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttMessage.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttMessage.java
new file mode 100644
index 0000000..4c5f88f
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttMessage.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3;
+
+/**
+ * An MQTT message holds the application payload and options
+ * specifying how the message is to be delivered
+ * The message includes a "payload" (the body of the message)
+ * represented as a byte[].
+ */
+public class MqttMessage {
+
+ private boolean mutable = true;
+ private byte[] payload;
+ private int qos = 1;
+ private boolean retained = false;
+ private boolean dup = false;
+
+ /**
+ * Utility method to validate the supplied QoS value.
+ * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2.
+ */
+ public static void validateQos(int qos) {
+ if ((qos < 0) || (qos > 2)) {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ /**
+ * Constructs a message with an empty payload, and all other values
+ * set to defaults.
+ *
+ * The defaults are:
+ *
true
if the message should be, or was, retained by
+ * the server.
+ * @see #setRetained(boolean)
+ */
+ public boolean isRetained() {
+ return retained;
+ }
+
+ /**
+ * Whether or not the publish message should be retained by the messaging engine.
+ * Sending a message with the retained set to false
will clear the
+ * retained message from the server. The default value is false
+ *
+ * @param retained whether or not the messaging engine should retain the message.
+ * @throws IllegalStateException if this message cannot be edited
+ */
+ public void setRetained(boolean retained) {
+ checkMutable();
+ this.retained = retained;
+ }
+
+ /**
+ * Returns the quality of service for this message.
+ * @return the quality of service to use, either 0, 1, or 2.
+ * @see #setQos(int)
+ */
+ public int getQos() {
+ return qos;
+ }
+
+ /**
+ * Sets the quality of service for this message.
+ * MqttConnectOptions
.
+ * If a persistence mechanism is not specified, the message will not be
+ * delivered in the event of a client failure.
+ * The message will be acknowledged across the network.
+ * This is the default QoS.MqttConnectOptions
.
+ * If a persistence mechanism is not specified, the message will not be
+ * delivered in the event of a client failure.true
if the values can be changed,
+ * false
to prevent them from being changed.
+ */
+ protected void setMutable(boolean mutable) {
+ this.mutable = mutable;
+ }
+
+ protected void checkMutable() throws IllegalStateException {
+ if (!mutable) {
+ throw new IllegalStateException();
+ }
+ }
+
+ protected void setDuplicate(boolean dup) {
+ this.dup = dup;
+ }
+
+ /**
+ * Returns whether or not this message might be a duplicate of one which has
+ * already been received. This will only be set on messages received from
+ * the server.
+ * @return true
if the message might be a duplicate.
+ */
+ public boolean isDuplicate() {
+ return this.dup;
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttPersistable.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttPersistable.java
new file mode 100644
index 0000000..5508c52
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttPersistable.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3;
+
+/**
+ * Represents an object used to pass data to be persisted across the
+ * {@link org.eclipse.paho.client.mqttv3.MqttClientPersistence MqttClientPersistence}
+ * interface.
+ * + * When data is passed across the interface the header and payload are + * separated, so that unnecessary message copies may be avoided. + * For example, if a 10 MB payload was published it would be inefficient + * to create a byte array a few bytes larger than 10 MB and copy the + * MQTT message header and payload into a contiguous byte array.
+ *+ * When the request to persist data is made a separate byte array and offset + * is passed for the header and payload. Only the data between + * offset and length need be persisted. + * So for example, a message to be persisted consists of a header byte + * array starting at offset 1 and length 4, plus a payload byte array + * starting at offset 30 and length 40000. There are three ways in which + * the persistence implementation may return data to the client on + * recovery: + *
MqttPersistenceException
+ */
+ public MqttPersistenceException() {
+ super(REASON_CODE_CLIENT_EXCEPTION);
+ }
+
+ /**
+ * Constructs a new MqttPersistenceException
with the specified code
+ * as the underlying reason.
+ * @param reasonCode the reason code for the exception.
+ */
+ public MqttPersistenceException(int reasonCode) {
+ super(reasonCode);
+ }
+ /**
+ * Constructs a new MqttPersistenceException
with the specified
+ * Throwable
as the underlying reason.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttPersistenceException(Throwable cause) {
+ super(cause);
+ }
+ /**
+ * Constructs a new MqttPersistenceException
with the specified
+ * Throwable
as the underlying reason.
+ * @param reason the reason code for the exception.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttPersistenceException(int reason, Throwable cause) {
+ super(reason, cause);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttSecurityException.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttSecurityException.java
new file mode 100644
index 0000000..9069e41
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttSecurityException.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3;
+
+/**
+ * Thrown when a client is not authorized to perform an operation, or
+ * if there is a problem with the security configuration.
+ */
+public class MqttSecurityException extends MqttException {
+ private static final long serialVersionUID = 300L;
+
+ /**
+ * Constructs a new MqttSecurityException
with the specified code
+ * as the underlying reason.
+ * @param reasonCode the reason code for the exception.
+ */
+ public MqttSecurityException(int reasonCode) {
+ super(reasonCode);
+ }
+
+ /**
+ * Constructs a new MqttSecurityException
with the specified
+ * Throwable
as the underlying reason.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttSecurityException(Throwable cause) {
+ super(cause);
+ }
+ /**
+ * Constructs a new MqttSecurityException
with the specified
+ * code and Throwable
as the underlying reason.
+ * @param reasonCode the reason code for the exception.
+ * @param cause the underlying cause of the exception.
+ */
+ public MqttSecurityException(int reasonCode, Throwable cause) {
+ super(reasonCode, cause);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttToken.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttToken.java
new file mode 100644
index 0000000..1fc826b
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttToken.java
@@ -0,0 +1,71 @@
+package org.eclipse.paho.client.mqttv3;
+
+import org.eclipse.paho.client.mqttv3.internal.Token;
+
+/**
+ * Provides a mechanism for tracking the completion of an asynchronous action.
+ * + * A token that implements the ImqttToken interface is returned from all non-blocking + * method with the exception of publish. + *
+ * + * @see IMqttToken + */ + +public class MqttToken implements IMqttToken { + /** + * A reference to the the class that provides most of the implementation of the + * MqttToken. MQTT application programs must not use the internal class. + */ + public Token internalTok = null; + + public MqttToken() { + } + + public MqttToken(String logContext) { + internalTok = new Token(logContext); + } + + public MqttException getException() { + return internalTok.getException(); + } + + public boolean isComplete() { + return internalTok.isComplete(); + } + + public void setActionCallback(IMqttActionListener listener) { + internalTok.setActionCallback(listener); + + } + public IMqttActionListener getActionCallback() { + return internalTok.getActionCallback(); + } + + public void waitForCompletion() throws MqttException { + internalTok.waitForCompletion(-1); + } + + public void waitForCompletion(long timeout) throws MqttException { + internalTok.waitForCompletion(timeout); + } + + public IMqttAsyncClient getClient() { + return internalTok.getClient(); + } + + public String[] getTopics() { + return internalTok.getTopics(); + } + + public Object getUserContext() { + return internalTok.getUserContext(); + } + + public void setUserContext(Object userContext) { + internalTok.setUserContext(userContext); } + + public int getMessageId() { + return internalTok.getMessageID(); + } +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttTopic.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttTopic.java new file mode 100644 index 0000000..a486f7b --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttTopic.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3; + +import org.eclipse.paho.client.mqttv3.internal.ClientComms; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish; + +/** + * Represents a topic destination, used for publish/subscribe messaging. + */ +public class MqttTopic { + + private ClientComms comms; + private String name; + + public MqttTopic(String name, ClientComms comms) { + this.comms = comms; + this.name = name; + } + + /** + * Publishes a message on the topic. This is a convenience method, which will + * create a new {@link MqttMessage} object with a byte array payload and the + * specified QoS, and then publish it. All other values in the + * message will be set to the defaults. + + * @param payload the byte array to use as the payload + * @param qos the Quality of Service. Valid values are 0, 1 or 2. + * @param retained whether or not this message should be retained by the server. + * @throws IllegalArgumentException if value of QoS is not 0, 1 or 2. + * @see #publish(MqttMessage) + * @see MqttMessage#setQos(int) + * @see MqttMessage#setRetained(boolean) + */ + public MqttDeliveryToken publish(byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException { + MqttMessage message = new MqttMessage(payload); + message.setQos(qos); + message.setRetained(retained); + return this.publish(message); + } + + /** + * Publishes the specified message to this topic, but does not wait for delivery + * of the message to complete. The returned {@link MqttDeliveryToken token} can be used + * to track the delivery status of the message. Once this method has + * returned cleanly, the message has been accepted for publication by the + * client. Message delivery will be completed in the background when a connection + * is available. + * + * @param message the message to publish + * @return an MqttDeliveryToken for tracking the delivery of the message + */ + public MqttDeliveryToken publish(MqttMessage message) throws MqttException, MqttPersistenceException { + MqttDeliveryToken token = new MqttDeliveryToken(comms.getClient().getClientId()); + token.setMessage(message); + comms.sendNoWait(createPublish(message), token); + token.internalTok.waitUntilSent(); + return token; + } + + /** + * Returns the name of the queue or topic. + * + * @return the name of this destination. + */ + public String getName() { + return name; + } + + /** + * Create a PUBLISH packet from the specified message. + */ + private MqttPublish createPublish(MqttMessage message) { + return new MqttPublish(this.getName(), message); + } + + /** + * Returns a string representation of this topic. + * @return a string representation of this topic. + */ + public String toString() { + return getName(); + } + +} diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientComms.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientComms.java new file mode 100644 index 0000000..0f1aa2d --- /dev/null +++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/ClientComms.java @@ -0,0 +1,582 @@ +/* +* Copyright (c) 2009, 2012 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Dave Locke - initial API and implementation and/or initial documentation + */ +package org.eclipse.paho.client.mqttv3.internal; + +import java.util.Enumeration; +import java.util.Properties; +import java.util.Vector; + +import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClientPersistence; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttPersistenceException; +import org.eclipse.paho.client.mqttv3.MqttToken; +import org.eclipse.paho.client.mqttv3.MqttTopic; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttConnack; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttConnect; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttDisconnect; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttPublish; +import org.eclipse.paho.client.mqttv3.internal.wire.MqttWireMessage; +import org.eclipse.paho.client.mqttv3.logging.Logger; +import org.eclipse.paho.client.mqttv3.logging.LoggerFactory; + +/** + * Handles client communications with the server. Sends and receives MQTT V3 + * messages. + */ +public class ClientComms { + public static String VERSION = "${project.version}"; + public static String BUILD_LEVEL = "L${build.level}"; + + private IMqttAsyncClient client; + NetworkModule networkModule; + CommsReceiver receiver; + CommsSender sender; + CommsCallback callback; + ClientState clientState; + MqttConnectOptions conOptions; + private MqttClientPersistence persistence; + CommsTokenStore tokenStore; + boolean stoppingComms = false; + + final static byte CONNECTED =0; + final static byte CONNECTING =1; + final static byte DISCONNECTING =2; + final static byte DISCONNECTED =3; + final static byte CLOSED =4; + + private byte conState = DISCONNECTED; + Object conLock = new Object(); // Used to syncrhonize connection state + private boolean closePending = false; + + final static String className = ClientComms.class.getName(); + Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,className); + + + /** + * Creates a new ClientComms object, using the specified module to handle + * the network calls. + */ + public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence) throws MqttException { + this.conState = DISCONNECTED; + this.client = client; + this.persistence = persistence; + this.tokenStore = new CommsTokenStore(getClient().getClientId()); + this.callback = new CommsCallback(this); + this.clientState = new ClientState(persistence, tokenStore, this.callback, this); + + callback.setClientState(clientState); + log.setResourceName(getClient().getClientId()); + } + + /** + * Sends a message to the server. Does not check if connected this validation must be done + * by invoking routines. + * @param message + * @param token + * @throws MqttException + */ + void internalSend(MqttWireMessage message, MqttToken token) throws MqttException { + final String methodName = "internalSend"; + //@TRACE 200=internalSend key={0} message={1} token={2} + log.fine(className, methodName, "200", new Object[]{message.getKey(), message, token}); + + if (token.getClient() == null ) { + // Associate the client with the token - also marks it as in use. + token.internalTok.setClient(getClient()); + } else { + // Token is already in use - cannot reuse + //@TRACE 213=fail: token in use: key={0} message={1} token={2} + log.fine(className, methodName, "213", new Object[]{message.getKey(), message, token}); + + throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE); + } + + try { + // Persist if needed and send the message + this.clientState.send(message, token); + } catch(MqttException e) { + if (message instanceof MqttPublish) { + this.clientState.undo((MqttPublish)message); + } + throw e; + } + } + + /** + * Sends a message to the broker if in connected state, but only waits for the message to be + * stored, before returning. + */ + public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { + final String methodName = "sendNoWait"; + if (isConnected() || + (!isConnected() && message instanceof MqttConnect) || + (isDisconnecting() && message instanceof MqttDisconnect)) { + this.internalSend(message, token); + } else { + //@TRACE 208=failed: not connected + log.fine(className, methodName, "208"); + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED); + } + } + + /** + * Tidy up + * - call each main class and let it tidy up e.g. releasing the token + * store which normally survives a disconnect. + * @throws MqttException if not disconnected + */ + public void close() throws MqttException { + final String methodName = "close"; + synchronized (conLock) { + if (!isClosed()) { + // Must be disconnected before close can take place + if (!isDisconnected()) { + //@TRACE 224=failed: not disconnected + log.fine(className, methodName, "224"); + + if (isConnecting()) { + throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS); + } else if (isConnected()) { + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); + } else if (isDisconnecting()) { + closePending = true; + return; + } + } + + conState = CLOSED; + + // ShutdownConnection has already cleaned most things + clientState.close(); + clientState = null; + callback = null; + persistence = null; + sender = null; + receiver = null; + networkModule = null; + conOptions = null; + tokenStore = null; + } + } + } + + /** + * Sends a connect message and waits for an ACK or NACK. + * Connecting is a special case which will also start up the + * network connection, receive thread, and keep alive thread. + */ + public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { + final String methodName = "connect"; + synchronized (conLock) { + if (isDisconnected() && !closePending) { + //@TRACE 214=state=CONNECTING + log.fine(className,methodName,"214"); + + conState = CONNECTING; + + this.conOptions = options; + + MqttConnect connect = new MqttConnect(client.getClientId(), + options.isCleanSession(), + options.getKeepAliveInterval(), + options.getUserName(), + options.getPassword(), + options.getWillMessage(), + options.getWillDestination()); + + this.clientState.setKeepAliveSecs(options.getKeepAliveInterval()); + this.clientState.setCleanSession(options.isCleanSession()); + + tokenStore.open(); + ConnectBG conbg = new ConnectBG(this, token, connect); + conbg.start(); + } + else { + // @TRACE 207=connect failed: not disconnected {0} + log.fine(className,methodName,"207", new Object[] {new Byte(conState)}); + if (isClosed() || closePending) { + throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED); + } else if (isConnecting()) { + throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS); + } else if (isDisconnecting()) { + throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); + } else { + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED); + } + } + } + } + + public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException { + final String methodName = "connectComplete"; + int rc = cack.getReturnCode(); + synchronized (conLock) { + if (rc == 0) { + // We've successfully connected + // @TRACE 215=state=CONNECTED + log.fine(className,methodName,"215"); + + conState = CONNECTED; + return; + } + } + + // @TRACE 204=connect failed: rc={0} + log.fine(className,methodName,"204", new Object[]{new Integer(rc)}); + throw mex; + } + + /** + * Shuts down the connection to the server. + * This may have been invoked as a result of a user calling disconnect or + * an abnormal disconnection. The method may be invoked multiple times + * in parallel as each thread when it receives an error uses this method + * to ensure that shutdown completes successfully. + */ + public void shutdownConnection(MqttToken token, MqttException reason) { + final String methodName = "shutdownConnection"; + boolean wasConnected; + MqttToken endToken = null; //Token to notify after disconnect completes + + // This method could concurrently be invoked from many places only allow it + // to run once. + synchronized(conLock) { + if (stoppingComms || closePending) { + return; + } + stoppingComms = true; + + //@TRACE 216=state=DISCONNECTING + log.fine(className,methodName,"216"); + + wasConnected = (isConnected() || isDisconnecting()); + conState = DISCONNECTING; + } + + // Update the token with the reason for shutdown if it + // is not already complete. + if (token != null && !token.isComplete()) { + token.internalTok.setException(reason); + } + + // Stop the thread that is used to call the user back + // when actions complete + if (callback!= null) {callback.stop(); } + + // Stop the network module, send and receive now not possible + try { + if (networkModule != null) {networkModule.stop();} + }catch(Exception ioe) { + // Ignore as we are shutting down + } + + // Stop the thread that handles inbound work from the network + if (receiver != null) {receiver.stop();} + + // Stop any new tokens being saved by app and throwing an exception if they do + tokenStore.quiesce(new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING)); + + // Notify any outstanding tokens with the exception of + // con or discon which may be returned and will be notified at + // the end + endToken = handleOldTokens(token, reason); + + try { + // Clean session handling and tidy up + clientState.disconnected(reason); + }catch(Exception ex) { + // Ignore as we are shutting down + } + + if (sender != null) { sender.stop(); } + + try { + if (persistence != null) {persistence.close();} + }catch(Exception ex) { + // Ignore as we are shutting down + } + // All disconnect logic has been completed allowing the + // client to be marked as disconnected. + synchronized(conLock) { + //@TRACE 217=state=DISCONNECTED + log.fine(className,methodName,"217"); + + conState = DISCONNECTED; + stoppingComms = false; + } + + // Internal disconnect processing has completed. If there + // is a disconnect token or a connect in error notify + // it now. This is done at the end to allow a new connect + // to be processed and now throw a currently disconnecting error. + // any outstanding tokens and unblock any waiters + if (endToken != null & callback != null) { + callback.asyncOperationComplete(endToken); + } + + if (wasConnected && callback != null) { + // Let the user know client has disconnected either normally or abnormally + callback.connectionLost(reason); + } + + // While disconnecting, close may have been requested - try it now + synchronized(conLock) { + if (closePending) { + try { + close(); + } catch (Exception e) { // ignore any errors as closing + } + } + } + } + + // Tidy up. There may be tokens outstanding as the client was + // not disconnected/quiseced cleanly! Work out what tokens still + // need to be notified and waiters unblocked. Store the + // disconnect or connect token to notify after disconnect is + // complete. + private MqttToken handleOldTokens(MqttToken token, MqttException reason) { + final String methodName = "handleOldTokens"; + //@TRACE 222=> + log.fine(className,methodName,"222"); + + MqttToken tokToNotifyLater = null; + try { + // First the token that was related to the disconnect / shutdown may + // not be in the token table - temporarily add it if not + if (token != null) { + if (tokenStore.getToken(token.internalTok.getKey())==null) { + tokenStore.saveToken(token, token.internalTok.getKey()); + } + } + + Vector toksToNot = clientState.resolveOldTokens(reason); + Enumeration toksToNotE = toksToNot.elements(); + while(toksToNotE.hasMoreElements()) { + MqttToken tok = (MqttToken)toksToNotE.nextElement(); + + if (tok.internalTok.getKey().equals(MqttDisconnect.KEY) || + tok.internalTok.getKey().equals(MqttConnect.KEY)) { + // Its con or discon so remember and notify @ end of disc routine + tokToNotifyLater = tok; + } else { + // notify waiters and callbacks of outstanding tokens + // that a problem has occurred and disconnect is in + // progress + callback.asyncOperationComplete(tok); + } + } + }catch(Exception ex) { + // Ignore as we are shutting down + } + return tokToNotifyLater; + } + + public void disconnect(MqttDisconnect disconnect, long quiesceTimeout, MqttToken token) throws MqttException { + final String methodName = "disconnect"; + synchronized (conLock){ + if (isClosed()) { + //@TRACE 223=failed: in closed state + log.fine(className,methodName,"223"); + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CLOSED); + } else if (isDisconnected()) { + //@TRACE 211=failed: already disconnected + log.fine(className,methodName,"211"); + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED); + } else if (isDisconnecting()) { + //@TRACE 219=failed: already disconnecting + log.fine(className,methodName,"219"); + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING); + } else if (Thread.currentThread() == callback.getThread()) { + //@TRACE 210=failed: called on callback thread + log.fine(className,methodName,"210"); + // Not allowed to call disconnect() from the callback, as it will deadlock. + throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECT_PROHIBITED); + } + + //@TRACE 218=state=DISCONNECTING + log.fine(className,methodName,"218"); + conState = DISCONNECTING; + DisconnectBG discbg = new DisconnectBG(disconnect,quiesceTimeout,token); + discbg.start(); + } + } + + public boolean isConnected() { + return conState == CONNECTED; + } + + public boolean isConnecting() { + return conState == CONNECTING; + } + public boolean isDisconnected() { + return conState == DISCONNECTED; + } + + public boolean isDisconnecting() { + return conState == DISCONNECTING; + } + public boolean isClosed() { + return conState == CLOSED; + } + + + public void setCallback(MqttCallback mqttCallback) { + this.callback.setCallback(mqttCallback); + } + + protected MqttTopic getTopic(String topic) { + return new MqttTopic(topic, this); + } + public void setNetworkModule(NetworkModule networkModule) { + this.networkModule = networkModule; + } + public MqttDeliveryToken[] getPendingDeliveryTokens() { + return tokenStore.getOutstandingDelTokens(); + } + protected void deliveryComplete(MqttPublish msg) throws MqttPersistenceException { + this.clientState.deliveryComplete(msg); + } + + public IMqttAsyncClient getClient() { + return client; + } + + public long getKeepAlive() { + return this.clientState.getKeepAlive(); + } + + public ClientState getClientState() { + return clientState; + } + + public MqttConnectOptions getConOptions() { + return conOptions; + } + + public Properties getDebug() { + Properties props = new Properties(); + props.put("conState", new Integer(conState)); + props.put("serverURI", getClient().getServerURI()); + props.put("callback", callback); + props.put("stoppingComms", new Boolean(stoppingComms)); + return props; + } + + + + // Kick off the connect processing in the background so that it does not block. For instance + // the socket could take time to create. + private class ConnectBG implements Runnable { + ClientComms clientComms = null; + Thread cBg = null; + MqttToken conToken; + MqttConnect conPacket; + + ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) { + clientComms = cc; + conToken = cToken; + conPacket = cPacket; + cBg = new Thread(this, "MQTT Con: "+getClient().getClientId()); + } + + void start() { + cBg.start(); + } + + public void run() { + final String methodName = "connectBG:run"; + MqttException mqttEx = null; + //@TRACE 220=> + log.fine(className, methodName, "220"); + + try { + // Reset an exception on existing delivery tokens. + // This will have been set if disconnect occured before delivery was + // fully processed. + MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens(); + for (int i=0; i+ * The SSLSocketFactoryFactory is configured using IBM SSL properties, i.e. + * properties of the format "com.ibm.ssl.propertyName", e.g. + * "com.ibm.ssl.keyStore". The class supports multiple configurations, each + * configuration is identified using a name or configuration ID. The + * configuration ID with "null" is used as a default configuration. When a + * socket factory is being created for a given configuration, properties of that + * configuration are first picked. If a property is not defined there, then that + * property is looked up in the default configuration. Finally, if a property + * element is still not found, then the corresponding system property is + * inspected, i.e. javax.net.ssl.keyStore. If the system property is not set + * either, then the system's default value is used (if available) or an + * exception is thrown. + *
+ * The SSLSocketFacotryFactory can be reconfigured at any time. A + * reconfiguration does not affect existing socket factories. + *
+ * All properties share the same key space; i.e. the configuration ID is not + * part of the property keys. + *
+ * The methods should be called in the following order: + *
CountingInputStream
wrapping the supplied
+ * input stream.
+ */
+ public CountingInputStream(InputStream in) {
+ this.in = in;
+ this.counter = 0;
+ }
+
+ public int read() throws IOException {
+ int i = in.read();
+ if (i != -1) {
+ counter++;
+ }
+ return i;
+ }
+
+ /**
+ * Returns the number of bytes read since the last reset.
+ */
+ public int getCounter() {
+ return counter;
+ }
+
+ /**
+ * Resets the counter to zero.
+ */
+ public void resetCounter() {
+ counter = 0;
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttAck.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttAck.java
new file mode 100644
index 0000000..13ce622
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttAck.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+
+/**
+ * Abstract super-class of all acknowledgement messages.
+ */
+public abstract class MqttAck extends MqttWireMessage {
+ public MqttAck(byte type) {
+ super(type);
+ }
+
+ protected byte getMessageInfo() {
+ return 0;
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnack.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnack.java
new file mode 100644
index 0000000..612c321
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnack.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+/**
+ * An on-the-wire representation of an MQTT CONNACK.
+ */
+public class MqttConnack extends MqttAck {
+ private int returnCode;
+
+ public MqttConnack(byte info, byte[] variableHeader) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_CONNACK);
+ ByteArrayInputStream bais = new ByteArrayInputStream(variableHeader);
+ DataInputStream dis = new DataInputStream(bais);
+ dis.readByte();
+ returnCode = dis.readUnsignedByte();
+ dis.close();
+ }
+
+ public int getReturnCode() {
+ return returnCode;
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ // Not needed, as the client never encodes a CONNACK
+ return new byte[0];
+ }
+
+ /**
+ * Returns whether or not this message needs to include a message ID.
+ */
+ public boolean isMessageIdRequired() {
+ return false;
+ }
+
+ public String getKey() {
+ return new String("Con");
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnect.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnect.java
new file mode 100644
index 0000000..7c42e1f
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttConnect.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * An on-the-wire representation of an MQTT CONNECT message.
+ */
+public class MqttConnect extends MqttWireMessage {
+ private String clientId;
+ private boolean cleanSession;
+ private MqttMessage willMessage;
+ private String userName;
+ private char[] password;
+ private int keepAliveInterval;
+ private String willDestination;
+ public static String KEY="Con";
+
+
+ public MqttConnect(String clientId,
+ boolean cleanSession,
+ int keepAliveInterval,
+ String userName,
+ char[] password,
+ MqttMessage willMessage,
+ String willDestination) {
+ super(MqttWireMessage.MESSAGE_TYPE_CONNECT);
+ this.clientId = clientId;
+ this.cleanSession = cleanSession;
+ this.keepAliveInterval = keepAliveInterval;
+ this.userName = userName;
+ this.password = password;
+ this.willMessage = willMessage;
+ this.willDestination = willDestination;
+ }
+
+ protected byte getMessageInfo() {
+ return (byte) 0;
+ }
+
+ public boolean isCleanSession() {
+ return cleanSession;
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ this.encodeUTF8(dos,"MQIsdp");
+ dos.write(3);
+ byte connectFlags = 0;
+
+ if (cleanSession) {
+ connectFlags |= 0x02;
+ }
+
+ if (willMessage != null ) {
+ connectFlags |= 0x04;
+ connectFlags |= (willMessage.getQos()<<3);
+ if (willMessage.isRetained()) {
+ connectFlags |= 0x20;
+ }
+ }
+
+ if (userName != null) {
+ connectFlags |= 0x80;
+ if (password != null) {
+ connectFlags |= 0x40;
+ }
+ }
+ dos.write(connectFlags);
+ dos.writeShort(keepAliveInterval);
+ dos.flush();
+ return baos.toByteArray();
+ } catch(IOException ioe) {
+ throw new MqttException(ioe);
+ }
+ }
+
+ public byte[] getPayload() throws MqttException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ this.encodeUTF8(dos,clientId);
+
+ if (willMessage != null) {
+ this.encodeUTF8(dos,willDestination);
+ dos.writeShort(willMessage.getPayload().length);
+ dos.write(willMessage.getPayload());
+ }
+
+ if (userName != null) {
+ this.encodeUTF8(dos,userName);
+ if (password != null) {
+ this.encodeUTF8(dos,new String(password));
+ }
+ }
+ dos.flush();
+ return baos.toByteArray();
+ }
+ catch (IOException ex) {
+ throw new MqttException(ex);
+ }
+ }
+
+ /**
+ * Returns whether or not this message needs to include a message ID.
+ */
+ public boolean isMessageIdRequired() {
+ return false;
+ }
+
+ public String getKey() {
+ return new String(KEY);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttDisconnect.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttDisconnect.java
new file mode 100644
index 0000000..1c48d47
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttDisconnect.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+/**
+ * An on-the-wire representation of an MQTT DISCONNECT message.
+ */
+public class MqttDisconnect extends MqttWireMessage {
+ public static String KEY="Disc";
+
+ public MqttDisconnect() {
+ super(MqttWireMessage.MESSAGE_TYPE_DISCONNECT);
+ }
+
+ protected byte getMessageInfo() {
+ return (byte) 0;
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return new byte[0];
+ }
+
+ /**
+ * Returns whether or not this message needs to include a message ID.
+ */
+ public boolean isMessageIdRequired() {
+ return false;
+ }
+
+ public String getKey() {
+ return new String(KEY);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttInputStream.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttInputStream.java
new file mode 100644
index 0000000..9eaec05
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttInputStream.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.internal.ExceptionHelper;
+
+
+/**
+ * An MqttInputStream
lets applications read instances of
+ * MqttWireMessage
.
+ */
+public class MqttInputStream extends InputStream {
+ private DataInputStream in;
+
+ public MqttInputStream(InputStream in) {
+ this.in = new DataInputStream(in);
+ }
+
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Reads an MqttWireMessage
from the stream.
+ */
+ public MqttWireMessage readMqttWireMessage() throws IOException, MqttException {
+ ByteArrayOutputStream bais = new ByteArrayOutputStream();
+ byte first = in.readByte();
+ byte type = (byte) ((first >>> 4) & 0x0F);
+ if ((type < MqttWireMessage.MESSAGE_TYPE_CONNECT) ||
+ (type > MqttWireMessage.MESSAGE_TYPE_DISCONNECT)) {
+ // Invalid MQTT message type...
+ throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_INVALID_MESSAGE);
+ }
+ long remLen = MqttWireMessage.readMBI(in).getValue();
+ bais.write(first);
+ // bit silly, we decode it then encode it
+ bais.write(MqttWireMessage.encodeMBI(remLen));
+ byte[] packet = new byte[(int)(bais.size()+remLen)];
+ in.readFully(packet,bais.size(),packet.length - bais.size());
+ byte[] header = bais.toByteArray();
+ System.arraycopy(header,0,packet,0, header.length);
+ MqttWireMessage message = MqttWireMessage.createWireMessage(packet);
+ return message;
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java
new file mode 100644
index 0000000..5df98c6
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttOutputStream.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+/**
+ * An MqttOutputStream
lets applications write instances of
+ * MqttWireMessage
.
+ */
+public class MqttOutputStream extends OutputStream {
+ private BufferedOutputStream out;
+
+ public MqttOutputStream(OutputStream out) {
+ this.out = new BufferedOutputStream(out);
+ }
+
+ public void close() throws IOException {
+ out.close();
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ /**
+ * Writes an MqttWireMessage
to the stream.
+ */
+ public void write(MqttWireMessage message) throws IOException, MqttException {
+ byte[] bytes = message.getHeader();
+ byte[] pl = message.getPayload();
+// out.write(message.getHeader());
+// out.write(message.getPayload());
+ out.write(bytes,0,bytes.length);
+ out.write(pl,0,pl.length);
+ }
+}
+
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPersistableWireMessage.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPersistableWireMessage.java
new file mode 100644
index 0000000..bc2d39f
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPersistableWireMessage.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttPersistable;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+
+public abstract class MqttPersistableWireMessage extends MqttWireMessage
+ implements MqttPersistable {
+
+ public MqttPersistableWireMessage(byte type) {
+ super(type);
+ }
+
+ public byte[] getHeaderBytes() throws MqttPersistenceException {
+ try {
+ return getHeader();
+ }
+ catch (MqttException ex) {
+ throw new MqttPersistenceException(ex.getCause());
+ }
+ }
+
+ public int getHeaderLength() throws MqttPersistenceException {
+ return getHeaderBytes().length;
+ }
+
+ public int getHeaderOffset() throws MqttPersistenceException{
+ return 0;
+ }
+
+// public String getKey() throws MqttPersistenceException {
+// return new Integer(getMessageId()).toString();
+// }
+
+ public byte[] getPayloadBytes() throws MqttPersistenceException {
+ try {
+ return getPayload();
+ }
+ catch (MqttException ex) {
+ throw new MqttPersistenceException(ex.getCause());
+ }
+ }
+
+ public int getPayloadLength() throws MqttPersistenceException {
+ return 0;
+ }
+
+ public int getPayloadOffset() throws MqttPersistenceException {
+ return 0;
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingReq.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingReq.java
new file mode 100644
index 0000000..4f397fd
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingReq.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+/**
+ * An on-the-wire representation of an MQTT PINGREQ message.
+ */
+public class MqttPingReq extends MqttWireMessage {
+ public MqttPingReq() {
+ super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);
+ }
+
+ /**
+ * Returns false
as message IDs are not required for MQTT
+ * PINGREQ messages.
+ */
+ public boolean isMessageIdRequired() {
+ return false;
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return new byte[0];
+ }
+
+ protected byte getMessageInfo() {
+ return 0;
+ }
+
+ public String getKey() {
+ return new String("Ping");
+ }
+}
+
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingResp.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingResp.java
new file mode 100644
index 0000000..abb28d3
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPingResp.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+/**
+ * An on-the-wire representation of an MQTT PINGRESP.
+ */
+public class MqttPingResp extends MqttAck {
+ public MqttPingResp(byte info, byte[] variableHeader) {
+ super(MqttWireMessage.MESSAGE_TYPE_PINGRESP);
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ // Not needed, as the client never encodes a PINGRESP
+ return new byte[0];
+ }
+
+ /**
+ * Returns whether or not this message needs to include a message ID.
+ */
+ public boolean isMessageIdRequired() {
+ return false;
+ }
+
+ public String getKey() {
+ return new String("Ping");
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubAck.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubAck.java
new file mode 100644
index 0000000..4b92a43
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubAck.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+
+/**
+ * An on-the-wire representation of an MQTT PUBACK message.
+ */
+public class MqttPubAck extends MqttAck {
+ public MqttPubAck(byte info, byte[] data) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBACK);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+ msgId = dis.readUnsignedShort();
+ dis.close();
+ }
+
+ public MqttPubAck(MqttPublish publish) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBACK);
+ msgId = publish.getMessageId();
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return encodeMessageId();
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubComp.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubComp.java
new file mode 100644
index 0000000..3176e0d
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubComp.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+
+/**
+ * An on-the-wire representation of an MQTT PUBCOMP message.
+ */
+public class MqttPubComp extends MqttAck {
+ public MqttPubComp(byte info, byte[] data) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBCOMP);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+ msgId = dis.readUnsignedShort();
+ dis.close();
+ }
+
+ public MqttPubComp(MqttPublish publish) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBCOMP);
+ this.msgId = publish.getMessageId();
+ }
+
+ public MqttPubComp(int msgId) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBCOMP);
+ this.msgId = msgId;
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return encodeMessageId();
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRec.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRec.java
new file mode 100644
index 0000000..864da02
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRec.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+
+/**
+ * An on-the-wire representation of an MQTT PUBREC message.
+ */
+public class MqttPubRec extends MqttAck {
+ public MqttPubRec(byte info, byte[] data) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBREC);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+ msgId = dis.readUnsignedShort();
+ dis.close();
+ }
+
+ public MqttPubRec(MqttPublish publish) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBREC);
+ msgId = publish.getMessageId();
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return encodeMessageId();
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRel.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRel.java
new file mode 100644
index 0000000..8c12d4c
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPubRel.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+/**
+ * An on-the-wire representation of an MQTT PUBREL message.
+ */
+public class MqttPubRel extends MqttPersistableWireMessage {
+
+ /**
+ * Createa a pubrel message based on a pubrec
+ * @param pubRec
+ */
+ public MqttPubRel(MqttPubRec pubRec) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBREL);
+ this.setMessageId(pubRec.getMessageId());
+ }
+
+ /**
+ * Creates a pubrel based on a pubrel set of bytes read fro the network
+ * @param info
+ * @param data
+ * @throws IOException
+ */
+ public MqttPubRel(byte info, byte[] data) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBREL);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+ msgId = dis.readUnsignedShort();
+ dis.close();
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ return encodeMessageId();
+ }
+
+ protected byte getMessageInfo() {
+ return (byte)( 2 | (this.duplicate?8:0));
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPublish.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPublish.java
new file mode 100644
index 0000000..7c9d620
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttPublish.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+/**
+ * An on-the-wire representation of an MQTT SEND message.
+ */
+public class MqttPublish extends MqttPersistableWireMessage {
+
+ private MqttMessage message;
+ private String topicName;
+
+ private byte[] encodedPayload = null;
+
+ public MqttPublish(String name, MqttMessage message) {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBLISH);
+ this.topicName = name;
+ this.message = message;
+ }
+
+ /**
+ * Constructs a new MqttPublish object.
+ * @param info the message info byte
+ * @param data the variable header and payload bytes
+ */
+ public MqttPublish(byte info, byte[] data) throws MqttException, IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_PUBLISH);
+ this.message = new MqttReceivedMessage();
+ message.setQos((info >> 1) & 0x03);
+ if ((info & 0x01) == 0x01) {
+ message.setRetained(true);
+ }
+ if ((info & 0x08) == 0x08) {
+ ((MqttReceivedMessage) message).setDuplicate(true);
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ CountingInputStream counter = new CountingInputStream(bais);
+ DataInputStream dis = new DataInputStream(counter);
+ topicName = this.decodeUTF8(dis);
+ if (message.getQos() > 0) {
+ msgId = dis.readUnsignedShort();
+ }
+ byte[] payload = new byte[data.length-counter.getCounter()];
+ dis.readFully(payload);
+ dis.close();
+ message.setPayload(payload);
+ }
+
+ protected byte getMessageInfo() {
+ byte info = (byte) (message.getQos() << 1);
+ if (message.isRetained()) {
+ info |= 0x01;
+ }
+ if (message.isDuplicate() || duplicate ) {
+ info |= 0x08;
+ }
+
+ return info;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public MqttMessage getMessage() {
+ return message;
+ }
+
+ protected static byte[] encodePayload(MqttMessage message) {
+ return message.getPayload();
+ }
+
+ public byte[] getPayload() throws MqttException {
+ if (encodedPayload == null) {
+ encodedPayload = encodePayload(message);
+ }
+ return encodedPayload;
+ }
+
+ public int getPayloadLength() {
+ int length = 0;
+ try {
+ length = getPayload().length;
+ } catch(MqttException me) {
+ }
+ return length;
+ }
+
+ public void setMessageId(int msgId) {
+ super.setMessageId(msgId);
+ if (message instanceof MqttReceivedMessage) {
+ ((MqttReceivedMessage)message).setMessageId(msgId);
+ }
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ this.encodeUTF8(dos, topicName);
+ if (message.getQos() > 0) {
+ dos.writeShort(msgId);
+ }
+ dos.flush();
+ return baos.toByteArray();
+ }
+ catch (IOException ex) {
+ throw new MqttException(ex);
+ }
+ }
+
+ public boolean isMessageIdRequired() {
+ // all publishes require a message ID as it's used as the key to the token store
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttReceivedMessage.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttReceivedMessage.java
new file mode 100644
index 0000000..68773d0
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttReceivedMessage.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+public class MqttReceivedMessage extends MqttMessage {
+
+ private int messageId;
+
+ public void setMessageId(int msgId) {
+ this.messageId = msgId;
+ }
+
+ public int getMessageId() {
+ return messageId;
+ }
+
+ // This method exists here to get around the protected visibility of the
+ // super class method.
+ public void setDuplicate(boolean value) {
+ super.setDuplicate(value);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSuback.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSuback.java
new file mode 100644
index 0000000..39ef1f0
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSuback.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+
+/**
+ * An on-the-wire representation of an MQTT SUBACK.
+ */
+public class MqttSuback extends MqttAck {
+ private int[] grantedQos; // Not currently made available to anyone.
+
+ public MqttSuback(byte info, byte[] data) throws IOException {
+ super(MqttWireMessage.MESSAGE_TYPE_SUBACK);
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputStream dis = new DataInputStream(bais);
+ msgId = dis.readUnsignedShort();
+ int index = 0;
+ grantedQos = new int[data.length-2];
+ int qos = dis.read();
+ while (qos != -1) {
+ grantedQos[index] = qos;
+ index++;
+ qos = dis.read();
+ }
+ dis.close();
+ }
+
+ protected byte[] getVariableHeader() throws MqttException {
+ // Not needed, as the client never encodes a SUBACK
+ return new byte[0];
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSubscribe.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSubscribe.java
new file mode 100644
index 0000000..0eb68fe
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/internal/wire/MqttSubscribe.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.internal.wire;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+
+/**
+ * An on-the-wire representation of an MQTT SUBSCRIBE message.
+ */
+public class MqttSubscribe extends MqttWireMessage {
+ private String[] names;
+ private int[] qos;
+
+ /**
+ * Constructor for on an on hte wire MQTT subscribe message
+ * @param names - one or more topics to subscribe to
+ * @param qos - the max QOS that each each topic will be subscribed at
+ */
+ public MqttSubscribe(String[] names, int[] qos) {
+ super(MqttWireMessage.MESSAGE_TYPE_SUBSCRIBE);
+ this.names = names;
+ this.qos = qos;
+
+ if (names.length != qos.length) {
+ throw new IllegalArgumentException();
+ }
+
+ for (int i=0;i+ * Clients should use the the convenience methods such as severe() and fine() or + * one of the predefined level constants such as Logger.SEVERE and Logger.FINE + * with the appropriate log(int level...) or trace(int level...) methods. + *
+ * The levels in descending order are: + *
+ */ +public interface Logger { + /** + * SEVERE is a message level indicating a serious failure. + *
+ * In general SEVERE messages should describe events that are of + * considerable importance and which will prevent normal program execution. + * They should be reasonably intelligible to end users and to system + * administrators. + */ + public static final int SEVERE = 1; + /** + * WARNING is a message level indicating a potential problem. + *
+ * In general WARNING messages should describe events that will be of + * interest to end users or system managers, or which indicate potential + * problems. + */ + public static final int WARNING = 2; + /** + * INFO is a message level for informational messages. + *
+ * Typically INFO messages will be written to the console or its equivalent. + * So the INFO level should only be used for reasonably significant messages + * that will make sense to end users and system admins. + */ + public static final int INFO = 3; + /** + * CONFIG is a message level for static configuration messages. + *
+ * CONFIG messages are intended to provide a variety of static configuration + * information, to assist in debugging problems that may be associated with + * particular configurations. For example, CONFIG message might include the + * CPU type, the graphics depth, the GUI look-and-feel, etc. + */ + public static final int CONFIG = 4; + /** + * FINE is a message level providing tracing information. + *
+ * All of FINE, FINER, and FINEST are intended for relatively detailed + * tracing. The exact meaning of the three levels will vary between + * subsystems, but in general, FINEST should be used for the most voluminous + * detailed output, FINER for somewhat less detailed output, and FINE for + * the lowest volume (and most important) messages. + *
+ * In general the FINE level should be used for information that will be + * broadly interesting to developers who do not have a specialized interest + * in the specific subsystem. + *
+ * FINE messages might include things like minor (recoverable) failures.
+ * Issues indicating potential performance problems are also worth logging
+ * as FINE.
+ */
+ public static final int FINE = 5;
+ /**
+ * FINER indicates a fairly detailed tracing message. By default logging
+ * calls for entering, returning, or throwing an exception are traced at
+ * this level.
+ */
+ public static final int FINER = 6;
+ /**
+ * FINEST indicates a highly detailed tracing message.
+ */
+ public static final int FINEST = 7;
+
+ public void initialise(ResourceBundle messageCatalog, String loggerID, String resourceName);
+
+ /**
+ * Set a name that can be used to provide context with each log record.
+ * This overrides the value passed in on initialise
+ */
+ public void setResourceName(String logContext);
+
+ /**
+ * Check if a message of the given level would actually be logged by this
+ * logger. This check is based on the Loggers effective level, which may be
+ * inherited from its parent.
+ *
+ * @param level
+ * a message logging level.
+ * @return true if the given message level is currently being logged.
+ */
+ public boolean isLoggable(int level);
+
+ /**
+ * Log a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used.
+ */
+ public void severe(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void severe(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ * @param thrown
+ * Throwable associated with log message.
+ */
+ public void severe(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable thrown);
+
+ /**
+ * Log a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used.
+ */
+ public void warning(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void warning(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ * @param thrown
+ * Throwable associated with log message.
+ */
+ public void warning(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable thrown);
+
+ /**
+ * Log a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used.
+ */
+ public void info(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void info(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ * @param thrown
+ * Throwable associated with log message.
+ */
+ public void info(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable thrown);
+
+ /**
+ * Log a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used.
+ */
+ public void config(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void config(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ * @param thrown
+ * Throwable associated with log message.
+ */
+ public void config(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable thrown);
+
+ /**
+ * Trace a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used.
+ */
+ public void fine(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Trace a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used. The
+ * formatter uses java.text.MessageFormat style formatting to
+ * format parameters, so for example a format string "{0} {1}"
+ * would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void fine(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ public void fine(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable ex);
+
+ /**
+ * Trace a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used.
+ */
+ public void finer(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Trace a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used. The
+ * formatter uses java.text.MessageFormat style formatting to
+ * format parameters, so for example a format string "{0} {1}"
+ * would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void finer(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ public void finer(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable ex);
+
+ /**
+ * Trace a message, specifying source class and method, if the logger is
+ * currently enabled for the given message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used.
+ */
+ public void finest(String sourceClass, String sourceMethod, String msg);
+
+ /**
+ * Trace a message, specifying source class and method, with an array of
+ * object arguments, if the logger is currently enabled for the given
+ * message level.
+ *
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used. The
+ * formatter uses java.text.MessageFormat style formatting to
+ * format parameters, so for example a format string "{0} {1}"
+ * would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ */
+ public void finest(String sourceClass, String sourceMethod, String msg, Object[] inserts);
+
+ public void finest(String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable ex);
+
+ /**
+ * Log a message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param level
+ * One of the message level identifiers, e.g. SEVERE.
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message, may be null.
+ * @param thrown
+ * Throwable associated with log message.
+ */
+ public void log(int level, String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable thrown);
+
+ /**
+ * Log a trace message, specifying source class and method, with an array of
+ * object arguments and a throwable, if the logger is currently enabled for
+ * the given message level.
+ *
+ * @param level
+ * One of the message level identifiers, e.g. SEVERE.
+ * @param sourceClass
+ * Name of class that issued the logging request.
+ * @param sourceMethod
+ * Name of method that issued the logging request.
+ * @param msg
+ * The key in the message catalog for the message or the actual
+ * message itself. During formatting, if the logger has a mapping
+ * for the msg string, then the msg string is replaced by the
+ * value. Otherwise the original msg string is used. The
+ * formatter uses java.text.MessageFormat style formatting to
+ * format parameters, so for example a format string "{0} {1}"
+ * would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message, may be null.
+ */
+ public void trace(int level, String sourceClass, String sourceMethod, String msg, Object[] inserts, Throwable ex);
+
+ /**
+ * Format a log message without causing it to be written to the log.
+ *
+ * @param msg
+ * The key in the message localization catalog for the message or
+ * the actual message itself. During formatting, if the logger
+ * has a mapping for the msg string, then the msg string is
+ * replaced by the localized value. Otherwise the original msg
+ * string is used. The formatter uses java.text.MessageFormat
+ * style formatting to format parameters, so for example a format
+ * string "{0} {1}" would format two inserts into the message.
+ * @param inserts
+ * Array of parameters to the message.
+ * @return The formatted message for the current locale.
+ */
+ public String formatMessage(String msg, Object[] inserts);
+
+ public void dumpTrace();
+}
\ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/LoggerFactory.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/LoggerFactory.java
new file mode 100644
index 0000000..348b185
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/LoggerFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.logging;
+
+import java.lang.reflect.Method;
+
+/**
+ * LoggerFactory will create a logger instance ready for use by the caller.
+ *
+ * The default is to create a logger that utilises the Java's built in
+ * logging facility java.util.logging (JSR47). It is possible to override
+ * this for systems where JSR47 is not available or an alternative logging
+ * facility is needed by using setLogger and passing the the class name of
+ * a logger that implements {@link Logger}
+ */
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+/**
+ * A factory that returns a logger for use by the MQTT client.
+ *
+ * The default log and trace facility uses Java's build in log facility:-
+ * java.util.logging. For systems where this is not available or where
+ * an alternative logging framework is required the logging facility can be
+ * replaced using {@link org.eclipse.paho.client.mqttv3.logging.LoggerFactory#setLogger(String)}
+ * which takes an implementation of the {@link org.eclipse.paho.client.mqttv3.logging.Logger}
+ * interface.
+ */
+public class LoggerFactory {
+ /**
+ * Default message catalog.
+ */
+ public final static String MQTT_CLIENT_MSG_CAT = "org.eclipse.paho.client.mqttv3.internal.nls.logcat";
+ private final static String className = LoggerFactory.class.getName();
+
+ private static String overrideloggerClassName = null;
+ /**
+ * Default logger that uses java.util.logging.
+ */
+ private static String jsr47LoggerClassName = "org.eclipse.paho.client.mqttv3.logging.JSR47Logger";
+
+ /**
+ * Find or create a logger for a named package/class.
+ * If a logger has already been created with the given name
+ * it is returned. Otherwise a new logger is created. By default a logger
+ * that uses java.util.logging will be returned.
+ *
+ * @param messageCatalogName the resource bundle containing the logging messages.
+ * @param loggerID unique name to identify this logger.
+ * @return a suitable Logger.
+ * @throws Exception
+ */
+ public static Logger getLogger(String messageCatalogName, String loggerID) {
+ String loggerClassName = overrideloggerClassName;
+ Logger logger = null;
+
+ if (loggerClassName == null) {
+ loggerClassName = jsr47LoggerClassName;
+ }
+// logger = getJSR47Logger(ResourceBundle.getBundle(messageCatalogName), loggerID, null) ;
+ logger = getLogger(loggerClassName, ResourceBundle.getBundle(messageCatalogName), loggerID, null) ;
+// }
+
+ if (null == logger) {
+ throw new MissingResourceException("Error locating the logging class", className, loggerID);
+ }
+
+ return logger;
+ }
+
+
+ /**
+ * Return an instance of a logger
+ *
+ * @param the class name of the load to load
+ * @param messageCatalog the resource bundle containing messages
+ * @param loggerID an identifier for the logger
+ * @param resourceName a name or context to associate with this logger instance.
+ * @return a ready for use logger
+ */
+ private static Logger getLogger(String loggerClassName, ResourceBundle messageCatalog, String loggerID, String resourceName) { //, FFDC ffdc) {
+ Logger logger = null;
+ Class logClass = null;
+
+ try {
+ logClass = Class.forName(loggerClassName);
+ } catch (NoClassDefFoundError ncdfe) {
+ return null;
+ } catch (ClassNotFoundException cnfe) {
+ return null;
+ }
+ if (null != logClass) {
+ // Now instantiate the log
+ try {
+ logger = (Logger)logClass.newInstance();
+ } catch (IllegalAccessException e) {
+ return null;
+ } catch (InstantiationException e) {
+ return null;
+ } catch (ExceptionInInitializerError e) {
+ return null;
+ } catch (SecurityException e) {
+ return null;
+ }
+ logger.initialise(messageCatalog, loggerID, resourceName);
+ }
+
+ return logger;
+ }
+
+ /**
+ * When run in JSR47, this allows access to the properties in the logging.properties
+ * file.
+ * If not run in JSR47, or the property isn't set, returns null.
+ * @param name the property to return
+ * @return the property value, or null if it isn't set or JSR47 isn't being used
+ */
+ public static String getLoggingProperty(String name) {
+ String result = null;
+ try {
+ // Hide behind reflection as java.util.logging is guaranteed to be
+ // available.
+ Class logManagerClass = Class.forName("java.util.logging.LogManager");
+ Method m1 = logManagerClass.getMethod("getLogManager", new Class[]{});
+ Object logManagerInstance = m1.invoke(null, null);
+ Method m2 = logManagerClass.getMethod("getProperty", new Class[]{String.class});
+ result = (String)m2.invoke(logManagerInstance,new Object[]{name});
+ } catch(Exception e) {
+ // Any error, assume JSR47 isn't available and return null
+ result = null;
+ }
+ return result;
+ }
+
+ /**
+ * Set the class name of the logger that the LoggerFactory will load
+ * If not set getLogger will attempt to create a logger
+ * appropriate for the platform.
+ * @param loggerClassName - Logger implementation class name to use.
+ */
+ public static void setLogger(String loggerClassName) {
+ LoggerFactory.overrideloggerClassName = loggerClassName;
+ }
+}
\ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/SimpleLogFormatter.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/SimpleLogFormatter.java
new file mode 100644
index 0000000..893db7f
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/SimpleLogFormatter.java
@@ -0,0 +1,91 @@
+package org.eclipse.paho.client.mqttv3.logging;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.logging.Formatter;
+import java.util.logging.LogRecord;
+
+/**
+ * SimpleLogFormatter prints a single line
+ * log record in human readable form.
+ */
+public class SimpleLogFormatter extends Formatter {
+
+ final String ls = System.getProperty("line.separator");
+ /**
+ * Constructs a The default log and trace facility uses Java's build in log facility:-
+java.util.logging. For systems where this is not available or where
+an alternative logging framework is required the logging facility can be
+replaced using {@link org.eclipse.paho.client.mqttv3.logging.LoggerFactory#setLogger(String)}
+which takes an implementation of the {@link org.eclipse.paho.client.mqttv3.logging.Logger}
+interface.
+
+ A sample java.util.logging properties file - jsr47min.properties is provided that demonstrates
+how to run with a memory based trace facility that runs with minimal performance
+overhead. The memory buffer can be dumped when a log/trace record is written matching
+the MemoryHandlers trigger level or when the push method is invoked on the MemoryHandler.
+{@link org.eclipse.paho.client.mqttv3.util.Debug Debug} provides method to make it easy
+to dump the memory buffer as well as other useful debug info.
+
+
\ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/package.html b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/package.html
new file mode 100644
index 0000000..1adc965
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/package.html
@@ -0,0 +1,127 @@
+
+The MQ Telemetry Transport (MQTT) is a lightweight broker-based publish/subscribe
+messaging protocol designed to be open, simple, lightweight and easy to implement.
+These characteristics make it ideal for use in constrained environments, for example,
+but not limited to:
+ Features of the protocol include:
+ The basic means of operating the client is: The programming model and concepts like the protocol are small and easy to use. Key concepts
+to use when creating MQTT application include:
+
+An MQTT client needs a persistence mechanism to store messages while they
+are in the process of being delivered. This package contains several
+implementations of the interface. If a persistence class is not
+specified on the constructor to an MQTT client,
+{@link org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence MqttDefaultFilePersistence}
+is used by default.
+
+SimpleFormatter
object.
+ */
+ public SimpleLogFormatter() {
+ super();
+ }
+
+ /**
+ * Format the logrecord as a single line with well defined columns.
+ */
+ public String format(LogRecord r) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(r.getLevel().getName()+"\t");
+ sb.append(MessageFormat.format("{0, date, yy-MM-dd} {0, time, kk:mm:ss.SSSS} ",
+ new Object[] { new Date(r.getMillis()) })+"\t");
+ String cnm = r.getSourceClassName();
+ String cn="";
+ if (cnm != null) {
+ int cnl = cnm.length();
+ if (cnl>20) {
+ cn = r.getSourceClassName().substring(cnl-19);
+ } else {
+ char sp[] = {' '};
+ StringBuffer sb1= new StringBuffer().append(cnm);
+ cn = sb1.append(sp,0, 1).toString();
+ }
+ }
+ sb.append(cn+"\t").append(" ");
+ sb.append(left(r.getSourceMethodName(),23,' ')+"\t");
+ sb.append(r.getThreadID()+"\t");
+ sb.append(formatMessage(r)).append(ls);
+ if (null != r.getThrown()) {
+ sb.append("Throwable occurred: ");
+ Throwable t = r.getThrown();
+ PrintWriter pw = null;
+ try {
+ StringWriter sw = new StringWriter();
+ pw = new PrintWriter(sw);
+ t.printStackTrace(pw);
+ sb.append(sw.toString());
+ } finally {
+ if (pw != null) {
+ try {
+ pw.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Left justify a string.
+ *
+ * @param s the string to justify
+ * @param width the field width to justify within
+ * @param fillChar the character to fill with
+ *
+ * @return the justified string.
+ */
+ public static String left(String s, int width, char fillChar) {
+ if (s.length() >= width) {
+ return s;
+ }
+ StringBuffer sb = new StringBuffer(width);
+ sb.append(s);
+ for (int i = width - s.length(); --i >= 0;) {
+ sb.append(fillChar);
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/jsr47min.properties b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/jsr47min.properties
new file mode 100644
index 0000000..0626551
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/logging/jsr47min.properties
@@ -0,0 +1,83 @@
+# Properties file which configures the operation of the JDK logging facility.
+#
+# The configuration in this file is the suggesgted configuration
+# for collecting trace for helping debug problems related to the
+# Paho MQTT client. It configures trace to be continuosly collected
+# in memory with minimal impact on performance.
+#
+# When the push trigger (by default a Severe level message) or a
+# specific request is made to "push" the in memory trace then it
+# is "pushed" to the configured target handler. By default
+# this is the standard java.util.logging.FileHandler. The Paho Debug
+# class can be used to push the memory trace to its target
+#
+# To enable trace either:
+# - use this properties file as is and set the logging facility up
+# to use it by configuring the util logging system property e.g.
+#
+# >java -Djava.util.logging.config.file=
+
+
+
+
+
+
+ The quality of service for message delivery is met even if the network connection
+ breaks, or the client or the server stop while a message is being delivered
+
+
+
+connect
to the server
+
+ publish messages
to the server,
+ via a topic
.subscribe
to one more topics
. The server will send any messages
+ it receives on those topics to the client. The client will be informed when a message
+ arrives via a callback
+ disconnect
from the server.
+
+
+
\ No newline at end of file
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MemoryPersistence.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MemoryPersistence.java
new file mode 100644
index 0000000..de1a209
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MemoryPersistence.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.persist;
+
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttPersistable;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+
+/**
+ * Persistence that uses memory
+ *
+ * In cases where reliability is not required across client or device
+ * restarts memory this memory peristence can be used. In cases where
+ * reliability is required like when clean session is set to false
+ * then a non-volatile form of persistence should be used.
+ *
+ */
+public class MemoryPersistence implements MqttClientPersistence {
+
+ private Hashtable data;
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#close()
+ */
+ public void close() throws MqttPersistenceException {
+ data.clear();
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#keys()
+ */
+ public Enumeration keys() throws MqttPersistenceException {
+ return data.keys();
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#get(java.lang.String)
+ */
+ public MqttPersistable get(String key) throws MqttPersistenceException {
+ return (MqttPersistable)data.get(key);
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#open(java.lang.String, java.lang.String)
+ */
+ public void open(String clientId, String serverURI) throws MqttPersistenceException {
+ this.data = new Hashtable();
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#put(java.lang.String, org.eclipse.paho.client.mqttv3.MqttPersistable)
+ */
+ public void put(String key, MqttPersistable persistable) throws MqttPersistenceException {
+ data.put(key, persistable);
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#remove(java.lang.String)
+ */
+ public void remove(String key) throws MqttPersistenceException {
+ data.remove(key);
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#clear()
+ */
+ public void clear() throws MqttPersistenceException {
+ data.clear();
+ }
+
+ /* (non-Javadoc)
+ * @see org.eclipse.paho.client.mqttv3.MqttClientPersistence#containsKey(java.lang.String)
+ */
+ public boolean containsKey(String key) throws MqttPersistenceException {
+ return data.containsKey(key);
+ }
+}
diff --git a/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java
new file mode 100644
index 0000000..2e4d44a
--- /dev/null
+++ b/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/persist/MqttDefaultFilePersistence.java
@@ -0,0 +1,288 @@
+/*
+ * Copyright (c) 2009, 2012 IBM Corp.
+ *
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Dave Locke - initial API and implementation and/or initial documentation
+ */
+package org.eclipse.paho.client.mqttv3.persist;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Vector;
+
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttPersistable;
+import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
+import org.eclipse.paho.client.mqttv3.internal.FileLock;
+import org.eclipse.paho.client.mqttv3.internal.MqttPersistentData;
+
+/**
+ * An implementation of the {@link MqttClientPersistence} interface that provides
+ * file based persistence.
+ *
+ * A directory is specified when the Persistence object is created. When the persistence
+ * is then opened (see {@link #open(String, String)}), a sub-directory is made beneath the base
+ * for this client ID and connection key. This allows one persistence base directory
+ * to be shared by multiple clients.
+ *
+ * The sub-directory's name is created from a concatenation of the client ID and connection key
+ * with any instance of '/', '\\', ':' or ' ' removed.
+ */
+public class MqttDefaultFilePersistence implements MqttClientPersistence {
+
+ private File dataDir;
+ private File clientDir = null;
+ private FileLock fileLock = null;
+ private static final String MESSAGE_FILE_EXTENSION = ".msg";
+ private static final String MESSAGE_BACKUP_FILE_EXTENSION = ".bup";
+ private static final String LOCK_FILENAME = ".lck";
+
+ private static final FilenameFilter FILE_FILTER = new FilenameFilter() {
+ public boolean accept(File dir, String name) { return name.endsWith(MESSAGE_FILE_EXTENSION); }
+ };
+
+ public MqttDefaultFilePersistence() { //throws MqttPersistenceException {
+ this(System.getProperty("user.dir"));
+ }
+
+ /**
+ * Create an file-based persistent data store within the specified directory.
+ * @param directory the directory to use.
+ */
+ public MqttDefaultFilePersistence(String directory) { //throws MqttPersistenceException {
+ dataDir = new File(directory);
+ }
+
+ public void open(String clientId, String theConnection) throws MqttPersistenceException {
+
+ if (dataDir.exists() && !dataDir.isDirectory()) {
+ throw new MqttPersistenceException();
+ } else if (!dataDir.exists() ) {
+ if (!dataDir.mkdirs()) {
+ throw new MqttPersistenceException();
+ }
+ }
+ if (!dataDir.canWrite()) {
+ throw new MqttPersistenceException();
+ }
+
+
+ StringBuffer keyBuffer = new StringBuffer();
+ for (int i=0;i
+
+ If set to true:
+
+
+
+
+
+
+
+
+ A client registers interest in these notifications by registering a
+ {@link org.eclipse.paho.client.mqttv3.MqttCallback MqttCallback} on the client
+
+
+