[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[paho-dev] Multiple Messages on subscribing
|
Hello,
with paho-mqtt i publish 10656 Datasets (Messages) in a loop to
mosquitto with qos level 2.
Through the iteration i count the messages and print out the total at
the end.
Parallel i proceed the messages via paho-mqtt subscribing (level qos:
1). In my tests my client proceed currently about more than 19742 times
and i do not know why.
Do you have any clue why this can be happen?
I expect the behaviour that one message will be send, one client will
process the message one time and good.
Here is my source for the client (python)
#!/usr/bin/env python3
import paho.mqtt.client as mqtt
import pgdb
import sys
# MQTT parameter
MQTT_HOST="10.0.1.94"
MQTT_PORT=1883
MQTT_TOPIC="xxx/pcap"
MQTT_CLIENT_ID="xxx_app1"
MQTT_QOS=1 # Only 0 or 1 is valid!
# Database parameter
DB_HOST="10.0.1.98"
DB_PORT=5432
DB_DATABASE="xxxx"
DB_USERNAME="xxx"
DB_PASSWORD="xxx"
# Establish connection to postgresql database
dbConnection = pgdb.connect(host=DB_HOST, user=DB_USERNAME,
password=DB_PASSWORD, database=DB_DATABASE)
# Prepare statement
cursor = dbConnection.cursor()
insertQuery = "INSERT INTO raw_client_data (frame_time,
radiotap_present_channel, radiotap_present_dbm_antsignal,
radiotap_channel_freq, radiotap_channel_type, radiotap_dbm_antsignal,
wlan_addr, wlan_addr_resolved, data_data, tracker_serial, sequence_id)
VALUES (%s, %i,%s, %s, %s, %s, %s, %s, %s, %s, %i);"
i_data=0 # Iterator for dataset
# This is the MQTT Broker Subscriber
def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(MQTT_TOPIC, MQTT_QOS)
# This is the MQTT Broker onDisconnect
def on_disconnect(client, userdata, rc):
while rc != 0:
print ("Unexpected MQTT disconnection. Will auto-reconnect")
rc = client.reconnect()
# On every incoming message
def on_message(client, userdata, msg):
global i_data
global dbConnection
print( str(i_data) + ". " + (msg.payload.decode()) + "' on topic '"
+ msg.topic + "' with QoS " + str(msg.qos))
if "frame.number" not in msg.payload.decode() :
print(msg.payload.decode())
# Split row
splittedString = msg.payload.decode().split(';')
if len(splittedString) == 11 :
# Only if given string contains all elements
#print("Länge: " + str(len(splittedString)))
if str(splittedString[7]) != '' :
i_data = i_data + 1
#print("Länge: " + str(len(splittedString)))
#print(splittedString)
# set variables
tracker_serial = splittedString[0].replace('"', '')
frame_time = splittedString[2].replace('"', '')
radiotap_present_channel =
int(splittedString[3].replace('"', ''))
radiotap_present_dbm_antsignal =
splittedString[4].replace('"', '')
radiotap_channel_freq = splittedString[5].replace('"', '')
radiotap_channel_type = splittedString[6].replace('"', '')
radiotap_dbm_antsignal =
float(splittedString[7].replace('"', ''))
wlan_addr = splittedString[8].replace('"', '')
wlan_addr_resolved = splittedString[9].replace('"', '')
#data_data = splittedString[9].replace('"', '')
data_data = ''
sequence_id = 0
#print("Tracker: " + tracker_serial)
# Ignore some entries cause we do not need them
if ("ff:ff:ff:ff:ff:ff" not in wlan_addr) and (wlan_addr):
# Insert row into database
data = (frame_time, radiotap_present_channel,
radiotap_present_dbm_antsignal, radiotap_channel_freq,
radiotap_channel_type, radiotap_dbm_antsignal, wlan_addr,
wlan_addr_resolved, data_data, tracker_serial, sequence_id)
try:
cursor.execute(insertQuery, data)
dbConnection.commit()
except Exception as e:
print(e)
dbConnection.rollback()
else:
print("Invalid dataset.")
else:
print("Länge: " + str(len(splittedString)))
else:
print("Länge: " + str(len(splittedString)))
#else:
#print("Header: " + msg.payload.decode())
try:
global client
# Establish connection to mqtt broker
client = mqtt.Client(MQTT_CLIENT_ID, False)
client.connect(MQTT_HOST,MQTT_PORT,60)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
run = True
while run:
client.loop()
#print("Loop()")
except KeyboardInterrupt:
# Close DBConnection
dbConnection.close()
pass
Here my mosquitto.conf
# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
log_dest file /var/log/mosquitto/mosquitto.log
include_dir /etc/mosquitto/conf.d
# Append Michael Post
max_queued_messages 1000000
max_inflight_messages 500
#autosave_interval 60
log_type all
retry_interval 5