fixed possible NULL pointer dereference in agent
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 /**
32 * Database schema version
33 */
34 #define DATACOLL_SCHEMA_VERSION 3
35
36 /**
37 * Data collector start indicator
38 */
39 static bool s_dataCollectorStarted = false;
40
41 /**
42 * Unsaved poll time indicator
43 */
44 static bool s_pollTimeChanged = false;
45
46 /**
47 * Data collection item
48 */
49 class DataCollectionItem : public RefCountObject
50 {
51 private:
52 UINT64 m_serverId;
53 UINT32 m_id;
54 INT32 m_pollingInterval;
55 TCHAR *m_name;
56 BYTE m_type;
57 BYTE m_origin;
58 UINT16 m_snmpPort;
59 BYTE m_snmpRawValueType;
60 BYTE m_busy;
61 uuid m_snmpTargetGuid;
62 time_t m_lastPollTime;
63
64 public:
65 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
66 DataCollectionItem(DB_RESULT hResult, int row);
67 DataCollectionItem(const DataCollectionItem *item);
68 virtual ~DataCollectionItem();
69
70 UINT32 getId() const { return m_id; }
71 UINT64 getServerId() const { return m_serverId; }
72 const TCHAR *getName() const { return m_name; }
73 int getType() const { return (int)m_type; }
74 int getOrigin() const { return (int)m_origin; }
75 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
76 UINT16 getSnmpPort() const { return m_snmpPort; }
77 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
78 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
79 time_t getLastPollTime() { return m_lastPollTime; }
80
81 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
82
83 void updateAndSave(const DataCollectionItem *item);
84 void saveToDatabase(bool newObject);
85 void deleteFromDatabase();
86 void setLastPollTime(time_t time);
87
88 void startDataCollection() { m_busy = 1; incRefCount(); }
89 void finishDataCollection() { m_busy = 0; decRefCount(); }
90
91 UINT32 getTimeToNextPoll(time_t now) const
92 {
93 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
94 return m_pollingInterval;
95 time_t diff = now - m_lastPollTime;
96 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
97 }
98 };
99
100 /**
101 * Create data collection item from NXCP mesage
102 */
103 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
104 {
105 m_serverId = serverId;
106 m_id = msg->getFieldAsInt32(baseId);
107 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
108 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
109 m_name = msg->getFieldAsString(baseId + 3);
110 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
111 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
112 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
113 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
114 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
115 m_busy = 0;
116 }
117
118 /**
119 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
120 */
121 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
122 {
123 m_serverId = DBGetFieldInt64(hResult, row, 0);
124 m_id = DBGetFieldULong(hResult, row, 1);
125 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
126 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
127 m_name = DBGetField(hResult, row, 4, NULL, 0);
128 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
129 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
130 m_snmpPort = DBGetFieldULong(hResult, row, 7);
131 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
132 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
133 m_busy = 0;
134 }
135
136 /**
137 * Copy constructor
138 */
139 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
140 {
141 m_serverId = item->m_serverId;
142 m_id = item->m_id;
143 m_type = item->m_type;
144 m_origin = item->m_origin;
145 m_name = _tcsdup(item->m_name);
146 m_pollingInterval = item->m_pollingInterval;
147 m_lastPollTime = item->m_lastPollTime;
148 m_snmpTargetGuid = item->m_snmpTargetGuid;
149 m_snmpPort = item->m_snmpPort;
150 m_snmpRawValueType = item->m_snmpRawValueType;
151 m_busy = 0;
152 }
153
154 /**
155 * Data collection item destructor
156 */
157 DataCollectionItem::~DataCollectionItem()
158 {
159 safe_free(m_name);
160 }
161
162 /**
163 * Will check if object has changed. If at least one field is changed - all data will be updated and
164 * saved to database.
165 */
166 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
167 {
168 // if at least one of fields changed - set all fields and save to DB
169 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
170 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
171 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
172 {
173 m_type = item->m_type;
174 m_origin = item->m_origin;
175 m_name = _tcsdup(item->m_name);
176 m_pollingInterval = item->m_pollingInterval;
177 if (m_lastPollTime < item->m_lastPollTime)
178 m_lastPollTime = item->m_lastPollTime;
179 m_snmpTargetGuid = item->m_snmpTargetGuid;
180 m_snmpPort = item->m_snmpPort;
181 m_snmpRawValueType = item->m_snmpRawValueType;
182 saveToDatabase(false);
183 }
184 }
185
186 /**
187 * Save configuration object to database
188 */
189 void DataCollectionItem::saveToDatabase(bool newObject)
190 {
191 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
192 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
193 DB_HANDLE db = GetLocalDatabaseHandle();
194 DB_STATEMENT hStmt;
195
196 if (newObject)
197 {
198 hStmt = DBPrepare(db,
199 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
200 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
201 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
202 }
203 else
204 {
205 hStmt = DBPrepare(db,
206 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
207 _T("polling_interval=?,last_poll=?,snmp_port=?,")
208 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
209 }
210
211 if (hStmt == NULL)
212 return;
213
214 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
215 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
216 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
217 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
218 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
219 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
220 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
221 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
222 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
223 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
224
225 DBExecute(hStmt);
226 DBFreeStatement(hStmt);
227 }
228
229 /**
230 * Remove item form database and delete not synced data if exist
231 */
232 void DataCollectionItem::deleteFromDatabase()
233 {
234 DB_HANDLE db = GetLocalDatabaseHandle();
235 TCHAR query[256];
236 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
237 if (DBQuery(db, query))
238 {
239 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
240 if (DBQuery(db, query))
241 {
242 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
243 }
244 }
245 }
246
247 /**
248 * Set last poll time for item
249 */
250 void DataCollectionItem::setLastPollTime(time_t time)
251 {
252 m_lastPollTime = time;
253 s_pollTimeChanged = true;
254 }
255
256 /**
257 * Collected data
258 */
259 class DataElement
260 {
261 private:
262 UINT64 m_serverId;
263 UINT32 m_dciId;
264 time_t m_timestamp;
265 int m_origin;
266 int m_type;
267 uuid m_snmpNode;
268 union
269 {
270 TCHAR *item;
271 StringList *list;
272 Table *table;
273 } m_value;
274
275 public:
276 DataElement(DataCollectionItem *dci, const TCHAR *value)
277 {
278 m_serverId = dci->getServerId();
279 m_dciId = dci->getId();
280 m_timestamp = time(NULL);
281 m_origin = dci->getOrigin();
282 m_type = DCO_TYPE_ITEM;
283 m_snmpNode = dci->getSnmpTargetGuid();
284 m_value.item = _tcsdup(value);
285 }
286
287 DataElement(DataCollectionItem *dci, StringList *value)
288 {
289 m_serverId = dci->getServerId();
290 m_dciId = dci->getId();
291 m_timestamp = time(NULL);
292 m_origin = dci->getOrigin();
293 m_type = DCO_TYPE_LIST;
294 m_snmpNode = dci->getSnmpTargetGuid();
295 m_value.list = value;
296 }
297
298 DataElement(DataCollectionItem *dci, Table *value)
299 {
300 m_serverId = dci->getServerId();
301 m_dciId = dci->getId();
302 m_timestamp = time(NULL);
303 m_origin = dci->getOrigin();
304 m_type = DCO_TYPE_TABLE;
305 m_snmpNode = dci->getSnmpTargetGuid();
306 m_value.table = value;
307 }
308
309 /**
310 * Create data element from database record
311 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
312 */
313 DataElement(DB_RESULT hResult, int row)
314 {
315 m_serverId = DBGetFieldUInt64(hResult, row, 0);
316 m_dciId = DBGetFieldULong(hResult, row, 1);
317 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
318 m_origin = DBGetFieldLong(hResult, row, 3);
319 m_type = DBGetFieldLong(hResult, row, 2);
320 m_snmpNode = DBGetFieldGUID(hResult, row, 4);
321 switch(m_type)
322 {
323 case DCO_TYPE_ITEM:
324 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
325 break;
326 case DCO_TYPE_LIST:
327 {
328 m_value.list = new StringList();
329 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
330 if (text != NULL)
331 {
332 m_value.list->splitAndAdd(text, _T("\n"));
333 free(text);
334 }
335 }
336 break;
337 case DCO_TYPE_TABLE:
338 {
339 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
340 if (xml != NULL)
341 {
342 m_value.table = Table::createFromXML(xml);
343 free(xml);
344 }
345 else
346 {
347 m_value.table = NULL;
348 }
349 }
350 break;
351 default:
352 m_type = DCO_TYPE_ITEM;
353 m_value.item = _tcsdup(_T(""));
354 break;
355 }
356 }
357
358 ~DataElement()
359 {
360 switch(m_type)
361 {
362 case DCO_TYPE_ITEM:
363 free(m_value.item);
364 break;
365 case DCO_TYPE_LIST:
366 delete m_value.list;
367 break;
368 case DCO_TYPE_TABLE:
369 delete m_value.table;
370 break;
371 }
372 }
373
374 time_t getTimestamp() { return m_timestamp; }
375 UINT64 getServerId() { return m_serverId; }
376 UINT32 getDciId() { return m_dciId; }
377 int getType() { return m_type; }
378
379 void saveToDatabase(DB_STATEMENT hStmt);
380 bool sendToServer(bool reconcillation);
381 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
382 };
383
384 /**
385 * Save data element to database
386 */
387 void DataElement::saveToDatabase(DB_STATEMENT hStmt)
388 {
389 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
390 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
391 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
392 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
393 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
394 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
395 switch(m_type)
396 {
397 case DCO_TYPE_ITEM:
398 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
399 break;
400 case DCO_TYPE_LIST:
401 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
402 break;
403 case DCO_TYPE_TABLE:
404 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
405 break;
406 }
407 DBExecute(hStmt);
408 }
409
410 /**
411 * Send collected data to server
412 */
413 bool DataElement::sendToServer(bool reconciliation)
414 {
415 // If data in database was invalid table may not be parsed correctly
416 // Consider sending a success in that case so data element can be dropped
417 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
418 return true;
419
420 CommSession *session = (CommSession *)FindServerSession(m_serverId);
421 if (session == NULL)
422 return false;
423
424 NXCPMessage msg;
425 msg.setCode(CMD_DCI_DATA);
426 msg.setId(session->generateRequestId());
427 msg.setField(VID_DCI_ID, m_dciId);
428 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
429 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
430 msg.setField(VID_NODE_ID, m_snmpNode);
431 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
432 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
433 switch(m_type)
434 {
435 case DCO_TYPE_ITEM:
436 msg.setField(VID_VALUE, m_value.item);
437 break;
438 case DCO_TYPE_LIST:
439 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
440 break;
441 case DCO_TYPE_TABLE:
442 m_value.table->setSource(m_origin);
443 m_value.table->fillMessage(msg, 0, -1);
444 break;
445 }
446 UINT32 rcc = session->doRequest(&msg, 2000);
447 session->decRefCount();
448
449 // consider internal error as success because it means that server
450 // cannot accept data for some reason and retry is not feasible
451 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
452 }
453
454 /**
455 * Fill bulk reconciliation message with DCI data
456 */
457 void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
458 {
459 msg->setField(baseId, m_dciId);
460 msg->setField(baseId + 1, (INT16)m_origin);
461 msg->setField(baseId + 2, (INT16)m_type);
462 msg->setField(baseId + 3, m_snmpNode);
463 msg->setFieldFromTime(baseId + 4, m_timestamp);
464 msg->setField(baseId + 5, m_value.item);
465 }
466
467 /**
468 * Server data sync status object
469 */
470 struct ServerSyncStatus
471 {
472 INT32 queueSize;
473
474 ServerSyncStatus()
475 {
476 queueSize = 0;
477 }
478 };
479
480 /**
481 * Server sync status information
482 */
483 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
484 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
485
486 /**
487 * Database writer queue
488 */
489 static Queue s_databaseWriterQueue;
490
491 /**
492 * Database writer
493 */
494 static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
495 {
496 DB_HANDLE hdb = GetLocalDatabaseHandle();
497 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread started"));
498
499 while(true)
500 {
501 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
502 if (e == INVALID_POINTER_VALUE)
503 break;
504
505 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
506 if (hStmt == NULL)
507 {
508 delete e;
509 continue;
510 }
511
512 int count = 0;
513
514 DBBegin(hdb);
515 while((e != NULL) && (e != INVALID_POINTER_VALUE))
516 {
517 e->saveToDatabase(hStmt);
518 delete e;
519
520 count++;
521 if (count > 200)
522 break;
523
524 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
525 }
526 DBCommit(hdb);
527 DBFreeStatement(hStmt);
528 DebugPrintf(INVALID_INDEX, 7, _T("Database writer: %d records inserted"), count);
529 }
530
531 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread stopped"));
532 return THREAD_OK;
533 }
534
535 /**
536 * List of all data collection items
537 */
538 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
539 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
540
541 /**
542 * Session comparator
543 */
544 static bool SessionComparator(AbstractCommSession *session, void *data)
545 {
546 if ((session->getServerId() == 0) || !session->canAcceptTraps())
547 return false;
548 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
549 if ((s != NULL) && (s->queueSize > 0))
550 {
551 return true;
552 }
553 return false;
554 }
555
556 /**
557 * Data reconciliation thread
558 */
559 static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
560 {
561 DB_HANDLE hdb = GetLocalDatabaseHandle();
562 UINT32 sleepTime = 30000;
563 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread started"));
564
565 bool vacuumNeeded = false;
566 while(!AgentSleepAndCheckForShutdown(sleepTime))
567 {
568 // Check if there is something to sync
569 MutexLock(s_serverSyncStatusLock);
570 CommSession *session = (CommSession *)FindServerSession(SessionComparator, NULL);
571 MutexUnlock(s_serverSyncStatusLock);
572 if (session == NULL)
573 {
574 // Save last poll times when reconciliation thread is idle
575 MutexLock(s_itemLock);
576 if (s_pollTimeChanged)
577 {
578 DBBegin(hdb);
579 TCHAR query[256];
580 for(int i = 0; i < s_items.size(); i++)
581 {
582 DataCollectionItem *dci = s_items.get(i);
583 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
584 dci->getLastPollTime(), dci->getServerId(), dci->getId());
585 DBQuery(hdb, query);
586 }
587 DBCommit(hdb);
588 s_pollTimeChanged = false;
589 }
590 MutexUnlock(s_itemLock);
591
592 if (vacuumNeeded)
593 {
594 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: vacuum local database"));
595 DBQuery(hdb, _T("VACUUM"));
596 vacuumNeeded = false;
597 }
598 sleepTime = 30000;
599 continue;
600 }
601
602 TCHAR query[1024];
603 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
604
605 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
606 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
607 if (hResult == NULL)
608 {
609 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: database query failed: %s"), sqlError);
610 sleepTime = 30000;
611 session->decRefCount();
612 continue;
613 }
614
615 int count = DBGetNumRows(hResult);
616 if (count > 0)
617 {
618 ObjectArray<DataElement> bulkSendList(count, 10, true);
619 ObjectArray<DataElement> deleteList(count, 10, true);
620 for(int i = 0; i < count; i++)
621 {
622 DataElement *e = new DataElement(hResult, i);
623 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
624 {
625 bulkSendList.add(e);
626 }
627 else
628 {
629 MutexLock(s_serverSyncStatusLock);
630 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
631 if (status != NULL)
632 {
633 if (e->sendToServer(true))
634 {
635 status->queueSize--;
636 deleteList.add(e);
637 }
638 else
639 {
640 delete e;
641 }
642 }
643 else
644 {
645 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
646 deleteList.add(e); // record should be deleted
647 }
648 MutexUnlock(s_serverSyncStatusLock);
649 }
650 }
651
652 if (bulkSendList.size() > 0)
653 {
654 DebugPrintf(INVALID_INDEX, 6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
655
656 NXCPMessage msg;
657 msg.setCode(CMD_DCI_DATA);
658 msg.setId(session->generateRequestId());
659 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
660 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
661
662 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
663 for(int i = 0; i < bulkSendList.size(); i++)
664 {
665 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
666 fieldId += 10;
667 }
668
669 NXCPMessage *response = session->doRequestEx(&msg, 2000);
670 if (response != NULL)
671 {
672 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
673 if (rcc == ERR_SUCCESS)
674 {
675 MutexLock(s_serverSyncStatusLock);
676 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
677
678 // Check status for each data element
679 BYTE status[BULK_DATA_BLOCK_SIZE];
680 memset(status, 0, BULK_DATA_BLOCK_SIZE);
681 response->getFieldAsBinary(VID_STATUS, status, BULK_DATA_BLOCK_SIZE);
682 bulkSendList.setOwner(false);
683 for(int i = 0; i < bulkSendList.size(); i++)
684 {
685 DataElement *e = bulkSendList.get(i);
686 if (status[i] != BULK_DATA_REC_RETRY)
687 {
688 deleteList.add(e);
689 serverSyncStatus->queueSize--;
690 }
691 else
692 {
693 delete e;
694 }
695 }
696
697 MutexUnlock(s_serverSyncStatusLock);
698 }
699 else
700 {
701 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
702 }
703 delete response;
704 }
705 else
706 {
707 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: timeout on bulk send"));
708 }
709 }
710
711 if (deleteList.size() > 0)
712 {
713 DBBegin(hdb);
714 for(int i = 0; i < deleteList.size(); i++)
715 {
716 DataElement *e = deleteList.get(i);
717 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
718 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
719 DBQuery(hdb, query);
720 }
721 DBCommit(hdb);
722 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: %d records sent"), deleteList.size());
723 vacuumNeeded = true;
724 }
725 }
726 DBFreeResult(hResult);
727
728 session->decRefCount();
729 sleepTime = (count > 0) ? 50 : 30000;
730 }
731
732 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread stopped"));
733 return THREAD_OK;
734 }
735
736 /**
737 * Data sender queue
738 */
739 static Queue s_dataSenderQueue;
740
741 /**
742 * Data sender
743 */
744 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
745 {
746 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
747 while(true)
748 {
749 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
750 if (e == INVALID_POINTER_VALUE)
751 break;
752
753 MutexLock(s_serverSyncStatusLock);
754 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
755 if (status == NULL)
756 {
757 status = new ServerSyncStatus();
758 s_serverSyncStatus.set(e->getServerId(), status);
759 }
760
761 if (status->queueSize == 0)
762 {
763 if (!e->sendToServer(false))
764 {
765 status->queueSize++;
766 s_databaseWriterQueue.put(e);
767 e = NULL;
768 }
769 }
770 else
771 {
772 status->queueSize++;
773 s_databaseWriterQueue.put(e);
774 e = NULL;
775 }
776 MutexUnlock(s_serverSyncStatusLock);
777
778 delete e;
779 }
780 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
781 return THREAD_OK;
782 }
783
784 /**
785 * Pseudo-session for cached data collection
786 */
787 class VirtualSession : public AbstractCommSession
788 {
789 private:
790 UINT64 m_serverId;
791
792 public:
793 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
794
795 virtual bool isMasterServer() { return false; }
796 virtual bool isControlServer() { return false; }
797 virtual bool canAcceptTraps() { return true; }
798 virtual bool canAcceptFileUpdates() { return false; }
799 virtual UINT64 getServerId() { return m_serverId; };
800 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
801
802 virtual bool isIPv6Aware() { return true; }
803
804 virtual void sendMessage(NXCPMessage *pMsg) { }
805 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
806 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
807 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
808 virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
809 virtual UINT32 generateRequestId() { return 0; }
810 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
811 };
812
813 /**
814 * Collect data from agent
815 */
816 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
817 {
818 VirtualSession session(dci->getServerId());
819
820 DataElement *e = NULL;
821 if (dci->getType() == DCO_TYPE_ITEM)
822 {
823 TCHAR value[MAX_RESULT_LENGTH];
824 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
825 e = new DataElement(dci, value);
826 }
827 else if (dci->getType() == DCO_TYPE_LIST)
828 {
829 StringList *value = new StringList;
830 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
831 e = new DataElement(dci, value);
832 }
833 else if (dci->getType() == DCO_TYPE_TABLE)
834 {
835 Table *value = new Table;
836 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
837 e = new DataElement(dci, value);
838 }
839
840 return e;
841 }
842
843 /**
844 * Collect data from SNMP
845 */
846 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
847 {
848 DataElement *e = NULL;
849 if (dci->getType() == DCO_TYPE_ITEM)
850 {
851 DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
852
853 TCHAR value[MAX_RESULT_LENGTH];
854 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
855 e = new DataElement(dci, value);
856 }
857 return e;
858 }
859
860 /**
861 * Local data collection callback
862 */
863 static void LocalDataCollectionCallback(void *arg)
864 {
865 DataCollectionItem *dci = (DataCollectionItem *)arg;
866
867 DataElement *e = CollectDataFromAgent(dci);
868 if (e != NULL)
869 {
870 s_dataSenderQueue.put(e);
871 }
872 else
873 {
874 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
875 }
876
877 dci->setLastPollTime(time(NULL));
878 dci->finishDataCollection();
879 }
880
881 /**
882 * SNMP data collection callback
883 */
884 static void SnmpDataCollectionCallback(void *arg)
885 {
886 DataCollectionItem *dci = (DataCollectionItem *)arg;
887
888 DataElement *e = CollectDataFromSNMP(dci);
889 if (e != NULL)
890 {
891 s_dataSenderQueue.put(e);
892 }
893 else
894 {
895 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
896 }
897
898 dci->setLastPollTime(time(NULL));
899 dci->finishDataCollection();
900 }
901
902 /**
903 * Data collectors thread pool
904 */
905 static ThreadPool *s_dataCollectorPool = NULL;
906
907 /**
908 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
909 */
910 static UINT32 DataCollectionSchedulerRun()
911 {
912 UINT32 sleepTime = 60;
913
914 MutexLock(s_itemLock);
915 for(int i = 0; i < s_items.size(); i++)
916 {
917 DataCollectionItem *dci = s_items.get(i);
918 time_t now = time(NULL);
919 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
920 if (timeToPoll == 0)
921 {
922 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
923
924 if (dci->getOrigin() == DS_NATIVE_AGENT)
925 {
926 dci->startDataCollection();
927 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
928 }
929 else if (dci->getOrigin() == DS_SNMP_AGENT)
930 {
931 dci->startDataCollection();
932 TCHAR key[64];
933 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
934 }
935 else
936 {
937 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
938 }
939
940 timeToPoll = dci->getPollingInterval();
941 }
942
943 if (sleepTime > timeToPoll)
944 sleepTime = timeToPoll;
945 }
946 MutexUnlock(s_itemLock);
947 return sleepTime;
948 }
949
950 /**
951 * Data collection scheduler thread
952 */
953 static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
954 {
955 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread started"));
956 s_dataCollectorPool = ThreadPoolCreate(1, 64, _T("DATACOLL"));
957
958 UINT32 sleepTime = DataCollectionSchedulerRun();
959 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
960 {
961 sleepTime = DataCollectionSchedulerRun();
962 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
963 }
964
965 ThreadPoolDestroy(s_dataCollectorPool);
966 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread stopped"));
967 return THREAD_OK;
968 }
969
970 /**
971 * Configure data collection
972 */
973 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
974 {
975 if (!s_dataCollectorStarted)
976 {
977 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
978 return;
979 }
980
981 DB_HANDLE hdb = GetLocalDatabaseHandle();
982
983 int count = msg->getFieldAsInt32(VID_NUM_NODES);
984 if (count > 0)
985 {
986 DBBegin(hdb);
987 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
988 for(int i = 0; i < count; i++)
989 {
990 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
991 UpdateSnmpTarget(target);
992 fieldId += 50;
993 }
994 DBCommit(hdb);
995 }
996 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
997
998 ObjectArray<DataCollectionItem> config(32, 32, true);
999
1000 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
1001 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
1002 for(int i = 0; i < count; i++)
1003 {
1004 config.add(new DataCollectionItem(serverId, msg, fieldId));
1005 fieldId += 10;
1006 }
1007 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
1008
1009 bool txnOpen = false;
1010
1011 MutexLock(s_itemLock);
1012
1013 // Update and add new
1014 for(int j = 0; j < config.size(); j++)
1015 {
1016 DataCollectionItem *item = config.get(j);
1017 bool exist = false;
1018 for(int i = 0; i < s_items.size(); i++)
1019 {
1020 if (item->equals(s_items.get(i)))
1021 {
1022 s_items.get(i)->updateAndSave(item);
1023 exist = true;
1024 }
1025 }
1026 if (!exist)
1027 {
1028 DataCollectionItem *newItem = new DataCollectionItem(item);
1029 s_items.add(newItem);
1030 if (!txnOpen)
1031 {
1032 DBBegin(hdb);
1033 txnOpen = true;
1034 }
1035 newItem->saveToDatabase(true);
1036 }
1037 }
1038
1039 // Remove not existing configuration and data for it
1040 for(int i = 0; i < s_items.size(); i++)
1041 {
1042 DataCollectionItem *item = s_items.get(i);
1043 //If item is from other server, then, do not search it in list of this server
1044 if(item->getServerId() != serverId)
1045 continue;
1046 bool exist = false;
1047 for(int j = 0; j < config.size(); j++)
1048 {
1049 if (item->equals(config.get(j)))
1050 {
1051 exist = true;
1052 }
1053 }
1054 if (!exist)
1055 {
1056 if (!txnOpen)
1057 {
1058 DBBegin(hdb);
1059 txnOpen = true;
1060 }
1061 item->deleteFromDatabase();
1062 s_items.unlink(i);
1063 item->decRefCount();
1064 i--;
1065 }
1066 }
1067
1068 if (txnOpen)
1069 DBCommit(hdb);
1070
1071 MutexUnlock(s_itemLock);
1072
1073 DebugPrintf(INVALID_INDEX, 4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
1074 }
1075
1076 /**
1077 * Load saved state of local data collection
1078 */
1079 static void LoadState()
1080 {
1081 DB_HANDLE hdb = GetLocalDatabaseHandle();
1082 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
1083 if (hResult != NULL)
1084 {
1085 int count = DBGetNumRows(hResult);
1086 for(int i = 0; i < count; i++)
1087 {
1088 s_items.add(new DataCollectionItem(hResult, i));
1089 }
1090 DBFreeResult(hResult);
1091 }
1092
1093 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1094 if (hResult != NULL)
1095 {
1096 int count = DBGetNumRows(hResult);
1097 for(int i = 0; i < count; i++)
1098 {
1099 SNMPTarget *t = new SNMPTarget(hResult, i);
1100 UpdateSnmpTarget(t);
1101 }
1102 DBFreeResult(hResult);
1103 }
1104
1105 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1106 if (hResult != NULL)
1107 {
1108 int count = DBGetNumRows(hResult);
1109 for(int i = 0; i < count; i++)
1110 {
1111 ServerSyncStatus *s = new ServerSyncStatus;
1112 s->queueSize = DBGetFieldLong(hResult, i, 1);
1113 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1114 s_serverSyncStatus.set(serverId, s);
1115 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1116 }
1117 DBFreeResult(hResult);
1118 }
1119 }
1120
1121 /**
1122 * SQL script array
1123 */
1124 static const TCHAR *s_upgradeQueries[] =
1125 {
1126 _T("CREATE TABLE dc_queue (")
1127 _T(" server_id number(20) not null,")
1128 _T(" dci_id integer not null,")
1129 _T(" dci_type integer not null,")
1130 _T(" dci_origin integer not null,")
1131 _T(" snmp_target_guid varchar(36) not null,")
1132 _T(" timestamp integer not null,")
1133 _T(" value varchar not null,")
1134 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
1135
1136 _T("CREATE TABLE dc_config (")
1137 _T(" server_id number(20) not null,")
1138 _T(" dci_id integer not null,")
1139 _T(" type integer not null,")
1140 _T(" origin integer not null,")
1141 _T(" name varchar(1023) null,")
1142 _T(" polling_interval integer not null,")
1143 _T(" last_poll integer not null,")
1144 _T(" snmp_port integer not null,")
1145 _T(" snmp_target_guid varchar(36) not null,")
1146 _T(" snmp_raw_type integer not null,")
1147 _T(" PRIMARY KEY(server_id,dci_id))"),
1148
1149 _T("CREATE TABLE dc_snmp_targets (")
1150 _T(" guid varchar(36) not null,")
1151 _T(" server_id number(20) not null,")
1152 _T(" ip_address varchar(48) not null,")
1153 _T(" snmp_version integer not null,")
1154 _T(" port integer not null,")
1155 _T(" auth_type integer not null,")
1156 _T(" enc_type integer not null,")
1157 _T(" auth_name varchar(63),")
1158 _T(" auth_pass varchar(63),")
1159 _T(" enc_pass varchar(63),")
1160 _T(" PRIMARY KEY(guid))")
1161 };
1162
1163 /**
1164 * Data collector and sender thread handles
1165 */
1166 static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
1167 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
1168 static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
1169 static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
1170
1171 /**
1172 * Initialize and start local data collector
1173 */
1174 void StartLocalDataCollector()
1175 {
1176 DB_HANDLE db = GetLocalDatabaseHandle();
1177 if (db == NULL)
1178 {
1179 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
1180 return;
1181 }
1182
1183 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
1184 while(dbVersion < DATACOLL_SCHEMA_VERSION)
1185 {
1186 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
1187 {
1188 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
1189 return;
1190 }
1191 dbVersion++;
1192 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
1193 }
1194
1195 s_itemLock = MutexCreate();
1196 s_serverSyncStatusLock = MutexCreate();
1197
1198 LoadState();
1199
1200 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
1201 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
1202 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
1203 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
1204
1205 s_dataCollectorStarted = true;
1206 }
1207
1208 /**
1209 * Shutdown local data collector
1210 */
1211 void ShutdownLocalDataCollector()
1212 {
1213 if (!s_dataCollectorStarted)
1214 {
1215 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
1216 return;
1217 }
1218
1219 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
1220 ThreadJoin(s_dataCollectionSchedulerThread);
1221
1222 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
1223 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
1224 ThreadJoin(s_dataSenderThread);
1225
1226 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for database writer thread termination"));
1227 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1228 ThreadJoin(s_databaseWriterThread);
1229
1230 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconciliation thread termination"));
1231 ThreadJoin(s_reconciliationThread);
1232
1233 MutexDestroy(s_itemLock);
1234 MutexDestroy(s_serverSyncStatusLock);
1235 }
1236
1237 /**
1238 * Clear data collection configuration
1239 */
1240 void ClearDataCollectionConfiguration()
1241 {
1242 MutexLock(s_itemLock);
1243 DB_HANDLE db = GetLocalDatabaseHandle();
1244 DBQuery(db, _T("DELETE FROM dc_queue"));
1245 DBQuery(db, _T("DELETE FROM dc_config"));
1246 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1247 s_items.clear();
1248 MutexUnlock(s_itemLock);
1249
1250 MutexLock(s_serverSyncStatusLock);
1251 s_serverSyncStatus.clear();
1252 MutexUnlock(s_serverSyncStatusLock);
1253 }
1254
1255 /**
1256 * Handler for data collector queue size
1257 */
1258 LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1259 {
1260 if (!s_dataCollectorStarted)
1261 return SYSINFO_RC_UNSUPPORTED;
1262
1263 UINT32 count = 0;
1264 MutexLock(s_serverSyncStatusLock);
1265 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1266 while(it->hasNext())
1267 count += (UINT32)it->next()->queueSize;
1268 delete it;
1269 MutexUnlock(s_serverSyncStatusLock);
1270
1271 ret_uint(value, count);
1272 return SYSINFO_RC_SUCCESS;
1273 }