b9bc7bf768a708ae83294e30f4f293a6ffe6a4cb
[public/netxms.git] / src / server / core / syslogd.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2013 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: syslogd.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24 #include <nxlog.h>
25 #include <nxlpapi.h>
26
27 /**
28 * Max syslog message length
29 */
30 #define MAX_SYSLOG_MSG_LEN 1024
31
32 /**
33 * Queued syslog message structure
34 */
35 struct QUEUED_SYSLOG_MESSAGE
36 {
37 UINT32 dwSourceIP;
38 int nBytes;
39 char *psMsg;
40 };
41
42 /**
43 * Static data
44 */
45 static UINT64 m_qwMsgId = 1;
46 static Queue *m_pSyslogQueue = NULL;
47 static LogParser *m_parser = NULL;
48 static MUTEX m_mutexParserAccess = INVALID_MUTEX_HANDLE;
49
50 /**
51 * Parse timestamp field
52 */
53 static BOOL ParseTimeStamp(char **ppStart, int nMsgSize, int *pnPos, time_t *ptmTime)
54 {
55 static char psMonth[12][5] = { "Jan ", "Feb ", "Mar ", "Apr ",
56 "May ", "Jun ", "Jul ", "Aug ",
57 "Sep ", "Oct ", "Nov ", "Dec " };
58 struct tm timestamp;
59 time_t t;
60 char szBuffer[16], *pCurr = *ppStart;
61 int i;
62
63 if (nMsgSize - *pnPos < 16)
64 return FALSE; // Timestamp cannot be shorter than 16 bytes
65
66 // Prepare local time structure
67 t = time(NULL);
68 memcpy(&timestamp, localtime(&t), sizeof(struct tm));
69
70 // Month
71 for(i = 0; i < 12; i++)
72 if (!memcmp(pCurr, psMonth[i], 4))
73 {
74 timestamp.tm_mon = i;
75 break;
76 }
77 if (i == 12)
78 return FALSE;
79 pCurr += 4;
80
81 // Day of week
82 if (isdigit(*pCurr))
83 {
84 timestamp.tm_mday = *pCurr - '0';
85 }
86 else
87 {
88 if (*pCurr != ' ')
89 return FALSE; // Invalid day of month
90 timestamp.tm_mday = 0;
91 }
92 pCurr++;
93 if (isdigit(*pCurr))
94 {
95 timestamp.tm_mday = timestamp.tm_mday * 10 + (*pCurr - '0');
96 }
97 else
98 {
99 return FALSE; // Invalid day of month
100 }
101 pCurr++;
102 if (*pCurr != ' ')
103 return FALSE;
104 pCurr++;
105
106 // HH:MM:SS
107 memcpy(szBuffer, pCurr, 8);
108 szBuffer[8] = 0;
109 if (sscanf(szBuffer, "%02d:%02d:%02d", &timestamp.tm_hour,
110 &timestamp.tm_min, &timestamp.tm_sec) != 3)
111 return FALSE; // Invalid time format
112 pCurr += 8;
113
114 // Check for Cisco variant - HH:MM:SS.nnn
115 if (*pCurr == '.')
116 {
117 pCurr++;
118 if (isdigit(*pCurr))
119 pCurr++;
120 if (isdigit(*pCurr))
121 pCurr++;
122 if (isdigit(*pCurr))
123 pCurr++;
124 }
125
126 if (*pCurr != ' ')
127 return FALSE; // Space should follow timestamp
128 pCurr++;
129
130 // Convert to system time
131 *ptmTime = mktime(&timestamp);
132 if (*ptmTime == ((time_t)-1))
133 return FALSE;
134
135 // Adjust current position
136 *pnPos += (int)(pCurr - *ppStart);
137 *ppStart = pCurr;
138 return TRUE;
139 }
140
141 /**
142 * Parse syslog message
143 */
144 static BOOL ParseSyslogMessage(char *psMsg, int nMsgLen, NX_SYSLOG_RECORD *pRec)
145 {
146 int i, nLen, nPos = 0;
147 char *pCurr = psMsg;
148
149 memset(pRec, 0, sizeof(NX_SYSLOG_RECORD));
150
151 // Parse PRI part
152 if (*psMsg == '<')
153 {
154 int nPri = 0, nCount = 0;
155
156 for(pCurr++, nPos++; isdigit(*pCurr) && (nPos < nMsgLen); pCurr++, nPos++, nCount++)
157 nPri = nPri * 10 + (*pCurr - '0');
158 if (nPos >= nMsgLen)
159 return FALSE; // Unexpected end of message
160
161 if ((*pCurr == '>') && (nCount > 0) && (nCount <4))
162 {
163 pRec->nFacility = nPri / 8;
164 pRec->nSeverity = nPri % 8;
165 pCurr++;
166 nPos++;
167 }
168 else
169 {
170 return FALSE; // Invalid message
171 }
172 }
173 else
174 {
175 // Set default PRI of 13
176 pRec->nFacility = 1;
177 pRec->nSeverity = SYSLOG_SEVERITY_NOTICE;
178 }
179
180 // Parse HEADER part
181 if (ParseTimeStamp(&pCurr, nMsgLen, &nPos, &pRec->tmTimeStamp))
182 {
183 // Hostname
184 for(i = 0; (*pCurr >= 33) && (*pCurr <= 126) && (i < MAX_SYSLOG_HOSTNAME_LEN - 1) && (nPos < nMsgLen); i++, nPos++, pCurr++)
185 pRec->szHostName[i] = *pCurr;
186 if ((nPos >= nMsgLen) || (*pCurr != ' '))
187 {
188 // Not a valid hostname, assuming to be a part of message
189 pCurr -= i;
190 nPos -= i;
191 pRec->szHostName[0] = 0;
192 }
193 else
194 {
195 pCurr++;
196 nPos++;
197 }
198 }
199 else
200 {
201 pRec->tmTimeStamp = time(NULL);
202 }
203
204 // Parse MSG part
205 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_TAG_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
206 pRec->szTag[i] = *pCurr;
207 if ((i == MAX_SYSLOG_TAG_LEN) || (nPos >= nMsgLen))
208 {
209 // Too long tag, assuming that it's a part of message
210 pRec->szTag[0] = 0;
211 }
212 pCurr -= i;
213 nPos -= i;
214 nLen = min(nMsgLen - nPos, MAX_LOG_MSG_LENGTH);
215 memcpy(pRec->szMessage, pCurr, nLen);
216
217 return TRUE;
218 }
219
220 /**
221 * Bind syslog message to NetXMS node object
222 * dwSourceIP is an IP address from which we receive message
223 */
224 static void BindMsgToNode(NX_SYSLOG_RECORD *pRec, UINT32 dwSourceIP)
225 {
226 Node *pNode = NULL;
227 UINT32 dwIpAddr;
228
229 // Determine IP address of a source
230 if (pRec->szHostName[0] == 0)
231 {
232 // Hostname was not defined in the message
233 dwIpAddr = dwSourceIP;
234 }
235 else
236 {
237 dwIpAddr = ntohl(ResolveHostNameA(pRec->szHostName));
238 if ((dwIpAddr == INADDR_NONE) || (dwIpAddr == INADDR_ANY))
239 {
240 #ifdef UNICODE
241 WCHAR wname[MAX_OBJECT_NAME];
242 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, pRec->szHostName, -1, wname, MAX_OBJECT_NAME);
243 wname[MAX_OBJECT_NAME - 1] = 0;
244 pNode = (Node *)FindObjectByName(wname, OBJECT_NODE);
245 #else
246 pNode = (Node *)FindObjectByName(pRec->szHostName, OBJECT_NODE);
247 #endif
248 if (pNode == NULL)
249 dwIpAddr = dwSourceIP;
250 }
251 }
252
253 // Match source IP to NetXMS object
254 if ((dwIpAddr != INADDR_NONE) && (pNode == NULL))
255 pNode = FindNodeByIP(0, dwIpAddr);
256
257 if (pNode != NULL)
258 {
259 pRec->dwSourceObject = pNode->Id();
260 if (pRec->szHostName[0] == 0)
261 {
262 #ifdef UNICODE
263 WideCharToMultiByte(CP_ACP, WC_DEFAULTCHAR | WC_COMPOSITECHECK, pNode->Name(), -1, pRec->szHostName, MAX_SYSLOG_HOSTNAME_LEN, NULL, NULL);
264 pRec->szHostName[MAX_SYSLOG_HOSTNAME_LEN - 1] = 0;
265 #else
266 nx_strncpy(pRec->szHostName, pNode->Name(), MAX_SYSLOG_HOSTNAME_LEN);
267 #endif
268 }
269 }
270 else
271 {
272 if (pRec->szHostName[0] == 0)
273 IpToStrA(dwSourceIP, pRec->szHostName);
274 }
275 }
276
277 /**
278 * Handler for EnumerateSessions()
279 */
280 static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
281 {
282 if (pSession->isAuthenticated())
283 pSession->onSyslogMessage((NX_SYSLOG_RECORD *)pArg);
284 }
285
286 /**
287 * Process syslog message
288 */
289 static void ProcessSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
290 {
291 NX_SYSLOG_RECORD record;
292 TCHAR szQuery[4096];
293
294 DbgPrintf(6, _T("ProcessSyslogMessage: Raw syslog message to process:\n%hs"), psMsg);
295 if (ParseSyslogMessage(psMsg, nMsgLen, &record))
296 {
297 record.qwMsgId = m_qwMsgId++;
298 BindMsgToNode(&record, dwSourceIP);
299 _sntprintf(szQuery, 4096,
300 _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,")
301 _T("source_object_id,hostname,msg_tag,msg_text) VALUES ")
302 _T("(") UINT64_FMT _T(",") INT64_FMT _T(",%d,%d,%d,%s,%s,%s)"),
303 record.qwMsgId, (INT64)record.tmTimeStamp, record.nFacility,
304 record.nSeverity, record.dwSourceObject,
305 (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szHostName),
306 (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szTag),
307 (const TCHAR *)DBPrepareStringA(g_hCoreDB, record.szMessage));
308 DBQuery(g_hCoreDB, szQuery);
309
310 // Send message to all connected clients
311 EnumerateClientSessions(BroadcastSyslogMessage, &record);
312
313 TCHAR ipAddr[32];
314 DbgPrintf(6, _T("Syslog message: ipAddr=%s objectId=%d tag=\"%hs\" msg=\"%hs\""),
315 IpToStr(dwSourceIP, ipAddr), record.dwSourceObject, record.szTag, record.szMessage);
316
317 MutexLock(m_mutexParserAccess);
318 if ((record.dwSourceObject != 0) && (m_parser != NULL))
319 {
320 #ifdef UNICODE
321 WCHAR wtag[MAX_SYSLOG_TAG_LEN];
322 WCHAR wmsg[MAX_LOG_MSG_LENGTH];
323 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szTag, -1, wtag, MAX_SYSLOG_TAG_LEN);
324 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szMessage, -1, wmsg, MAX_LOG_MSG_LENGTH);
325 m_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity,
326 wmsg, record.dwSourceObject);
327 #else
328 m_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity,
329 record.szMessage, record.dwSourceObject);
330 #endif
331 }
332 MutexUnlock(m_mutexParserAccess);
333 }
334 else
335 {
336 DbgPrintf(6, _T("ProcessSyslogMessage: Cannot parse syslog message"));
337 }
338 }
339
340 /**
341 * Syslog processing thread
342 */
343 static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
344 {
345 QUEUED_SYSLOG_MESSAGE *pMsg;
346
347 while(1)
348 {
349 pMsg = (QUEUED_SYSLOG_MESSAGE *)m_pSyslogQueue->GetOrBlock();
350 if (pMsg == INVALID_POINTER_VALUE)
351 break;
352
353 ProcessSyslogMessage(pMsg->psMsg, pMsg->nBytes, pMsg->dwSourceIP);
354 free(pMsg->psMsg);
355 free(pMsg);
356 }
357 return THREAD_OK;
358 }
359
360 /**
361 * Queue syslog message for processing
362 */
363 static void QueueSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
364 {
365 QUEUED_SYSLOG_MESSAGE *pMsg;
366
367 pMsg = (QUEUED_SYSLOG_MESSAGE *)malloc(sizeof(QUEUED_SYSLOG_MESSAGE));
368 pMsg->dwSourceIP = dwSourceIP;
369 pMsg->nBytes = nMsgLen;
370 pMsg->psMsg = (char *)nx_memdup(psMsg, nMsgLen + 1);
371 m_pSyslogQueue->Put(pMsg);
372 }
373
374 /**
375 * Callback for syslog parser
376 */
377 static void SyslogParserCallback(DWORD eventCode, const TCHAR *eventName, const TCHAR *line, int paramCount,
378 TCHAR **params, DWORD objectId, void *userArg)
379 {
380 char format[] = "ssssssssssssssssssssssssssssssss";
381 TCHAR *plist[32];
382 int i, count;
383
384 count = min(paramCount, 32);
385 format[count] = 0;
386 for(i = 0; i < count; i++)
387 plist[i] = params[i];
388 PostEvent(eventCode, objectId, format,
389 plist[0], plist[1], plist[2], plist[3],
390 plist[4], plist[5], plist[6], plist[7],
391 plist[8], plist[9], plist[10], plist[11],
392 plist[12], plist[13], plist[14], plist[15],
393 plist[16], plist[17], plist[18], plist[19],
394 plist[20], plist[21], plist[22], plist[23],
395 plist[24], plist[25], plist[26], plist[27],
396 plist[28], plist[29], plist[30], plist[31]);
397 }
398
399 /**
400 * Event name resolver
401 */
402 static bool EventNameResolver(const TCHAR *name, DWORD *code)
403 {
404 EVENT_TEMPLATE *event;
405 bool success = false;
406
407 event = FindEventTemplateByName(name);
408 if (event != NULL)
409 {
410 *code = event->dwCode;
411 success = true;
412 }
413 return success;
414 }
415
416 /**
417 * Create syslog parser from config
418 */
419 static void CreateParserFromConfig()
420 {
421 char *xml;
422
423 MutexLock(m_mutexParserAccess);
424 delete_and_null(m_parser);
425 #ifdef UNICODE
426 WCHAR *wxml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
427 if (wxml != NULL)
428 {
429 xml = UTF8StringFromWideString(wxml);
430 free(wxml);
431 }
432 else
433 {
434 xml = NULL;
435 }
436 #else
437 xml = ConfigReadCLOB("SyslogParser", "<parser></parser>");
438 #endif
439 if (xml != NULL)
440 {
441 TCHAR parseError[256];
442 ObjectArray<LogParser> *parsers = LogParser::createFromXml(xml, -1, parseError, 256, EventNameResolver);
443 if ((parsers != NULL) && (parsers->size() > 0))
444 {
445 m_parser = parsers->get(0);
446 m_parser->setCallback(SyslogParserCallback);
447 DbgPrintf(3, _T("syslogd: parser successfully created from config"));
448 }
449 else
450 {
451 nxlog_write(MSG_SYSLOG_PARSER_INIT_FAILED, EVENTLOG_ERROR_TYPE, "s", parseError);
452 }
453 free(xml);
454 delete parsers;
455 }
456 MutexUnlock(m_mutexParserAccess);
457 }
458
459 /**
460 * Syslog messages receiver thread
461 */
462 THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
463 {
464 SOCKET hSocket;
465 struct sockaddr_in addr;
466 int nBytes, nPort, nRet;
467 socklen_t nAddrLen;
468 char sMsg[MAX_SYSLOG_MSG_LEN + 1];
469 DB_RESULT hResult;
470 THREAD hProcessingThread;
471 fd_set rdfs;
472 struct timeval tv;
473
474 // Determine first available message id
475 hResult = DBSelect(g_hCoreDB, _T("SELECT max(msg_id) FROM syslog"));
476 if (hResult != NULL)
477 {
478 if (DBGetNumRows(hResult) > 0)
479 {
480 m_qwMsgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, m_qwMsgId);
481 }
482 DBFreeResult(hResult);
483 }
484
485 hSocket = socket(AF_INET, SOCK_DGRAM, 0);
486 if (hSocket == INVALID_SOCKET)
487 {
488 nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("SyslogDaemon"));
489 return THREAD_OK;
490 }
491
492 SetSocketExclusiveAddrUse(hSocket);
493 SetSocketReuseFlag(hSocket);
494
495 // Get listen port number
496 nPort = ConfigReadInt(_T("SyslogListenPort"), 514);
497 if ((nPort < 1) || (nPort > 65535))
498 nPort = 514;
499
500 // Fill in local address structure
501 memset(&addr, 0, sizeof(struct sockaddr_in));
502 addr.sin_family = AF_INET;
503 addr.sin_addr.s_addr = ResolveHostName(g_szListenAddress);
504 addr.sin_port = htons((WORD)nPort);
505
506 // Bind socket
507 if (bind(hSocket, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) != 0)
508 {
509 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", nPort, _T("SyslogDaemon"), WSAGetLastError());
510 closesocket(hSocket);
511 return THREAD_OK;
512 }
513 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(addr.sin_addr.s_addr), nPort);
514
515 // Create message parser
516 m_mutexParserAccess = MutexCreate();
517 CreateParserFromConfig();
518
519 // Start processing thread
520 m_pSyslogQueue = new Queue(1000, 100);
521 hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
522
523 DbgPrintf(1, _T("Syslog Daemon started"));
524
525 // Wait for packets
526 while(!IsShutdownInProgress())
527 {
528 FD_ZERO(&rdfs);
529 FD_SET(hSocket, &rdfs);
530 tv.tv_sec = 1;
531 tv.tv_usec = 0;
532 nRet = select((int)hSocket + 1, &rdfs, NULL, NULL, &tv);
533 if (nRet > 0)
534 {
535 nAddrLen = sizeof(struct sockaddr_in);
536 nBytes = recvfrom(hSocket, sMsg, MAX_SYSLOG_MSG_LEN, 0,
537 (struct sockaddr *)&addr, &nAddrLen);
538 if (nBytes > 0)
539 {
540 sMsg[nBytes] = 0;
541 QueueSyslogMessage(sMsg, nBytes, ntohl(addr.sin_addr.s_addr));
542 }
543 else
544 {
545 // Sleep on error
546 ThreadSleepMs(100);
547 }
548 }
549 else if (nRet == -1)
550 {
551 // Sleep on error
552 ThreadSleepMs(100);
553 }
554 }
555
556 // Stop processing thread
557 m_pSyslogQueue->Put(INVALID_POINTER_VALUE);
558 ThreadJoin(hProcessingThread);
559 delete m_pSyslogQueue;
560 delete m_parser;
561
562 DbgPrintf(1, _T("Syslog Daemon stopped"));
563 return THREAD_OK;
564 }
565
566 /**
567 * Create NXCP message from NX_SYSLOG_RECORD structure
568 */
569 void CreateMessageFromSyslogMsg(CSCPMessage *pMsg, NX_SYSLOG_RECORD *pRec)
570 {
571 UINT32 dwId = VID_SYSLOG_MSG_BASE;
572
573 pMsg->SetVariable(VID_NUM_RECORDS, (UINT32)1);
574 pMsg->SetVariable(dwId++, pRec->qwMsgId);
575 pMsg->SetVariable(dwId++, (UINT32)pRec->tmTimeStamp);
576 pMsg->SetVariable(dwId++, (WORD)pRec->nFacility);
577 pMsg->SetVariable(dwId++, (WORD)pRec->nSeverity);
578 pMsg->SetVariable(dwId++, pRec->dwSourceObject);
579 pMsg->SetVariableFromMBString(dwId++, pRec->szHostName);
580 pMsg->SetVariableFromMBString(dwId++, pRec->szTag);
581 pMsg->SetVariableFromMBString(dwId++, pRec->szMessage);
582 }
583
584 /**
585 * Reinitialize parser on configuration change
586 */
587 void ReinitializeSyslogParser()
588 {
589 if (m_mutexParserAccess == INVALID_MUTEX_HANDLE)
590 return; // Syslog daemon not initialized
591 CreateParserFromConfig();
592 }