raw_dci_values UPDATE optimization; next build number; minor fixes in Windows build
authorVictor Kirhenshtein <victor@netxms.org>
Fri, 5 Sep 2014 12:31:03 +0000 (15:31 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Fri, 5 Sep 2014 12:31:03 +0000 (15:31 +0300)
21 files changed:
android/src/agent/res/values/build_number.xml
android/src/console/res/values/build_number.xml
build/build_number
include/build.h
include/nms_cscp.h
src/install/windows/netxms-x64.iss
src/java/build/set_build_number.cmd
src/java/netxms-base/src/main/java/org/netxms/base/BuildNumber.java
src/java/netxms-base/src/main/java/org/netxms/base/NXCPCodes.java
src/libnetxms/main.cpp
src/server/core/datacoll.cpp
src/server/core/dbwrite.cpp
src/server/core/dcitem.cpp
src/server/core/dctarget.cpp
src/server/core/ldap.cpp
src/server/core/main.cpp
src/server/core/node.cpp
src/server/core/nxslext.cpp
src/server/core/session.cpp
src/server/include/nms_core.h
src/server/include/nms_dcoll.h

index e1c06de..dfd624f 100644 (file)
@@ -1,4 +1,4 @@
 <?xml version="1.0" encoding="utf-8"?>
 <resources>
-       <string name="build_number">7887</string>
+       <string name="build_number">7888</string>
 </resources>
index e1c06de..dfd624f 100644 (file)
@@ -1,4 +1,4 @@
 <?xml version="1.0" encoding="utf-8"?>
 <resources>
-       <string name="build_number">7887</string>
+       <string name="build_number">7888</string>
 </resources>
index f2d0560..786ea6e 100644 (file)
@@ -1 +1 @@
-7887
+7888
index 437945a..5c2e062 100644 (file)
@@ -1,5 +1,5 @@
 #ifndef __build_h
 #define __build_h
-#define NETXMS_VERSION_BUILD 7887
-#define NETXMS_VERSION_BUILD_STRING _T("7887")
+#define NETXMS_VERSION_BUILD 7888
+#define NETXMS_VERSION_BUILD_STRING _T("7888")
 #endif
index a437e5f..46e1090 100644 (file)
@@ -994,6 +994,7 @@ typedef struct
 #define VID_INCLUDE_NOVALUE_OBJECTS ((UINT32)481)
 #define VID_RECEIVE_OUTPUT          ((UINT32)482)
 #define VID_SESSION_STATE           ((UINT32)483)
+#define VID_PAGE_SIZE               ((UINT32)484)
 
 // Base variabe for single threshold in message
 #define VID_THRESHOLD_BASE          ((UINT32)0x00800000)
index aa1abd6..b62fd06 100644 (file)
@@ -1,7 +1,7 @@
 ; Installation script for NetXMS Server / Windows x64
 
 #include "setup.iss"
-OutputBaseFilename=netxms-1.2.17-x64
+OutputBaseFilename=netxms-1.2.17-7888-x64
 ArchitecturesInstallIn64BitMode=x64
 ArchitecturesAllowed=x64
 
index 0c2fed6..707f09e 100644 (file)
@@ -1,5 +1,5 @@
 package org.netxms.base;
 public final class BuildNumber {
-   public static final String TEXT = "7887";
-   public static final int NUMBER = 7887;
+   public static final String TEXT = "7888";
+   public static final int NUMBER = 7888;
 }
index 701f12f..cf2ad23 100644 (file)
@@ -819,6 +819,8 @@ public class NXCPCodes
    public static final long VID_ROOT = 480;
    public static final long VID_INCLUDE_NOVALUE_OBJECTS = 481;
    public static final long VID_RECEIVE_OUTPUT = 482;
+   public static final long VID_SESSION_STATE = 483;
+   public static final long VID_PAGE_SIZE = 484;
 
        public static final long VID_ACL_USER_BASE = 0x00001000L;
        public static final long VID_ACL_USER_LAST = 0x00001FFFL;
index c5732c7..f32ac60 100644 (file)
@@ -43,11 +43,9 @@ QWORD LIBNETXMS_EXPORTABLE __bswap_64(QWORD qwVal)
 
 #endif
 
-
-//
-// Swap bytes in double
-//
-
+/**
+ * Swap bytes in double
+ */
 double LIBNETXMS_EXPORTABLE __bswap_double(double dVal)
 {
    double dResult;
@@ -61,11 +59,9 @@ double LIBNETXMS_EXPORTABLE __bswap_double(double dVal)
    return dResult;
 }
 
-
-//
-// Swap bytes in wide string (UCS-2)
-//
-
+/**
+ * Swap bytes in wide string (UCS-2)
+ */
 void LIBNETXMS_EXPORTABLE __bswap_wstr(UCS2CHAR *pStr)
 {
    UCS2CHAR *pch;
@@ -74,13 +70,11 @@ void LIBNETXMS_EXPORTABLE __bswap_wstr(UCS2CHAR *pStr)
       *pch = htons(*pch);
 }
 
-
-//
-// strupr() implementation for non-windows platforms
-//
-
 #if !defined(_WIN32) && !defined(_NETWARE)
 
+/**
+ * strupr() implementation for non-windows platforms
+ */
 void LIBNETXMS_EXPORTABLE strupr(char *in)
 {
        char *p = in;
@@ -99,6 +93,9 @@ void LIBNETXMS_EXPORTABLE strupr(char *in)
 
 #if defined(UNICODE_UCS2) || defined(UNICODE_UCS4)
 
+/**
+ * wcsupr() implementation for non-windows platforms
+ */
 void LIBNETXMS_EXPORTABLE wcsupr(WCHAR *in)
 {
        WCHAR *p = in;
index 7adc696..c0a59ba 100644 (file)
@@ -41,6 +41,7 @@ extern Queue g_syslogWriteQueue;
 double g_dAvgPollerQueueSize = 0;
 double g_dAvgDBWriterQueueSize = 0;
 double g_dAvgIDataWriterQueueSize = 0;
+double g_dAvgRawDataWriterQueueSize = 0;
 double g_dAvgDBAndIDataWriterQueueSize = 0;
 double g_dAvgStatusPollerQueueSize = 0;
 double g_dAvgConfigPollerQueueSize = 0;
@@ -357,14 +358,15 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
 {
    UINT32 i, currPos = 0;
    UINT32 pollerQS[12], dbWriterQS[12];
-   UINT32 iDataWriterQS[12], dbAndIDataWriterQS[12];
+   UINT32 iDataWriterQS[12], rawDataWriterQS[12], dbAndIDataWriterQS[12];
    UINT32 statusPollerQS[12], configPollerQS[12];
    UINT32 syslogProcessingQS[12], syslogWriterQS[12];
-   double sum1, sum2, sum3, sum4, sum5, sum6, sum7, sum8;
+   double sum1, sum2, sum3, sum4, sum5, sum6, sum7, sum8, sum9;
 
    memset(pollerQS, 0, sizeof(UINT32) * 12);
    memset(dbWriterQS, 0, sizeof(UINT32) * 12);
    memset(iDataWriterQS, 0, sizeof(UINT32) * 12);
+   memset(rawDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(dbAndIDataWriterQS, 0, sizeof(UINT32) * 12);
    memset(statusPollerQS, 0, sizeof(UINT32) * 12);
    memset(configPollerQS, 0, sizeof(UINT32) * 12);
@@ -373,6 +375,7 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
    g_dAvgPollerQueueSize = 0;
    g_dAvgDBWriterQueueSize = 0;
    g_dAvgIDataWriterQueueSize = 0;
+   g_dAvgRawDataWriterQueueSize = 0;
    g_dAvgDBAndIDataWriterQueueSize = 0;
    g_dAvgStatusPollerQueueSize = 0;
    g_dAvgConfigPollerQueueSize = 0;
@@ -385,9 +388,10 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
 
       // Get current values
       pollerQS[currPos] = g_pItemQueue->Size();
-      dbWriterQS[currPos] = g_pLazyRequestQueue->Size();
-      iDataWriterQS[currPos] = g_pIDataInsertQueue->Size();
-      dbAndIDataWriterQS[currPos] = g_pLazyRequestQueue->Size() + g_pIDataInsertQueue->Size();
+      dbWriterQS[currPos] = g_dbWriterQueue->Size();
+      iDataWriterQS[currPos] = g_dciDataWriterQueue->Size();
+      rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->Size();
+      dbAndIDataWriterQS[currPos] = g_dbWriterQueue->Size() + g_dciDataWriterQueue->Size() + g_dciRawDataWriterQueue->Size();
       statusPollerQS[currPos] = g_statusPollQueue.Size();
       configPollerQS[currPos] = g_configPollQueue.Size();
       syslogProcessingQS[currPos] = g_syslogProcessingQueue.Size();
@@ -397,25 +401,27 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
          currPos = 0;
 
       // Calculate new averages
-      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum6 = 0, sum7 = 0, sum8 = 0; i < 12; i++)
+      for(i = 0, sum1 = 0, sum2 = 0, sum3 = 0, sum4 = 0, sum5 = 0, sum6 = 0, sum7 = 0, sum8 = 0, sum9 = 0; i < 12; i++)
       {
          sum1 += pollerQS[i];
          sum2 += dbWriterQS[i];
          sum3 += iDataWriterQS[i];
-         sum4 += dbAndIDataWriterQS[i];
-         sum5 += statusPollerQS[i];
-         sum6 += configPollerQS[i];
-         sum7 += syslogProcessingQS[i];
-         sum8 += syslogWriterQS[i];
+         sum4 += rawDataWriterQS[i];
+         sum5 += dbAndIDataWriterQS[i];
+         sum6 += statusPollerQS[i];
+         sum7 += configPollerQS[i];
+         sum8 += syslogProcessingQS[i];
+         sum9 += syslogWriterQS[i];
       }
       g_dAvgPollerQueueSize = sum1 / 12;
       g_dAvgDBWriterQueueSize = sum2 / 12;
       g_dAvgIDataWriterQueueSize = sum3 / 12;
-      g_dAvgDBAndIDataWriterQueueSize = sum4 / 12;
-      g_dAvgStatusPollerQueueSize = sum5 / 12;
-      g_dAvgConfigPollerQueueSize = sum6 / 12;
-      g_dAvgSyslogProcessingQueueSize = sum7 / 12;
-      g_dAvgSyslogWriterQueueSize = sum8 / 12;
+      g_dAvgRawDataWriterQueueSize = sum4 / 12;
+      g_dAvgDBAndIDataWriterQueueSize = sum5 / 12;
+      g_dAvgStatusPollerQueueSize = sum6 / 12;
+      g_dAvgConfigPollerQueueSize = sum7 / 12;
+      g_dAvgSyslogProcessingQueueSize = sum8 / 12;
+      g_dAvgSyslogWriterQueueSize = sum9 / 12;
    }
    return THREAD_OK;
 }
index 9bad421..53c0507 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS - Network Management System
-** Copyright (C) 2003-2011 Victor Kirhenshtein
+** Copyright (C) 2003-2014 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
 
 #include "nxcore.h"
 
-
-//
-// Constants
-//
-
+/**
+ * MAximum supported number of database writers
+ */
 #define MAX_DB_WRITERS     16
 
+/**
+ * Generic DB writer queue
+ */
+Queue *g_dbWriterQueue = NULL;
 
-//
-// Global variables
-//
-
-Queue *g_pLazyRequestQueue = NULL;
-Queue *g_pIDataInsertQueue = NULL;
-
+/**
+ * DCI data (idata_* tables) writer queue
+ */
+Queue *g_dciDataWriterQueue = NULL;
 
-//
-// Static data
-//
+/**
+ * Raw DCI data writer queue
+ */
+Queue *g_dciRawDataWriterQueue = NULL;
 
-static int m_iNumWriters = 1;
+/**
+ * Static data
+ */
+static int m_numWriters = 1;
 static THREAD m_hWriteThreadList[MAX_DB_WRITERS];
 static THREAD m_hIDataWriterThread;
+static THREAD m_hRawDataWriterThread;
 
-
-//
-// Put SQL request into queue for later execution
-//
-
+/**
+ * Put SQL request into queue for later execution
+ */
 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query)
 {
        DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)malloc(sizeof(DELAYED_SQL_REQUEST) + (_tcslen(query) + 1) * sizeof(TCHAR));
        rq->query = (TCHAR *)&rq->bindings[0];
        _tcscpy(rq->query, query);
        rq->bindCount = 0;
-   g_pLazyRequestQueue->Put(rq);
+   g_dbWriterQueue->Put(rq);
        DbgPrintf(8, _T("SQL request queued: %s"), query);
 }
 
@@ -95,15 +97,13 @@ void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *s
                        pos += align - pos % align;
        }
 
-   g_pLazyRequestQueue->Put(rq);
+   g_dbWriterQueue->Put(rq);
        DbgPrintf(8, _T("SQL request queued: %s"), query);
 }
 
-
-//
-// Queue INSERT request for idata_xxx table
-//
-
+/**
+ * Queue INSERT request for idata_xxx table
+ */
 void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR *value)
 {
        DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)malloc(sizeof(DELAYED_IDATA_INSERT));
@@ -111,7 +111,20 @@ void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR
        rq->nodeId = nodeId;
        rq->dciId = dciId;
        nx_strncpy(rq->value, value, MAX_RESULT_LENGTH);
-       g_pIDataInsertQueue->Put(rq);
+       g_dciDataWriterQueue->Put(rq);
+}
+
+/**
+ * Queue UPDATE request for raw_dci_values table
+ */
+void QueueRawDciDataUpdate(time_t timestamp, UINT32 dciId, const TCHAR *rawValue, const TCHAR *transformedValue)
+{
+       DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)malloc(sizeof(DELAYED_RAW_DATA_UPDATE));
+       rq->timestamp = timestamp;
+       rq->dciId = dciId;
+       nx_strncpy(rq->rawValue, rawValue, MAX_RESULT_LENGTH);
+       nx_strncpy(rq->transformedValue, transformedValue, MAX_RESULT_LENGTH);
+       g_dciRawDataWriterQueue->Put(rq);
 }
 
 /**
@@ -138,7 +151,7 @@ static THREAD_RESULT THREAD_CALL DBWriteThread(void *arg)
 
    while(1)
    {
-      DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_pLazyRequestQueue->GetOrBlock();
+      DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_dbWriterQueue->GetOrBlock();
       if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
          break;
 
@@ -193,7 +206,7 @@ static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
 
    while(1)
    {
-               DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_pIDataInsertQueue->GetOrBlock();
+               DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->GetOrBlock();
       if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
          break;
 
@@ -225,7 +238,83 @@ static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
                                if (!success || (count > 1000))
                                        break;
 
-                               rq = (DELAYED_IDATA_INSERT *)g_pIDataInsertQueue->Get();
+                               rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->Get();
+                               if (rq == NULL)
+                                       break;
+                               if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
+                                       goto stop;
+                       }
+                       DBCommit(hdb);
+               }
+               else
+               {
+                       free(rq);
+               }
+       }
+
+stop:
+   if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
+   {
+      DBDisconnect(hdb);
+   }
+   return THREAD_OK;
+}
+
+/**
+ * Database "lazy" write thread for raw_dci_values UPDATEs
+ */
+static THREAD_RESULT THREAD_CALL RawDataWriteThread(void *arg)
+{
+   DB_HANDLE hdb;
+
+   if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
+   {
+               TCHAR errorText[DBDRV_MAX_ERROR_TEXT];
+      hdb = DBConnect(g_dbDriver, g_szDbServer, g_szDbName, g_szDbLogin, g_szDbPassword, g_szDbSchema, errorText);
+      if (hdb == NULL)
+      {
+         nxlog_write(MSG_DB_CONNFAIL, EVENTLOG_ERROR_TYPE, "s", errorText);
+         return THREAD_OK;
+      }
+   }
+   else
+   {
+      hdb = g_hCoreDB;
+   }
+
+   while(1)
+   {
+               DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->GetOrBlock();
+      if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
+         break;
+
+               if (DBBegin(hdb))
+               {
+                       int count = 0;
+                       while(1)
+                       {
+                               BOOL success;
+                               DB_STATEMENT hStmt = DBPrepare(hdb, _T("UPDATE raw_dci_values SET raw_value=?,transformed_value=?,last_poll_time=? WHERE item_id=?"));
+                               if (hStmt != NULL)
+                               {
+                                       DBBind(hStmt, 1, DB_SQLTYPE_VARCHAR, rq->rawValue, DB_BIND_STATIC);
+                                       DBBind(hStmt, 2, DB_SQLTYPE_VARCHAR, rq->transformedValue, DB_BIND_STATIC);
+                                       DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (INT64)rq->timestamp);
+                                       DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, rq->dciId);
+                                       success = DBExecute(hStmt);
+                                       DBFreeStatement(hStmt);
+                               }
+                               else
+                               {
+                                       success = FALSE;
+                               }
+                               free(rq);
+
+                               count++;
+                               if (!success || (count > 1000))
+                                       break;
+
+                               rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->Get();
                                if (rq == NULL)
                                        break;
                                if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
@@ -256,17 +345,18 @@ void StartDBWriter()
 
    if (g_flags & AF_ENABLE_MULTIPLE_DB_CONN)
    {
-      m_iNumWriters = ConfigReadInt(_T("NumberOfDatabaseWriters"), 1);
-      if (m_iNumWriters < 1)
-         m_iNumWriters = 1;
-      if (m_iNumWriters > MAX_DB_WRITERS)
-         m_iNumWriters = MAX_DB_WRITERS;
+      m_numWriters = ConfigReadInt(_T("NumberOfDatabaseWriters"), 1);
+      if (m_numWriters < 1)
+         m_numWriters = 1;
+      if (m_numWriters > MAX_DB_WRITERS)
+         m_numWriters = MAX_DB_WRITERS;
    }
 
-   for(i = 0; i < m_iNumWriters; i++)
+   for(i = 0; i < m_numWriters; i++)
       m_hWriteThreadList[i] = ThreadCreateEx(DBWriteThread, 0, NULL);
 
        m_hIDataWriterThread = ThreadCreateEx(IDataWriteThread, 0, NULL);
+       m_hRawDataWriterThread = ThreadCreateEx(RawDataWriteThread, 0, NULL);
 }
 
 /**
@@ -276,11 +366,13 @@ void StopDBWriter()
 {
    int i;
 
-   for(i = 0; i < m_iNumWriters; i++)
-      g_pLazyRequestQueue->Put(INVALID_POINTER_VALUE);
-   for(i = 0; i < m_iNumWriters; i++)
+   for(i = 0; i < m_numWriters; i++)
+      g_dbWriterQueue->Put(INVALID_POINTER_VALUE);
+   for(i = 0; i < m_numWriters; i++)
       ThreadJoin(m_hWriteThreadList[i]);
 
-       g_pIDataInsertQueue->Put(INVALID_POINTER_VALUE);
+       g_dciDataWriterQueue->Put(INVALID_POINTER_VALUE);
+       g_dciRawDataWriterQueue->Put(INVALID_POINTER_VALUE);
        ThreadJoin(m_hIDataWriterThread);
+       ThreadJoin(m_hRawDataWriterThread);
 }
index 8fd040b..d7e6613 100644 (file)
@@ -691,31 +691,8 @@ bool DCItem::processNewValue(time_t tmTimeStamp, void *originalValue)
    m_prevRawValue = rawValue;
    m_tPrevValueTimeStamp = tmTimeStamp;
 
-       // Prepare SQL statement bindings
-       TCHAR dciId[32], pollTime[32];
-       _sntprintf(dciId, 32, _T("%d"), (int)m_dwId);
-       _sntprintf(pollTime, 32, _T("%ld"), (long)tmTimeStamp);
-
    // Save raw value into database
-       const TCHAR *values[4];
-       if (_tcslen((const TCHAR *)originalValue) >= MAX_DB_STRING)
-       {
-               // need to be truncated
-               TCHAR *temp = _tcsdup((const TCHAR *)originalValue);
-               temp[MAX_DB_STRING - 1] = 0;
-               values[0] = temp;
-       }
-       else
-       {
-               values[0] = (const TCHAR *)originalValue;
-       }
-       values[1] = pValue->getString();
-       values[2] = pollTime;
-       values[3] = dciId;
-       QueueSQLRequest(_T("UPDATE raw_dci_values SET raw_value=?,transformed_value=?,last_poll_time=? WHERE item_id=?"),
-                       4, updateRawValueTypes, values);
-       if ((void *)values[0] != originalValue)
-               free((void *)values[0]);
+   QueueRawDciDataUpdate(tmTimeStamp, m_dwId, (const TCHAR *)originalValue, pValue->getString());
 
        // Save transformed value to database
    if ((m_flags & DCF_NO_STORAGE) == 0)
index 8509323..419716c 100644 (file)
@@ -494,7 +494,7 @@ UINT32 DataCollectionTarget::getInternalItem(const TCHAR *param, size_t bufSize,
          dwError = DCE_NOT_SUPPORTED;
       }
    }
-   else if(MatchString(_T("PingTime(*)"), param, FALSE))
+   else if (MatchString(_T("PingTime(*)"), param, FALSE))
    {
       TCHAR *pEnd, szArg[256];
       UINT32 i, dwId;
@@ -530,13 +530,13 @@ UINT32 DataCollectionTarget::getInternalItem(const TCHAR *param, size_t bufSize,
          dwError = DCE_NOT_SUPPORTED;
       }
    }
-   else if(MatchString(_T("PingTime"), param, FALSE))
+   else if (!_tcsicmp(_T("PingTime"), param))
    {
       NetObj *pObject = NULL;
 
       // Find child object with requested ID or name
       LockChildList(FALSE);
-      for(int i = 0; i < m_dwChildCount; i++)
+      for(int i = 0; i < (int)m_dwChildCount; i++)
       {
          if (m_pChildList[i]->IpAddr() == m_dwIpAddr)
          {
index 1490906..2f4cbff 100644 (file)
@@ -213,14 +213,14 @@ void LDAPConnection::getAllSyncParameters()
    ConfigReadStr(_T("LdapSyncUser"), m_userDN, MAX_DB_STRING, _T(""));
    ConfigReadStr(_T("LdapSearchBase"), m_searchBase, MAX_DB_STRING, _T(""));
    ConfigReadStr(_T("LdapSearchFilter"), m_searchFilter, MAX_DB_STRING, _T("(objectClass=*)"));
-   if(!wcscmp("", m_searchFilter))
-      wcscpy(m_searchFilter, _T("(objectClass=*)"));
+   if (m_searchFilter[0] == 0)
+      _tcscpy(m_searchFilter, _T("(objectClass=*)"));
 #else
    ConfigReadStrUTF8(_T("LdapConnectionString"), m_connList, MAX_DB_STRING, "");
    ConfigReadStrUTF8(_T("LdapSyncUser"), m_userDN, MAX_DB_STRING, "");
    ConfigReadStrUTF8(_T("LdapSearchBase"), m_searchBase, MAX_DB_STRING, "");
    ConfigReadStrUTF8(_T("LdapSearchFilter"), m_searchFilter, MAX_DB_STRING, "(objectClass=*)");
-   if(!strcmp("", m_searchFilter))
+   if (m_searchFilter[0] == 0)
       strcmp(m_searchFilter, "(objectClass=*)");
 #endif
    ConfigReadStrUTF8(_T("LdapSyncUserPassword"), m_userPassword, MAX_DB_STRING, "");
index 781e010..c03df8f 100644 (file)
@@ -610,8 +610,9 @@ BOOL NXCORE_EXPORTABLE Initialize()
        InitLocalNetInfo();
 
        // Create queue for delayed SQL queries
-       g_pLazyRequestQueue = new Queue(256, 64);
-       g_pIDataInsertQueue = new Queue(1024, 1024);
+       g_dbWriterQueue = new Queue(256, 64);
+       g_dciDataWriterQueue = new Queue(1024, 1024);
+       g_dciRawDataWriterQueue = new Queue(1024, 1024);
 
        // Initialize database driver and connect to database
        DBSetDebugPrintCallback(DbgPrintf2);
@@ -1359,8 +1360,9 @@ int ProcessConsoleCommand(const TCHAR *pszCmdLine, CONSOLE_CTX pCtx)
                        ShowQueueStats(pCtx, &g_configPollQueue, _T("Configuration poller"));
                        ShowQueueStats(pCtx, &g_topologyPollQueue, _T("Topology poller"));
                        ShowQueueStats(pCtx, g_pItemQueue, _T("Data collector"));
-                       ShowQueueStats(pCtx, g_pLazyRequestQueue, _T("Database writer"));
-                       ShowQueueStats(pCtx, g_pIDataInsertQueue, _T("Database writer (IData)"));
+                       ShowQueueStats(pCtx, g_dbWriterQueue, _T("Database writer"));
+                       ShowQueueStats(pCtx, g_dciDataWriterQueue, _T("Database writer (IData)"));
+                       ShowQueueStats(pCtx, g_dciRawDataWriterQueue, _T("Database writer (raw DCI values)"));
                        ShowQueueStats(pCtx, g_pEventQueue, _T("Event processor"));
                        ShowQueueStats(pCtx, &g_discoveryPollQueue, _T("Network discovery poller"));
                        ShowQueueStats(pCtx, &g_nodePollerQueue, _T("Node poller"));
index 4ee90fc..53b689f 100644 (file)
@@ -3712,6 +3712,10 @@ UINT32 Node::getInternalItem(const TCHAR *param, size_t bufSize, TCHAR *buffer)
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgIDataWriterQueueSize);
       }
+      else if (!_tcsicmp(param, _T("Server.AverageDBWriterQueueSize.RawData")))
+      {
+         _sntprintf(buffer, bufSize, _T("%f"), g_dAvgRawDataWriterQueueSize);
+      }
       else if (!_tcsicmp(param, _T("Server.AverageStatusPollerQueueSize")))
       {
          _sntprintf(buffer, bufSize, _T("%f"), g_dAvgStatusPollerQueueSize);
index c7ce0ce..ed3dc6c 100644 (file)
@@ -111,7 +111,6 @@ static int F_SetCustomAttribute(int argc, NXSL_Value **argv, NXSL_Value **ppResu
 static int F_DeleteCustomAttribute(int argc, NXSL_Value **argv, NXSL_Value **ppResult, NXSL_VM *vm)
 {
        NXSL_Object *object;
-       const TCHAR *value;
 
        if (!argv[0]->isObject())
                return NXSL_ERR_NOT_OBJECT;
index 448f263..600bd22 100644 (file)
@@ -7915,7 +7915,7 @@ void ClientSession::sendServerStats(UINT32 dwRqId)
        msg.SetVariable(VID_QSIZE_CONDITION_POLLER, g_conditionPollerQueue.Size());
        msg.SetVariable(VID_QSIZE_CONF_POLLER, g_configPollQueue.Size());
        msg.SetVariable(VID_QSIZE_DCI_POLLER, g_pItemQueue->Size());
-       msg.SetVariable(VID_QSIZE_DBWRITER, g_pLazyRequestQueue->Size());
+       msg.SetVariable(VID_QSIZE_DBWRITER, g_dbWriterQueue->Size());
        msg.SetVariable(VID_QSIZE_EVENT, g_pEventQueue->Size());
        msg.SetVariable(VID_QSIZE_DISCOVERY, g_discoveryPollQueue.Size());
        msg.SetVariable(VID_QSIZE_NODE_POLLER, g_nodePollerQueue.Size());
index b737f7b..33aa98a 100644 (file)
@@ -770,6 +770,17 @@ typedef struct
 } DELAYED_IDATA_INSERT;
 
 /**
+ * Delayed request for raw_dci_values UPDATE
+ */
+typedef struct
+{
+       time_t timestamp;
+       UINT32 dciId;
+       TCHAR rawValue[MAX_RESULT_LENGTH];
+       TCHAR transformedValue[MAX_RESULT_LENGTH];
+} DELAYED_RAW_DATA_UPDATE;
+
+/**
  * Graph ACL entry
  */
 struct GRAPH_ACL_ENTRY
@@ -837,6 +848,7 @@ void NXCORE_EXPORTABLE ObjectTransactionEnd();
 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query);
 void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *sqlTypes, const TCHAR **values);
 void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR *value);
+void QueueRawDciDataUpdate(time_t timestamp, UINT32 dciId, const TCHAR *rawValue, const TCHAR *transformedValue);
 void StartDBWriter();
 void StopDBWriter();
 
@@ -1031,8 +1043,9 @@ extern TCHAR g_szDbSchema[];
 extern TCHAR NXCORE_EXPORTABLE g_szJavaPath[];
 extern DB_DRIVER g_dbDriver;
 extern DB_HANDLE NXCORE_EXPORTABLE g_hCoreDB;
-extern Queue *g_pLazyRequestQueue;
-extern Queue *g_pIDataInsertQueue;
+extern Queue *g_dbWriterQueue;
+extern Queue *g_dciDataWriterQueue;
+extern Queue *g_dciRawDataWriterQueue;
 
 extern int NXCORE_EXPORTABLE g_dbSyntax;
 extern FileMonitoringList g_monitoringList;
index dedbffe..fffae3d 100644 (file)
@@ -605,6 +605,7 @@ void CalculateItemValueMax(ItemValue &result, int nDataType, int nNumValues, Ite
 extern double g_dAvgPollerQueueSize;
 extern double g_dAvgDBWriterQueueSize;
 extern double g_dAvgIDataWriterQueueSize;
+extern double g_dAvgRawDataWriterQueueSize;
 extern double g_dAvgDBAndIDataWriterQueueSize;
 extern double g_dAvgStatusPollerQueueSize;
 extern double g_dAvgConfigPollerQueueSize;