set thread names in server
[public/netxms.git] / src / server / core / evproc.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: evproc.cpp
20 **
21 **/
22
23 #include "nxcore.h"
24
25 #if WITH_ZMQ
26 #include "zeromq.h"
27 #endif
28
29 /**
30 * Number of processed events since start
31 */
32 INT64 g_totalEventsProcessed = 0;
33
34 /**
35 * Static data
36 */
37 static THREAD s_threadStormDetector = INVALID_THREAD_HANDLE;
38 static THREAD s_threadLogger = INVALID_THREAD_HANDLE;
39 static Queue *s_loggerQueue = NULL;
40
41 /**
42 * Handler for EnumerateSessions()
43 */
44 static void BroadcastEvent(ClientSession *pSession, void *pArg)
45 {
46 if (pSession->isAuthenticated())
47 pSession->onNewEvent((Event *)pArg);
48 }
49
50 /**
51 * Event storm detector thread
52 */
53 static THREAD_RESULT THREAD_CALL EventStormDetector(void *arg)
54 {
55 ThreadSetName("EvtStormDetect");
56
57 INT64 numEvents, prevEvents, eventsPerSecond;
58 int duration, actualDuration = 0;
59
60 if (!ConfigReadInt(_T("EnableEventStormDetection"), 0))
61 {
62 // Event storm detection is off
63 DbgPrintf(1, _T("Event storm detector thread stopped because event storm detection is off"));
64 return THREAD_OK;
65 }
66
67 eventsPerSecond = ConfigReadInt(_T("EventStormEventsPerSecond"), 100);
68 duration = ConfigReadInt(_T("EventStormDuraction"), 15);
69
70 prevEvents = g_totalEventsProcessed;
71 while(!(g_flags & AF_SHUTDOWN))
72 {
73 ThreadSleepMs(1000);
74 numEvents = g_totalEventsProcessed - prevEvents;
75 prevEvents = g_totalEventsProcessed;
76 if ((numEvents >= eventsPerSecond) && (!(g_flags & AF_EVENT_STORM_DETECTED)))
77 {
78 actualDuration++;
79 if (actualDuration >= duration)
80 {
81 g_flags |= AF_EVENT_STORM_DETECTED;
82 DbgPrintf(2, _T("Event storm detected: threshold=") INT64_FMT _T(" eventsPerSecond=") INT64_FMT, eventsPerSecond, numEvents);
83 PostEvent(EVENT_EVENT_STORM_DETECTED, g_dwMgmtNode, "DdD", numEvents, duration, eventsPerSecond);
84 }
85 }
86 else if ((numEvents < eventsPerSecond) && (g_flags & AF_EVENT_STORM_DETECTED))
87 {
88 actualDuration = 0;
89 g_flags &= ~AF_EVENT_STORM_DETECTED;
90 DbgPrintf(2, _T("Event storm condition cleared"));
91 PostEvent(EVENT_EVENT_STORM_ENDED, g_dwMgmtNode, "DdD", numEvents, duration, eventsPerSecond);
92 }
93 }
94 DbgPrintf(1, _T("Event storm detector thread stopped"));
95 return THREAD_OK;
96 }
97
98 /**
99 * Event logger
100 */
101 static THREAD_RESULT THREAD_CALL EventLogger(void *arg)
102 {
103 ThreadSetName("EventLogger");
104
105 while(!IsShutdownInProgress())
106 {
107 Event *pEvent = (Event *)s_loggerQueue->getOrBlock();
108 if (pEvent == INVALID_POINTER_VALUE)
109 break; // Shutdown indicator
110
111 DB_HANDLE hdb = DBConnectionPoolAcquireConnection();
112 int syntaxId = DBGetSyntax(hdb);
113 if (syntaxId == DB_SYNTAX_SQLITE)
114 {
115 TCHAR szQuery[4096];
116 _sntprintf(szQuery, 4096, _T("INSERT INTO event_log (event_id,event_code,event_timestamp,event_source,")
117 _T("dci_id,event_severity,event_message,root_event_id,user_tag) VALUES (") UINT64_FMT
118 _T(",%d,%d,%d,%d,%d,%s,") UINT64_FMT _T(",%s)"), pEvent->getId(), pEvent->getCode(),
119 (UINT32)pEvent->getTimeStamp(), pEvent->getSourceId(), pEvent->getDciId(), pEvent->getSeverity(),
120 (const TCHAR *)DBPrepareString(hdb, pEvent->getMessage(), MAX_EVENT_MSG_LENGTH),
121 pEvent->getRootId(), (const TCHAR *)DBPrepareString(hdb, pEvent->getUserTag(), 63));
122 DBQuery(hdb, szQuery);
123 DbgPrintf(8, _T("EventLogger: DBQuery: id=%d,code=%d"), (int)pEvent->getId(), (int)pEvent->getCode());
124 delete pEvent;
125 }
126 else
127 {
128 DB_STATEMENT hStmt = DBPrepare(hdb, _T("INSERT INTO event_log (event_id,event_code,event_timestamp,")
129 _T("event_source,dci_id,event_severity,event_message,root_event_id,user_tag) ")
130 _T("VALUES (?,?,?,?,?,?,?,?,?)"));
131 if (hStmt != NULL)
132 {
133 do
134 {
135 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, pEvent->getId());
136 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, pEvent->getCode());
137 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (UINT32)pEvent->getTimeStamp());
138 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, pEvent->getSourceId());
139 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, pEvent->getDciId());
140 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, pEvent->getSeverity());
141 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, pEvent->getMessage(), DB_BIND_STATIC, MAX_EVENT_MSG_LENGTH);
142 DBBind(hStmt, 8, DB_SQLTYPE_BIGINT, pEvent->getRootId());
143 DBBind(hStmt, 9, DB_SQLTYPE_VARCHAR, pEvent->getUserTag(), DB_BIND_STATIC, 63);
144 DBExecute(hStmt);
145 DbgPrintf(8, _T("EventLogger: DBExecute: id=%d,code=%d"), (int)pEvent->getId(), (int)pEvent->getCode());
146 delete pEvent;
147 pEvent = (Event *)s_loggerQueue->get();
148 } while((pEvent != NULL) && (pEvent != INVALID_POINTER_VALUE));
149 DBFreeStatement(hStmt);
150 }
151 else
152 {
153 delete pEvent;
154 }
155 }
156
157 DBConnectionPoolReleaseConnection(hdb);
158
159 if (pEvent == INVALID_POINTER_VALUE)
160 break; // Shutdown indicator (need second check if got it in inner loop)
161 }
162 return THREAD_OK;
163 }
164
165 /**
166 * Event processing thread
167 */
168 THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
169 {
170 ThreadSetName("EventProcessor");
171
172 s_loggerQueue = new Queue;
173 s_threadLogger = ThreadCreateEx(EventLogger, 0, NULL);
174 s_threadStormDetector = ThreadCreateEx(EventStormDetector, 0, NULL);
175 while(!IsShutdownInProgress())
176 {
177 Event *pEvent = (Event *)g_pEventQueue->getOrBlock();
178 if (pEvent == INVALID_POINTER_VALUE)
179 break; // Shutdown indicator
180
181 if (g_flags & AF_EVENT_STORM_DETECTED)
182 {
183 delete pEvent;
184 g_totalEventsProcessed++;
185 continue;
186 }
187
188 // Expand message text
189 // We cannot expand message text in PostEvent because of
190 // possible deadlock on g_rwlockIdIndex
191 pEvent->expandMessageText();
192
193 // Attempt to correlate event to some of previous events
194 CorrelateEvent(pEvent);
195
196 // Pass event to modules
197 CALL_ALL_MODULES(pfEventHandler, (pEvent));
198
199 // Send event to all connected clients
200 EnumerateClientSessions(BroadcastEvent, pEvent);
201
202 // Write event information to debug
203 if (nxlog_get_debug_level() >= 5)
204 {
205 NetObj *pObject = FindObjectById(pEvent->getSourceId());
206 if (pObject == NULL)
207 pObject = g_pEntireNet;
208 nxlog_debug(5, _T("EVENT %s [%d] (ID:") UINT64_FMT _T(" F:0x%04X S:%d TAG:\"%s\"%s) FROM %s: %s"),
209 pEvent->getName(), pEvent->getCode(), pEvent->getId(), pEvent->getFlags(), pEvent->getSeverity(),
210 CHECK_NULL_EX(pEvent->getUserTag()),
211 (pEvent->getRootId() == 0) ? _T("") : _T(" CORRELATED"),
212 pObject->getName(), pEvent->getMessage());
213 }
214
215 // Pass event through event processing policy if it is not correlated
216 if (pEvent->getRootId() == 0)
217 {
218 #ifdef WITH_ZMQ
219 ZmqPublishEvent(pEvent);
220 #endif
221
222 g_pEventPolicy->processEvent(pEvent);
223 nxlog_debug(7, _T("Event ") UINT64_FMT _T(" with code %d passed event processing policy"), pEvent->getId(), pEvent->getCode());
224 }
225
226 // Write event to log if required, otherwise destroy it
227 // Don't write SYS_DB_QUERY_FAILED to log to prevent
228 // possible event recursion in case of severe DB failure
229 // Logger will destroy event object after logging
230 if ((pEvent->getFlags() & EF_LOG) && (pEvent->getCode() != EVENT_DB_QUERY_FAILED))
231 {
232 s_loggerQueue->put(pEvent);
233 }
234 else
235 {
236 delete pEvent;
237 DbgPrintf(7, _T("Event object destroyed"));
238 }
239
240 g_totalEventsProcessed++;
241 }
242
243 s_loggerQueue->put(INVALID_POINTER_VALUE);
244 ThreadJoin(s_threadStormDetector);
245 ThreadJoin(s_threadLogger);
246 delete s_loggerQueue;
247 DbgPrintf(1, _T("Event processing thread stopped"));
248 return THREAD_OK;
249 }