implemented syslog proxy in agent (issue #568)
[public/netxms.git] / src / server / core / syslogd.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: syslogd.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24 #include <nxlog.h>
25 #include <nxlpapi.h>
26
27 /**
28 * Max syslog message length
29 */
30 #define MAX_SYSLOG_MSG_LEN 1024
31
32 /**
33 * Queued syslog message structure
34 */
35 class QueuedSyslogMessage
36 {
37 public:
38 InetAddress sourceAddr;
39 time_t timestamp;
40 UINT32 zoneId;
41 char *message;
42 int messageLength;
43
44 QueuedSyslogMessage(const InetAddress& addr, const char *msg, int msgLen) : sourceAddr(addr)
45 {
46 message = (char *)nx_memdup(msg, msgLen + 1);
47 messageLength = msgLen;
48 timestamp = time(NULL);
49 zoneId = 0;
50 }
51
52 QueuedSyslogMessage(const InetAddress& addr, time_t t, UINT32 zid, const char *msg, int msgLen) : sourceAddr(addr)
53 {
54 message = (char *)nx_memdup(msg, msgLen + 1);
55 messageLength = msgLen;
56 timestamp = t;
57 zoneId = zid;
58 }
59
60 ~QueuedSyslogMessage()
61 {
62 free(message);
63 }
64 };
65
66 /**
67 * Queues
68 */
69 Queue g_syslogProcessingQueue(1000, 100);
70 Queue g_syslogWriteQueue(1000, 100);
71
72 /**
73 * Total number of received syslog messages
74 */
75 UINT64 g_syslogMessagesReceived = 0;
76
77 /**
78 * Node matching policy
79 */
80 enum NodeMatchingPolicy
81 {
82 SOURCE_IP_THEN_HOSTNAME = 0,
83 HOSTNAME_THEN_SOURCE_IP = 1
84 };
85
86 /**
87 * Static data
88 */
89 static UINT64 s_msgId = 1;
90 static LogParser *s_parser = NULL;
91 static MUTEX s_parserLock = INVALID_MUTEX_HANDLE;
92 static NodeMatchingPolicy s_nodeMatchingPolicy = SOURCE_IP_THEN_HOSTNAME;
93 static THREAD s_receiverThread = INVALID_THREAD_HANDLE;
94 static THREAD s_processingThread = INVALID_THREAD_HANDLE;
95 static THREAD s_writerThread = INVALID_THREAD_HANDLE;
96 static bool s_running = true;
97 static bool s_alwaysUseServerTime = false;
98
99 /**
100 * Parse timestamp field
101 */
102 static BOOL ParseTimeStamp(char **ppStart, int nMsgSize, int *pnPos, time_t *ptmTime)
103 {
104 static char psMonth[12][5] = { "Jan ", "Feb ", "Mar ", "Apr ",
105 "May ", "Jun ", "Jul ", "Aug ",
106 "Sep ", "Oct ", "Nov ", "Dec " };
107 struct tm timestamp;
108 time_t t;
109 char szBuffer[16], *pCurr = *ppStart;
110 int i;
111
112 if (nMsgSize - *pnPos < 16)
113 return FALSE; // Timestamp cannot be shorter than 16 bytes
114
115 // Prepare local time structure
116 t = time(NULL);
117 memcpy(&timestamp, localtime(&t), sizeof(struct tm));
118
119 // Month
120 for(i = 0; i < 12; i++)
121 if (!memcmp(pCurr, psMonth[i], 4))
122 {
123 timestamp.tm_mon = i;
124 break;
125 }
126 if (i == 12)
127 return FALSE;
128 pCurr += 4;
129
130 // Day of week
131 if (isdigit(*pCurr))
132 {
133 timestamp.tm_mday = *pCurr - '0';
134 }
135 else
136 {
137 if (*pCurr != ' ')
138 return FALSE; // Invalid day of month
139 timestamp.tm_mday = 0;
140 }
141 pCurr++;
142 if (isdigit(*pCurr))
143 {
144 timestamp.tm_mday = timestamp.tm_mday * 10 + (*pCurr - '0');
145 }
146 else
147 {
148 return FALSE; // Invalid day of month
149 }
150 pCurr++;
151 if (*pCurr != ' ')
152 return FALSE;
153 pCurr++;
154
155 // HH:MM:SS
156 memcpy(szBuffer, pCurr, 8);
157 szBuffer[8] = 0;
158 if (sscanf(szBuffer, "%02d:%02d:%02d", &timestamp.tm_hour,
159 &timestamp.tm_min, &timestamp.tm_sec) != 3)
160 return FALSE; // Invalid time format
161 pCurr += 8;
162
163 // Check for Cisco variant - HH:MM:SS.nnn
164 if (*pCurr == '.')
165 {
166 pCurr++;
167 if (isdigit(*pCurr))
168 pCurr++;
169 if (isdigit(*pCurr))
170 pCurr++;
171 if (isdigit(*pCurr))
172 pCurr++;
173 }
174
175 if (*pCurr != ' ')
176 return FALSE; // Space should follow timestamp
177 pCurr++;
178
179 // Convert to system time
180 *ptmTime = mktime(&timestamp);
181 if (*ptmTime == ((time_t)-1))
182 return FALSE;
183
184 // Adjust current position
185 *pnPos += (int)(pCurr - *ppStart);
186 *ppStart = pCurr;
187 return TRUE;
188 }
189
190 /**
191 * Parse syslog message
192 */
193 static BOOL ParseSyslogMessage(char *psMsg, int nMsgLen, time_t receiverTime, NX_SYSLOG_RECORD *pRec)
194 {
195 int i, nLen, nPos = 0;
196 char *pCurr = psMsg;
197
198 memset(pRec, 0, sizeof(NX_SYSLOG_RECORD));
199
200 // Parse PRI part
201 if (*psMsg == '<')
202 {
203 int nPri = 0, nCount = 0;
204
205 for(pCurr++, nPos++; isdigit(*pCurr) && (nPos < nMsgLen); pCurr++, nPos++, nCount++)
206 nPri = nPri * 10 + (*pCurr - '0');
207 if (nPos >= nMsgLen)
208 return FALSE; // Unexpected end of message
209
210 if ((*pCurr == '>') && (nCount > 0) && (nCount <4))
211 {
212 pRec->nFacility = nPri / 8;
213 pRec->nSeverity = nPri % 8;
214 pCurr++;
215 nPos++;
216 }
217 else
218 {
219 return FALSE; // Invalid message
220 }
221 }
222 else
223 {
224 // Set default PRI of 13
225 pRec->nFacility = 1;
226 pRec->nSeverity = SYSLOG_SEVERITY_NOTICE;
227 }
228
229 // Parse HEADER part
230 if (ParseTimeStamp(&pCurr, nMsgLen, &nPos, &pRec->tmTimeStamp))
231 {
232 // Use server time if configured
233 // We still had to parse timestamp to get correct start position for MSG part
234 if (s_alwaysUseServerTime)
235 {
236 pRec->tmTimeStamp = receiverTime;
237 }
238
239 // Hostname
240 for(i = 0; (*pCurr >= 33) && (*pCurr <= 126) && (i < MAX_SYSLOG_HOSTNAME_LEN - 1) && (nPos < nMsgLen); i++, nPos++, pCurr++)
241 pRec->szHostName[i] = *pCurr;
242 if ((nPos >= nMsgLen) || (*pCurr != ' '))
243 {
244 // Not a valid hostname, assuming to be a part of message
245 pCurr -= i;
246 nPos -= i;
247 pRec->szHostName[0] = 0;
248 }
249 else
250 {
251 pCurr++;
252 nPos++;
253 }
254 }
255 else
256 {
257 pRec->tmTimeStamp = receiverTime;
258 }
259
260 // Parse MSG part
261 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_TAG_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
262 pRec->szTag[i] = *pCurr;
263 if ((i == MAX_SYSLOG_TAG_LEN) || (nPos >= nMsgLen))
264 {
265 // Too long tag, assuming that it's a part of message
266 pRec->szTag[0] = 0;
267 }
268 pCurr -= i;
269 nPos -= i;
270 nLen = min(nMsgLen - nPos, MAX_LOG_MSG_LENGTH);
271 memcpy(pRec->szMessage, pCurr, nLen);
272
273 return TRUE;
274 }
275
276 /**
277 * Find node by host name
278 */
279 static Node *FindNodeByHostname(const char *hostName, UINT32 zoneId)
280 {
281 if (hostName[0] == 0)
282 return NULL;
283
284 Node *node = NULL;
285 InetAddress ipAddr = InetAddress::resolveHostName(hostName);
286 if (ipAddr.isValidUnicast())
287 {
288 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : zoneId, ipAddr);
289 }
290
291 if (node == NULL)
292 {
293 #ifdef UNICODE
294 WCHAR wname[MAX_OBJECT_NAME];
295 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, hostName, -1, wname, MAX_OBJECT_NAME);
296 wname[MAX_OBJECT_NAME - 1] = 0;
297 node = (Node *)FindObjectByName(wname, OBJECT_NODE);
298 #else
299 node = (Node *)FindObjectByName(hostName, OBJECT_NODE);
300 #endif
301 }
302 return node;
303 }
304
305 /**
306 * Bind syslog message to NetXMS node object
307 * sourceAddr is an IP address from which we receive message
308 */
309 static Node *BindMsgToNode(NX_SYSLOG_RECORD *pRec, const InetAddress& sourceAddr, UINT32 zoneId)
310 {
311 Node *node = NULL;
312
313 if (s_nodeMatchingPolicy == SOURCE_IP_THEN_HOSTNAME)
314 {
315 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : zoneId, sourceAddr);
316 if (node == NULL)
317 {
318 node = FindNodeByHostname(pRec->szHostName, zoneId);
319 }
320 }
321 else
322 {
323 node = FindNodeByHostname(pRec->szHostName, zoneId);
324 if (node == NULL)
325 {
326 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : zoneId, sourceAddr);
327 }
328 }
329
330 if (node != NULL)
331 {
332 node->incSyslogMessageCount();
333 pRec->dwSourceObject = node->getId();
334 if (pRec->szHostName[0] == 0)
335 {
336 #ifdef UNICODE
337 WideCharToMultiByte(CP_ACP, WC_DEFAULTCHAR | WC_COMPOSITECHECK, node->getName(), -1, pRec->szHostName, MAX_SYSLOG_HOSTNAME_LEN, NULL, NULL);
338 pRec->szHostName[MAX_SYSLOG_HOSTNAME_LEN - 1] = 0;
339 #else
340 nx_strncpy(pRec->szHostName, node->getName(), MAX_SYSLOG_HOSTNAME_LEN);
341 #endif
342 }
343 }
344 else
345 {
346 if (pRec->szHostName[0] == 0)
347 {
348 sourceAddr.toStringA(pRec->szHostName);
349 }
350 }
351
352 return node;
353 }
354
355 /**
356 * Handler for EnumerateSessions()
357 */
358 static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
359 {
360 if (pSession->isAuthenticated())
361 pSession->onSyslogMessage((NX_SYSLOG_RECORD *)pArg);
362 }
363
364 /**
365 * Syslog writer thread
366 */
367 static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
368 {
369 DbgPrintf(1, _T("Syslog writer thread started"));
370 while(true)
371 {
372 NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.getOrBlock();
373 if (r == INVALID_POINTER_VALUE)
374 break;
375
376 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
377
378 DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,source_object_id,hostname,msg_tag,msg_text) VALUES (?,?,?,?,?,?,?,?)"));
379 if (hStmt == NULL)
380 {
381 free(r);
382 DBConnectionPoolReleaseConnection(hdb);
383 continue;
384 }
385
386 int count = 0;
387 DBBegin(hdb);
388 while(true)
389 {
390 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, r->qwMsgId);
391 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)r->tmTimeStamp);
392 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, r->nFacility);
393 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, r->nSeverity);
394 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, r->dwSourceObject);
395 #ifdef UNICODE
396 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szHostName), DB_BIND_DYNAMIC);
397 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szTag), DB_BIND_DYNAMIC);
398 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szMessage), DB_BIND_DYNAMIC);
399 #else
400 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, r->szHostName, DB_BIND_STATIC);
401 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, r->szTag, DB_BIND_STATIC);
402 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, r->szMessage, DB_BIND_STATIC);
403 #endif
404
405 if (!DBExecute(hStmt))
406 {
407 free(r);
408 break;
409 }
410 free(r);
411 count++;
412 if (count == 1000)
413 break;
414 r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.get();
415 if ((r == NULL) || (r == INVALID_POINTER_VALUE))
416 break;
417 }
418 DBCommit(hdb);
419 DBFreeStatement(hStmt);
420 DBConnectionPoolReleaseConnection(hdb);
421 if (r == INVALID_POINTER_VALUE)
422 break;
423 }
424 DbgPrintf(1, _T("Syslog writer thread stopped"));
425 return THREAD_OK;
426 }
427
428 /**
429 * Process syslog message
430 */
431 static void ProcessSyslogMessage(QueuedSyslogMessage *msg)
432 {
433 NX_SYSLOG_RECORD record;
434
435 DbgPrintf(6, _T("ProcessSyslogMessage: Raw syslog message to process:\n%hs"), msg->message);
436 if (ParseSyslogMessage(msg->message, msg->messageLength, msg->timestamp, &record))
437 {
438 g_syslogMessagesReceived++;
439
440 record.qwMsgId = s_msgId++;
441 Node *node = BindMsgToNode(&record, msg->sourceAddr, msg->zoneId);
442
443 g_syslogWriteQueue.put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
444
445 // Send message to all connected clients
446 EnumerateClientSessions(BroadcastSyslogMessage, &record);
447
448 TCHAR ipAddr[64];
449 DbgPrintf(6, _T("Syslog message: ipAddr=%s objectId=%d tag=\"%hs\" msg=\"%hs\""),
450 msg->sourceAddr.toString(ipAddr), record.dwSourceObject, record.szTag, record.szMessage);
451
452 MutexLock(s_parserLock);
453 if ((record.dwSourceObject != 0) && (s_parser != NULL) &&
454 ((node->getStatus() != STATUS_UNMANAGED) || (g_flags & AF_TRAPS_FROM_UNMANAGED_NODES)))
455 {
456 #ifdef UNICODE
457 WCHAR wtag[MAX_SYSLOG_TAG_LEN];
458 WCHAR wmsg[MAX_LOG_MSG_LENGTH];
459 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szTag, -1, wtag, MAX_SYSLOG_TAG_LEN);
460 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szMessage, -1, wmsg, MAX_LOG_MSG_LENGTH);
461 s_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity, wmsg, record.dwSourceObject);
462 #else
463 s_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity, record.szMessage, record.dwSourceObject);
464 #endif
465 }
466 MutexUnlock(s_parserLock);
467 }
468 else
469 {
470 DbgPrintf(6, _T("ProcessSyslogMessage: Cannot parse syslog message"));
471 }
472 }
473
474 /**
475 * Syslog processing thread
476 */
477 static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
478 {
479 QueuedSyslogMessage *msg;
480
481 while(true)
482 {
483 msg = (QueuedSyslogMessage *)g_syslogProcessingQueue.getOrBlock();
484 if (msg == INVALID_POINTER_VALUE)
485 break;
486
487 ProcessSyslogMessage(msg);
488 delete msg;
489 }
490 return THREAD_OK;
491 }
492
493 /**
494 * Queue syslog message for processing
495 */
496 static void QueueSyslogMessage(char *msg, int msgLen, const InetAddress& sourceAddr)
497 {
498 g_syslogProcessingQueue.put(new QueuedSyslogMessage(sourceAddr, msg, msgLen));
499 }
500
501 /**
502 * Queue proxied syslog message for processing
503 */
504 void QueueProxiedSyslogMessage(const InetAddress &addr, UINT32 zoneId, time_t timestamp, const char *msg, int msgLen)
505 {
506 g_syslogProcessingQueue.put(new QueuedSyslogMessage(addr, timestamp, zoneId, msg, msgLen));
507 }
508
509 /**
510 * Callback for syslog parser
511 */
512 static void SyslogParserCallback(UINT32 eventCode, const TCHAR *eventName, const TCHAR *line,
513 const TCHAR *source, UINT32 facility, UINT32 severity,
514 int paramCount, TCHAR **params, UINT32 objectId, int repeatCount,
515 void *userArg)
516 {
517 char format[] = "sssssssssssssssssssssssssssssssss";
518 TCHAR *plist[33];
519 TCHAR repeatCountText[16];
520
521 int count = min(paramCount, 32);
522 format[count + 1] = 0;
523 for(int i = 0; i < count; i++)
524 plist[i] = params[i];
525 _sntprintf(repeatCountText, 16, _T("%d"), repeatCount);
526 plist[count] = repeatCountText;
527 PostEvent(eventCode, objectId, format,
528 plist[0], plist[1], plist[2], plist[3],
529 plist[4], plist[5], plist[6], plist[7],
530 plist[8], plist[9], plist[10], plist[11],
531 plist[12], plist[13], plist[14], plist[15],
532 plist[16], plist[17], plist[18], plist[19],
533 plist[20], plist[21], plist[22], plist[23],
534 plist[24], plist[25], plist[26], plist[27],
535 plist[28], plist[29], plist[30], plist[31]);
536 }
537
538 /**
539 * Event name resolver
540 */
541 static bool EventNameResolver(const TCHAR *name, UINT32 *code)
542 {
543 bool success = false;
544 EventTemplate *event = FindEventTemplateByName(name);
545 if (event != NULL)
546 {
547 *code = event->getCode();
548 event->decRefCount();
549 success = true;
550 }
551 return success;
552 }
553
554 /**
555 * Create syslog parser from config
556 */
557 static void CreateParserFromConfig()
558 {
559 char *xml;
560
561 MutexLock(s_parserLock);
562 delete_and_null(s_parser);
563 #ifdef UNICODE
564 WCHAR *wxml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
565 if (wxml != NULL)
566 {
567 xml = UTF8StringFromWideString(wxml);
568 free(wxml);
569 }
570 else
571 {
572 xml = NULL;
573 }
574 #else
575 xml = ConfigReadCLOB("SyslogParser", "<parser></parser>");
576 #endif
577 if (xml != NULL)
578 {
579 TCHAR parseError[256];
580 ObjectArray<LogParser> *parsers = LogParser::createFromXml(xml, -1, parseError, 256, EventNameResolver);
581 if ((parsers != NULL) && (parsers->size() > 0))
582 {
583 s_parser = parsers->get(0);
584 s_parser->setCallback(SyslogParserCallback);
585 DbgPrintf(3, _T("syslogd: parser successfully created from config"));
586 }
587 else
588 {
589 nxlog_write(MSG_SYSLOG_PARSER_INIT_FAILED, EVENTLOG_ERROR_TYPE, "s", parseError);
590 }
591 free(xml);
592 delete parsers;
593 }
594 MutexUnlock(s_parserLock);
595 }
596
597 /**
598 * Syslog messages receiver thread
599 */
600 static THREAD_RESULT THREAD_CALL SyslogReceiver(void *pArg)
601 {
602 SOCKET hSocket = socket(AF_INET, SOCK_DGRAM, 0);
603 #ifdef WITH_IPV6
604 SOCKET hSocket6 = socket(AF_INET6, SOCK_DGRAM, 0);
605 #endif
606 if ((hSocket == INVALID_SOCKET)
607 #ifdef WITH_IPV6
608 && (hSocket6 == INVALID_SOCKET)
609 #endif
610 )
611 {
612 nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("SyslogReceiver"));
613 return THREAD_OK;
614 }
615
616 SetSocketExclusiveAddrUse(hSocket);
617 SetSocketReuseFlag(hSocket);
618 #ifndef _WIN32
619 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
620 #endif
621
622 #ifdef WITH_IPV6
623 SetSocketExclusiveAddrUse(hSocket6);
624 SetSocketReuseFlag(hSocket6);
625 #ifndef _WIN32
626 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
627 #endif
628 #ifdef IPV6_V6ONLY
629 int on = 1;
630 setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
631 #endif
632 #endif
633
634 // Get listen port number
635 int port = ConfigReadInt(_T("SyslogListenPort"), 514);
636 if ((port < 1) || (port > 65535))
637 {
638 DbgPrintf(2, _T("Syslog: invalid listen port number %d, using default"), port);
639 port = 514;
640 }
641
642 // Fill in local address structure
643 struct sockaddr_in servAddr;
644 memset(&servAddr, 0, sizeof(struct sockaddr_in));
645 servAddr.sin_family = AF_INET;
646
647 #ifdef WITH_IPV6
648 struct sockaddr_in6 servAddr6;
649 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
650 servAddr6.sin6_family = AF_INET6;
651 #endif
652
653 if (!_tcscmp(g_szListenAddress, _T("*")))
654 {
655 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
656 #ifdef WITH_IPV6
657 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
658 #endif
659 }
660 else
661 {
662 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
663 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
664 {
665 servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
666 }
667 else
668 {
669 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
670 }
671 #ifdef WITH_IPV6
672 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
673 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
674 {
675 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
676 }
677 else
678 {
679 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
680 servAddr6.sin6_addr.s6_addr[15] = 1;
681 }
682 #endif
683 }
684 servAddr.sin_port = htons((UINT16)port);
685 #ifdef WITH_IPV6
686 servAddr6.sin6_port = htons((UINT16)port);
687 #endif
688
689 // Bind socket
690 TCHAR buffer[64];
691 int bindFailures = 0;
692 DbgPrintf(5, _T("Trying to bind on UDP %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
693 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
694 {
695 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
696 bindFailures++;
697 closesocket(hSocket);
698 hSocket = INVALID_SOCKET;
699 }
700
701 #ifdef WITH_IPV6
702 DbgPrintf(5, _T("Trying to bind on UDP [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
703 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
704 {
705 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
706 bindFailures++;
707 closesocket(hSocket6);
708 hSocket6 = INVALID_SOCKET;
709 }
710 #else
711 bindFailures++;
712 #endif
713
714 // Abort if cannot bind to at least one socket
715 if (bindFailures == 2)
716 {
717 DbgPrintf(1, _T("Syslog receiver aborted - cannot bind at least one socket"));
718 return THREAD_OK;
719 }
720
721 if (hSocket != INVALID_SOCKET)
722 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), port);
723 #ifdef WITH_IPV6
724 if (hSocket6 != INVALID_SOCKET)
725 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, port);
726 #endif
727
728 DbgPrintf(1, _T("Syslog receiver thread started"));
729
730 // Wait for packets
731 while(s_running)
732 {
733 struct timeval tv;
734 tv.tv_sec = 1;
735 tv.tv_usec = 0;
736
737 fd_set rdfs;
738 FD_ZERO(&rdfs);
739 if (hSocket != INVALID_SOCKET)
740 FD_SET(hSocket, &rdfs);
741 #ifdef WITH_IPV6
742 if (hSocket6 != INVALID_SOCKET)
743 FD_SET(hSocket6, &rdfs);
744 #endif
745
746 #if defined(WITH_IPV6) && !defined(_WIN32)
747 SOCKET nfds = 0;
748 if (hSocket != INVALID_SOCKET)
749 nfds = hSocket;
750 if ((hSocket6 != INVALID_SOCKET) && (hSocket6 > nfds))
751 nfds = hSocket6;
752 int rc = select(SELECT_NFDS(nfds + 1), &rdfs, NULL, NULL, &tv);
753 #else
754 int rc = select(SELECT_NFDS(hSocket + 1), &rdfs, NULL, NULL, &tv);
755 #endif
756 if (rc > 0)
757 {
758 char syslogMessage[MAX_SYSLOG_MSG_LEN + 1];
759 SockAddrBuffer addr;
760 socklen_t addrLen = sizeof(SockAddrBuffer);
761 #ifdef WITH_IPV6
762 SOCKET s = FD_ISSET(hSocket, &rdfs) ? hSocket : hSocket6;
763 #else
764 SOCKET s = hSocket;
765 #endif
766 int bytes = recvfrom(s, syslogMessage, MAX_SYSLOG_MSG_LEN, 0, (struct sockaddr *)&addr, &addrLen);
767 if (bytes > 0)
768 {
769 syslogMessage[bytes] = 0;
770 QueueSyslogMessage(syslogMessage, bytes, InetAddress::createFromSockaddr((struct sockaddr *)&addr));
771 }
772 else
773 {
774 // Sleep on error
775 ThreadSleepMs(100);
776 }
777 }
778 else if (rc == -1)
779 {
780 // Sleep on error
781 ThreadSleepMs(100);
782 }
783 }
784
785 if (hSocket != INVALID_SOCKET)
786 closesocket(hSocket);
787 #ifdef WITH_IPV6
788 if (hSocket6 != INVALID_SOCKET)
789 closesocket(hSocket6);
790 #endif
791
792 nxlog_debug(1, _T("Syslog receiver thread stopped"));
793 return THREAD_OK;
794 }
795
796 /**
797 * Create NXCP message from NX_SYSLOG_RECORD structure
798 */
799 void CreateMessageFromSyslogMsg(NXCPMessage *pMsg, NX_SYSLOG_RECORD *pRec)
800 {
801 UINT32 dwId = VID_SYSLOG_MSG_BASE;
802
803 pMsg->setField(VID_NUM_RECORDS, (UINT32)1);
804 pMsg->setField(dwId++, pRec->qwMsgId);
805 pMsg->setField(dwId++, (UINT32)pRec->tmTimeStamp);
806 pMsg->setField(dwId++, (WORD)pRec->nFacility);
807 pMsg->setField(dwId++, (WORD)pRec->nSeverity);
808 pMsg->setField(dwId++, pRec->dwSourceObject);
809 pMsg->setFieldFromMBString(dwId++, pRec->szHostName);
810 pMsg->setFieldFromMBString(dwId++, pRec->szTag);
811 pMsg->setFieldFromMBString(dwId++, pRec->szMessage);
812 }
813
814 /**
815 * Reinitialize parser on configuration change
816 */
817 void ReinitializeSyslogParser()
818 {
819 if (s_parserLock == INVALID_MUTEX_HANDLE)
820 return; // Syslog daemon not initialized
821 CreateParserFromConfig();
822 }
823
824 /**
825 * Handler for syslog related configuration changes
826 */
827 void OnSyslogConfigurationChange(const TCHAR *name, const TCHAR *value)
828 {
829 if (!_tcscmp(name, _T("SyslogIgnoreMessageTimestamp")))
830 {
831 s_alwaysUseServerTime = _tcstol(value, NULL, 0) ? true : false;
832 nxlog_debug(4, _T("Syslog: ignore message timestamp option set to %s"), s_alwaysUseServerTime ? _T("ON") : _T("OFF"));
833 }
834 }
835
836 /**
837 * Start built-in syslog server
838 */
839 void StartSyslogServer()
840 {
841 s_nodeMatchingPolicy = (NodeMatchingPolicy)ConfigReadInt(_T("SyslogNodeMatchingPolicy"), SOURCE_IP_THEN_HOSTNAME);
842 s_alwaysUseServerTime = ConfigReadInt(_T("SyslogIgnoreMessageTimestamp"), 0) ? true : false;
843
844 // Determine first available message id
845 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
846 DB_RESULT hResult = DBSelect(hdb, _T("SELECT max(msg_id) FROM syslog"));
847 if (hResult != NULL)
848 {
849 if (DBGetNumRows(hResult) > 0)
850 {
851 s_msgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, s_msgId);
852 }
853 DBFreeResult(hResult);
854 }
855 DBConnectionPoolReleaseConnection(hdb);
856
857 SetLogParserTraceCallback(nxlog_debug2);
858 InitLogParserLibrary();
859
860 // Create message parser
861 s_parserLock = MutexCreate();
862 CreateParserFromConfig();
863
864 // Start processing thread
865 s_processingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
866 s_writerThread = ThreadCreateEx(SyslogWriterThread, 0, NULL);
867
868 if (ConfigReadInt(_T("EnableSyslogReceiver"), 0))
869 s_receiverThread = ThreadCreateEx(SyslogReceiver, 0, NULL);
870 }
871
872 /**
873 * Stop built-in syslog server
874 */
875 void StopSyslogServer()
876 {
877 s_running = false;
878 ThreadJoin(s_receiverThread);
879
880 // Stop processing thread
881 g_syslogProcessingQueue.put(INVALID_POINTER_VALUE);
882 ThreadJoin(s_processingThread);
883
884 // Stop writer thread - it must be done after processing thread already finished
885 g_syslogWriteQueue.put(INVALID_POINTER_VALUE);
886 ThreadJoin(s_writerThread);
887
888 delete s_parser;
889 CleanupLogParserLibrary();
890 }