optimizations and fixes in log watcher repeat count processing
[public/netxms.git] / src / server / core / syslogd.cpp
CommitLineData
c06c063c 1/*
5039dede 2** NetXMS - Network Management System
2df047f4 3** Copyright (C) 2003-2016 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: syslogd.cpp
20**
21**/
22
23#include "nxcore.h"
24#include <nxlog.h>
25#include <nxlpapi.h>
26
271c3551
VK
27/**
28 * Max syslog message length
29 */
5039dede
AK
30#define MAX_SYSLOG_MSG_LEN 1024
31
271c3551
VK
32/**
33 * Queued syslog message structure
34 */
af4a09df 35class QueuedSyslogMessage
5039dede 36{
af4a09df
VK
37public:
38 InetAddress sourceAddr;
39 char *message;
40 int messageLength;
41
42 QueuedSyslogMessage(const InetAddress& addr, char *msg, int msgLen) : sourceAddr(addr)
43 {
043b72bb 44 message = (char *)nx_memdup(msg, msgLen + 1);
af4a09df
VK
45 messageLength = msgLen;
46 }
47
48 ~QueuedSyslogMessage()
49 {
50 free(message);
51 }
5039dede
AK
52};
53
f1784ab6
VK
54/**
55 * Queues
56 */
57Queue g_syslogProcessingQueue(1000, 100);
58Queue g_syslogWriteQueue(1000, 100);
59
47cacc1f
VK
60/**
61 * Node matching policy
62 */
63enum NodeMatchingPolicy
64{
65 SOURCE_IP_THEN_HOSTNAME = 0,
66 HOSTNAME_THEN_SOURCE_IP = 1
67};
68
271c3551
VK
69/**
70 * Static data
71 */
1499848d 72static UINT64 s_msgId = 1;
1499848d
VK
73static LogParser *s_parser = NULL;
74static MUTEX s_parserLock = INVALID_MUTEX_HANDLE;
47cacc1f 75static NodeMatchingPolicy s_nodeMatchingPolicy = SOURCE_IP_THEN_HOSTNAME;
af4a09df
VK
76static THREAD s_receiverThread = INVALID_THREAD_HANDLE;
77static bool s_running = true;
b8c8a8a7 78static bool s_alwaysUseServerTime = false;
5039dede 79
e9491562
VK
80/**
81 * Parse timestamp field
82 */
5039dede
AK
83static BOOL ParseTimeStamp(char **ppStart, int nMsgSize, int *pnPos, time_t *ptmTime)
84{
85 static char psMonth[12][5] = { "Jan ", "Feb ", "Mar ", "Apr ",
86 "May ", "Jun ", "Jul ", "Aug ",
87 "Sep ", "Oct ", "Nov ", "Dec " };
88 struct tm timestamp;
89 time_t t;
90 char szBuffer[16], *pCurr = *ppStart;
91 int i;
92
93 if (nMsgSize - *pnPos < 16)
94 return FALSE; // Timestamp cannot be shorter than 16 bytes
95
96 // Prepare local time structure
97 t = time(NULL);
98 memcpy(&timestamp, localtime(&t), sizeof(struct tm));
99
100 // Month
101 for(i = 0; i < 12; i++)
102 if (!memcmp(pCurr, psMonth[i], 4))
103 {
104 timestamp.tm_mon = i;
105 break;
106 }
107 if (i == 12)
108 return FALSE;
109 pCurr += 4;
110
111 // Day of week
112 if (isdigit(*pCurr))
113 {
114 timestamp.tm_mday = *pCurr - '0';
115 }
c06c063c 116 else
5039dede
AK
117 {
118 if (*pCurr != ' ')
119 return FALSE; // Invalid day of month
120 timestamp.tm_mday = 0;
121 }
122 pCurr++;
123 if (isdigit(*pCurr))
124 {
125 timestamp.tm_mday = timestamp.tm_mday * 10 + (*pCurr - '0');
126 }
127 else
128 {
129 return FALSE; // Invalid day of month
130 }
131 pCurr++;
132 if (*pCurr != ' ')
133 return FALSE;
134 pCurr++;
135
136 // HH:MM:SS
137 memcpy(szBuffer, pCurr, 8);
138 szBuffer[8] = 0;
139 if (sscanf(szBuffer, "%02d:%02d:%02d", &timestamp.tm_hour,
140 &timestamp.tm_min, &timestamp.tm_sec) != 3)
141 return FALSE; // Invalid time format
142 pCurr += 8;
143
144 // Check for Cisco variant - HH:MM:SS.nnn
145 if (*pCurr == '.')
146 {
147 pCurr++;
148 if (isdigit(*pCurr))
149 pCurr++;
150 if (isdigit(*pCurr))
151 pCurr++;
152 if (isdigit(*pCurr))
153 pCurr++;
154 }
155
156 if (*pCurr != ' ')
157 return FALSE; // Space should follow timestamp
158 pCurr++;
159
160 // Convert to system time
161 *ptmTime = mktime(&timestamp);
162 if (*ptmTime == ((time_t)-1))
163 return FALSE;
164
165 // Adjust current position
166 *pnPos += (int)(pCurr - *ppStart);
167 *ppStart = pCurr;
168 return TRUE;
169}
170
e9491562
VK
171/**
172 * Parse syslog message
173 */
e0f99bf0 174static BOOL ParseSyslogMessage(char *psMsg, int nMsgLen, NX_SYSLOG_RECORD *pRec)
5039dede
AK
175{
176 int i, nLen, nPos = 0;
177 char *pCurr = psMsg;
178
e0f99bf0 179 memset(pRec, 0, sizeof(NX_SYSLOG_RECORD));
5039dede
AK
180
181 // Parse PRI part
182 if (*psMsg == '<')
183 {
184 int nPri = 0, nCount = 0;
185
186 for(pCurr++, nPos++; isdigit(*pCurr) && (nPos < nMsgLen); pCurr++, nPos++, nCount++)
187 nPri = nPri * 10 + (*pCurr - '0');
188 if (nPos >= nMsgLen)
189 return FALSE; // Unexpected end of message
190
191 if ((*pCurr == '>') && (nCount > 0) && (nCount <4))
192 {
193 pRec->nFacility = nPri / 8;
194 pRec->nSeverity = nPri % 8;
195 pCurr++;
196 nPos++;
197 }
198 else
199 {
200 return FALSE; // Invalid message
201 }
202 }
203 else
204 {
205 // Set default PRI of 13
206 pRec->nFacility = 1;
207 pRec->nSeverity = SYSLOG_SEVERITY_NOTICE;
208 }
209
210 // Parse HEADER part
211 if (ParseTimeStamp(&pCurr, nMsgLen, &nPos, &pRec->tmTimeStamp))
212 {
b8c8a8a7
VK
213 // Use server time if configured
214 // We still had to parse timestamp to get correct start position for MSG part
30089197 215 if (s_alwaysUseServerTime)
5039dede 216 {
30089197
VK
217 pRec->tmTimeStamp = time(NULL);
218 }
219
220 // Hostname
221 for(i = 0; (*pCurr >= 33) && (*pCurr <= 126) && (i < MAX_SYSLOG_HOSTNAME_LEN - 1) && (nPos < nMsgLen); i++, nPos++, pCurr++)
222 pRec->szHostName[i] = *pCurr;
223 if ((nPos >= nMsgLen) || (*pCurr != ' '))
224 {
225 // Not a valid hostname, assuming to be a part of message
226 pCurr -= i;
227 nPos -= i;
228 pRec->szHostName[0] = 0;
5039dede
AK
229 }
230 else
231 {
30089197
VK
232 pCurr++;
233 nPos++;
5039dede
AK
234 }
235 }
236 else
237 {
238 pRec->tmTimeStamp = time(NULL);
239 }
240
241 // Parse MSG part
242 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_TAG_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
243 pRec->szTag[i] = *pCurr;
244 if ((i == MAX_SYSLOG_TAG_LEN) || (nPos >= nMsgLen))
245 {
246 // Too long tag, assuming that it's a part of message
247 pRec->szTag[0] = 0;
248 }
249 pCurr -= i;
250 nPos -= i;
251 nLen = min(nMsgLen - nPos, MAX_LOG_MSG_LENGTH);
252 memcpy(pRec->szMessage, pCurr, nLen);
253
254 return TRUE;
255}
256
e9491562 257/**
47cacc1f 258 * Find node by host name
e9491562 259 */
47cacc1f 260static Node *FindNodeByHostname(const char *hostName)
5039dede 261{
47cacc1f
VK
262 if (hostName[0] == 0)
263 return NULL;
5039dede 264
47cacc1f
VK
265 Node *node = NULL;
266 UINT32 ipAddr = ntohl(ResolveHostNameA(hostName));
267 if ((ipAddr != INADDR_NONE) && (ipAddr != INADDR_ANY))
5039dede 268 {
0eff2ce4 269 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, ipAddr);
5039dede 270 }
47cacc1f
VK
271
272 if (node == NULL)
273 {
e0f99bf0 274#ifdef UNICODE
47cacc1f
VK
275 WCHAR wname[MAX_OBJECT_NAME];
276 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, hostName, -1, wname, MAX_OBJECT_NAME);
277 wname[MAX_OBJECT_NAME - 1] = 0;
278 node = (Node *)FindObjectByName(wname, OBJECT_NODE);
e0f99bf0 279#else
47cacc1f 280 node = (Node *)FindObjectByName(hostName, OBJECT_NODE);
e0f99bf0 281#endif
5039dede 282 }
47cacc1f
VK
283 return node;
284}
5039dede 285
47cacc1f
VK
286/**
287 * Bind syslog message to NetXMS node object
af4a09df 288 * sourceAddr is an IP address from which we receive message
47cacc1f 289 */
af4a09df 290static Node *BindMsgToNode(NX_SYSLOG_RECORD *pRec, const InetAddress& sourceAddr)
47cacc1f
VK
291{
292 Node *node = NULL;
3aa66b65 293
47cacc1f 294 if (s_nodeMatchingPolicy == SOURCE_IP_THEN_HOSTNAME)
299f1678 295 {
af4a09df 296 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, sourceAddr);
47cacc1f
VK
297 if (node == NULL)
298 {
299 node = FindNodeByHostname(pRec->szHostName);
300 }
301 }
302 else
303 {
304 node = FindNodeByHostname(pRec->szHostName);
305 if (node == NULL)
306 {
af4a09df 307 node = FindNodeByIP((g_flags & AF_TRAP_SOURCES_IN_ALL_ZONES) ? ALL_ZONES : 0, sourceAddr);
47cacc1f 308 }
299f1678
VK
309 }
310
47cacc1f 311 if (node != NULL)
3aa66b65 312 {
c42b4551 313 pRec->dwSourceObject = node->getId();
3aa66b65 314 if (pRec->szHostName[0] == 0)
e0f99bf0
VK
315 {
316#ifdef UNICODE
c42b4551 317 WideCharToMultiByte(CP_ACP, WC_DEFAULTCHAR | WC_COMPOSITECHECK, node->getName(), -1, pRec->szHostName, MAX_SYSLOG_HOSTNAME_LEN, NULL, NULL);
e0f99bf0
VK
318 pRec->szHostName[MAX_SYSLOG_HOSTNAME_LEN - 1] = 0;
319#else
c42b4551 320 nx_strncpy(pRec->szHostName, node->getName(), MAX_SYSLOG_HOSTNAME_LEN);
e0f99bf0
VK
321#endif
322 }
3aa66b65
VK
323 }
324 else
325 {
326 if (pRec->szHostName[0] == 0)
af4a09df
VK
327 {
328 sourceAddr.toStringA(pRec->szHostName);
329 }
5039dede 330 }
f8106e8b 331
47cacc1f 332 return node;
5039dede
AK
333}
334
e9491562
VK
335/**
336 * Handler for EnumerateSessions()
337 */
5039dede
AK
338static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
339{
e05b1945 340 if (pSession->isAuthenticated())
e0f99bf0 341 pSession->onSyslogMessage((NX_SYSLOG_RECORD *)pArg);
5039dede
AK
342}
343
1499848d
VK
344/**
345 * Syslog writer thread
346 */
347static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
348{
349 DbgPrintf(1, _T("Syslog writer thread started"));
350 while(true)
351 {
19dbc8ef 352 NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.getOrBlock();
1499848d
VK
353 if (r == INVALID_POINTER_VALUE)
354 break;
355
356 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
357
358 DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,source_object_id,hostname,msg_tag,msg_text) VALUES (?,?,?,?,?,?,?,?)"));
359 if (hStmt == NULL)
360 {
361 free(r);
ec5829b2 362 DBConnectionPoolReleaseConnection(hdb);
1499848d
VK
363 continue;
364 }
365
366 int count = 0;
367 DBBegin(hdb);
368 while(true)
369 {
370 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, r->qwMsgId);
371 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (INT32)r->tmTimeStamp);
372 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, r->nFacility);
373 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, r->nSeverity);
374 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, r->dwSourceObject);
375#ifdef UNICODE
376 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szHostName), DB_BIND_DYNAMIC);
377 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szTag), DB_BIND_DYNAMIC);
378 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, WideStringFromMBString(r->szMessage), DB_BIND_DYNAMIC);
379#else
380 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, r->szHostName, DB_BIND_STATIC);
381 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, r->szTag, DB_BIND_STATIC);
382 DBBind(hStmt, 8, DB_SQLTYPE_VARCHAR, r->szMessage, DB_BIND_STATIC);
383#endif
384
385 if (!DBExecute(hStmt))
386 {
387 free(r);
388 break;
389 }
390 free(r);
391 count++;
392 if (count == 1000)
393 break;
19dbc8ef 394 r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.get();
1499848d
VK
395 if ((r == NULL) || (r == INVALID_POINTER_VALUE))
396 break;
397 }
398 DBCommit(hdb);
399 DBFreeStatement(hStmt);
400 DBConnectionPoolReleaseConnection(hdb);
401 if (r == INVALID_POINTER_VALUE)
402 break;
403 }
404 DbgPrintf(1, _T("Syslog writer thread stopped"));
405 return THREAD_OK;
406}
407
e9491562
VK
408/**
409 * Process syslog message
410 */
af4a09df 411static void ProcessSyslogMessage(char *psMsg, int nMsgLen, const InetAddress& sourceAddr)
5039dede 412{
e0f99bf0 413 NX_SYSLOG_RECORD record;
5039dede 414
271c3551 415 DbgPrintf(6, _T("ProcessSyslogMessage: Raw syslog message to process:\n%hs"), psMsg);
5039dede
AK
416 if (ParseSyslogMessage(psMsg, nMsgLen, &record))
417 {
1499848d 418 record.qwMsgId = s_msgId++;
af4a09df 419 Node *node = BindMsgToNode(&record, sourceAddr);
1499848d 420
19dbc8ef 421 g_syslogWriteQueue.put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
5039dede
AK
422
423 // Send message to all connected clients
424 EnumerateClientSessions(BroadcastSyslogMessage, &record);
425
af4a09df 426 TCHAR ipAddr[64];
271c3551 427 DbgPrintf(6, _T("Syslog message: ipAddr=%s objectId=%d tag=\"%hs\" msg=\"%hs\""),
af4a09df 428 sourceAddr.toString(ipAddr), record.dwSourceObject, record.szTag, record.szMessage);
271c3551 429
1499848d 430 MutexLock(s_parserLock);
f8106e8b
VK
431 if ((record.dwSourceObject != 0) && (s_parser != NULL) &&
432 ((node->Status() != STATUS_UNMANAGED) || (g_flags & AF_TRAPS_FROM_UNMANAGED_NODES)))
5039dede 433 {
6646dca4
VK
434#ifdef UNICODE
435 WCHAR wtag[MAX_SYSLOG_TAG_LEN];
436 WCHAR wmsg[MAX_LOG_MSG_LENGTH];
437 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szTag, -1, wtag, MAX_SYSLOG_TAG_LEN);
438 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, record.szMessage, -1, wmsg, MAX_LOG_MSG_LENGTH);
1499848d 439 s_parser->matchEvent(wtag, record.nFacility, 1 << record.nSeverity, wmsg, record.dwSourceObject);
6646dca4 440#else
1499848d 441 s_parser->matchEvent(record.szTag, record.nFacility, 1 << record.nSeverity, record.szMessage, record.dwSourceObject);
6646dca4 442#endif
5039dede 443 }
1499848d 444 MutexUnlock(s_parserLock);
5039dede 445 }
271c3551
VK
446 else
447 {
448 DbgPrintf(6, _T("ProcessSyslogMessage: Cannot parse syslog message"));
449 }
5039dede
AK
450}
451
e9491562
VK
452/**
453 * Syslog processing thread
454 */
5039dede
AK
455static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
456{
af4a09df 457 QueuedSyslogMessage *msg;
5039dede 458
af4a09df 459 while(true)
5039dede 460 {
af4a09df
VK
461 msg = (QueuedSyslogMessage *)g_syslogProcessingQueue.getOrBlock();
462 if (msg == INVALID_POINTER_VALUE)
5039dede
AK
463 break;
464
af4a09df
VK
465 ProcessSyslogMessage(msg->message, msg->messageLength, msg->sourceAddr);
466 delete msg;
5039dede
AK
467 }
468 return THREAD_OK;
469}
470
271c3551
VK
471/**
472 * Queue syslog message for processing
473 */
af4a09df 474static void QueueSyslogMessage(char *msg, int msgLen, const InetAddress& sourceAddr)
5039dede 475{
af4a09df 476 g_syslogProcessingQueue.put(new QueuedSyslogMessage(sourceAddr, msg, msgLen));
5039dede
AK
477}
478
271c3551
VK
479/**
480 * Callback for syslog parser
481 */
c06c063c 482static void SyslogParserCallback(UINT32 eventCode, const TCHAR *eventName, const TCHAR *line,
da623f8b 483 const TCHAR *source, UINT32 facility, UINT32 severity,
d930bfc9
VK
484 int paramCount, TCHAR **params, UINT32 objectId, int repeatCount,
485 void *userArg)
5039dede 486{
d930bfc9
VK
487 char format[] = "sssssssssssssssssssssssssssssssss";
488 TCHAR *plist[33];
489 TCHAR repeatCountText[16];
490
491 int count = min(paramCount, 32);
492 format[count + 1] = 0;
493 for(int i = 0; i < count; i++)
5039dede 494 plist[i] = params[i];
d930bfc9
VK
495 _sntprintf(repeatCountText, 16, _T("%d"), repeatCount);
496 plist[count] = repeatCountText;
2dd24569 497 PostEvent(eventCode, objectId, format,
5039dede
AK
498 plist[0], plist[1], plist[2], plist[3],
499 plist[4], plist[5], plist[6], plist[7],
500 plist[8], plist[9], plist[10], plist[11],
501 plist[12], plist[13], plist[14], plist[15],
502 plist[16], plist[17], plist[18], plist[19],
503 plist[20], plist[21], plist[22], plist[23],
504 plist[24], plist[25], plist[26], plist[27],
505 plist[28], plist[29], plist[30], plist[31]);
506}
507
271c3551
VK
508/**
509 * Event name resolver
510 */
da623f8b 511static bool EventNameResolver(const TCHAR *name, UINT32 *code)
5039dede 512{
4d0c32f3 513 bool success = false;
50963ced 514 EventTemplate *event = FindEventTemplateByName(name);
5039dede
AK
515 if (event != NULL)
516 {
50963ced
VK
517 *code = event->getCode();
518 event->decRefCount();
4d0c32f3 519 success = true;
5039dede
AK
520 }
521 return success;
522}
523
271c3551
VK
524/**
525 * Create syslog parser from config
526 */
4d0c32f3
VK
527static void CreateParserFromConfig()
528{
9f24efb3 529 char *xml;
4d0c32f3 530
1499848d
VK
531 MutexLock(s_parserLock);
532 delete_and_null(s_parser);
9f24efb3
VK
533#ifdef UNICODE
534 WCHAR *wxml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
535 if (wxml != NULL)
536 {
537 xml = UTF8StringFromWideString(wxml);
538 free(wxml);
539 }
540 else
541 {
542 xml = NULL;
543 }
544#else
6646dca4 545 xml = ConfigReadCLOB("SyslogParser", "<parser></parser>");
9f24efb3 546#endif
4d0c32f3
VK
547 if (xml != NULL)
548 {
6646dca4 549 TCHAR parseError[256];
271c3551
VK
550 ObjectArray<LogParser> *parsers = LogParser::createFromXml(xml, -1, parseError, 256, EventNameResolver);
551 if ((parsers != NULL) && (parsers->size() > 0))
4d0c32f3 552 {
1499848d
VK
553 s_parser = parsers->get(0);
554 s_parser->setCallback(SyslogParserCallback);
4d0c32f3
VK
555 DbgPrintf(3, _T("syslogd: parser successfully created from config"));
556 }
557 else
558 {
4d0c32f3
VK
559 nxlog_write(MSG_SYSLOG_PARSER_INIT_FAILED, EVENTLOG_ERROR_TYPE, "s", parseError);
560 }
561 free(xml);
271c3551 562 delete parsers;
4d0c32f3 563 }
1499848d 564 MutexUnlock(s_parserLock);
4d0c32f3
VK
565}
566
271c3551
VK
567/**
568 * Syslog messages receiver thread
569 */
af4a09df 570static THREAD_RESULT THREAD_CALL SyslogReceiver(void *pArg)
5039dede 571{
af4a09df
VK
572 SOCKET hSocket = socket(AF_INET, SOCK_DGRAM, 0);
573#ifdef WITH_IPV6
574 SOCKET hSocket6 = socket(AF_INET6, SOCK_DGRAM, 0);
575#endif
576 if ((hSocket == INVALID_SOCKET)
577#ifdef WITH_IPV6
578 && (hSocket6 == INVALID_SOCKET)
579#endif
580 )
5039dede 581 {
af4a09df 582 nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", _T("SyslogReceiver"));
5039dede
AK
583 return THREAD_OK;
584 }
585
1ddf3f0c 586 SetSocketExclusiveAddrUse(hSocket);
5039dede 587 SetSocketReuseFlag(hSocket);
af4a09df
VK
588#ifndef _WIN32
589 fcntl(hSocket, F_SETFD, fcntl(hSocket, F_GETFD) | FD_CLOEXEC);
590#endif
591
592#ifdef WITH_IPV6
593 SetSocketExclusiveAddrUse(hSocket6);
594 SetSocketReuseFlag(hSocket6);
595#ifndef _WIN32
596 fcntl(hSocket6, F_SETFD, fcntl(hSocket6, F_GETFD) | FD_CLOEXEC);
597#endif
598#ifdef IPV6_V6ONLY
599 int on = 1;
600 setsockopt(hSocket6, IPPROTO_IPV6, IPV6_V6ONLY, (char *)&on, sizeof(int));
601#endif
602#endif
5039dede
AK
603
604 // Get listen port number
af4a09df
VK
605 int port = ConfigReadInt(_T("SyslogListenPort"), 514);
606 if ((port < 1) || (port > 65535))
607 {
608 DbgPrintf(2, _T("Syslog: invalid listen port number %d, using default"), port);
609 port = 514;
610 }
5039dede
AK
611
612 // Fill in local address structure
af4a09df
VK
613 struct sockaddr_in servAddr;
614 memset(&servAddr, 0, sizeof(struct sockaddr_in));
615 servAddr.sin_family = AF_INET;
616
617#ifdef WITH_IPV6
618 struct sockaddr_in6 servAddr6;
619 memset(&servAddr6, 0, sizeof(struct sockaddr_in6));
620 servAddr6.sin6_family = AF_INET6;
621#endif
622
623 if (!_tcscmp(g_szListenAddress, _T("*")))
624 {
625 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
626#ifdef WITH_IPV6
627 memset(servAddr6.sin6_addr.s6_addr, 0, 16);
628#endif
629 }
630 else
631 {
632 InetAddress bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET);
633 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET))
634 {
635 servAddr.sin_addr.s_addr = htonl(bindAddress.getAddressV4());
636 }
637 else
638 {
639 servAddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
640 }
641#ifdef WITH_IPV6
642 bindAddress = InetAddress::resolveHostName(g_szListenAddress, AF_INET6);
643 if (bindAddress.isValid() && (bindAddress.getFamily() == AF_INET6))
644 {
645 memcpy(servAddr6.sin6_addr.s6_addr, bindAddress.getAddressV6(), 16);
646 }
647 else
648 {
649 memset(servAddr6.sin6_addr.s6_addr, 0, 15);
650 servAddr6.sin6_addr.s6_addr[15] = 1;
651 }
652#endif
653 }
654 servAddr.sin_port = htons((UINT16)port);
655#ifdef WITH_IPV6
656 servAddr6.sin6_port = htons((UINT16)port);
657#endif
5039dede
AK
658
659 // Bind socket
af4a09df
VK
660 TCHAR buffer[64];
661 int bindFailures = 0;
662 DbgPrintf(5, _T("Trying to bind on UDP %s:%d"), SockaddrToStr((struct sockaddr *)&servAddr, buffer), ntohs(servAddr.sin_port));
663 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
5039dede 664 {
af4a09df
VK
665 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
666 bindFailures++;
5039dede 667 closesocket(hSocket);
af4a09df
VK
668 hSocket = INVALID_SOCKET;
669 }
670
671#ifdef WITH_IPV6
672 DbgPrintf(5, _T("Trying to bind on UDP [%s]:%d"), SockaddrToStr((struct sockaddr *)&servAddr6, buffer), ntohs(servAddr6.sin6_port));
673 if (bind(hSocket6, (struct sockaddr *)&servAddr6, sizeof(struct sockaddr_in6)) != 0)
674 {
675 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", port, _T("SyslogReceiver"), WSAGetLastError());
676 bindFailures++;
677 closesocket(hSocket6);
678 hSocket6 = INVALID_SOCKET;
679 }
680#else
681 bindFailures++;
682#endif
683
684 // Abort if cannot bind to at least one socket
685 if (bindFailures == 2)
686 {
687 DbgPrintf(1, _T("Syslog receiver aborted - cannot bind at least one socket"));
5039dede
AK
688 return THREAD_OK;
689 }
af4a09df
VK
690
691 if (hSocket != INVALID_SOCKET)
692 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(servAddr.sin_addr.s_addr), port);
693#ifdef WITH_IPV6
694 if (hSocket6 != INVALID_SOCKET)
695 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "Hd", servAddr6.sin6_addr.s6_addr, port);
696#endif
5039dede 697
2df047f4 698 SetLogParserTraceCallback(nxlog_debug2);
07a1156a
VK
699 InitLogParserLibrary();
700
5039dede 701 // Create message parser
1499848d 702 s_parserLock = MutexCreate();
4d0c32f3 703 CreateParserFromConfig();
5039dede
AK
704
705 // Start processing thread
1499848d
VK
706 THREAD hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
707 THREAD hWriterThread = ThreadCreateEx(SyslogWriterThread, 0, NULL);
5039dede 708
af4a09df 709 DbgPrintf(1, _T("Syslog receiver thread started"));
5039dede
AK
710
711 // Wait for packets
af4a09df 712 while(s_running)
5039dede 713 {
af4a09df 714 struct timeval tv;
5039dede
AK
715 tv.tv_sec = 1;
716 tv.tv_usec = 0;
af4a09df
VK
717
718 fd_set rdfs;
719 FD_ZERO(&rdfs);
720 if (hSocket != INVALID_SOCKET)
721 FD_SET(hSocket, &rdfs);
722#ifdef WITH_IPV6
723 if (hSocket6 != INVALID_SOCKET)
724 FD_SET(hSocket6, &rdfs);
725#endif
726
727#if defined(WITH_IPV6) && !defined(_WIN32)
728 SOCKET nfds = 0;
729 if (hSocket != INVALID_SOCKET)
730 nfds = hSocket;
731 if ((hSocket6 != INVALID_SOCKET) && (hSocket6 > nfds))
732 nfds = hSocket6;
733 int rc = select(SELECT_NFDS(nfds + 1), &rdfs, NULL, NULL, &tv);
734#else
735 int rc = select(SELECT_NFDS(hSocket + 1), &rdfs, NULL, NULL, &tv);
736#endif
737 if (rc > 0)
5039dede 738 {
af4a09df
VK
739 char syslogMessage[MAX_SYSLOG_MSG_LEN + 1];
740 SockAddrBuffer addr;
741 socklen_t addrLen = sizeof(SockAddrBuffer);
742#ifdef WITH_IPV6
743 SOCKET s = FD_ISSET(hSocket, &rdfs) ? hSocket : hSocket6;
744#else
745 SOCKET s = hSocket;
746#endif
747 int bytes = recvfrom(s, syslogMessage, MAX_SYSLOG_MSG_LEN, 0, (struct sockaddr *)&addr, &addrLen);
748 if (bytes > 0)
5039dede 749 {
af4a09df
VK
750 syslogMessage[bytes] = 0;
751 QueueSyslogMessage(syslogMessage, bytes, InetAddress::createFromSockaddr((struct sockaddr *)&addr));
5039dede
AK
752 }
753 else
754 {
755 // Sleep on error
756 ThreadSleepMs(100);
757 }
758 }
af4a09df 759 else if (rc == -1)
5039dede
AK
760 {
761 // Sleep on error
762 ThreadSleepMs(100);
763 }
764 }
765
766 // Stop processing thread
19dbc8ef 767 g_syslogProcessingQueue.put(INVALID_POINTER_VALUE);
5039dede 768 ThreadJoin(hProcessingThread);
1499848d
VK
769
770 // Stop writer thread - it must be done after processing thread already finished
19dbc8ef 771 g_syslogWriteQueue.put(INVALID_POINTER_VALUE);
1499848d
VK
772 ThreadJoin(hWriterThread);
773
774 delete s_parser;
07a1156a 775 CleanupLogParserLibrary();
5039dede 776
af4a09df 777 DbgPrintf(1, _T("Syslog receiver thread stopped"));
5039dede
AK
778 return THREAD_OK;
779}
780
af4a09df
VK
781/**
782 * Start built-in syslog server
783 */
784void StartSyslogServer()
785{
786 s_nodeMatchingPolicy = (NodeMatchingPolicy)ConfigReadInt(_T("SyslogNodeMatchingPolicy"), SOURCE_IP_THEN_HOSTNAME);
b8c8a8a7 787 s_alwaysUseServerTime = ConfigReadInt(_T("SyslogIgnoreMessageTimestamp"), 0) ? true : false;
af4a09df
VK
788
789 // Determine first available message id
790 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
791 DB_RESULT hResult = DBSelect(hdb, _T("SELECT max(msg_id) FROM syslog"));
792 if (hResult != NULL)
793 {
794 if (DBGetNumRows(hResult) > 0)
795 {
796 s_msgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, s_msgId);
797 }
798 DBFreeResult(hResult);
799 }
800 DBConnectionPoolReleaseConnection(hdb);
801
802 s_receiverThread = ThreadCreateEx(SyslogReceiver, 0, NULL);
803}
804
805/**
806 * Stop built-in syslog server
807 */
808void StopSyslogServer()
809{
810 s_running = false;
811 ThreadJoin(s_receiverThread);
812}
813
271c3551
VK
814/**
815 * Create NXCP message from NX_SYSLOG_RECORD structure
816 */
b368969c 817void CreateMessageFromSyslogMsg(NXCPMessage *pMsg, NX_SYSLOG_RECORD *pRec)
5039dede 818{
967893bb 819 UINT32 dwId = VID_SYSLOG_MSG_BASE;
5039dede 820
b368969c
VK
821 pMsg->setField(VID_NUM_RECORDS, (UINT32)1);
822 pMsg->setField(dwId++, pRec->qwMsgId);
823 pMsg->setField(dwId++, (UINT32)pRec->tmTimeStamp);
824 pMsg->setField(dwId++, (WORD)pRec->nFacility);
825 pMsg->setField(dwId++, (WORD)pRec->nSeverity);
826 pMsg->setField(dwId++, pRec->dwSourceObject);
827 pMsg->setFieldFromMBString(dwId++, pRec->szHostName);
828 pMsg->setFieldFromMBString(dwId++, pRec->szTag);
829 pMsg->setFieldFromMBString(dwId++, pRec->szMessage);
5039dede 830}
4d0c32f3 831
271c3551
VK
832/**
833 * Reinitialize parser on configuration change
834 */
4d0c32f3
VK
835void ReinitializeSyslogParser()
836{
1499848d 837 if (s_parserLock == INVALID_MUTEX_HANDLE)
4d0c32f3
VK
838 return; // Syslog daemon not initialized
839 CreateParserFromConfig();
840}
b8c8a8a7
VK
841
842/**
843 * Handler for syslog related configuration changes
844 */
845void OnSyslogConfigurationChange(const TCHAR *name, const TCHAR *value)
846{
847 if (!_tcscmp(name, _T("SyslogIgnoreMessageTimestamp")))
848 {
849 s_alwaysUseServerTime = _tcstol(value, NULL, 0) ? true : false;
850 nxlog_debug(4, _T("Syslog: ignore message timestamp option set to %s"), s_alwaysUseServerTime ? _T("ON") : _T("OFF"));
851 }
852}