first working version of MQTT subagent
authorVictor Kirhenshtein <victor@netxms.org>
Sat, 25 Feb 2017 19:55:53 +0000 (21:55 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Sat, 25 Feb 2017 19:55:53 +0000 (21:55 +0200)
include/nxconfig.h
src/agent/subagents/mqtt/.cproject [new file with mode: 0644]
src/agent/subagents/mqtt/.project [new file with mode: 0644]
src/agent/subagents/mqtt/.settings/language.settings.xml [new file with mode: 0644]
src/agent/subagents/mqtt/Makefile.am
src/agent/subagents/mqtt/broker.cpp [new file with mode: 0644]
src/agent/subagents/mqtt/main.cpp
src/agent/subagents/mqtt/mqtt_subagent.h
src/agent/subagents/mqtt/topic.cpp [new file with mode: 0644]
src/libnetxms/config.cpp

index f06d339..84fd305 100644 (file)
@@ -75,20 +75,20 @@ public:
        void addValuePreallocated(TCHAR *value);
        void setValue(const TCHAR*value);
 
-       const TCHAR *getSubEntryValue(const TCHAR *name, int index = 0, const TCHAR *defaultValue = NULL);
-       INT32 getSubEntryValueAsInt(const TCHAR *name, int index = 0, INT32 defaultValue = 0);
-       UINT32 getSubEntryValueAsUInt(const TCHAR *name, int index = 0, UINT32 defaultValue = 0);
-       INT64 getSubEntryValueAsInt64(const TCHAR *name, int index = 0, INT64 defaultValue = 0);
-       UINT64 getSubEntryValueAsUInt64(const TCHAR *name, int index = 0, UINT64 defaultValue = 0);
-       bool getSubEntryValueAsBoolean(const TCHAR *name, int index = 0, bool defaultValue = false);
-       uuid getSubEntryValueAsUUID(const TCHAR *name, int index = 0);
-
-   const TCHAR *getAttribute(const TCHAR *name) { return m_attributes.get(name); }
-       INT32 getAttributeAsInt(const TCHAR *name, INT32 defaultValue = 0);
-       UINT32 getAttributeAsUInt(const TCHAR *name, UINT32 defaultValue = 0);
-       INT64 getAttributeAsInt64(const TCHAR *name, INT64 defaultValue = 0);
-       UINT64 getAttributeAsUInt64(const TCHAR *name, UINT64 defaultValue = 0);
-       bool getAttributeAsBoolean(const TCHAR *name, bool defaultValue = false);
+       const TCHAR *getSubEntryValue(const TCHAR *name, int index = 0, const TCHAR *defaultValue = NULL) const;
+       INT32 getSubEntryValueAsInt(const TCHAR *name, int index = 0, INT32 defaultValue = 0) const;
+       UINT32 getSubEntryValueAsUInt(const TCHAR *name, int index = 0, UINT32 defaultValue = 0) const;
+       INT64 getSubEntryValueAsInt64(const TCHAR *name, int index = 0, INT64 defaultValue = 0) const;
+       UINT64 getSubEntryValueAsUInt64(const TCHAR *name, int index = 0, UINT64 defaultValue = 0) const;
+       bool getSubEntryValueAsBoolean(const TCHAR *name, int index = 0, bool defaultValue = false) const;
+       uuid getSubEntryValueAsUUID(const TCHAR *name, int index = 0) const;
+
+   const TCHAR *getAttribute(const TCHAR *name)  const { return m_attributes.get(name); }
+       INT32 getAttributeAsInt(const TCHAR *name, INT32 defaultValue = 0) const;
+       UINT32 getAttributeAsUInt(const TCHAR *name, UINT32 defaultValue = 0) const;
+       INT64 getAttributeAsInt64(const TCHAR *name, INT64 defaultValue = 0) const;
+       UINT64 getAttributeAsUInt64(const TCHAR *name, UINT64 defaultValue = 0) const;
+       bool getAttributeAsBoolean(const TCHAR *name, bool defaultValue = false) const;
 
    void setAttribute(const TCHAR *name, const TCHAR *value) { m_attributes.set(name, value); }
    void setAttributePreallocated(TCHAR *name, TCHAR *value) { m_attributes.setPreallocated(name, value); }
@@ -98,15 +98,15 @@ public:
    void setAttribute(const TCHAR *name, UINT64 value);
    void setAttribute(const TCHAR *name, bool value);
 
-       const TCHAR *getFile() { return m_file; }
-       int getLine() { return m_line; }
+       const TCHAR *getFile() const { return m_file; }
+       int getLine() const { return m_line; }
 
        void setName(const TCHAR *name);
 
        ConfigEntry *createEntry(const TCHAR *name);
-       ConfigEntry *findEntry(const TCHAR *name);
-       ObjectArray<ConfigEntry> *getSubEntries(const TCHAR *mask);
-       ObjectArray<ConfigEntry> *getOrderedSubEntries(const TCHAR *mask);
+       ConfigEntry *findEntry(const TCHAR *name) const;
+       ObjectArray<ConfigEntry> *getSubEntries(const TCHAR *mask) const;
+       ObjectArray<ConfigEntry> *getOrderedSubEntries(const TCHAR *mask) const;
        void unlinkEntry(ConfigEntry *entry);
 
        void print(FILE *file, int level, TCHAR *prefix);
diff --git a/src/agent/subagents/mqtt/.cproject b/src/agent/subagents/mqtt/.cproject
new file mode 100644 (file)
index 0000000..058ca66
--- /dev/null
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<?fileVersion 4.0.0?><cproject storage_type_id="org.eclipse.cdt.core.XmlProjectDescriptionStorage">
+       <storageModule moduleId="org.eclipse.cdt.core.settings">
+               <cconfiguration id="cdt.managedbuild.toolchain.gnu.base.1383361858">
+                       <storageModule buildSystemId="org.eclipse.cdt.managedbuilder.core.configurationDataProvider" id="cdt.managedbuild.toolchain.gnu.base.1383361858" moduleId="org.eclipse.cdt.core.settings" name="Default">
+                               <externalSettings/>
+                               <extensions>
+                                       <extension id="org.eclipse.cdt.core.GNU_ELF" point="org.eclipse.cdt.core.BinaryParser"/>
+                                       <extension id="org.eclipse.cdt.core.GASErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
+                                       <extension id="org.eclipse.cdt.core.GmakeErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
+                                       <extension id="org.eclipse.cdt.core.GLDErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
+                                       <extension id="org.eclipse.cdt.core.CWDLocator" point="org.eclipse.cdt.core.ErrorParser"/>
+                                       <extension id="org.eclipse.cdt.core.GCCErrorParser" point="org.eclipse.cdt.core.ErrorParser"/>
+                               </extensions>
+                       </storageModule>
+                       <storageModule moduleId="cdtBuildSystem" version="4.0.0">
+                               <configuration artifactName="${ProjName}" buildProperties="" description="" id="cdt.managedbuild.toolchain.gnu.base.1383361858" name="Default" parent="org.eclipse.cdt.build.core.emptycfg">
+                                       <folderInfo id="cdt.managedbuild.toolchain.gnu.base.1383361858.440231152" name="/" resourcePath="">
+                                               <toolChain id="cdt.managedbuild.toolchain.gnu.base.1529736728" name="Linux GCC" superClass="cdt.managedbuild.toolchain.gnu.base">
+                                                       <targetPlatform archList="all" binaryParser="org.eclipse.cdt.core.GNU_ELF" id="cdt.managedbuild.target.gnu.platform.base.243629133" name="Debug Platform" osList="linux,hpux,aix,qnx" superClass="cdt.managedbuild.target.gnu.platform.base"/>
+                                                       <builder id="cdt.managedbuild.target.gnu.builder.base.1640995991" keepEnvironmentInBuildfile="false" managedBuildOn="false" name="Gnu Make Builder" superClass="cdt.managedbuild.target.gnu.builder.base"/>
+                                                       <tool id="cdt.managedbuild.tool.gnu.archiver.base.1355805477" name="GCC Archiver" superClass="cdt.managedbuild.tool.gnu.archiver.base"/>
+                                                       <tool id="cdt.managedbuild.tool.gnu.cpp.compiler.base.1670013884" name="GCC C++ Compiler" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.base">
+                                                               <option id="gnu.cpp.compiler.option.include.paths.1541293056" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" useByScannerDiscovery="false" valueType="includePath">
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}&quot;"/>
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}/include&quot;"/>
+                                                               </option>
+                                                               <option id="gnu.cpp.compiler.option.preprocessor.def.1292646403" name="Defined symbols (-D)" superClass="gnu.cpp.compiler.option.preprocessor.def" useByScannerDiscovery="false" valueType="definedSymbols">
+                                                                       <listOptionValue builtIn="false" value="_THREAD_SAFE"/>
+                                                                       <listOptionValue builtIn="false" value="TRE_WCHAR=1"/>
+                                                                       <listOptionValue builtIn="false" value="UNICODE"/>
+                                                                       <listOptionValue builtIn="false" value="_GNU_SOURCE"/>
+                                                               </option>
+                                                               <inputType id="cdt.managedbuild.tool.gnu.cpp.compiler.input.2032331267" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.input"/>
+                                                       </tool>
+                                                       <tool id="cdt.managedbuild.tool.gnu.c.compiler.base.900818319" name="GCC C Compiler" superClass="cdt.managedbuild.tool.gnu.c.compiler.base">
+                                                               <option id="gnu.c.compiler.option.include.paths.1179611823" name="Include paths (-I)" superClass="gnu.c.compiler.option.include.paths" useByScannerDiscovery="false" valueType="includePath">
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}&quot;"/>
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}/include&quot;"/>
+                                                               </option>
+                                                               <option id="gnu.c.compiler.option.preprocessor.def.symbols.915566136" name="Defined symbols (-D)" superClass="gnu.c.compiler.option.preprocessor.def.symbols" useByScannerDiscovery="false" valueType="definedSymbols">
+                                                                       <listOptionValue builtIn="false" value="_THREAD_SAFE"/>
+                                                                       <listOptionValue builtIn="false" value="TRE_WCHAR=1"/>
+                                                                       <listOptionValue builtIn="false" value="UNICODE"/>
+                                                                       <listOptionValue builtIn="false" value="_GNU_SOURCE"/>
+                                                               </option>
+                                                               <inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.254562804" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
+                                                       </tool>
+                                                       <tool id="cdt.managedbuild.tool.gnu.c.linker.base.1161384594" name="GCC C Linker" superClass="cdt.managedbuild.tool.gnu.c.linker.base"/>
+                                                       <tool id="cdt.managedbuild.tool.gnu.cpp.linker.base.263939884" name="GCC C++ Linker" superClass="cdt.managedbuild.tool.gnu.cpp.linker.base">
+                                                               <inputType id="cdt.managedbuild.tool.gnu.cpp.linker.input.354098483" superClass="cdt.managedbuild.tool.gnu.cpp.linker.input">
+                                                                       <additionalInput kind="additionalinputdependency" paths="$(USER_OBJS)"/>
+                                                                       <additionalInput kind="additionalinput" paths="$(LIBS)"/>
+                                                               </inputType>
+                                                       </tool>
+                                                       <tool id="cdt.managedbuild.tool.gnu.assembler.base.780960992" name="GCC Assembler" superClass="cdt.managedbuild.tool.gnu.assembler.base">
+                                                               <option id="gnu.both.asm.option.include.paths.106999652" name="Include paths (-I)" superClass="gnu.both.asm.option.include.paths" valueType="includePath">
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}&quot;"/>
+                                                                       <listOptionValue builtIn="false" value="&quot;${NETXMS_BASE}/include&quot;"/>
+                                                               </option>
+                                                               <inputType id="cdt.managedbuild.tool.gnu.assembler.input.695273975" superClass="cdt.managedbuild.tool.gnu.assembler.input"/>
+                                                       </tool>
+                                               </toolChain>
+                                       </folderInfo>
+                               </configuration>
+                       </storageModule>
+                       <storageModule moduleId="org.eclipse.cdt.core.externalSettings"/>
+               </cconfiguration>
+       </storageModule>
+       <storageModule moduleId="cdtBuildSystem" version="4.0.0">
+               <project id="mqtt.null.1072768025" name="mqtt"/>
+       </storageModule>
+       <storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
+       <storageModule moduleId="scannerConfiguration">
+               <autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
+               <scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.1383361858;cdt.managedbuild.toolchain.gnu.base.1383361858.440231152;cdt.managedbuild.tool.gnu.c.compiler.base.900818319;cdt.managedbuild.tool.gnu.c.compiler.input.254562804">
+                       <autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
+               </scannerConfigBuildInfo>
+               <scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.base.1383361858;cdt.managedbuild.toolchain.gnu.base.1383361858.440231152;cdt.managedbuild.tool.gnu.cpp.compiler.base.1670013884;cdt.managedbuild.tool.gnu.cpp.compiler.input.2032331267">
+                       <autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
+               </scannerConfigBuildInfo>
+       </storageModule>
+</cproject>
diff --git a/src/agent/subagents/mqtt/.project b/src/agent/subagents/mqtt/.project
new file mode 100644 (file)
index 0000000..bc7db6d
--- /dev/null
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<projectDescription>
+       <name>mqtt</name>
+       <comment></comment>
+       <projects>
+               <project>libnetxms</project>
+               <project>libnxagent</project>
+       </projects>
+       <buildSpec>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.genmakebuilder</name>
+                       <triggers>clean,full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+               <buildCommand>
+                       <name>org.eclipse.cdt.managedbuilder.core.ScannerConfigBuilder</name>
+                       <triggers>full,incremental,</triggers>
+                       <arguments>
+                       </arguments>
+               </buildCommand>
+       </buildSpec>
+       <natures>
+               <nature>org.eclipse.cdt.core.cnature</nature>
+               <nature>org.eclipse.cdt.core.ccnature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.managedBuildNature</nature>
+               <nature>org.eclipse.cdt.managedbuilder.core.ScannerConfigNature</nature>
+       </natures>
+       <filteredResources>
+               <filter>
+                       <id>1488016198957</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.o</arguments>
+                       </matcher>
+               </filter>
+               <filter>
+                       <id>1488016198969</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.lo</arguments>
+                       </matcher>
+               </filter>
+               <filter>
+                       <id>1488016198974</id>
+                       <name></name>
+                       <type>6</type>
+                       <matcher>
+                               <id>org.eclipse.ui.ide.multiFilter</id>
+                               <arguments>1.0-name-matches-false-false-*.la</arguments>
+                       </matcher>
+               </filter>
+       </filteredResources>
+</projectDescription>
diff --git a/src/agent/subagents/mqtt/.settings/language.settings.xml b/src/agent/subagents/mqtt/.settings/language.settings.xml
new file mode 100644 (file)
index 0000000..d831a84
--- /dev/null
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<project>
+       <configuration id="cdt.managedbuild.toolchain.gnu.base.1383361858" name="Default">
+               <extension point="org.eclipse.cdt.core.LanguageSettingsProvider">
+                       <provider class="org.eclipse.cdt.core.language.settings.providers.LanguageSettingsGenericProvider" id="org.eclipse.cdt.ui.UserLanguageSettingsProvider" name="CDT User Setting Entries" prefer-non-shared="true"/>
+                       <provider-reference id="org.eclipse.cdt.core.ReferencedProjectsLanguageSettingsProvider" ref="shared-provider"/>
+                       <provider copy-of="extension" id="org.eclipse.cdt.managedbuilder.core.GCCBuildCommandParser"/>
+                       <provider class="org.eclipse.cdt.managedbuilder.language.settings.providers.GCCBuiltinSpecsDetector" console="false" env-hash="1092834717854559371" id="org.eclipse.cdt.managedbuilder.core.GCCBuiltinSpecsDetector" keep-relative-paths="false" name="CDT GCC Built-in Compiler Settings" parameter="${COMMAND} ${FLAGS} -E -P -v -dD &quot;${INPUTS}&quot;" prefer-non-shared="true">
+                               <language-scope id="org.eclipse.cdt.core.gcc"/>
+                               <language-scope id="org.eclipse.cdt.core.g++"/>
+                       </provider>
+                       <provider-reference id="org.eclipse.cdt.managedbuilder.core.MBSLanguageSettingsProvider" ref="shared-provider"/>
+               </extension>
+       </configuration>
+</project>
index 4c445c1..6a12e08 100644 (file)
@@ -1,7 +1,7 @@
 SUBAGENT = mqtt
 
 pkglib_LTLIBRARIES = mqtt.la
-mqtt_la_SOURCES = main.cpp
+mqtt_la_SOURCES = broker.cpp main.cpp topic.cpp
 mqtt_la_CPPFLAGS=-I@top_srcdir@/include @MQTT_CPPFLAGS@
 mqtt_la_LDFLAGS = -module -avoid-version -export-symbols ../subagent.sym @MQTT_LDFLAGS@
 mqtt_la_LIBADD = ../../libnxagent/libnxagent.la ../../../libnetxms/libnetxms.la @MQTT_LIBS@
diff --git a/src/agent/subagents/mqtt/broker.cpp b/src/agent/subagents/mqtt/broker.cpp
new file mode 100644 (file)
index 0000000..9ac95c4
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ ** MQTT subagent
+ ** Copyright (C) 2017 Raden Solutions
+ **
+ ** This program is free software; you can redistribute it and/or modify
+ ** it under the terms of the GNU General Public License as published by
+ ** the Free Software Foundation; either version 2 of the License, or
+ ** (at your option) any later version.
+ **
+ ** This program is distributed in the hope that it will be useful,
+ ** but WITHOUT ANY WARRANTY; without even the implied warranty of
+ ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ ** GNU General Public License for more details.
+ **
+ ** You should have received a copy of the GNU General Public License
+ ** along with this program; if not, write to the Free Software
+ ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ **
+ **/
+#include "mqtt_subagent.h"
+
+/**
+ * Log callback
+ */
+static void LogCallback(struct mosquitto *handle, void *userData, int level, const char *message)
+{
+   nxlog_debug((level == MOSQ_LOG_DEBUG) ? 7 : 3, _T("MQTT: %hs"), message);
+}
+
+/**
+ * Topic parameter handler
+ */
+static LONG H_TopicData(const TCHAR *name, const TCHAR *arg, TCHAR *result, AbstractCommSession *session)
+{
+   Topic *topic = (Topic *)arg;
+   return topic->retrieveData(result, MAX_RESULT_LENGTH) ? SYSINFO_RC_SUCCESS : SYSINFO_RC_ERROR;
+}
+
+/**
+ * Broker constructor
+ */
+MqttBroker::MqttBroker() : m_topics(16, 16, true)
+{
+   m_hostname = NULL;
+   m_port = 0;
+   m_login = NULL;
+   m_password = NULL;
+   m_handle = mosquitto_new(NULL, true, this);
+   m_loopThread = INVALID_THREAD_HANDLE;
+}
+
+/**
+ * Broker destructor
+ */
+MqttBroker::~MqttBroker()
+{
+   if (m_loopThread != INVALID_THREAD_HANDLE)
+      ThreadJoin(m_loopThread);
+   if (m_handle != NULL)
+      mosquitto_destroy(m_handle);
+   free(m_hostname);
+   free(m_login);
+   free(m_password);
+}
+
+/**
+ * Create broker object from configuration
+ */
+MqttBroker *MqttBroker::createFromConfig(const ConfigEntry *config, StructArray<NETXMS_SUBAGENT_PARAM> *parameters)
+{
+   MqttBroker *broker = new MqttBroker();
+   if (broker->m_handle == NULL)
+   {
+      nxlog_debug(3, _T("MQTT: cannot create client instance"));
+      delete broker;
+      return NULL;
+   }
+
+   mosquitto_log_callback_set(broker->m_handle, LogCallback);
+   mosquitto_message_callback_set(broker->m_handle, MqttBroker::messageCallback);
+
+#ifdef UNICODE
+   broker->m_hostname = UTF8StringFromWideString(config->getSubEntryValue(L"Hostname", 0, L"127.0.0.1"));
+#else
+   broker->m_hostname = strdup(config->getSubEntryValue("Hostname", 0, "127.0.0.1"));
+#endif
+   broker->m_port = (UINT16)config->getSubEntryValueAsUInt(_T("Port"), 0, 1883);
+   broker->m_login = _tcsdup_ex(config->getSubEntryValue(_T("Login")));
+   broker->m_password = _tcsdup_ex(config->getSubEntryValue(_T("Password")));
+
+   const ConfigEntry *topicRoot = config->findEntry(_T("Topics"));
+   if (topicRoot != NULL)
+   {
+      ObjectArray<ConfigEntry> *topics = topicRoot->getSubEntries(_T("*"));
+      for(int i = 0; i < topics->size(); i++)
+      {
+         ConfigEntry *e = topics->get(i);
+         Topic *t = new Topic(e->getValue());
+         broker->m_topics.add(t);
+
+         NETXMS_SUBAGENT_PARAM p;
+         memset(&p, 0, sizeof(NETXMS_SUBAGENT_PARAM));
+         nx_strncpy(p.name, e->getName(), MAX_PARAM_NAME);
+         p.arg = (const TCHAR *)t;
+         p.dataType = DCI_DT_STRING;
+         p.handler = H_TopicData;
+         _sntprintf(p.description, MAX_DB_STRING, _T("MQTT topic %hs"), t->getPattern());
+         parameters->add(&p);
+      }
+      delete topics;
+   }
+
+   return broker;
+}
+
+/**
+ * Broker network loop
+ */
+void MqttBroker::networkLoop()
+{
+   while(mosquitto_connect(m_handle, m_hostname, m_port, 600) != MOSQ_ERR_SUCCESS)
+   {
+      nxlog_debug(4, _T("MQTT: unable to connect to broker at %hs:%d, will retry in 60 seconds"), m_hostname, (int)m_port);
+      if (AgentSleepAndCheckForShutdown(60000))
+         return;  // Agent shutdown
+   }
+
+   nxlog_debug(3, _T("MQTT: connected to broker %hs:%d"), m_hostname, (int)m_port);
+
+   for(int i = 0; i < m_topics.size(); i++)
+   {
+      Topic *t = m_topics.get(i);
+      if (mosquitto_subscribe(m_handle, NULL, t->getPattern(), 0) == MOSQ_ERR_SUCCESS)
+         nxlog_debug(4, _T("MQTT: subscribed to topic %hs on broker %hs:%d"), t->getPattern(), m_hostname, (int)m_port);
+      else
+         AgentWriteDebugLog(NXLOG_WARNING, _T("MQTT: cannot subscribe to topic %hs on broker %hs:%d"), t->getPattern(), m_hostname, (int)m_port);
+   }
+
+   mosquitto_loop_forever(m_handle, -1, 1);
+   nxlog_debug(3, _T("MQTT: network loop stopped for broker %hs:%d"), m_hostname, (int)m_port);
+}
+
+/**
+ * Broker network loop starter
+ */
+THREAD_RESULT THREAD_CALL MqttBroker::networkLoopStarter(void *arg)
+{
+   ((MqttBroker *)arg)->networkLoop();
+   return THREAD_OK;
+}
+
+/**
+ * Start broker network loop
+ */
+void MqttBroker::startNetworkLoop()
+{
+   m_loopThread = ThreadCreateEx(MqttBroker::networkLoopStarter, 0, this);
+}
+
+/**
+ * Stop broker network loop
+ */
+void MqttBroker::stopNetworkLoop()
+{
+   mosquitto_disconnect(m_handle);
+   ThreadJoin(m_loopThread);
+   m_loopThread = INVALID_THREAD_HANDLE;
+}
+
+/**
+ * Message callback
+ */
+void MqttBroker::messageCallback(struct mosquitto *handle, void *userData, const struct mosquitto_message *msg)
+{
+   ((MqttBroker *)userData)->processMessage(msg);
+}
+
+/**
+ * Process broker message
+ */
+void MqttBroker::processMessage(const struct mosquitto_message *msg)
+{
+   if (msg->payloadlen <= 0)
+      return;  // NULL message
+
+   nxlog_debug(6, _T("MQTT: message received: %hs=\"%hs\""), msg->topic, (const char *)msg->payload);
+   for(int i = 0; i < m_topics.size(); i++)
+   {
+      m_topics.get(i)->processMessage(msg->topic, (const char *)msg->payload);
+   }
+}
index 719c016..49089f3 100644 (file)
  ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  **
  **/
-
 #include "mqtt_subagent.h"
 
 /**
- * Add topic from config
+ * Registered brokers
  */
-static bool AddTopicFromConfig(StructArray<NETXMS_SUBAGENT_PARAM> *parameters, const TCHAR *config)
-{
-   return true;
-}
+static ObjectArray<MqttBroker> s_brokers(8, 8, true);
 
 /**
- * Add parameters from config
+ * Add brokers and parameters from config
  */
-static void AddParameters(StructArray<NETXMS_SUBAGENT_PARAM> *parameters, Config *config)
+static void RegisterBrokers(StructArray<NETXMS_SUBAGENT_PARAM> *parameters, Config *config)
 {
+   ObjectArray<ConfigEntry> *brokers = config->getSubEntries(_T("/MQTT/Brokers"), _T("*"));
+   for(int i = 0; i < brokers->size(); i++)
+   {
+      MqttBroker *b = MqttBroker::createFromConfig(brokers->get(i), parameters);
+      if (b != NULL)
+      {
+         s_brokers.add(b);
+      }
+      else
+      {
+         AgentWriteLog(NXLOG_WARNING, _T("MQTT: cannot add broker %s definition from config"), brokers->get(i)->getName());
+      }
+   }
+   delete brokers;
+
    nxlog_debug(3, _T("MQTT: %d parameters added from configuration"), parameters->size());
 }
 
@@ -47,6 +58,10 @@ static BOOL SubAgentInit(Config *config)
    mosquitto_lib_version(&major, &minor, &rev);
    nxlog_debug(2, _T("MQTT: using libmosquitto %d.%d.%d"), major, minor, rev);
 
+   // Start network loops
+   for(int i = 0; i < s_brokers.size(); i++)
+      s_brokers.get(i)->startNetworkLoop();
+
    return TRUE;
 }
 
@@ -55,6 +70,10 @@ static BOOL SubAgentInit(Config *config)
  */
 static void SubAgentShutdown()
 {
+   // Stop network loops
+   for(int i = 0; i < s_brokers.size(); i++)
+      s_brokers.get(i)->stopNetworkLoop();
+
    mosquitto_lib_cleanup();
    nxlog_debug(2, _T("MQTT subagent shutdown completed"));
 }
@@ -81,7 +100,7 @@ DECLARE_SUBAGENT_ENTRY_POINT(MQTT)
 {
    StructArray<NETXMS_SUBAGENT_PARAM> *parameters = new StructArray<NETXMS_SUBAGENT_PARAM>();
 
-   AddParameters(parameters, config);
+   RegisterBrokers(parameters, config);
 
    m_info.numParameters = parameters->size();
    m_info.parameters = (NETXMS_SUBAGENT_PARAM *)nx_memdup(parameters->getBuffer(),
index 990bc32..2a79e2a 100644 (file)
 #include <nms_agent.h>
 #include <mosquitto.h>
 
+/**
+ * Topic definition
+ */
+class Topic
+{
+private:
+   char *m_pattern;
+   char m_lastName[MAX_DB_STRING];
+   char m_lastValue[MAX_RESULT_LENGTH];
+   time_t m_timestamp;
+   Mutex m_mutex;
+
+public:
+   Topic(const TCHAR *pattern);
+   ~Topic();
+
+   const char *getPattern() const { return m_pattern; }
+   time_t getTimestamp() const { return m_timestamp; }
+
+   void processMessage(const char *topic, const char *msg);
+   bool retrieveData(TCHAR *buffer, size_t bufferLen);
+};
+
 /**
  * Broker definition
  */
 class MqttBroker
 {
 private:
+   char *m_hostname;
+   UINT16 m_port;
+   TCHAR *m_login;
+   TCHAR *m_password;
+   ObjectArray<Topic> m_topics;
+   struct mosquitto *m_handle;
+   THREAD m_loopThread;
    
+   MqttBroker();
+
+   static void messageCallback(struct mosquitto *handle, void *userData, const struct mosquitto_message *msg);
+   void processMessage(const struct mosquitto_message *msg);
+
+   void networkLoop();
+   static THREAD_RESULT THREAD_CALL networkLoopStarter(void *arg);
+
+public:
+   ~MqttBroker();
+
+   void startNetworkLoop();
+   void stopNetworkLoop();
+
+   static MqttBroker *createFromConfig(const ConfigEntry *e, StructArray<NETXMS_SUBAGENT_PARAM> *parameters);
 };
 
 #endif
diff --git a/src/agent/subagents/mqtt/topic.cpp b/src/agent/subagents/mqtt/topic.cpp
new file mode 100644 (file)
index 0000000..25c77f3
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ ** MQTT subagent
+ ** Copyright (C) 2017 Raden Solutions
+ **
+ ** This program is free software; you can redistribute it and/or modify
+ ** it under the terms of the GNU General Public License as published by
+ ** the Free Software Foundation; either version 2 of the License, or
+ ** (at your option) any later version.
+ **
+ ** This program is distributed in the hope that it will be useful,
+ ** but WITHOUT ANY WARRANTY; without even the implied warranty of
+ ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ ** GNU General Public License for more details.
+ **
+ ** You should have received a copy of the GNU General Public License
+ ** along with this program; if not, write to the Free Software
+ ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ **
+ **/
+#include "mqtt_subagent.h"
+
+/**
+ * Topic constructor
+ */
+Topic::Topic(const TCHAR *pattern)
+{
+#ifdef UNICODE
+   m_pattern = (pattern != NULL) ? UTF8StringFromWideString(pattern) : NULL;
+#else
+   m_pattern = (pattern != NULL) ? strdup(pattern) : NULL;
+#endif
+   m_lastName[0] = 0;
+   m_lastValue[0] = 0;
+   m_timestamp = 0;
+}
+
+/**
+ * Topic destructor
+ */
+Topic::~Topic()
+{
+   free(m_pattern);
+}
+
+/**
+ * Process broker message
+ */
+void Topic::processMessage(const char *topic, const char *msg)
+{
+   bool match;
+   if (mosquitto_topic_matches_sub(m_pattern, topic, &match) != MOSQ_ERR_SUCCESS)
+      return;
+   if (!match)
+      return;
+
+   m_mutex.lock();
+   strncpy(m_lastName, topic, MAX_DB_STRING);
+   strncpy(m_lastValue, msg, MAX_RESULT_LENGTH);
+   m_timestamp = time(NULL);
+   m_mutex.unlock();
+}
+
+/**
+ * Retrieve collected data
+ */
+bool Topic::retrieveData(TCHAR *buffer, size_t bufferLen)
+{
+   m_mutex.lock();
+   if ((m_timestamp == 0) || (m_lastName[0] == 0))
+   {
+      m_mutex.unlock();
+      return false;
+   }
+
+#ifdef UNICODE
+   MultiByteToWideChar(CP_UTF8, 0, m_lastValue, -1, buffer, (int)bufferLen);
+   buffer[bufferLen - 1] = 0;
+#else
+   nx_strncpy(buffer, m_lastValue, bufferLen);
+#endif
+
+   m_mutex.unlock();
+   return true;
+}
index c047cd4..638e801 100644 (file)
@@ -155,12 +155,12 @@ ConfigEntry::~ConfigEntry()
       next = entry->getNext();
       delete entry;
    }
-   safe_free(m_name);
-   safe_free(m_file);
+   free(m_name);
+   free(m_file);
 
    for(int i = 0; i < m_valueCount; i++)
       safe_free(m_values[i]);
-   safe_free(m_values);
+   free(m_values);
 }
 
 /**
@@ -175,7 +175,7 @@ void ConfigEntry::setName(const TCHAR *name)
 /**
  * Find entry by name
  */
-ConfigEntry* ConfigEntry::findEntry(const TCHAR *name)
+ConfigEntry* ConfigEntry::findEntry(const TCHAR *name) const
 {
    const TCHAR *realName;
    if (name[0] == _T('%'))
@@ -264,7 +264,7 @@ void ConfigEntry::unlinkEntry(ConfigEntry *entry)
  * Get all subentries with names matched to mask.
  * Returned list ordered by ID
  */
-ObjectArray<ConfigEntry> *ConfigEntry::getSubEntries(const TCHAR *mask)
+ObjectArray<ConfigEntry> *ConfigEntry::getSubEntries(const TCHAR *mask) const
 {
    ObjectArray<ConfigEntry> *list = new ObjectArray<ConfigEntry>(16, 16, false);
    for(ConfigEntry *e = m_first; e != NULL; e = e->getNext())
@@ -288,7 +288,7 @@ static int CompareById(const void *p1, const void *p2)
 /**
  * Get all subentries with names matched to mask ordered by id
  */
-ObjectArray<ConfigEntry> *ConfigEntry::getOrderedSubEntries(const TCHAR *mask)
+ObjectArray<ConfigEntry> *ConfigEntry::getOrderedSubEntries(const TCHAR *mask) const
 {
    ObjectArray<ConfigEntry> *list = getSubEntries(mask);
    list->sort(CompareById);
@@ -427,7 +427,7 @@ int ConfigEntry::getConcatenatedValuesLength()
  * @param defaultValue value to be returned if requested sub-entry is missing (NULL if omited)
  * @return sub-entry value or default value if requested sub-entry does not exist
  */
-const TCHAR* ConfigEntry::getSubEntryValue(const TCHAR *name, int index, const TCHAR *defaultValue)
+const TCHAR* ConfigEntry::getSubEntryValue(const TCHAR *name, int index, const TCHAR *defaultValue) const
 {
    ConfigEntry *e = findEntry(name);
    if (e == NULL)
@@ -436,25 +436,25 @@ const TCHAR* ConfigEntry::getSubEntryValue(const TCHAR *name, int index, const T
    return (value != NULL) ? value : defaultValue;
 }
 
-INT32 ConfigEntry::getSubEntryValueAsInt(const TCHAR *name, int index, INT32 defaultValue)
+INT32 ConfigEntry::getSubEntryValueAsInt(const TCHAR *name, int index, INT32 defaultValue) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    return (value != NULL) ? _tcstol(value, NULL, 0) : defaultValue;
 }
 
-UINT32 ConfigEntry::getSubEntryValueAsUInt(const TCHAR *name, int index, UINT32 defaultValue)
+UINT32 ConfigEntry::getSubEntryValueAsUInt(const TCHAR *name, int index, UINT32 defaultValue) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    return (value != NULL) ? _tcstoul(value, NULL, 0) : defaultValue;
 }
 
-INT64 ConfigEntry::getSubEntryValueAsInt64(const TCHAR *name, int index, INT64 defaultValue)
+INT64 ConfigEntry::getSubEntryValueAsInt64(const TCHAR *name, int index, INT64 defaultValue) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    return (value != NULL) ? _tcstol(value, NULL, 0) : defaultValue;
 }
 
-UINT64 ConfigEntry::getSubEntryValueAsUInt64(const TCHAR *name, int index, UINT64 defaultValue)
+UINT64 ConfigEntry::getSubEntryValueAsUInt64(const TCHAR *name, int index, UINT64 defaultValue) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    return (value != NULL) ? _tcstoul(value, NULL, 0) : defaultValue;
@@ -464,7 +464,7 @@ UINT64 ConfigEntry::getSubEntryValueAsUInt64(const TCHAR *name, int index, UINT6
  * Get sub-entry value as boolean
  * (consider non-zero numerical value or strings "yes", "true", "on" as true)
  */
-bool ConfigEntry::getSubEntryValueAsBoolean(const TCHAR *name, int index, bool defaultValue)
+bool ConfigEntry::getSubEntryValueAsBoolean(const TCHAR *name, int index, bool defaultValue) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    if (value != NULL)
@@ -480,7 +480,7 @@ bool ConfigEntry::getSubEntryValueAsBoolean(const TCHAR *name, int index, bool d
 /**
  * Get sub-entry value as UUID
  */
-uuid ConfigEntry::getSubEntryValueAsUUID(const TCHAR *name, int index)
+uuid ConfigEntry::getSubEntryValueAsUUID(const TCHAR *name, int index) const
 {
    const TCHAR *value = getSubEntryValue(name, index);
    if (value != NULL)
@@ -496,7 +496,7 @@ uuid ConfigEntry::getSubEntryValueAsUUID(const TCHAR *name, int index)
 /**
  * Get attribute as integer
  */
-INT32 ConfigEntry::getAttributeAsInt(const TCHAR *name, INT32 defaultValue)
+INT32 ConfigEntry::getAttributeAsInt(const TCHAR *name, INT32 defaultValue) const
 {
    const TCHAR *value = getAttribute(name);
    return (value != NULL) ? _tcstol(value, NULL, 0) : defaultValue;
@@ -505,7 +505,7 @@ INT32 ConfigEntry::getAttributeAsInt(const TCHAR *name, INT32 defaultValue)
 /**
  * Get attribute as unsigned integer
  */
-UINT32 ConfigEntry::getAttributeAsUInt(const TCHAR *name, UINT32 defaultValue)
+UINT32 ConfigEntry::getAttributeAsUInt(const TCHAR *name, UINT32 defaultValue) const
 {
    const TCHAR *value = getAttribute(name);
    return (value != NULL) ? _tcstoul(value, NULL, 0) : defaultValue;
@@ -514,7 +514,7 @@ UINT32 ConfigEntry::getAttributeAsUInt(const TCHAR *name, UINT32 defaultValue)
 /**
  * Get attribute as 64 bit integer
  */
-INT64 ConfigEntry::getAttributeAsInt64(const TCHAR *name, INT64 defaultValue)
+INT64 ConfigEntry::getAttributeAsInt64(const TCHAR *name, INT64 defaultValue) const
 {
    const TCHAR *value = getAttribute(name);
    return (value != NULL) ? _tcstoll(value, NULL, 0) : defaultValue;
@@ -523,7 +523,7 @@ INT64 ConfigEntry::getAttributeAsInt64(const TCHAR *name, INT64 defaultValue)
 /**
  * Get attribute as unsigned 64 bit integer
  */
-UINT64 ConfigEntry::getAttributeAsUInt64(const TCHAR *name, UINT64 defaultValue)
+UINT64 ConfigEntry::getAttributeAsUInt64(const TCHAR *name, UINT64 defaultValue) const
 {
    const TCHAR *value = getAttribute(name);
    return (value != NULL) ? _tcstoull(value, NULL, 0) : defaultValue;
@@ -533,7 +533,7 @@ UINT64 ConfigEntry::getAttributeAsUInt64(const TCHAR *name, UINT64 defaultValue)
  * Get attribute as boolean
  * (consider non-zero numerical value or strings "yes", "true", "on" as true)
  */
-bool ConfigEntry::getAttributeAsBoolean(const TCHAR *name, bool defaultValue)
+bool ConfigEntry::getAttributeAsBoolean(const TCHAR *name, bool defaultValue) const
 {
    const TCHAR *value = getAttribute(name);
    if (value != NULL)