agent data reconciliation block size and timeout can be configured
authorVictor Kirhenshtein <victor@netxms.org>
Tue, 14 Jun 2016 08:01:58 +0000 (11:01 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Tue, 14 Jun 2016 08:01:58 +0000 (11:01 +0300)
include/nms_agent.h
src/agent/core/datacoll.cpp
src/agent/core/nxagentd.cpp
src/server/core/agent.cpp

index dae9859..a433241 100644 (file)
 /**
  * Max bulk data block size
  */
-#define BULK_DATA_BLOCK_SIZE     1024
+#define MAX_BULK_DATA_BLOCK_SIZE 8192
 
 /**
  * Parameter handler return codes
index f223739..9bdd787 100644 (file)
@@ -28,6 +28,9 @@
 void UpdateSnmpTarget(SNMPTarget *target);
 UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
 
+extern UINT32 g_dcReconciliationBlockSize;
+extern UINT32 g_dcReconciliationTimeout;
+
 /**
  * Data collector start indicator
  */
@@ -572,7 +575,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
 {
    DB_HANDLE hdb = GetLocalDatabaseHandle();
    UINT32 sleepTime = 30000;
-   DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread started"));
+   nxlog_debug(1, _T("Data reconciliation thread started (block size %d, timeout %d ms)"), g_dcReconciliationBlockSize, g_dcReconciliationTimeout);
 
    bool vacuumNeeded = false;
    while(!AgentSleepAndCheckForShutdown(sleepTime))
@@ -612,7 +615,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
       }
 
       TCHAR query[1024];
-      _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
+      _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), g_dcReconciliationBlockSize);
 
       TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
       DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
@@ -678,7 +681,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
                fieldId += 10;
             }
 
-            NXCPMessage *response = session->doRequestEx(&msg, 15000);
+            NXCPMessage *response = session->doRequestEx(&msg, g_dcReconciliationTimeout);
             if (response != NULL)
             {
                UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
@@ -688,9 +691,9 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
                   ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
 
                   // Check status for each data element
-                  BYTE status[BULK_DATA_BLOCK_SIZE];
-                  memset(status, 0, BULK_DATA_BLOCK_SIZE);
-                  response->getFieldAsBinary(VID_STATUS, status, BULK_DATA_BLOCK_SIZE);
+                  BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
+                  memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
+                  response->getFieldAsBinary(VID_STATUS, status, MAX_BULK_DATA_BLOCK_SIZE);
                   bulkSendList.setOwner(false);
                   for(int i = 0; i < bulkSendList.size(); i++)
                   {
@@ -731,7 +734,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
                DBQuery(hdb, query);
             }
             DBCommit(hdb);
-            DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: %d records sent"), deleteList.size());
+            nxlog_debug(4, _T("ReconciliationThread: %d records sent"), deleteList.size());
             vacuumNeeded = true;
          }
       }
@@ -741,7 +744,7 @@ static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
       sleepTime = (count > 0) ? 50 : 30000;
    }
 
-   DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread stopped"));
+   nxlog_debug(1, _T("Data reconciliation thread stopped"));
    return THREAD_OK;
 }
 
@@ -1152,6 +1155,28 @@ void StartLocalDataCollector()
       return;
    }
 
+   if (g_dcReconciliationBlockSize < 16)
+   {
+      nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to 16"), g_dcReconciliationBlockSize);
+      g_dcReconciliationBlockSize = 16;
+   }
+   else if (g_dcReconciliationBlockSize > MAX_BULK_DATA_BLOCK_SIZE)
+   {
+      nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to %d"), g_dcReconciliationBlockSize, MAX_BULK_DATA_BLOCK_SIZE);
+      g_dcReconciliationBlockSize = MAX_BULK_DATA_BLOCK_SIZE;
+   }
+
+   if (g_dcReconciliationTimeout < 1000)
+   {
+      nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 1000"), g_dcReconciliationTimeout);
+      g_dcReconciliationTimeout = 1000;
+   }
+   else if (g_dcReconciliationTimeout > 600000)
+   {
+      nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 600000"), g_dcReconciliationTimeout);
+      g_dcReconciliationTimeout = 600000;
+   }
+
    s_itemLock = MutexCreate();
    s_serverSyncStatusLock = MutexCreate();
 
index 768463a..6741fc2 100644 (file)
@@ -163,6 +163,8 @@ UINT32 g_dwStartupDelay = 0;
 UINT32 g_dwMaxSessions = 32;
 UINT32 g_dwSNMPTrapPort = 162;
 UINT32 g_longRunningQueryThreshold = 250;
+UINT32 g_dcReconciliationBlockSize = 1024;
+UINT32 g_dcReconciliationTimeout = 15000;
 #ifdef _WIN32
 UINT16 g_sessionAgentPort = 28180;
 #else
@@ -231,6 +233,8 @@ static NX_CFG_TEMPLATE m_cfgTemplate[] =
    { _T("ControlServers"), CT_STRING_LIST, ',', 0, 0, 0, &m_pszControlServerList, NULL },
    { _T("CreateCrashDumps"), CT_BOOLEAN, 0, 0, AF_CATCH_EXCEPTIONS, 0, &g_dwFlags, NULL },
        { _T("DataDirectory"), CT_STRING, 0, 0, MAX_PATH, 0, g_szDataDirectory, NULL },
+   { _T("DataReconciliationBlockSize"), CT_LONG, 0, 0, 0, 0, &g_dcReconciliationBlockSize, NULL },
+   { _T("DataReconciliationTimeout"), CT_LONG, 0, 0, 0, 0, &g_dcReconciliationTimeout, NULL },
    { _T("DailyLogFileSuffix"), CT_STRING, 0, 0, 64, 0, s_dailyLogFileSuffix, NULL },
        { _T("DebugLevel"), CT_LONG, 0, 0, 0, 0, &s_debugLevel, &s_debugLevel },
    { _T("DisableIPv4"), CT_BOOLEAN, 0, 0, AF_DISABLE_IPV4, 0, &g_dwFlags, NULL },
@@ -712,7 +716,8 @@ BOOL Initialize()
                   ((tail != '\\') && (tail != '/')) ? FS_PATH_SEPARATOR : _T(""),
               CONFIG_AP_FOLDER FS_PATH_SEPARATOR);
        CreateFolder(g_szConfigPolicyDir);
-       //load configuration
+
+       // Load configuration
    g_config->loadConfigDirectory(g_szConfigPolicyDir, _T("agent"));
        g_config->parseTemplate(_T("agent"), m_cfgTemplate);
 
@@ -835,6 +840,7 @@ BOOL Initialize()
 
                // Add built-in actions
                AddAction(_T("Agent.Restart"), AGENT_ACTION_SUBAGENT, NULL, H_RestartAgent, _T("CORE"), _T("Restart agent"));
+
                //Add build-in DCIs
       AddParameter(_T("Agent.LogFile"), H_FailStatusProvider, _T("L"), DCI_DT_UINT, _T("Get log status"));
       AddParameter(_T("Agent.LocalDatabase"), H_FailStatusProvider, _T("D"), DCI_DT_UINT, _T("Get database status"));
index 68a7922..053e76f 100644 (file)
@@ -617,12 +617,12 @@ UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMes
    }
 
    int count = request->getFieldAsInt16(VID_NUM_ELEMENTS);
-   if (count > BULK_DATA_BLOCK_SIZE)
-      count = BULK_DATA_BLOCK_SIZE;
+   if (count > MAX_BULK_DATA_BLOCK_SIZE)
+      count = MAX_BULK_DATA_BLOCK_SIZE;
    DbgPrintf(5, _T("AgentConnectionEx::processBulkCollectedData: %d elements from node %s [%d]"), count, node->getName(), node->getId());
 
-   BYTE status[BULK_DATA_BLOCK_SIZE];
-   memset(status, 0, BULK_DATA_BLOCK_SIZE);
+   BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
+   memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
    UINT32 fieldId = VID_ELEMENT_LIST_BASE;
    for(int i = 0; i < count; i++, fieldId += 10)
    {
@@ -700,6 +700,6 @@ UINT32 AgentConnectionEx::processBulkCollectedData(NXCPMessage *request, NXCPMes
       free(value);
    }
 
-   response->setField(VID_STATUS, status, BULK_DATA_BLOCK_SIZE);
+   response->setField(VID_STATUS, status, count);
    return ERR_SUCCESS;
 }