fixed data reconciliation problems when server is not master server (issue #1231)
authorVictor Kirhenshtein <victor@netxms.org>
Wed, 25 May 2016 18:06:57 +0000 (21:06 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Wed, 25 May 2016 18:06:57 +0000 (21:06 +0300)
include/nms_agent.h
src/agent/core/comm.cpp
src/agent/core/datacoll.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp

index 22c0e8a..880cf0d 100644 (file)
@@ -426,6 +426,7 @@ class AbstractCommSession : public RefCountObject
 public:
    virtual bool isMasterServer() = 0;
    virtual bool isControlServer() = 0;
+   virtual bool canAcceptData() = 0;
    virtual bool canAcceptTraps() = 0;
    virtual bool canAcceptFileUpdates() = 0;
    virtual UINT64 getServerId() = 0;
index 2bf8f15..e65418b 100644 (file)
@@ -152,7 +152,7 @@ AbstractCommSession *FindServerSession(UINT64 serverId)
    MutexLock(g_hSessionListAccess);
    for(UINT32 i = 0; i < g_dwMaxSessions; i++)
    {
-      if ((g_pSessionList[i] != NULL) && (g_pSessionList[i]->getServerId() == serverId) && g_pSessionList[i]->canAcceptTraps())
+      if ((g_pSessionList[i] != NULL) && (g_pSessionList[i]->getServerId() == serverId))
       {
          session = g_pSessionList[i];
          session->incRefCount();
index 4308fb0..b596dd4 100644 (file)
@@ -402,6 +402,14 @@ void DataElement::saveToDatabase(DB_STATEMENT hStmt)
    DBExecute(hStmt);
 }
 
+/**
+ * Session comparator
+ */
+static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
+{
+   return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
+}
+
 /**
  * Send collected data to server
  */
@@ -412,7 +420,7 @@ bool DataElement::sendToServer(bool reconciliation)
    if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
       return true;
 
-   CommSession *session = (CommSession *)FindServerSession(m_serverId);
+   CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
    if (session == NULL)
       return false;
 
@@ -536,9 +544,9 @@ static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
 /**
  * Session comparator
  */
-static bool SessionComparator(AbstractCommSession *session, void *data)
+static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
 {
-   if ((session->getServerId() == 0) || !session->canAcceptTraps())
+   if ((session->getServerId() == 0) || !session->canAcceptData())
       return false;
    ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
    if ((s != NULL) && (s->queueSize > 0))
@@ -562,7 +570,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
    {
       // Check if there is something to sync
       MutexLock(s_serverSyncStatusLock);
-      CommSession *session = (CommSession *)FindServerSession(SessionComparator, NULL);
+      CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
       MutexUnlock(s_serverSyncStatusLock);
       if (session == NULL)
       {
@@ -789,6 +797,7 @@ public:
 
    virtual bool isMasterServer() { return false; }
    virtual bool isControlServer() { return false; }
+   virtual bool canAcceptData() { return true; }
    virtual bool canAcceptTraps() { return true; }
    virtual bool canAcceptFileUpdates() { return false; }
    virtual UINT64 getServerId() { return m_serverId; };
index a01ecf7..e3ddea2 100644 (file)
@@ -329,6 +329,7 @@ private:
    bool m_masterServer;
    bool m_controlServer;
    bool m_proxyConnection;
+   bool m_acceptData;
    bool m_acceptTraps;
    bool m_acceptFileUpdates;
    bool m_ipv6Aware;
@@ -393,6 +394,7 @@ public:
    time_t getTimeStamp() { return m_ts; }
        void updateTimeStamp() { m_ts = time(NULL); }
 
+   bool canAcceptData() { return m_acceptData; }
    bool canAcceptTraps() { return m_acceptTraps; }
    bool canAcceptFileUpdates() { return m_acceptFileUpdates; }
    bool isBulkReconciliationSupported() { return m_bulkReconciliationSupported; }
index bc22abf..8c803f1 100644 (file)
@@ -634,6 +634,7 @@ void CommSession::processingThread()
                if (m_serverId != 0)
                {
                   ConfigureDataCollection(m_serverId, pMsg);
+                  m_acceptData = true;
                   msg.setField(VID_RCC, ERR_SUCCESS);
                }
                else