Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[paho-dev] Multiple Messages on subscribing

Hello,

with paho-mqtt i publish 10656 Datasets (Messages) in a loop to
mosquitto with qos level 2.
Through the iteration i count the messages and print out the total at
the end.

Parallel i proceed the messages via paho-mqtt subscribing (level qos:
1). In my tests my client proceed currently about more than 19742 times
and i do not know why.

Do you have any clue why this can be happen?

I expect the behaviour that one message will be send, one client will
process the message one time and good.


Here is my source for the client (python)


#!/usr/bin/env python3

import paho.mqtt.client as mqtt
import pgdb
import sys



# MQTT parameter
MQTT_HOST="10.0.1.94"
MQTT_PORT=1883
MQTT_TOPIC="xxx/pcap"
MQTT_CLIENT_ID="xxx_app1"
MQTT_QOS=1   # Only 0 or 1 is valid!


# Database parameter
DB_HOST="10.0.1.98"
DB_PORT=5432
DB_DATABASE="xxxx"
DB_USERNAME="xxx"
DB_PASSWORD="xxx"


# Establish connection to postgresql database
dbConnection = pgdb.connect(host=DB_HOST, user=DB_USERNAME,
password=DB_PASSWORD, database=DB_DATABASE)

# Prepare statement
cursor = dbConnection.cursor()
insertQuery =  "INSERT INTO raw_client_data (frame_time,
radiotap_present_channel, radiotap_present_dbm_antsignal,
radiotap_channel_freq, radiotap_channel_type, radiotap_dbm_antsignal,
wlan_addr, wlan_addr_resolved, data_data, tracker_serial, sequence_id)
VALUES (%s, %i,%s, %s, %s, %s, %s, %s, %s, %s, %i);"

i_data=0 # Iterator for dataset


# This is the MQTT Broker Subscriber
def on_connect(client, userdata, flags, rc):
    print("Connected with result code " + str(rc))

    client.subscribe(MQTT_TOPIC, MQTT_QOS)

# This is the MQTT Broker onDisconnect
def on_disconnect(client, userdata, rc):
    while rc != 0:
        print ("Unexpected MQTT disconnection. Will auto-reconnect")
        rc = client.reconnect()


# On every incoming message
def on_message(client, userdata, msg):
    global i_data
    global dbConnection

    print( str(i_data) + ". " + (msg.payload.decode()) + "' on topic '"
+ msg.topic + "' with QoS " + str(msg.qos))

    if "frame.number" not in msg.payload.decode() :
        print(msg.payload.decode())

        # Split row
        splittedString = msg.payload.decode().split(';')

        if len(splittedString) == 11 :
            # Only if given string contains all elements

            #print("Länge: " + str(len(splittedString)))

            if str(splittedString[7]) != '' :

                i_data = i_data + 1
                #print("Länge: " + str(len(splittedString)))

                #print(splittedString)

                # set variables
                tracker_serial = splittedString[0].replace('"', '')
                frame_time = splittedString[2].replace('"', '')
                radiotap_present_channel =
int(splittedString[3].replace('"', ''))
                radiotap_present_dbm_antsignal =
splittedString[4].replace('"', '')
                radiotap_channel_freq = splittedString[5].replace('"', '')
                radiotap_channel_type = splittedString[6].replace('"', '')
                radiotap_dbm_antsignal =
float(splittedString[7].replace('"', ''))
                wlan_addr = splittedString[8].replace('"', '')
                wlan_addr_resolved = splittedString[9].replace('"', '')
                #data_data = splittedString[9].replace('"', '')
                data_data = ''
                sequence_id = 0

                #print("Tracker: " + tracker_serial)

                # Ignore some entries cause we do not need them
                if ("ff:ff:ff:ff:ff:ff" not in wlan_addr) and (wlan_addr):
                    # Insert row into database
                    data = (frame_time, radiotap_present_channel,
radiotap_present_dbm_antsignal, radiotap_channel_freq,
radiotap_channel_type, radiotap_dbm_antsignal, wlan_addr,
wlan_addr_resolved, data_data, tracker_serial, sequence_id)

                    try:
                        cursor.execute(insertQuery, data)
                        dbConnection.commit()
                    except Exception as e:
                        print(e)
                        dbConnection.rollback()
                else:
                    print("Invalid dataset.")
            else:
                print("Länge: " + str(len(splittedString)))
        else:
            print("Länge: " + str(len(splittedString)))
    #else:
        #print("Header: " + msg.payload.decode())

try:
    global client
    # Establish connection to mqtt broker
    client = mqtt.Client(MQTT_CLIENT_ID, False)
    client.connect(MQTT_HOST,MQTT_PORT,60)

    client.on_connect = on_connect
    client.on_message = on_message
    client.on_disconnect = on_disconnect

    run = True
    while run:
        client.loop()
        #print("Loop()")
except KeyboardInterrupt:
    # Close DBConnection
    dbConnection.close()
    pass


Here my mosquitto.conf

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest file /var/log/mosquitto/mosquitto.log

include_dir /etc/mosquitto/conf.d

# Append Michael Post
max_queued_messages 1000000
max_inflight_messages 500
#autosave_interval 60
log_type all
retry_interval 5


Back to the top