a629658a1a5d0fa0e59b39109941d3836ce4bd1e
[public/netxms.git] / src / libnxlp / file.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Log Parsing Library
4 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: file.cpp
21 **
22 **/
23
24 #include "libnxlp.h"
25
26 #ifdef _WIN32
27 #include <share.h>
28 #endif
29
30
31 #if defined(_WIN32)
32 #define NX_STAT _tstati64
33 #define NX_STAT_STRUCT struct _stati64
34 #elif HAVE_STAT64 && HAVE_STRUCT_STAT64
35 #define NX_STAT stat64
36 #define NX_STAT_STRUCT struct stat64
37 #else
38 #define NX_STAT stat
39 #define NX_STAT_STRUCT struct stat
40 #endif
41
42 #if defined(_WIN32)
43 #define NX_FSTAT _fstati64
44 #elif HAVE_FSTAT64 && HAVE_STRUCT_STAT64
45 #define NX_FSTAT fstat64
46 #else
47 #define NX_FSTAT fstat
48 #endif
49
50 #if defined(UNICODE) && !defined(_WIN32)
51 inline int __call_stat(const WCHAR *f, NX_STAT_STRUCT *s)
52 {
53 char *mbf = MBStringFromWideString(f);
54 int rc = NX_STAT(mbf, s);
55 free(mbf);
56 return rc;
57 }
58 #define CALL_STAT(f, s) __call_stat(f, s)
59 #else
60 #define CALL_STAT(f, s) NX_STAT(f, s)
61 #endif
62
63
64 /**
65 * Constants
66 */
67 #define READ_BUFFER_SIZE 4096
68
69 /**
70 * Find byte sequence in the stream
71 */
72 static char *FindSequence(char *start, int length, const char *sequence, int seqLength)
73 {
74 char *curr = start;
75 int count = 0;
76 while(length - count >= seqLength)
77 {
78 if (!memcmp(curr, sequence, seqLength))
79 return curr;
80 curr += seqLength;
81 count += seqLength;
82 }
83 return NULL;
84 }
85
86 /**
87 * Find end-of-line marker
88 */
89 static char *FindEOL(char *start, int length, int encoding)
90 {
91 char *eol = NULL;
92 switch(encoding)
93 {
94 case LP_FCP_UCS2:
95 #if WORDS_BIGENDIAN
96 eol = FindSequence(start, length, "\0\n", 2);
97 #else
98 eol = FindSequence(start, length, "\n\0", 2);
99 #endif
100 break;
101 case LP_FCP_UCS2_LE:
102 eol = FindSequence(start, length, "\n\0", 2);
103 break;
104 case LP_FCP_UCS2_BE:
105 eol = FindSequence(start, length, "\0\n", 2);
106 break;
107 case LP_FCP_UCS4:
108 #if WORDS_BIGENDIAN
109 eol = FindSequence(start, length, "\0\0\0\n", 4);
110 #else
111 eol = FindSequence(start, length, "\n\0\0\0", 4);
112 #endif
113 break;
114 case LP_FCP_UCS4_LE:
115 eol = FindSequence(start, length, "\n\0\0\0", 4);
116 break;
117 case LP_FCP_UCS4_BE:
118 eol = FindSequence(start, length, "\0\0\0\n", 4);
119 break;
120 default:
121 eol = (char *)memchr(start, '\n', length);
122 break;
123 }
124
125 if (eol == NULL)
126 {
127 // Try to find CR
128 switch(encoding)
129 {
130 case LP_FCP_UCS2:
131 #if WORDS_BIGENDIAN
132 eol = FindSequence(start, length, "\0\r", 2);
133 #else
134 eol = FindSequence(start, length, "\r\0", 2);
135 #endif
136 break;
137 case LP_FCP_UCS2_LE:
138 eol = FindSequence(start, length, "\r\0", 2);
139 break;
140 case LP_FCP_UCS2_BE:
141 eol = FindSequence(start, length, "\0\r", 2);
142 break;
143 case LP_FCP_UCS4:
144 #if WORDS_BIGENDIAN
145 eol = FindSequence(start, length, "\0\0\0\r", 4);
146 #else
147 eol = FindSequence(start, length, "\r\0\0\0", 4);
148 #endif
149 break;
150 case LP_FCP_UCS4_LE:
151 eol = FindSequence(start, length, "\r\0\0\0", 4);
152 break;
153 case LP_FCP_UCS4_BE:
154 eol = FindSequence(start, length, "\0\0\0\r", 4);
155 break;
156 default:
157 eol = (char *)memchr(start, '\r', length);
158 break;
159 }
160 }
161
162 return eol;
163 }
164
165 /**
166 * Parse new log records
167 */
168 static off_t ParseNewRecords(LogParser *parser, int fh)
169 {
170 char *ptr, *eptr, buffer[READ_BUFFER_SIZE];
171 int bytes, bufPos = 0;
172 off_t resetPos;
173 int encoding = parser->getFileEncoding();
174 TCHAR text[READ_BUFFER_SIZE];
175
176 do
177 {
178 resetPos = lseek(fh, 0, SEEK_CUR);
179 if ((bytes = _read(fh, &buffer[bufPos], READ_BUFFER_SIZE - bufPos)) > 0)
180 {
181 bytes += bufPos;
182 for(ptr = buffer;; ptr = eptr + 1)
183 {
184 bufPos = (int)(ptr - buffer);
185 eptr = FindEOL(ptr, bytes - bufPos, encoding);
186 if (eptr == NULL)
187 {
188 int remaining = bytes - bufPos;
189 resetPos = lseek(fh, 0, SEEK_CUR) - remaining;
190 if (remaining > 0)
191 {
192 memmove(buffer, ptr, remaining);
193 if (parser->isFilePreallocated() && !memcmp(buffer, "\x00\x00\x00\x00", std::min(remaining, 4)))
194 {
195 // Found zeroes in preallocated file, next read should be after last known EOL
196 return resetPos;
197 }
198 }
199 break;
200 }
201 // remove possible CR character and put 0 to indicate end of line
202 switch(encoding)
203 {
204 case LP_FCP_UCS2:
205 #if WORDS_BIGENDIAN
206 if ((eptr - ptr >= 2) && !memcmp(eptr - 2, "\0\r", 2))
207 eptr -= 2;
208 *eptr = 0;
209 *(eptr + 1) = 0;
210 #else
211
212 if ((eptr - ptr >= 2) && !memcmp(eptr - 2, "\r\0", 2))
213 eptr -= 2;
214 *eptr = 0;
215 *(eptr + 1) = 0;
216 #endif
217 break;
218 case LP_FCP_UCS2_LE:
219 if ((eptr - ptr >= 2) && !memcmp(eptr - 2, "\r\0", 2))
220 eptr -= 2;
221 *eptr = 0;
222 *(eptr + 1) = 0;
223 break;
224 case LP_FCP_UCS2_BE:
225 if ((eptr - ptr >= 2) && !memcmp(eptr - 2, "\0\r", 2))
226 eptr -= 2;
227 *eptr = 0;
228 *(eptr + 1) = 0;
229 break;
230 case LP_FCP_UCS4:
231 #if WORDS_BIGENDIAN
232 if ((eptr - ptr >= 4) && !memcmp(eptr - 4, "\0\0\0\r", 4))
233 eptr -= 4;
234 memset(eptr, 0, 4);
235 #else
236
237 if ((eptr - ptr >= 4) && !memcmp(eptr - 4, "\r\0\0\0", 4))
238 eptr -= 4;
239 memset(eptr, 0, 4);
240 #endif
241 break;
242 case LP_FCP_UCS4_LE:
243 if ((eptr - ptr >= 4) && !memcmp(eptr - 4, "\r\0\0\0", 4))
244 eptr -= 4;
245 memset(eptr, 0, 4);
246 break;
247 case LP_FCP_UCS4_BE:
248 if ((eptr - ptr >= 4) && !memcmp(eptr - 4, "\0\0\0\r", 4))
249 eptr -= 4;
250 memset(eptr, 0, 4);
251 break;
252 default:
253 if (*(eptr - 1) == '\r')
254 eptr--;
255 *eptr = 0;
256 break;
257 }
258 // Now ptr points to null-terminated string in original encoding
259 // Do the conversion to platform encoding
260 #ifdef UNICODE
261 switch(encoding)
262 {
263 case LP_FCP_ACP:
264 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, ptr, -1, text, READ_BUFFER_SIZE);
265 break;
266 case LP_FCP_UTF8:
267 MultiByteToWideChar(CP_UTF8, 0, ptr, -1, text, READ_BUFFER_SIZE);
268 break;
269 case LP_FCP_UCS2_LE:
270 #if WORDS_BIGENDIAN
271 bswap_array_16((UINT16 *)ptr, -1);
272 #endif
273 #ifdef UNICODE_UCS2
274 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
275 #else
276 ucs2_to_ucs4((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
277 #endif
278 break;
279 case LP_FCP_UCS2_BE:
280 #if !WORDS_BIGENDIAN
281 bswap_array_16((UINT16 *)ptr, -1);
282 #endif
283 #ifdef UNICODE_UCS2
284 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
285 #else
286 ucs2_to_ucs4((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
287 #endif
288 break;
289 case LP_FCP_UCS2:
290 #ifdef UNICODE_UCS2
291 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
292 #else
293 ucs2_to_ucs4((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
294 #endif
295 break;
296 case LP_FCP_UCS4_LE:
297 #if WORDS_BIGENDIAN
298 bswap_array_32((UINT32 *)ptr, -1);
299 #endif
300 #ifdef UNICODE_UCS2
301 ucs4_to_ucs2((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
302 #else
303 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
304 #endif
305 break;
306 case LP_FCP_UCS4_BE:
307 #if !WORDS_BIGENDIAN
308 bswap_array_32((UINT32 *)ptr, -1);
309 #endif
310 #ifdef UNICODE_UCS2
311 ucs4_to_ucs2((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
312 #else
313 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
314 #endif
315 break;
316 case LP_FCP_UCS4:
317 #ifdef UNICODE_UCS2
318 ucs4_to_ucs2((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
319 #else
320 nx_strncpy(text, (TCHAR *)ptr, READ_BUFFER_SIZE);
321 #endif
322 break;
323 default:
324 break;
325 }
326 #else
327 switch(encoding)
328 {
329 case LP_FCP_ACP:
330 nx_strncpy(text, ptr, READ_BUFFER_SIZE);
331 break;
332 case LP_FCP_UTF8:
333 utf8_to_mb(ptr, -1, text, READ_BUFFER_SIZE);
334 break;
335 case LP_FCP_UCS2_LE:
336 #if WORDS_BIGENDIAN
337 bswap_array_16((UINT16 *)ptr, -1);
338 #endif
339 ucs2_to_mb((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
340 break;
341 case LP_FCP_UCS2_BE:
342 #if !WORDS_BIGENDIAN
343 bswap_array_16((UINT16 *)ptr, -1);
344 #endif
345 ucs2_to_mb((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
346 break;
347 case LP_FCP_UCS2:
348 ucs2_to_mb((UCS2CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
349 break;
350 case LP_FCP_UCS4_LE:
351 #if WORDS_BIGENDIAN
352 bswap_array_32((UINT32 *)ptr, -1);
353 #endif
354 ucs4_to_mb((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
355 break;
356 case LP_FCP_UCS4_BE:
357 #if !WORDS_BIGENDIAN
358 bswap_array_32((UINT32 *)ptr, -1);
359 #endif
360 ucs4_to_mb((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
361 break;
362 case LP_FCP_UCS4:
363 ucs4_to_mb((UCS4CHAR *)ptr, -1, text, READ_BUFFER_SIZE);
364 break;
365 default:
366 break;
367 }
368 #endif
369 parser->matchLine(text);
370 }
371 }
372 else
373 {
374 bytes = 0;
375 }
376 } while(bytes == READ_BUFFER_SIZE);
377 return resetPos;
378 }
379
380 /**
381 * Scan first 10 bytes of a file to find its encoding
382 */
383 static int ScanFileEncoding(int fh)
384 {
385 char buffer[10];
386 if (_read(fh, buffer, 4) > 3)
387 {
388 if (!memcmp(buffer, "\x00\x00\xFE\xFF", 4))
389 return LP_FCP_UCS4_BE;
390 if (!memcmp(buffer, "\xFF\xFE\x00\x00", 4))
391 return LP_FCP_UCS4_LE;
392 if (!memcmp(buffer, "\xEF\xBB\xBF", 3))
393 return LP_FCP_UTF8;
394 if (!memcmp(buffer, "\xFE\xFF", 2))
395 return LP_FCP_UCS2_BE;
396 if (!memcmp(buffer, "\xFF\xFE", 2))
397 return LP_FCP_UCS2_LE;
398 }
399
400 return LP_FCP_ACP;
401 }
402
403 /**
404 * Seek file to the beginning of zeroes block
405 */
406 static void SeekToZero(int fh, int chsize)
407 {
408 char buffer[4096];
409 while(true)
410 {
411 int bytes = _read(fh, buffer, 4096);
412 if (bytes <= 0)
413 break;
414 char *p = buffer;
415 for(int i = 0; i < bytes - chsize + 1; i++, p++)
416 {
417 if ((*p == 0) && ((chsize == 1) || !memcmp(p, "\x00\x00\x00\x00", chsize)))
418 {
419 off_t pos = lseek(fh, i - bytes, SEEK_CUR);
420 LogParserTrace(6, _T("LogParser: beginning of zero block found at %ld"), (long)pos);
421 return;
422 }
423 }
424 if (chsize > 1)
425 lseek(fh, 1 - chsize, SEEK_CUR); // re-read potentially incomplete last character
426 }
427 }
428
429 /**
430 * File parser thread
431 */
432 bool LogParser::monitorFile(CONDITION stopCondition, bool readFromCurrPos)
433 {
434 TCHAR fname[MAX_PATH], temp[MAX_PATH];
435 NX_STAT_STRUCT st, stn;
436 size_t size;
437 int fh;
438 bool readFromStart = !readFromCurrPos;
439
440 if (m_fileName == NULL)
441 {
442 LogParserTrace(0, _T("LogParser: parser thread will not start, file name not set"));
443 return false;
444 }
445
446 LogParserTrace(0, _T("LogParser: parser thread for file \"%s\" started"), m_fileName);
447 bool exclusionPeriod = false;
448 while(true)
449 {
450 if (isExclusionPeriod())
451 {
452 if (!exclusionPeriod)
453 {
454 exclusionPeriod = true;
455 LogParserTrace(6, _T("LogParser: will not open file \"%s\" because of exclusion period"), getFileName());
456 setStatus(LPS_SUSPENDED);
457 }
458 if (ConditionWait(stopCondition, 30000))
459 break;
460 continue;
461 }
462
463 if (exclusionPeriod)
464 {
465 exclusionPeriod = false;
466 LogParserTrace(6, _T("LogParser: exclusion period for file \"%s\" ended"), getFileName());
467 }
468
469 ExpandFileName(getFileName(), fname, MAX_PATH, true);
470 if (CALL_STAT(fname, &st) == 0)
471 {
472 #ifdef _WIN32
473 fh = _tsopen(fname, O_RDONLY, _SH_DENYNO);
474 #else
475 fh = _topen(fname, O_RDONLY);
476 #endif
477 if (fh != -1)
478 {
479 setStatus(LPS_RUNNING);
480 LogParserTrace(3, _T("LogParser: file \"%s\" (pattern \"%s\") successfully opened"), fname, m_fileName);
481
482 if (m_fileEncoding == -1)
483 {
484 m_fileEncoding = ScanFileEncoding(fh);
485 lseek(fh, 0, SEEK_SET);
486 }
487
488 size = (size_t)st.st_size;
489 if (readFromStart)
490 {
491 LogParserTrace(5, _T("LogParser: parsing existing records in file \"%s\""), fname);
492 off_t resetPos = ParseNewRecords(this, fh);
493 lseek(fh, resetPos, SEEK_SET);
494 }
495 else if (m_preallocatedFile)
496 {
497 SeekToZero(fh, getCharSize());
498 }
499 else
500 {
501 lseek(fh, 0, SEEK_END);
502 }
503
504 while(true)
505 {
506 if (ConditionWait(stopCondition, 5000))
507 goto stop_parser;
508
509 // Check if file name was changed
510 ExpandFileName(getFileName(), temp, MAX_PATH, true);
511 if (_tcscmp(temp, fname))
512 {
513 LogParserTrace(5, _T("LogParser: file name change for \"%s\" (\"%s\" -> \"%s\")"), m_fileName, fname, temp);
514 readFromStart = true;
515 break;
516 }
517
518 if (NX_FSTAT(fh, &st) < 0)
519 {
520 LogParserTrace(1, _T("LogParser: fstat(%d) failed, errno=%d"), fh, errno);
521 readFromStart = true;
522 break;
523 }
524
525 if (CALL_STAT(fname, &stn) < 0)
526 {
527 LogParserTrace(1, _T("LogParser: stat(%s) failed, errno=%d"), fname, errno);
528 readFromStart = true;
529 break;
530 }
531
532 #ifdef _WIN32
533 if (st.st_ctime != stn.st_ctime)
534 {
535 LogParserTrace(3, _T("LogParser: creation time for fstat(%d) is not equal to creation time for stat(%s), assume file rename"), fh, fname);
536 readFromStart = true;
537 break;
538 }
539 #else
540 if ((st.st_ino != stn.st_ino) || (st.st_dev != stn.st_dev))
541 {
542 LogParserTrace(3, _T("LogParser: file device or inode differs for stat(%d) and fstat(%s), assume file rename"), fh, fname);
543 readFromStart = true;
544 break;
545 }
546 #endif
547
548 if ((size_t)st.st_size != size)
549 {
550 if ((size_t)st.st_size < size)
551 {
552 // File was cleared, start from the beginning
553 lseek(fh, 0, SEEK_SET);
554 LogParserTrace(3, _T("LogParser: file \"%s\" st_size != size"), fname);
555 }
556 size = (size_t)st.st_size;
557 LogParserTrace(6, _T("LogParser: new data available in file \"%s\""), fname);
558 off_t resetPos = ParseNewRecords(this, fh);
559 lseek(fh, resetPos, SEEK_SET);
560 }
561 else if (m_preallocatedFile)
562 {
563 char buffer[4];
564 int bytes = _read(fh, buffer, 4);
565 if ((bytes == 4) && memcmp(buffer, "\x00\x00\x00\x00", 4))
566 {
567 lseek(fh, -4, SEEK_CUR);
568 LogParserTrace(6, _T("LogParser: new data available in file \"%s\""), fname);
569 off_t resetPos = ParseNewRecords(this, fh);
570 lseek(fh, resetPos, SEEK_SET);
571 }
572 else
573 {
574 off_t pos = lseek(fh, -bytes, SEEK_CUR);
575 if (pos > 0)
576 {
577 int readSize = std::min(pos, (off_t)4);
578 lseek(fh, -readSize, SEEK_CUR);
579 int bytes = _read(fh, buffer, readSize);
580 if ((bytes == readSize) && !memcmp(buffer, "\x00\x00\x00\x00", readSize))
581 {
582 LogParserTrace(6, _T("LogParser: detected reset of preallocated file \"%s\""), fname);
583 lseek(fh, 0, SEEK_SET);
584 off_t resetPos = ParseNewRecords(this, fh);
585 lseek(fh, resetPos, SEEK_SET);
586 }
587 }
588 }
589 }
590
591 if (isExclusionPeriod())
592 {
593 LogParserTrace(6, _T("LogParser: closing file \"%s\" because of exclusion period"), fname);
594 exclusionPeriod = true;
595 setStatus(LPS_SUSPENDED);
596 break;
597 }
598 }
599 _close(fh);
600 }
601 else
602 {
603 setStatus(LPS_OPEN_ERROR);
604 }
605 }
606 else
607 {
608 setStatus(LPS_NO_FILE);
609 if (ConditionWait(stopCondition, 10000))
610 break;
611 }
612 }
613
614 stop_parser:
615 LogParserTrace(0, _T("LogParser: parser thread for file \"%s\" stopped"), m_fileName);
616 return true;
617 }