syslog message write to database optimized
authorVictor Kirhenshtein <victor@netxms.org>
Tue, 3 Dec 2013 18:50:30 +0000 (20:50 +0200)
committerVictor Kirhenshtein <victor@netxms.org>
Tue, 3 Dec 2013 18:50:30 +0000 (20:50 +0200)
include/nxlog.h
include/nxqueue.h
src/agent/subagents/hpux/net.cpp
src/libnetxms/queue.cpp
src/server/core/syslogd.cpp

index 62336d3..97d839d 100644 (file)
 #ifndef _nxlog_h_
 #define _nxlog_h_
 
-
-//
-// Constants
-//
-
+/**
+ * Constants
+ */
 #define MAX_OBJECT_NAME          64
 #define MAX_LOG_MSG_LENGTH       1024
 #define MAX_SYSLOG_HOSTNAME_LEN  128
 #define MAX_SYSLOG_TAG_LEN       33
 
-
-//
-// Syslog severity codes
-//
-
+/**
+ * Syslog severity codes
+ */
 #define SYSLOG_SEVERITY_EMERGENCY      0
 #define SYSLOG_SEVERITY_ALERT          1
 #define SYSLOG_SEVERITY_CRITICAL       2
 #define SYSLOG_SEVERITY_INFORMATIONAL  6
 #define SYSLOG_SEVERITY_DEBUG          7
 
-
-//
-// Policy flags
-//
-
+/**
+ * Policy flags
+ */
 #define NX_LPPF_WINDOWS_EVENT_LOG      0x0001
 #define NX_LPPF_REPORT_UNMATCHED       0x0002
 
-
-//
-// Syslog message structure
-//
-
+/**
+ * Syslog message structure
+ */
 typedef struct
 {
-   QWORD qwMsgId;       // NetXMS internal message ID
+   UINT64 qwMsgId;       // NetXMS internal message ID
    time_t tmTimeStamp;
-   int nFacility;
-   int nSeverity;
+   INT32 nFacility;
+   INT32 nSeverity;
    UINT32 dwSourceObject;
    char szHostName[MAX_SYSLOG_HOSTNAME_LEN];
    char szTag[MAX_SYSLOG_TAG_LEN];
    char szMessage[MAX_LOG_MSG_LENGTH];
 } NX_SYSLOG_RECORD;
 
-
 #endif
index 7600b32..54b4c14 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS - Network Management System
-** Copyright (C) 2003-2010 Victor Kirhenshtein
+** Copyright (C) 2003-2013 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU Lesser General Public License as published
 
 #include <nms_util.h>
 
-
-//
-// Queue class
-//
-
+/**
+ * Comparator for queue search
+ */
 typedef bool (*QUEUE_COMPARATOR)(void *key, void *object);
 
+/**
+ * Queue class
+ */
 class LIBNETXMS_EXPORTABLE Queue
 {
 private:
@@ -45,9 +46,9 @@ private:
    UINT32 m_dwBufferIncrement;
        BOOL m_bShutdownFlag;
 
-       void CommonInit();
-   void Lock() { MutexLock(m_mutexQueueAccess); }
-   void Unlock() { MutexUnlock(m_mutexQueueAccess); }
+       void commonInit();
+   void lock() { MutexLock(m_mutexQueueAccess); }
+   void unlock() { MutexUnlock(m_mutexQueueAccess); }
 
 public:
    Queue();
index 3dfbdc5..dc58aaa 100644 (file)
@@ -61,13 +61,11 @@ struct NETIF
        IPADDR ipAddrList[MAX_IPADDR_COUNT];
 };
 
-
-//
-// Find interface name by interface index
-//
-
 #if HAVE_DECL_SIOCGIFNAME
 
+/**
+ * Find interface name by interface index
+ */
 static char *IfIndexToName(int index, char *buffer)
 {
        int fd;
@@ -91,13 +89,11 @@ static char *IfIndexToName(int index, char *buffer)
 
 #endif
 
-
-//
-// Find interface index by name
-//
-
 #if HAVE_DECL_SIOCGIFINDEX
 
+/**
+ * Find interface index by name
+ */
 static int IfNameToIndex(const char *name)
 {
        int fd, index = -1;
@@ -119,11 +115,9 @@ static int IfNameToIndex(const char *name)
 
 #endif
 
-
-//
-// Get interface list
-//
-
+/**
+ * Get interface list
+ */
 static int GetInterfaceList(NETIF **iflist)
 {
        nmapi_iftable ift[1024];
@@ -228,11 +222,9 @@ static int GetInterfaceList(NETIF **iflist)
        return ifcount;
 }
 
-
-//
-// Handler for Net.InterfaceList enum
-//
-
+/**
+ * Handler for Net.InterfaceList enum
+ */
 LONG H_NetIfList(const char *pszParam, const char *pArg, StringList *pValue)
 {
        int i, j, ifCount;
@@ -269,11 +261,9 @@ LONG H_NetIfList(const char *pszParam, const char *pArg, StringList *pValue)
        return SYSINFO_RC_SUCCESS;
 }
 
-
-//
-// Handler for Net.ArpCache enum
-//
-
+/**
+ * Handler for Net.ArpCache enum
+ */
 LONG H_NetArpCache(const char *pszParam, const char *pArg, StringList *pValue)
 {
        int i, mib;
@@ -310,11 +300,9 @@ LONG H_NetArpCache(const char *pszParam, const char *pArg, StringList *pValue)
        return nRet;
 }
 
-
-//
-// Handler for Net.IP.Forwarding parameter
-//
-
+/**
+ * Handler for Net.IP.Forwarding parameter
+ */
 LONG H_NetIpForwarding(const char *pszParam, const char *pArg, char *pValue)
 {
        int ipVer = CAST_FROM_POINTER(pArg, int);
index eee4ea0..b589b96 100644 (file)
@@ -30,7 +30,7 @@ Queue::Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement)
 {
    m_dwBufferSize = dwInitialSize;
    m_dwBufferIncrement = dwBufferIncrement;
-       CommonInit();
+       commonInit();
 }
 
 /**
@@ -40,15 +40,13 @@ Queue::Queue()
 {
    m_dwBufferSize = 256;
    m_dwBufferIncrement = 32;
-       CommonInit();
+       commonInit();
 }
 
-
-//
-// Common initialization (used by all constructors)
-//
-
-void Queue::CommonInit()
+/**
+ * Common initialization (used by all constructors)
+ */
+void Queue::commonInit()
 {
    m_mutexQueueAccess = MutexCreate();
    m_condWakeup = ConditionCreate(FALSE);
@@ -59,11 +57,9 @@ void Queue::CommonInit()
        m_bShutdownFlag = FALSE;
 }
 
-
-//
-// Destructor
-//
-
+/**
+ * Destructor
+ */
 Queue::~Queue()
 {
    MutexDestroy(m_mutexQueueAccess);
@@ -71,14 +67,12 @@ Queue::~Queue()
    safe_free(m_pElements);
 }
 
-
-//
-// Put new element into queue
-//
-
+/**
+ * Put new element into queue
+ */
 void Queue::Put(void *pElement)
 {
-   Lock();
+   lock();
    if (m_dwNumElements == m_dwBufferSize)
    {
       // Extend buffer
@@ -95,17 +89,15 @@ void Queue::Put(void *pElement)
       m_dwLast = 0;
    m_dwNumElements++;
    ConditionSet(m_condWakeup);
-   Unlock();
+   unlock();
 }
 
-
-//
-// Insert new element into the beginning of a queue
-//
-
+/**
+ * Insert new element into the beginning of a queue
+ */
 void Queue::Insert(void *pElement)
 {
-   Lock();
+   lock();
    if (m_dwNumElements == m_dwBufferSize)
    {
       // Extend buffer
@@ -122,19 +114,17 @@ void Queue::Insert(void *pElement)
    m_pElements[--m_dwFirst] = pElement;
    m_dwNumElements++;
    ConditionSet(m_condWakeup);
-   Unlock();
+   unlock();
 }
 
-
-//
-// Get object from queue. Return NULL if queue is empty
-//
-
+/**
+ * Get object from queue. Return NULL if queue is empty
+ */
 void *Queue::Get()
 {
    void *pElement = NULL;
 
-   Lock();
+   lock();
        if (m_bShutdownFlag)
        {
                pElement = INVALID_POINTER_VALUE;
@@ -149,15 +139,13 @@ void *Queue::Get()
                        m_dwNumElements--;
                }
    }
-   Unlock();
+   unlock();
    return pElement;
 }
 
-
-//
-// Get object from queue or block if queue if empty
-//
-
+/**
+ * Get object from queue or block if queue if empty
+ */
 void *Queue::GetOrBlock()
 {
    void *pElement;
@@ -176,47 +164,41 @@ void *Queue::GetOrBlock()
    return pElement;
 }
 
-
-//
-// Clear queue
-//
-
+/**
+ * Clear queue
+ */
 void Queue::Clear()
 {
-   Lock();
+   lock();
    m_dwNumElements = 0;
    m_dwFirst = 0;
    m_dwLast = 0;
-   Unlock();
+   unlock();
 }
 
-
-//
-// Set shutdown flag
-// When this flag is set, Get() always return INVALID_POINTER_VALUE
-//
-
+/**
+ * Set shutdown flag
+ * When this flag is set, Get() always return INVALID_POINTER_VALUE
+ */
 void Queue::SetShutdownMode()
 {
-       Lock();
+       lock();
        m_bShutdownFlag = TRUE;
        ConditionSet(m_condWakeup);
-       Unlock();
+       unlock();
 }
 
-
-//
-// Find element in queue using given key and comparator
-// Returns pointer to element or NULL if element was not found.
-// Element remains in the queue
-//
-
+/**
+ * Find element in queue using given key and comparator
+ * Returns pointer to element or NULL if element was not found.
+ * Element remains in the queue
+ */
 void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
 {
        void *element = NULL;
        UINT32 i, pos;
 
-       Lock();
+       lock();
        for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
        {
                if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
@@ -228,23 +210,20 @@ void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
                if (pos == m_dwBufferSize)
                        pos = 0;
        }
-       Unlock();
+       unlock();
        return element;
 }
 
-
-//
-// Find element in queue using given key and comparator
-// Returns pointer to element or NULL if element was not found.
-// Element remains in the queue
-//
-
+/**
+ * Find element in queue using given key and comparator and remove it.
+ * Returns true if element was removed.
+ */
 bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
 {
        bool success = false;
        UINT32 i, pos;
 
-       Lock();
+       lock();
        for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
        {
                if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
@@ -257,6 +236,6 @@ bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
                if (pos == m_dwBufferSize)
                        pos = 0;
        }
-       Unlock();
+       unlock();
        return success;
 }
index b9bc7bf..69aded1 100644 (file)
@@ -42,10 +42,11 @@ struct QUEUED_SYSLOG_MESSAGE
 /**
  * Static data
  */
-static UINT64 m_qwMsgId = 1;
-static Queue *m_pSyslogQueue = NULL;
-static LogParser *m_parser = NULL;
-static MUTEX m_mutexParserAccess = INVALID_MUTEX_HANDLE;
+static UINT64 s_msgId = 1;
+static Queue s_syslogProcessingQueue(1000, 100);
+static Queue s_syslogWriteQueue(1000, 100);
+static LogParser *s_parser = NULL;
+static MUTEX s_parserLock = INVALID_MUTEX_HANDLE;
 
 /**
  * Parse timestamp field
@@ -284,28 +285,82 @@ static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
 }
 
 /**
+ * Syslog writer thread
+ */
+static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
+{
+   DbgPrintf(1, _T("Syslog writer thread started"));
+   while(true)
+   {
+      NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)s_syslogWriteQueue.GetOrBlock();
+      if (r == INVALID_POINTER_VALUE)
+         break;
+
+      DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
+
+      DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,source_object_id,hostname,msg_tag,msg_text) VALUES (?,?,?,?,?,?,?,?)"));
+      if (hStmt == NULL)
+      {
+         free(r);
+         continue;
+      }
+
+      int count = 0;
+      DBBegin(hdb);
+      while(true)
+      {
+         DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, r->qwMsgId);
+         DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)r->tmTimeStamp);
+         DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, r->nFacility);
+         DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, r->nSeverity);
+         DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, r->dwSourceObject);
+#ifdef UNICODE
+         DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szHostName), DB_BIND_DYNAMIC);
+         DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szTag), DB_BIND_DYNAMIC);
+         DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szMessage), DB_BIND_DYNAMIC);
+#else
+         DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, r->szHostName, DB_BIND_STATIC);
+         DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, r->szTag, DB_BIND_STATIC);
+         DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, r->szMessage, DB_BIND_STATIC);
+#endif
+
+         if (!DBExecute(hStmt))
+         {
+            free(r);
+            break;
+         }
+         free(r);
+         count++;
+         if (count == 1000)
+            break;
+         r = (NX_SYSLOG_RECORD *)s_syslogWriteQueue.Get();
+         if ((r == NULL) || (r == INVALID_POINTER_VALUE))
+            break;
+      }
+      DBCommit(hdb);
+      DBFreeStatement(hStmt);
+      DBConnectionPoolReleaseConnection(hdb);
+      if (r == INVALID_POINTER_VALUE)
+         break;
+   }
+   DbgPrintf(1, _T("Syslog writer thread stopped"));
+   return THREAD_OK;
+}
+
+/**
  * Process syslog message
  */
 static void ProcessSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
 {
    NX_SYSLOG_RECORD record;
-   TCHAR szQuery[4096];
 
        DbgPrintf(6, _T("ProcessSyslogMessage: Raw syslog message to process:\n%hs"), psMsg);
    if (ParseSyslogMessage(psMsg, nMsgLen, &record))
    {
-      record.qwMsgId = m_qwMsgId++;
+      record.qwMsgId = s_msgId++;
       BindMsgToNode(&record, dwSourceIP);
-      _sntprintf(szQuery, 4096, 
-                 _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,")
-                 _T("source_object_id,hostname,msg_tag,msg_text) VALUES ")
-                 _T("(") UINT64_FMT _T(",") INT64_FMT _T(",%d,%d,%d,%s,%s,%s)"),
-                 record.qwMsgId, (INT64)record.tmTimeStamp, record.nFacility,
-                 record.nSeverity, record.dwSourceObject,
-                                         (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szHostName),
-                                         (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szTag),
-                                         (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szMessage));
-      DBQuery(g_hCoreDB, szQuery);
+
+      s_syslogWriteQueue.Put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
 
       // Send message to all connected clients
       EnumerateClientSessions(BroadcastSyslogMessage, &record);
@@ -314,22 +369,20 @@ static void ProcessSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
                DbgPrintf(6, _T("Syslog message: ipAddr=%s objectId=%d tag=\"%hs\" msg=\"%hs\""),
                          IpToStr(dwSourceIP, ipAddr), record.dwSourceObject, record.szTag, record.szMessage);
 
-               MutexLock(m_mutexParserAccess);
-               if ((record.dwSourceObject != 0) && (m_parser != NULL))
+               MutexLock(s_parserLock);
+               if ((record.dwSourceObject != 0) && (s_parser != NULL))
                {
 #ifdef UNICODE
                        WCHAR wtag[MAX_SYSLOG_TAG_LEN];
                        WCHAR wmsg[MAX_LOG_MSG_LENGTH];
                        MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szTag, -1, wtag, MAX_SYSLOG_TAG_LEN);
                        MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szMessage, -1, wmsg, MAX_LOG_MSG_LENGTH);
-                       m_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity,
-                                            wmsg, record.dwSourceObject);
+                       s_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity, wmsg, record.dwSourceObject);
 #else
-                       m_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity,
-                                            record.szMessage, record.dwSourceObject);
+                       s_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity, record.szMessage, record.dwSourceObject);
 #endif
                }
-               MutexUnlock(m_mutexParserAccess);
+               MutexUnlock(s_parserLock);
    }
        else
        {
@@ -346,7 +399,7 @@ static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
 
    while(1)
    {
-      pMsg = (QUEUED_SYSLOG_MESSAGE *)m_pSyslogQueue->GetOrBlock();
+      pMsg = (QUEUED_SYSLOG_MESSAGE *)s_syslogProcessingQueue.GetOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)
          break;
 
@@ -368,7 +421,7 @@ static void QueueSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
    pMsg->dwSourceIP = dwSourceIP;
    pMsg->nBytes = nMsgLen;
    pMsg->psMsg = (char *)nx_memdup(psMsg, nMsgLen + 1);
-   m_pSyslogQueue->Put(pMsg);
+   s_syslogProcessingQueue.Put(pMsg);
 }
 
 /**
@@ -420,8 +473,8 @@ static void CreateParserFromConfig()
 {
        char *xml;
 
-       MutexLock(m_mutexParserAccess);
-       delete_and_null(m_parser);
+       MutexLock(s_parserLock);
+       delete_and_null(s_parser);
 #ifdef UNICODE
        WCHAR *wxml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
        if (wxml != NULL)
@@ -442,8 +495,8 @@ static void CreateParserFromConfig()
                ObjectArray<LogParser> *parsers = LogParser::createFromXml(xml, -1, parseError, 256, EventNameResolver);
                if ((parsers != NULL) && (parsers->size() > 0))
                {
-                       m_parser = parsers->get(0);
-                       m_parser->setCallback(SyslogParserCallback);
+                       s_parser = parsers->get(0);
+                       s_parser->setCallback(SyslogParserCallback);
                        DbgPrintf(3, _T("syslogd: parser successfully created from config"));
                }
                else
@@ -453,7 +506,7 @@ static void CreateParserFromConfig()
                free(xml);
                delete parsers;
        }
-       MutexUnlock(m_mutexParserAccess);
+       MutexUnlock(s_parserLock);
 }
 
 /**
@@ -467,7 +520,6 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
    socklen_t nAddrLen;
    char sMsg[MAX_SYSLOG_MSG_LEN + 1];
    DB_RESULT hResult;
-   THREAD hProcessingThread;
    fd_set rdfs;
    struct timeval tv;
 
@@ -477,7 +529,7 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
    {
       if (DBGetNumRows(hResult) > 0)
       {
-         m_qwMsgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, m_qwMsgId);
+         s_msgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, s_msgId);
       }
       DBFreeResult(hResult);
    }
@@ -513,12 +565,12 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
        nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(addr.sin_addr.s_addr), nPort);
 
        // Create message parser
-       m_mutexParserAccess = MutexCreate();
+       s_parserLock = MutexCreate();
        CreateParserFromConfig();
 
    // Start processing thread
-   m_pSyslogQueue = new Queue(1000, 100);
-   hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
+   THREAD hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
+   THREAD hWriterThread = ThreadCreateEx(SyslogWriterThread, 0, NULL);
 
    DbgPrintf(1, _T("Syslog Daemon started"));
 
@@ -533,8 +585,7 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
       if (nRet > 0)
       {
          nAddrLen = sizeof(struct sockaddr_in);
-         nBytes = recvfrom(hSocket, sMsg, MAX_SYSLOG_MSG_LEN, 0,
-                           (struct sockaddr *)&addr, &nAddrLen);
+         nBytes = recvfrom(hSocket, sMsg, MAX_SYSLOG_MSG_LEN, 0, (struct sockaddr *)&addr, &nAddrLen);
          if (nBytes > 0)
          {
                                sMsg[nBytes] = 0;
@@ -554,10 +605,14 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
    }
 
    // Stop processing thread
-   m_pSyslogQueue->Put(INVALID_POINTER_VALUE);
+   s_syslogProcessingQueue.Put(INVALID_POINTER_VALUE);
    ThreadJoin(hProcessingThread);
-   delete m_pSyslogQueue;
-       delete m_parser;
+
+   // Stop writer thread - it must be done after processing thread already finished
+   s_syslogWriteQueue.Put(INVALID_POINTER_VALUE);
+   ThreadJoin(hWriterThread);
+
+       delete s_parser;
 
    DbgPrintf(1, _T("Syslog Daemon stopped"));
    return THREAD_OK;
@@ -586,7 +641,7 @@ void CreateMessageFromSyslogMsg(CSCPMessage *pMsg, NX_SYSLOG_RECORD *pRec)
  */
 void ReinitializeSyslogParser()
 {
-       if (m_mutexParserAccess == INVALID_MUTEX_HANDLE)
+       if (s_parserLock == INVALID_MUTEX_HANDLE)
                return; // Syslog daemon not initialized
        CreateParserFromConfig();
 }