All messages with codes 0x1100 - 0x11ff now forwarded to Reporting Server
authorAlex Kirhenshtein <alk@netxms.org>
Mon, 10 Jun 2013 20:52:09 +0000 (20:52 +0000)
committerAlex Kirhenshtein <alk@netxms.org>
Mon, 10 Jun 2013 20:52:09 +0000 (20:52 +0000)
src/server/core/Makefile.am
src/server/core/main.cpp
src/server/core/reporting.cpp [new file with mode: 0644]
src/server/core/session.cpp
src/server/include/nms_core.h
src/server/libnxsrv/isc.cpp

index e66727a..387c9ba 100644 (file)
@@ -27,7 +27,7 @@ libnxcore_la_SOURCES =  accesspoint.cpp acl.cpp actions.cpp admin.cpp \
                        syslogd.cpp template.cpp tools.cpp tracert.cpp \
                        uniroot.cpp upload_job.cpp uptimecalc.cpp userdb.cpp \
                        userdb_objects.cpp vpnconn.cpp vrrp.cpp watchdog.cpp \
-                       winperf.cpp zone.cpp
+                       winperf.cpp zone.cpp reporting.cpp
 libnxcore_la_CPPFLAGS=-I@top_srcdir@/include -I@top_srcdir@/src/server/include
 libnxcore_la_LDFLAGS = -version-info $(NETXMS_LIBRARY_VERSION)
 libnxcore_la_LIBADD = \
index 8e36347..1752554 100644 (file)
@@ -94,6 +94,7 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *);
 THREAD_RESULT THREAD_CALL BeaconPoller(void *);
 THREAD_RESULT THREAD_CALL JobManagerThread(void *);
 THREAD_RESULT THREAD_CALL UptimeCalculator(void *);
+THREAD_RESULT THREAD_CALL ReportingServerConnector(void *);
 
 /**
  * Global variables
@@ -789,6 +790,10 @@ retry_db_lock:
        if (ConfigReadInt(_T("EnableISCListener"), 0))
                ThreadCreate(ISCListener, 0, NULL);
 
+       // Start reporting server connector
+       if (ConfigReadInt(_T("EnableReportingServer"), 0))
+               ThreadCreate(ReportingServerConnector, 0, NULL);
+
        // Allow clients to connect
        InitClientListeners();
        ThreadCreate(ClientListener, 0, NULL);
diff --git a/src/server/core/reporting.cpp b/src/server/core/reporting.cpp
new file mode 100644 (file)
index 0000000..5d656aa
--- /dev/null
@@ -0,0 +1,100 @@
+/* 
+** NetXMS - Network Management System
+** Copyright (C) 2013 Alex Kirhenshtein
+**
+** This program is free software; you can redistribute it and/or modify
+** it under the terms of the GNU General Public License as published by
+** the Free Software Foundation; either version 2 of the License, or
+** (at your option) any later version.
+**
+** This program is distributed in the hope that it will be useful,
+** but WITHOUT ANY WARRANTY; without even the implied warranty of
+** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+** GNU General Public License for more details.
+**
+** You should have received a copy of the GNU General Public License
+** along with this program; if not, write to the Free Software
+** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+**
+** File: reporting.cpp
+**
+**/
+
+#include "nxcore.h"
+
+class RSConnector : public ISC
+{
+public:
+   RSConnector(DWORD addr, DWORD port) : ISC(addr, port)
+   {
+   }
+
+   virtual void PrintMsg(const TCHAR *format, ...)
+   {
+      va_list args;
+      va_start(args, format);
+      DbgPrintf2(7, format, args);
+      va_end(args);
+   }
+};
+
+static RSConnector *m_connector = NULL;
+
+THREAD_RESULT THREAD_CALL ReportingServerConnector(void *arg)
+{
+       TCHAR hostname[256];
+       ConfigReadStr(_T("ReportingServerHostname"), hostname, 256, _T("localhost"));
+   DWORD port = ConfigReadInt(_T("ReportingServerPort"), 4710);
+
+       DbgPrintf(1, _T("Reporting Server connector started (%s:%d)"), hostname, port);
+
+   // Keep connection open
+   m_connector = new RSConnector(ResolveHostName(hostname), port);
+   while(!IsShutdownInProgress())
+   {
+      if (m_connector->Nop() == ISC_ERR_SUCCESS)
+      {
+         ThreadSleep(1);
+      }
+      else
+      {
+         if (m_connector->Connect(0) == ISC_ERR_SUCCESS)
+         {
+            DbgPrintf(6, _T("Connection to Reporting Server restored"));
+         }
+         else
+         {
+            ThreadSleep(1);
+         }
+      }
+       }
+   m_connector->Disconnect();
+       delete m_connector;
+   m_connector = NULL;
+
+       DbgPrintf(1, _T("Reporting Server connector stopped"));
+   return THREAD_OK;
+}
+
+CSCPMessage *ForwardMessageToReportingServer(CSCPMessage *request)
+{
+   CSCPMessage *reply = NULL;
+
+   DWORD originalId = request->GetId();
+
+   if (m_connector != NULL)
+   {
+      request->SetId(0); // force ISC to generate unique ID
+      if (m_connector->SendMessage(request))
+      {
+         reply = m_connector->WaitForMessage(CMD_REQUEST_COMPLETED, request->GetId(), 10000);
+      }
+   }
+
+   if (reply != NULL)
+   {
+      reply->SetId(originalId);
+   }
+
+   return reply;
+}
index 8741061..fbeed1a 100644 (file)
@@ -71,6 +71,7 @@ extern Queue *g_pItemQueue;
 
 void UnregisterClientSession(DWORD dwIndex);
 void ResetDiscoveryPoller();
+CSCPMessage *ForwardMessageToReportingServer(CSCPMessage *request);
 
 /**
  * Node poller start data
@@ -167,6 +168,7 @@ DEFINE_THREAD_STARTER(getNetworkPath)
 DEFINE_THREAD_STARTER(queryParameter)
 DEFINE_THREAD_STARTER(queryAgentTable)
 DEFINE_THREAD_STARTER(getAlarmEvents)
+DEFINE_THREAD_STARTER(forwardToReportingServer)
 
 /**
  * Client communication read thread starter
@@ -1334,6 +1336,13 @@ void ClientSession::processingThread()
             querySummaryTable(pMsg);
             break;
          default:
+            if ((m_wCurrentCmd >> 8) == 0x11)
+            {
+               // Reporting Server range (0x1100 - 0x11FF)
+               CALL_IN_NEW_THREAD(forwardToReportingServer, pMsg);
+               break;
+            }
+
             // Pass message to loaded modules
             for(i = 0; i < g_dwNumModules; i++)
                                {
@@ -12472,3 +12481,23 @@ void ClientSession::querySummaryTable(CSCPMessage *request)
    // Send response
    sendMessage(&msg);
 }
+
+/**
+ * Forward event to Reporting Server
+ */
+void ClientSession::forwardToReportingServer(CSCPMessage *request)
+{
+   TCHAR buffer[256];
+   debugPrintf(7, _T("RS: Forwarding message %s"), NXCPMessageCodeName(request->GetCode(), buffer));
+
+   CSCPMessage *msg = ForwardMessageToReportingServer(request);
+   if (msg == NULL)
+   {
+      msg = new CSCPMessage();
+      msg->SetCode(CMD_REQUEST_COMPLETED);
+      msg->SetVariable(VID_RCC, RCC_COMM_FAILURE);
+   }
+
+   sendMessage(msg);
+   delete msg;
+}
index f1fd6c9..9281f4b 100644 (file)
@@ -444,6 +444,7 @@ private:
        DECLARE_THREAD_STARTER(renderReport)
        DECLARE_THREAD_STARTER(getNetworkPath)
        DECLARE_THREAD_STARTER(getAlarmEvents)
+       DECLARE_THREAD_STARTER(forwardToReportingServer)
 
    void readThread();
    void writeThread();
@@ -642,6 +643,7 @@ private:
    void modifySummaryTable(CSCPMessage *request);
    void deleteSummaryTable(CSCPMessage *request);
    void querySummaryTable(CSCPMessage *request);
+   void forwardToReportingServer(CSCPMessage *request);
 
 public:
    ClientSession(SOCKET hSocket, struct sockaddr *addr);
index 13f56d6..7f31144 100644 (file)
@@ -445,6 +445,11 @@ BOOL ISC::SendMessage(CSCPMessage *pMsg)
        if (!(m_flags & ISCF_IS_CONNECTED))
                return FALSE;
 
+   if (pMsg->GetId() == 0)
+   {
+      pMsg->SetId(m_requestId++);
+   }
+
    pRawMsg = pMsg->CreateMessage();
    if (m_ctx != NULL)
    {