cc0a3f90727f9e2d92f6d09198a8dbd222af2d86
[public/netxms.git] / src / server / core / syslogd.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: syslogd.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24 #include <nxlog.h>
25 #include <nxlpapi.h>
26
27 /**
28 * Max syslog message length
29 */
30 #define MAX_SYSLOG_MSG_LEN 1024
31
32 /**
33 * Queued syslog message structure
34 */
35 class QueuedSyslogMessage
36 {
37 public:
38 InetAddress sourceAddr;
39 char *message;
40 int messageLength;
41
42 QueuedSyslogMessage(const InetAddress& addr, char *msg, int msgLen) : sourceAddr(addr)
43 {
44 message = (char *)nx_memdup(msg, msgLen + 1);
45 messageLength = msgLen;
46 }
47
48 ~QueuedSyslogMessage()
49 {
50 free(message);
51 }
52 };
53
54 /**
55 * Queues
56 */
57 Queue g_syslogProcessingQueue(1000, 100);
58 Queue g_syslogWriteQueue(1000, 100);
59
60 /**
61 * Total number of received syslog messages
62 */
63 UINT64 g_syslogMessagesReceived = 0;
64
65 /**
66 * Node matching policy
67 */
68 enum NodeMatchingPolicy
69 {
70 SOURCE_IP_THEN_HOSTNAME = 0,
71 HOSTNAME_THEN_SOURCE_IP = 1
72 };
73
74 /**
75 * Static data
76 */
77 static UINT64 s_msgId = 1;
78 static LogParser *s_parser = NULL;
79 static MUTEX s_parserLock = INVALID_MUTEX_HANDLE;
80 static NodeMatchingPolicy s_nodeMatchingPolicy = SOURCE_IP_THEN_HOSTNAME;
81 static THREAD s_receiverThread = INVALID_THREAD_HANDLE;
82 static bool s_running = true;
83 static bool s_alwaysUseServerTime = false;
84
85 /**
86 * Parse timestamp field
87 */
88 static BOOL ParseTimeStamp(char **ppStart, int nMsgSize, int *pnPos, time_t *ptmTime)
89 {
90 static char psMonth[12][5] = { "Jan ", "Feb ", "Mar ", "Apr ",
91 "May ", "Jun ", "Jul ", "Aug ",
92 "Sep ", "Oct ", "Nov ", "Dec " };
93 struct tm timestamp;
94 time_t t;
95 char szBuffer[16], *pCurr = *ppStart;
96 int i;
97
98 if (nMsgSize - *pnPos < 16)
99 return FALSE; // Timestamp cannot be shorter than 16 bytes
100
101 // Prepare local time structure
102 t = time(NULL);
103 memcpy(&timestamp, localtime(&t), sizeof(struct tm));
104
105 // Month
106 for(i = 0; i < 12; i++)
107 if (!memcmp(pCurr, psMonth[i], 4))
108 {
109 timestamp.tm_mon = i;
110 break;
111 }
112 if (i == 12)
113 return FALSE;
114 pCurr += 4;
115
116 // Day of week
117 if (isdigit(*pCurr))
118 {
119 timestamp.tm_mday = *pCurr - '0';
120 }
121 else
122 {
123 if (*pCurr != ' ')
124 return FALSE; // Invalid day of month
125 timestamp.tm_mday = 0;
126 }
127 pCurr++;
128 if (isdigit(*pCurr))
129 {
130 timestamp.tm_mday = timestamp.tm_mday * 10 + (*pCurr - '0');
131 }
132 else
133 {
134 return FALSE; // Invalid day of month
135 }
136 pCurr++;
137 if (*pCurr != ' ')
138 return FALSE;
139 pCurr++;
140
141 // HH:MM:SS
142 memcpy(szBuffer, pCurr, 8);
143 szBuffer[8] = 0;
144 if (sscanf(szBuffer, "%02d:%02d:%02d", &timestamp.tm_hour,
145 &timestamp.tm_min, &timestamp.tm_sec) != 3)
146 return FALSE; // Invalid time format
147 pCurr += 8;
148
149 // Check for Cisco variant - HH:MM:SS.nnn
150 if (*pCurr == '.')
151 {
152 pCurr++;
153 if (isdigit(*pCurr))
154 pCurr++;
155 if (isdigit(*pCurr))
156 pCurr++;
157 if (isdigit(*pCurr))
158 pCurr++;
159 }
160
161 if (*pCurr != ' ')
162 return FALSE; // Space should follow timestamp
163 pCurr++;
164
165 // Convert to system time
166 *ptmTime = mktime(&timestamp);
167 if (*ptmTime == ((time_t)-1))
168 return FALSE;
169
170 // Adjust current position
171 *pnPos += (int)(pCurr - *ppStart);
172 *ppStart = pCurr;
173 return TRUE;
174 }
175
176 /**
177 * Parse syslog message
178 */
179 static BOOL ParseSyslogMessage(char *psMsg, int nMsgLen, NX_SYSLOG_RECORD *pRec)
180 {
181 int i, nLen, nPos = 0;
182 char *pCurr = psMsg;
183
184 memset(pRec, 0, sizeof(NX_SYSLOG_RECORD));
185
186 // Parse PRI part
187 if (*psMsg == '<')
188 {
189 int nPri = 0, nCount = 0;
190
191 for(pCurr++, nPos++; isdigit(*pCurr) && (nPos < nMsgLen); pCurr++, nPos++, nCount++)
192 nPri = nPri * 10 + (*pCurr - '0');
193 if (nPos >= nMsgLen)
194 return FALSE; // Unexpected end of message
195
196 if ((*pCurr == '>') && (nCount > 0) && (nCount <4))
197 {
198 pRec->nFacility = nPri / 8;
199 pRec->nSeverity = nPri % 8;
200 pCurr++;
201 nPos++;
202 }
203 else
204 {
205 return FALSE; // Invalid message
206 }
207 }
208 else
209 {
210 // Set default PRI of 13
211 pRec->nFacility = 1;
212 pRec->nSeverity = SYSLOG_SEVERITY_NOTICE;
213 }
214
215 // Parse HEADER part
216 if (ParseTimeStamp(&pCurr, nMsgLen, &nPos, &pRec->tmTimeStamp))
217 {
218 // Use server time if configured
219 // We still had to parse timestamp to get correct start position for MSG part
220 if (s_alwaysUseServerTime)
221 {
222 pRec->tmTimeStamp = time(NULL);
223 }
224
225 // Hostname
226 for(i = 0; (*pCurr >= 33) && (*pCurr <= 126) && (i < MAX_SYSLOG_HOSTNAME_LEN - 1) && (nPos < nMsgLen); i++, nPos++, pCurr++)
227 pRec->szHostName[i] = *pCurr;
228 if ((nPos >= nMsgLen) || (*pCurr != ' '))
229 {
230 // Not a valid hostname, assuming to be a part of message
231 pCurr -= i;
232 nPos -= i;
233 pRec->szHostName[0] = 0;
234 }
235 else
236 {
237 pCurr++;
238 nPos++;
239 }
240 }
241 else
242 {
243 pRec->tmTimeStamp = time(NULL);
244 }
245
246 // Parse MSG part
247 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_TAG_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
248 pRec->szTag[i] = *pCurr;
249 if ((i == MAX_SYSLOG_TAG_LEN) || (nPos >= nMsgLen))
250 {
251 // Too long tag, assuming that it's a part of message
252 pRec->szTag[0] = 0;
253 }
254 pCurr -= i;
255 nPos -= i;
256 nLen = min(nMsgLen - nPos, MAX_LOG_MSG_LENGTH);
257 memcpy(pRec->szMessage, pCurr, nLen);
258
259 return TRUE;
260 }
261
262 /**
263 * Find node by host name
264 */
265 static Node *FindNodeByHostname(const char *hostName)
266 {
267 if (hostName[0] == 0)
268 return NULL;
269
270 Node *node = NULL;
271 InetAddress ipAddr = InetAddress::resolveHostName(hostName);
272 if (ipAddr.isValidUnicast())
273 {
274 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, ipAddr);
275 }
276
277 if (node == NULL)
278 {
279 #ifdef UNICODE
280 WCHAR wname[MAX_OBJECT_NAME];
281 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, hostName, -1, wname, MAX_OBJECT_NAME);
282 wname[MAX_OBJECT_NAME - 1] = 0;
283 node = (Node *)FindObjectByName(wname, OBJECT_NODE);
284 #else
285 node = (Node *)FindObjectByName(hostName, OBJECT_NODE);
286 #endif
287 }
288 return node;
289 }
290
291 /**
292 * Bind syslog message to NetXMS node object
293 * sourceAddr is an IP address from which we receive message
294 */
295 static Node *BindMsgToNode(NX_SYSLOG_RECORD *pRec, const InetAddress& sourceAddr)
296 {
297 Node *node = NULL;
298
299 if (s_nodeMatchingPolicy == SOURCE_IP_THEN_HOSTNAME)
300 {
301 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, sourceAddr);
302 if (node == NULL)
303 {
304 node = FindNodeByHostname(pRec->szHostName);
305 }
306 }
307 else
308 {
309 node = FindNodeByHostname(pRec->szHostName);
310 if (node == NULL)
311 {
312 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, sourceAddr);
313 }
314 }
315
316 if (node != NULL)
317 {
318 node->incSyslogMessageCount();
319 pRec->dwSourceObject = node->getId();
320 if (pRec->szHostName[0] == 0)
321 {
322 #ifdef UNICODE
323 WideCharToMultiByte(CP_ACP, WC_DEFAULTCHAR | WC_COMPOSITECHECK, node->getName(), -1, pRec->szHostName, MAX_SYSLOG_HOSTNAME_LEN, NULL, NULL);
324 pRec->szHostName[MAX_SYSLOG_HOSTNAME_LEN - 1] = 0;
325 #else
326 nx_strncpy(pRec->szHostName, node->getName(), MAX_SYSLOG_HOSTNAME_LEN);
327 #endif
328 }
329 }
330 else
331 {
332 if (pRec->szHostName[0] == 0)
333 {
334 sourceAddr.toStringA(pRec->szHostName);
335 }
336 }
337
338 return node;
339 }
340
341 /**
342 * Handler for EnumerateSessions()
343 */
344 static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
345 {
346 if (pSession->isAuthenticated())
347 pSession->onSyslogMessage((NX_SYSLOG_RECORD *)pArg);
348 }
349
350 /**
351 * Syslog writer thread
352 */
353 static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
354 {
355 DbgPrintf(1, _T("Syslog writer thread started"));
356 while(true)
357 {
358 NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.getOrBlock();
359 if (r == INVALID_POINTER_VALUE)
360 break;
361
362 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
363
364 DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,source_object_id,hostname,msg_tag,msg_text) VALUES (?,?,?,?,?,?,?,?)"));
365 if (hStmt == NULL)
366 {
367 free(r);
368 DBConnectionPoolReleaseConnection(hdb);
369 continue;
370 }
371
372 int count = 0;
373 DBBegin(hdb);
374 while(true)
375 {
376 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, r->qwMsgId);
377 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)r->tmTimeStamp);
378 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, r->nFacility);
379 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, r->nSeverity);
380 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, r->dwSourceObject);
381 #ifdef UNICODE
382 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szHostName), DB_BIND_DYNAMIC);
383 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szTag), DB_BIND_DYNAMIC);
384 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szMessage), DB_BIND_DYNAMIC);
385 #else
386 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, r->szHostName, DB_BIND_STATIC);
387 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, r->szTag, DB_BIND_STATIC);
388 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, r->szMessage, DB_BIND_STATIC);
389 #endif
390
391 if (!DBExecute(hStmt))
392 {
393 free(r);
394 break;
395 }
396 free(r);
397 count++;
398 if (count == 1000)
399 break;
400 r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.get();
401 if ((r == NULL) || (r == INVALID_POINTER_VALUE))
402 break;
403 }
404 DBCommit(hdb);
405 DBFreeStatement(hStmt);
406 DBConnectionPoolReleaseConnection(hdb);
407 if (r == INVALID_POINTER_VALUE)
408 break;
409 }
410 DbgPrintf(1, _T("Syslog writer thread stopped"));
411 return THREAD_OK;
412 }
413
414 /**
415 * Process syslog message
416 */
417 static void ProcessSyslogMessage(char *psMsg, int nMsgLen, const InetAddress& sourceAddr)
418 {
419 NX_SYSLOG_RECORD record;
420
421 DbgPrintf(6, _T("ProcessSyslogMessage: Raw syslog message to process:\n%hs"), psMsg);
422 if (ParseSyslogMessage(psMsg, nMsgLen, &record))
423 {
424 g_syslogMessagesReceived++;
425
426 record.qwMsgId = s_msgId++;
427 Node *node = BindMsgToNode(&record, sourceAddr);
428
429 g_syslogWriteQueue.put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
430
431 // Send message to all connected clients
432 EnumerateClientSessions(BroadcastSyslogMessage, &record);
433
434 TCHAR ipAddr[64];
435 DbgPrintf(6, _T("Syslog message: ipAddr=%s objectId=%d tag=\"%hs\" msg=\"%hs\""),
436 sourceAddr.toString(ipAddr), record.dwSourceObject, record.szTag, record.szMessage);
437
438 MutexLock(s_parserLock);
439 if ((record.dwSourceObject != 0) && (s_parser != NULL) &&
440 ((node->getStatus() != STATUS_UNMANAGED) || (g_flags & AF_TRAPS_FROM_UNMANAGED_NODES)))
441 {
442 #ifdef UNICODE
443 WCHAR wtag[MAX_SYSLOG_TAG_LEN];
444 WCHAR wmsg[MAX_LOG_MSG_LENGTH];
445 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szTag, -1, wtag, MAX_SYSLOG_TAG_LEN);
446 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szMessage, -1, wmsg, MAX_LOG_MSG_LENGTH);
447 s_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity, wmsg, record.dwSourceObject);
448 #else
449 s_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity, record.szMessage, record.dwSourceObject);
450 #endif
451 }
452 MutexUnlock(s_parserLock);
453 }
454 else
455 {
456 DbgPrintf(6, _T("ProcessSyslogMessage: Cannot parse syslog message"));
457 }
458 }
459
460 /**
461 * Syslog processing thread
462 */
463 static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
464 {
465 QueuedSyslogMessage *msg;
466
467 while(true)
468 {
469 msg = (QueuedSyslogMessage *)g_syslogProcessingQueue.getOrBlock();
470 if (msg == INVALID_POINTER_VALUE)
471 break;
472
473 ProcessSyslogMessage(msg->message, msg->messageLength, msg->sourceAddr);
474 delete msg;
475 }
476 return THREAD_OK;
477 }
478
479 /**
480 * Queue syslog message for processing
481 */
482 static void QueueSyslogMessage(char *msg, int msgLen, const InetAddress& sourceAddr)
483 {
484 g_syslogProcessingQueue.put(new QueuedSyslogMessage(sourceAddr, msg, msgLen));
485 }
486
487 /**
488 * Callback for syslog parser
489 */
490 static void SyslogParserCallback(UINT32 eventCode, const TCHAR *eventName, const TCHAR *line,
491 const TCHAR *source, UINT32 facility, UINT32 severity,
492 int paramCount, TCHAR **params, UINT32 objectId, int repeatCount,
493 void *userArg)
494 {
495 char format[] = "sssssssssssssssssssssssssssssssss";
496 TCHAR *plist[33];
497 TCHAR repeatCountText[16];
498
499 int count = min(paramCount, 32);
500 format[count + 1] = 0;
501 for(int i = 0; i < count; i++)
502 plist[i] = params[i];
503 _sntprintf(repeatCountText, 16, _T("%d"), repeatCount);
504 plist[count] = repeatCountText;
505 PostEvent(eventCode, objectId, format,
506 plist[0], plist[1], plist[2], plist[3],
507 plist[4], plist[5], plist[6], plist[7],
508 plist[8], plist[9], plist[10], plist[11],
509 plist[12], plist[13], plist[14], plist[15],
510 plist[16], plist[17], plist[18], plist[19],
511 plist[20], plist[21], plist[22], plist[23],
512 plist[24], plist[25], plist[26], plist[27],
513 plist[28], plist[29], plist[30], plist[31]);
514 }
515
516 /**
517 * Event name resolver
518 */
519 static bool EventNameResolver(const TCHAR *name, UINT32 *code)
520 {
521 bool success = false;
522 EventTemplate *event = FindEventTemplateByName(name);
523 if (event != NULL)
524 {
525 *code = event->getCode();
526 event->decRefCount();
527 success = true;
528 }
529 return success;
530 }
531
532 /**
533 * Create syslog parser from config
534 */
535 static void CreateParserFromConfig()
536 {
537 char *xml;
538
539 MutexLock(s_parserLock);
540 delete_and_null(s_parser);
541 #ifdef UNICODE
542 WCHAR *wxml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
543 if (wxml != NULL)
544 {
545 xml = UTF8StringFromWideString(wxml);
546 free(wxml);
547 }
548 else
549 {
550 xml = NULL;
551 }
552 #else
553 xml = ConfigReadCLOB("SyslogParser", "<parser></parser>");
554 #endif
555 if (xml != NULL)
556 {
557 TCHAR parseError[256];
558 ObjectArray<LogParser> *parsers = LogParser::createFromXml(xml, -1, parseError, 256, EventNameResolver);
559 if ((parsers != NULL) && (parsers->size() > 0))
560 {
561 s_parser = parsers->get(0);
562 s_parser->setCallback(SyslogParserCallback);
563 DbgPrintf(3, _T("syslogd: parser successfully created from config"));
564 }
565 else
566 {
567 nxlog_write(MSG_SYSLOG_PARSER_INIT_FAILED, EVENTLOG_ERROR_TYPE, "s", parseError);
568 }
569 free(xml);
570 delete parsers;
571 }
572 MutexUnlock(s_parserLock);
573 }
574
575 /**
576 * Syslog messages receiver thread
577 */
578 static THREAD_RESULT THREAD_CALL SyslogReceiver(void *pArg)
579 {
580 SOCKET hSocket = socket(AF_INET, SOCK_DGRAM, 0);
581 #ifdef WITH_IPV6
582 SOCKET hSocket6 = socket(AF_INET6, SOCK_DGRAM, 0);
583 #endif
584 if ((hSocket == INVALID_SOCKET)
585 #ifdef WITH_IPV6
586 && (hSocket6 == INVALID_SOCKET)
587 #endif
588 )
589 {
590 nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("SyslogReceiver"));
591 return THREAD_OK;
592 }
593
594 SetSocketExclusiveAddrUse(hSocket);
595 SetSocketReuseFlag(hSocket);
596 #ifndef _WIN32
597 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
598 #endif
599
600 #ifdef WITH_IPV6
601 SetSocketExclusiveAddrUse(hSocket6);
602 SetSocketReuseFlag(hSocket6);
603 #ifndef _WIN32
604 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
605 #endif
606 #ifdef IPV6_V6ONLY
607 int on = 1;
608 setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
609 #endif
610 #endif
611
612 // Get listen port number
613 int port = ConfigReadInt(_T("SyslogListenPort"), 514);
614 if ((port < 1) || (port > 65535))
615 {
616 DbgPrintf(2, _T("Syslog: invalid listen port number %d, using default"), port);
617 port = 514;
618 }
619
620 // Fill in local address structure
621 struct sockaddr_in servAddr;
622 memset(&servAddr, 0, sizeof(struct sockaddr_in));
623 servAddr.sin_family = AF_INET;
624
625 #ifdef WITH_IPV6
626 struct sockaddr_in6 servAddr6;
627 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
628 servAddr6.sin6_family = AF_INET6;
629 #endif
630
631 if (!_tcscmp(g_szListenAddress, _T("*")))
632 {
633 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
634 #ifdef WITH_IPV6
635 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
636 #endif
637 }
638 else
639 {
640 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
641 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
642 {
643 servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
644 }
645 else
646 {
647 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
648 }
649 #ifdef WITH_IPV6
650 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
651 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
652 {
653 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
654 }
655 else
656 {
657 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
658 servAddr6.sin6_addr.s6_addr[15] = 1;
659 }
660 #endif
661 }
662 servAddr.sin_port = htons((UINT16)port);
663 #ifdef WITH_IPV6
664 servAddr6.sin6_port = htons((UINT16)port);
665 #endif
666
667 // Bind socket
668 TCHAR buffer[64];
669 int bindFailures = 0;
670 DbgPrintf(5, _T("Trying to bind on UDP %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
671 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
672 {
673 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
674 bindFailures++;
675 closesocket(hSocket);
676 hSocket = INVALID_SOCKET;
677 }
678
679 #ifdef WITH_IPV6
680 DbgPrintf(5, _T("Trying to bind on UDP [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
681 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
682 {
683 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
684 bindFailures++;
685 closesocket(hSocket6);
686 hSocket6 = INVALID_SOCKET;
687 }
688 #else
689 bindFailures++;
690 #endif
691
692 // Abort if cannot bind to at least one socket
693 if (bindFailures == 2)
694 {
695 DbgPrintf(1, _T("Syslog receiver aborted - cannot bind at least one socket"));
696 return THREAD_OK;
697 }
698
699 if (hSocket != INVALID_SOCKET)
700 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), port);
701 #ifdef WITH_IPV6
702 if (hSocket6 != INVALID_SOCKET)
703 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, port);
704 #endif
705
706 SetLogParserTraceCallback(nxlog_debug2);
707 InitLogParserLibrary();
708
709 // Create message parser
710 s_parserLock = MutexCreate();
711 CreateParserFromConfig();
712
713 // Start processing thread
714 THREAD hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
715 THREAD hWriterThread = ThreadCreateEx(SyslogWriterThread, 0, NULL);
716
717 DbgPrintf(1, _T("Syslog receiver thread started"));
718
719 // Wait for packets
720 while(s_running)
721 {
722 struct timeval tv;
723 tv.tv_sec = 1;
724 tv.tv_usec = 0;
725
726 fd_set rdfs;
727 FD_ZERO(&rdfs);
728 if (hSocket != INVALID_SOCKET)
729 FD_SET(hSocket, &rdfs);
730 #ifdef WITH_IPV6
731 if (hSocket6 != INVALID_SOCKET)
732 FD_SET(hSocket6, &rdfs);
733 #endif
734
735 #if defined(WITH_IPV6) && !defined(_WIN32)
736 SOCKET nfds = 0;
737 if (hSocket != INVALID_SOCKET)
738 nfds = hSocket;
739 if ((hSocket6 != INVALID_SOCKET) && (hSocket6 > nfds))
740 nfds = hSocket6;
741 int rc = select(SELECT_NFDS(nfds + 1), &rdfs, NULL, NULL, &tv);
742 #else
743 int rc = select(SELECT_NFDS(hSocket + 1), &rdfs, NULL, NULL, &tv);
744 #endif
745 if (rc > 0)
746 {
747 char syslogMessage[MAX_SYSLOG_MSG_LEN + 1];
748 SockAddrBuffer addr;
749 socklen_t addrLen = sizeof(SockAddrBuffer);
750 #ifdef WITH_IPV6
751 SOCKET s = FD_ISSET(hSocket, &rdfs) ? hSocket : hSocket6;
752 #else
753 SOCKET s = hSocket;
754 #endif
755 int bytes = recvfrom(s, syslogMessage, MAX_SYSLOG_MSG_LEN, 0, (struct sockaddr *)&addr, &addrLen);
756 if (bytes > 0)
757 {
758 syslogMessage[bytes] = 0;
759 QueueSyslogMessage(syslogMessage, bytes, InetAddress::createFromSockaddr((struct sockaddr *)&addr));
760 }
761 else
762 {
763 // Sleep on error
764 ThreadSleepMs(100);
765 }
766 }
767 else if (rc == -1)
768 {
769 // Sleep on error
770 ThreadSleepMs(100);
771 }
772 }
773
774 // Stop processing thread
775 g_syslogProcessingQueue.put(INVALID_POINTER_VALUE);
776 ThreadJoin(hProcessingThread);
777
778 // Stop writer thread - it must be done after processing thread already finished
779 g_syslogWriteQueue.put(INVALID_POINTER_VALUE);
780 ThreadJoin(hWriterThread);
781
782 delete s_parser;
783 CleanupLogParserLibrary();
784
785 DbgPrintf(1, _T("Syslog receiver thread stopped"));
786 return THREAD_OK;
787 }
788
789 /**
790 * Start built-in syslog server
791 */
792 void StartSyslogServer()
793 {
794 s_nodeMatchingPolicy = (NodeMatchingPolicy)ConfigReadInt(_T("SyslogNodeMatchingPolicy"), SOURCE_IP_THEN_HOSTNAME);
795 s_alwaysUseServerTime = ConfigReadInt(_T("SyslogIgnoreMessageTimestamp"), 0) ? true : false;
796
797 // Determine first available message id
798 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
799 DB_RESULT hResult = DBSelect(hdb, _T("SELECT max(msg_id) FROM syslog"));
800 if (hResult != NULL)
801 {
802 if (DBGetNumRows(hResult) > 0)
803 {
804 s_msgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, s_msgId);
805 }
806 DBFreeResult(hResult);
807 }
808 DBConnectionPoolReleaseConnection(hdb);
809
810 s_receiverThread = ThreadCreateEx(SyslogReceiver, 0, NULL);
811 }
812
813 /**
814 * Stop built-in syslog server
815 */
816 void StopSyslogServer()
817 {
818 s_running = false;
819 ThreadJoin(s_receiverThread);
820 }
821
822 /**
823 * Create NXCP message from NX_SYSLOG_RECORD structure
824 */
825 void CreateMessageFromSyslogMsg(NXCPMessage *pMsg, NX_SYSLOG_RECORD *pRec)
826 {
827 UINT32 dwId = VID_SYSLOG_MSG_BASE;
828
829 pMsg->setField(VID_NUM_RECORDS, (UINT32)1);
830 pMsg->setField(dwId++, pRec->qwMsgId);
831 pMsg->setField(dwId++, (UINT32)pRec->tmTimeStamp);
832 pMsg->setField(dwId++, (WORD)pRec->nFacility);
833 pMsg->setField(dwId++, (WORD)pRec->nSeverity);
834 pMsg->setField(dwId++, pRec->dwSourceObject);
835 pMsg->setFieldFromMBString(dwId++, pRec->szHostName);
836 pMsg->setFieldFromMBString(dwId++, pRec->szTag);
837 pMsg->setFieldFromMBString(dwId++, pRec->szMessage);
838 }
839
840 /**
841 * Reinitialize parser on configuration change
842 */
843 void ReinitializeSyslogParser()
844 {
845 if (s_parserLock == INVALID_MUTEX_HANDLE)
846 return; // Syslog daemon not initialized
847 CreateParserFromConfig();
848 }
849
850 /**
851 * Handler for syslog related configuration changes
852 */
853 void OnSyslogConfigurationChange(const TCHAR *name, const TCHAR *value)
854 {
855 if (!_tcscmp(name, _T("SyslogIgnoreMessageTimestamp")))
856 {
857 s_alwaysUseServerTime = _tcstol(value, NULL, 0) ? true : false;
858 nxlog_debug(4, _T("Syslog: ignore message timestamp option set to %s"), s_alwaysUseServerTime ? _T("ON") : _T("OFF"));
859 }
860 }