Rollback from r3608 to r3606
[public/netxms.git] / src / server / core / syslogd.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS - Network Management System
3** Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: syslogd.cpp
20**
21**/
22
23#include "nxcore.h"
24#include <nxlog.h>
25#include <nxlpapi.h>
26
27
28//
29// Constants
30//
31
32#define MAX_SYSLOG_MSG_LEN 1024
33
34
35//
36// Queued syslog message structure
37//
38
39struct QUEUED_SYSLOG_MESSAGE
40{
41 DWORD dwSourceIP;
42 int nBytes;
43 char *psMsg;
44};
45
46
47//
48// Static data
49//
50
51static QWORD m_qwMsgId = 1;
52static Queue *m_pSyslogQueue = NULL;
53static LogParser *m_parser = NULL;
54
55
56//
57// Parse timestamp field
58//
59
60static BOOL ParseTimeStamp(char **ppStart, int nMsgSize, int *pnPos, time_t *ptmTime)
61{
62 static char psMonth[12][5] = { "Jan ", "Feb ", "Mar ", "Apr ",
63 "May ", "Jun ", "Jul ", "Aug ",
64 "Sep ", "Oct ", "Nov ", "Dec " };
65 struct tm timestamp;
66 time_t t;
67 char szBuffer[16], *pCurr = *ppStart;
68 int i;
69
70 if (nMsgSize - *pnPos < 16)
71 return FALSE; // Timestamp cannot be shorter than 16 bytes
72
73 // Prepare local time structure
74 t = time(NULL);
75 memcpy(&timestamp, localtime(&t), sizeof(struct tm));
76
77 // Month
78 for(i = 0; i < 12; i++)
79 if (!memcmp(pCurr, psMonth[i], 4))
80 {
81 timestamp.tm_mon = i;
82 break;
83 }
84 if (i == 12)
85 return FALSE;
86 pCurr += 4;
87
88 // Day of week
89 if (isdigit(*pCurr))
90 {
91 timestamp.tm_mday = *pCurr - '0';
92 }
93 else
94 {
95 if (*pCurr != ' ')
96 return FALSE; // Invalid day of month
97 timestamp.tm_mday = 0;
98 }
99 pCurr++;
100 if (isdigit(*pCurr))
101 {
102 timestamp.tm_mday = timestamp.tm_mday * 10 + (*pCurr - '0');
103 }
104 else
105 {
106 return FALSE; // Invalid day of month
107 }
108 pCurr++;
109 if (*pCurr != ' ')
110 return FALSE;
111 pCurr++;
112
113 // HH:MM:SS
114 memcpy(szBuffer, pCurr, 8);
115 szBuffer[8] = 0;
116 if (sscanf(szBuffer, "%02d:%02d:%02d", &timestamp.tm_hour,
117 &timestamp.tm_min, &timestamp.tm_sec) != 3)
118 return FALSE; // Invalid time format
119 pCurr += 8;
120
121 // Check for Cisco variant - HH:MM:SS.nnn
122 if (*pCurr == '.')
123 {
124 pCurr++;
125 if (isdigit(*pCurr))
126 pCurr++;
127 if (isdigit(*pCurr))
128 pCurr++;
129 if (isdigit(*pCurr))
130 pCurr++;
131 }
132
133 if (*pCurr != ' ')
134 return FALSE; // Space should follow timestamp
135 pCurr++;
136
137 // Convert to system time
138 *ptmTime = mktime(&timestamp);
139 if (*ptmTime == ((time_t)-1))
140 return FALSE;
141
142 // Adjust current position
143 *pnPos += (int)(pCurr - *ppStart);
144 *ppStart = pCurr;
145 return TRUE;
146}
147
148
149//
150// Parse syslog message
151//
152
153static BOOL ParseSyslogMessage(char *psMsg, int nMsgLen, NX_LOG_RECORD *pRec)
154{
155 int i, nLen, nPos = 0;
156 char *pCurr = psMsg;
157
158 memset(pRec, 0, sizeof(NX_LOG_RECORD));
159
160 // Parse PRI part
161 if (*psMsg == '<')
162 {
163 int nPri = 0, nCount = 0;
164
165 for(pCurr++, nPos++; isdigit(*pCurr) && (nPos < nMsgLen); pCurr++, nPos++, nCount++)
166 nPri = nPri * 10 + (*pCurr - '0');
167 if (nPos >= nMsgLen)
168 return FALSE; // Unexpected end of message
169
170 if ((*pCurr == '>') && (nCount > 0) && (nCount <4))
171 {
172 pRec->nFacility = nPri / 8;
173 pRec->nSeverity = nPri % 8;
174 pCurr++;
175 nPos++;
176 }
177 else
178 {
179 return FALSE; // Invalid message
180 }
181 }
182 else
183 {
184 // Set default PRI of 13
185 pRec->nFacility = 1;
186 pRec->nSeverity = SYSLOG_SEVERITY_NOTICE;
187 }
188
189 // Parse HEADER part
190 if (ParseTimeStamp(&pCurr, nMsgLen, &nPos, &pRec->tmTimeStamp))
191 {
192 // Hostname
193 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_HOSTNAME_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
194 pRec->szHostName[i] = *pCurr;
195 if ((nPos >= nMsgLen) || (*pCurr != ' '))
196 {
197 // Not a valid hostname, assuming to be a part of message
198 pCurr -= i;
199 nPos -= i;
200 }
201 else
202 {
203 pCurr++;
204 nPos++;
205 }
206 }
207 else
208 {
209 pRec->tmTimeStamp = time(NULL);
210 }
211
212 // Parse MSG part
213 for(i = 0; isalnum(*pCurr) && (i < MAX_SYSLOG_TAG_LEN) && (nPos < nMsgLen); i++, nPos++, pCurr++)
214 pRec->szTag[i] = *pCurr;
215 if ((i == MAX_SYSLOG_TAG_LEN) || (nPos >= nMsgLen))
216 {
217 // Too long tag, assuming that it's a part of message
218 pRec->szTag[0] = 0;
219 }
220 pCurr -= i;
221 nPos -= i;
222 nLen = min(nMsgLen - nPos, MAX_LOG_MSG_LENGTH);
223 memcpy(pRec->szMessage, pCurr, nLen);
224
225 return TRUE;
226}
227
228
229//
230// Bind syslog message to NetXMS node object
231// dwSourceIP is an IP address from which we receive message
232//
233
234static void BindMsgToNode(NX_LOG_RECORD *pRec, DWORD dwSourceIP)
235{
236 Node *pNode;
237 DWORD dwIpAddr;
238
239 // Determine IP address of a source
240 if (pRec->szHostName[0] == 0)
241 {
242 // Hostname was not defined in the message
243 dwIpAddr = dwSourceIP;
244 }
245 else
246 {
247 dwIpAddr = ResolveHostName(pRec->szHostName);
248 }
249
250 // Match source IP to NetXMS object
251 if (dwIpAddr != INADDR_NONE)
252 {
253 pNode = FindNodeByIP(dwIpAddr);
254 if (pNode != NULL)
255 {
256 pRec->dwSourceObject = pNode->Id();
257 if (pRec->szHostName[0] == 0)
258 nx_strncpy(pRec->szHostName, pNode->Name(), MAX_SYSLOG_HOSTNAME_LEN);
259 }
260 else
261 {
262 if (pRec->szHostName[0] == 0)
263 IpToStr(dwSourceIP, pRec->szHostName);
264 }
265 }
266}
267
268
269//
270// Handler for EnumerateSessions()
271//
272
273static void BroadcastSyslogMessage(ClientSession *pSession, void *pArg)
274{
275 if (pSession->IsAuthenticated())
276 pSession->OnSyslogMessage((NX_LOG_RECORD *)pArg);
277}
278
279
280//
281// Process syslog message
282//
283
284static void ProcessSyslogMessage(char *psMsg, int nMsgLen, DWORD dwSourceIP)
285{
286 NX_LOG_RECORD record;
287 TCHAR *pszEscMsg, szQuery[4096];
288
289 if (ParseSyslogMessage(psMsg, nMsgLen, &record))
290 {
291 record.qwMsgId = m_qwMsgId++;
292 BindMsgToNode(&record, dwSourceIP);
293 pszEscMsg = EncodeSQLString(record.szMessage);
294 _sntprintf(szQuery, 4096,
295 _T("INSERT INTO syslog (msg_id,msg_timestamp,facility,severity,")
296 _T("source_object_id,hostname,msg_tag,msg_text) VALUES ")
297 _T("(" UINT64_FMT "," TIME_T_FMT ",%d,%d,%d,'%s','%s','%s')"),
298 record.qwMsgId, record.tmTimeStamp, record.nFacility,
299 record.nSeverity, record.dwSourceObject,
300 record.szHostName, record.szTag, pszEscMsg);
301 free(pszEscMsg);
302 DBQuery(g_hCoreDB, szQuery);
303
304 // Send message to all connected clients
305 EnumerateClientSessions(BroadcastSyslogMessage, &record);
306
307 if ((record.dwSourceObject != 0) && (m_parser != NULL))
308 {
309 m_parser->MatchLine(record.szMessage, record.dwSourceObject);
310 }
311 }
312}
313
314
315//
316// Syslog processing thread
317//
318
319static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
320{
321 QUEUED_SYSLOG_MESSAGE *pMsg;
322
323 while(1)
324 {
325 pMsg = (QUEUED_SYSLOG_MESSAGE *)m_pSyslogQueue->GetOrBlock();
326 if (pMsg == INVALID_POINTER_VALUE)
327 break;
328
329 ProcessSyslogMessage(pMsg->psMsg, pMsg->nBytes, pMsg->dwSourceIP);
330 free(pMsg->psMsg);
331 free(pMsg);
332 }
333 return THREAD_OK;
334}
335
336
337//
338// Queue syslog message for processing
339//
340
341static void QueueSyslogMessage(char *psMsg, int nMsgLen, DWORD dwSourceIP)
342{
343 QUEUED_SYSLOG_MESSAGE *pMsg;
344
345 pMsg = (QUEUED_SYSLOG_MESSAGE *)malloc(sizeof(QUEUED_SYSLOG_MESSAGE));
346 pMsg->dwSourceIP = dwSourceIP;
347 pMsg->nBytes = nMsgLen;
348 pMsg->psMsg = (char *)nx_memdup(psMsg, nMsgLen);
349 m_pSyslogQueue->Put(pMsg);
350}
351
352
353//
354// Callback for syslog parser
355//
356
357static void SyslogParserCallback(DWORD event, const char *line, int paramCount,
358 char **params, DWORD objectId, void *userArg)
359{
360 char format[] = "ssssssssssssssssssssssssssssssss";
361 char *plist[32];
362 int i, count;
363
364 count = min(paramCount, 32);
365 format[count] = 0;
366 for(i = 0; i < count; i++)
367 plist[i] = params[i];
368 PostEvent(event, objectId, format,
369 plist[0], plist[1], plist[2], plist[3],
370 plist[4], plist[5], plist[6], plist[7],
371 plist[8], plist[9], plist[10], plist[11],
372 plist[12], plist[13], plist[14], plist[15],
373 plist[16], plist[17], plist[18], plist[19],
374 plist[20], plist[21], plist[22], plist[23],
375 plist[24], plist[25], plist[26], plist[27],
376 plist[28], plist[29], plist[30], plist[31]);
377}
378
379
380//
381// Event name resolver
382//
383
384static BOOL EventNameResolver(const TCHAR *name, DWORD *code)
385{
386 EVENT_TEMPLATE *event;
387 BOOL success = FALSE;
388
389 event = FindEventTemplateByName(name);
390 if (event != NULL)
391 {
392 *code = event->dwCode;
393 success = TRUE;
394 }
395 return success;
396}
397
398
399//
400// Syslog messages receiver thread
401//
402
403THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
404{
405 SOCKET hSocket;
406 struct sockaddr_in addr;
407 int nBytes, nPort, nRet;
408 socklen_t nAddrLen;
409 char sMsg[MAX_SYSLOG_MSG_LEN];
410 DB_RESULT hResult;
411 THREAD hProcessingThread;
412 fd_set rdfs;
413 struct timeval tv;
414 TCHAR *xml;
415
416 // Determine first available message id
417 hResult = DBSelect(g_hCoreDB, _T("SELECT max(msg_id) FROM syslog"));
418 if (hResult != NULL)
419 {
420 if (DBGetNumRows(hResult) > 0)
421 {
422 m_qwMsgId = max(DBGetFieldUInt64(hResult, 0, 0) + 1, m_qwMsgId);
423 }
424 DBFreeResult(hResult);
425 }
426
427 hSocket = socket(AF_INET, SOCK_DGRAM, 0);
428 if (hSocket == -1)
429 {
430 nxlog_write(MSG_SOCKET_FAILED, EVENTLOG_ERROR_TYPE, "s", "SyslogDaemon");
431 return THREAD_OK;
432 }
433
434 SetSocketReuseFlag(hSocket);
435
436 // Get listen port number
437 nPort = ConfigReadInt(_T("SyslogListenPort"), 514);
438 if ((nPort < 1) || (nPort > 65535))
439 nPort = 514;
440
441 // Fill in local address structure
442 memset(&addr, 0, sizeof(struct sockaddr_in));
443 addr.sin_family = AF_INET;
444 addr.sin_addr.s_addr = ResolveHostName(g_szListenAddress);
445 addr.sin_port = htons((WORD)nPort);
446
447 // Bind socket
448 if (bind(hSocket, (struct sockaddr *)&addr, sizeof(struct sockaddr_in)) != 0)
449 {
450 nxlog_write(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "dse", nPort, "SyslogDaemon", WSAGetLastError());
451 closesocket(hSocket);
452 return THREAD_OK;
453 }
454 nxlog_write(MSG_LISTENING_FOR_SYSLOG, EVENTLOG_INFORMATION_TYPE, "ad", ntohl(addr.sin_addr.s_addr), nPort);
455
456 // Create message parser
457 xml = ConfigReadCLOB(_T("SyslogParser"), _T("<parser></parser>"));
458 if (xml != NULL)
459 {
460 TCHAR parseError[256];
461
462 m_parser = new LogParser;
463 m_parser->SetEventNameResolver(EventNameResolver);
464 if (m_parser->CreateFromXML(xml, -1, parseError, 256))
465 {
466 m_parser->SetCallback(SyslogParserCallback);
467 }
468 else
469 {
470 delete_and_null(m_parser);
471 nxlog_write(MSG_SYSLOG_PARSER_INIT_FAILED, EVENTLOG_ERROR_TYPE, "s", parseError);
472 }
473 free(xml);
474 }
475
476 // Start processing thread
477 m_pSyslogQueue = new Queue(1000, 100);
478 hProcessingThread = ThreadCreateEx(SyslogProcessingThread, 0, NULL);
479
480 DbgPrintf(1, _T("Syslog Daemon started"));
481
482 // Wait for packets
483 while(!ShutdownInProgress())
484 {
485 FD_ZERO(&rdfs);
486 FD_SET(hSocket, &rdfs);
487 tv.tv_sec = 1;
488 tv.tv_usec = 0;
489 nRet = select((int)hSocket + 1, &rdfs, NULL, NULL, &tv);
490 if (nRet > 0)
491 {
492 nAddrLen = sizeof(struct sockaddr_in);
493 nBytes = recvfrom(hSocket, sMsg, MAX_SYSLOG_MSG_LEN, 0,
494 (struct sockaddr *)&addr, &nAddrLen);
495 if (nBytes > 0)
496 {
497 QueueSyslogMessage(sMsg, nBytes, ntohl(addr.sin_addr.s_addr));
498 }
499 else
500 {
501 // Sleep on error
502 ThreadSleepMs(100);
503 }
504 }
505 else if (nRet == -1)
506 {
507 // Sleep on error
508 ThreadSleepMs(100);
509 }
510 }
511
512 // Stop processing thread
513 m_pSyslogQueue->Put(INVALID_POINTER_VALUE);
514 ThreadJoin(hProcessingThread);
515 delete m_pSyslogQueue;
516 delete m_parser;
517
518 DbgPrintf(1, _T("Syslog Daemon stopped"));
519 return THREAD_OK;
520}
521
522
523//
524// Create NXCP message from NX_LOG_RECORd structure
525//
526
527void CreateMessageFromSyslogMsg(CSCPMessage *pMsg, NX_LOG_RECORD *pRec)
528{
529 DWORD dwId = VID_SYSLOG_MSG_BASE;
530
531 pMsg->SetVariable(VID_NUM_RECORDS, (DWORD)1);
532 pMsg->SetVariable(dwId++, pRec->qwMsgId);
533 pMsg->SetVariable(dwId++, (DWORD)pRec->tmTimeStamp);
534 pMsg->SetVariable(dwId++, (WORD)pRec->nFacility);
535 pMsg->SetVariable(dwId++, (WORD)pRec->nSeverity);
536 pMsg->SetVariable(dwId++, pRec->dwSourceObject);
537 pMsg->SetVariable(dwId++, pRec->szHostName);
538 pMsg->SetVariable(dwId++, pRec->szTag);
539 pMsg->SetVariable(dwId++, pRec->szMessage);
540}