c0e5ecbcfb20501843e3c264c9d84ad4a81f873c
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 extern UINT32 g_dcReconciliationBlockSize;
32 extern UINT32 g_dcReconciliationTimeout;
33 extern UINT32 g_dcMaxCollectorPoolSize;
34
35 /**
36 * Data collector start indicator
37 */
38 static bool s_dataCollectorStarted = false;
39
40 /**
41 * Unsaved poll time indicator
42 */
43 static bool s_pollTimeChanged = false;
44
45 /**
46 * Data collection item
47 */
48 class DataCollectionItem : public RefCountObject
49 {
50 private:
51 UINT64 m_serverId;
52 UINT32 m_id;
53 INT32 m_pollingInterval;
54 TCHAR *m_name;
55 BYTE m_type;
56 BYTE m_origin;
57 UINT16 m_snmpPort;
58 BYTE m_snmpRawValueType;
59 BYTE m_busy;
60 uuid m_snmpTargetGuid;
61 time_t m_lastPollTime;
62
63 public:
64 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
65 DataCollectionItem(DB_RESULT hResult, int row);
66 DataCollectionItem(const DataCollectionItem *item);
67 virtual ~DataCollectionItem();
68
69 UINT32 getId() const { return m_id; }
70 UINT64 getServerId() const { return m_serverId; }
71 const TCHAR *getName() const { return m_name; }
72 int getType() const { return (int)m_type; }
73 int getOrigin() const { return (int)m_origin; }
74 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
75 UINT16 getSnmpPort() const { return m_snmpPort; }
76 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
77 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
78 time_t getLastPollTime() { return m_lastPollTime; }
79
80 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
81
82 void updateAndSave(const DataCollectionItem *item);
83 void saveToDatabase(bool newObject);
84 void deleteFromDatabase();
85 void setLastPollTime(time_t time);
86
87 void startDataCollection() { m_busy = 1; incRefCount(); }
88 void finishDataCollection() { m_busy = 0; decRefCount(); }
89
90 UINT32 getTimeToNextPoll(time_t now) const
91 {
92 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
93 return m_pollingInterval;
94 time_t diff = now - m_lastPollTime;
95 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
96 }
97 };
98
99 /**
100 * Create data collection item from NXCP mesage
101 */
102 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
103 {
104 m_serverId = serverId;
105 m_id = msg->getFieldAsInt32(baseId);
106 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
107 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
108 m_name = msg->getFieldAsString(baseId + 3);
109 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
110 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
111 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
112 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
113 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
114 m_busy = 0;
115 }
116
117 /**
118 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
119 */
120 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
121 {
122 m_serverId = DBGetFieldInt64(hResult, row, 0);
123 m_id = DBGetFieldULong(hResult, row, 1);
124 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
125 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
126 m_name = DBGetField(hResult, row, 4, NULL, 0);
127 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
128 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
129 m_snmpPort = DBGetFieldULong(hResult, row, 7);
130 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
131 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
132 m_busy = 0;
133 }
134
135 /**
136 * Copy constructor
137 */
138 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
139 {
140 m_serverId = item->m_serverId;
141 m_id = item->m_id;
142 m_type = item->m_type;
143 m_origin = item->m_origin;
144 m_name = _tcsdup(item->m_name);
145 m_pollingInterval = item->m_pollingInterval;
146 m_lastPollTime = item->m_lastPollTime;
147 m_snmpTargetGuid = item->m_snmpTargetGuid;
148 m_snmpPort = item->m_snmpPort;
149 m_snmpRawValueType = item->m_snmpRawValueType;
150 m_busy = 0;
151 }
152
153 /**
154 * Data collection item destructor
155 */
156 DataCollectionItem::~DataCollectionItem()
157 {
158 safe_free(m_name);
159 }
160
161 /**
162 * Will check if object has changed. If at least one field is changed - all data will be updated and
163 * saved to database.
164 */
165 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
166 {
167 // if at least one of fields changed - set all fields and save to DB
168 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
169 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
170 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
171 {
172 m_type = item->m_type;
173 m_origin = item->m_origin;
174 m_name = _tcsdup(item->m_name);
175 m_pollingInterval = item->m_pollingInterval;
176 if (m_lastPollTime < item->m_lastPollTime)
177 m_lastPollTime = item->m_lastPollTime;
178 m_snmpTargetGuid = item->m_snmpTargetGuid;
179 m_snmpPort = item->m_snmpPort;
180 m_snmpRawValueType = item->m_snmpRawValueType;
181 saveToDatabase(false);
182 }
183 }
184
185 /**
186 * Save configuration object to database
187 */
188 void DataCollectionItem::saveToDatabase(bool newObject)
189 {
190 DebugPrintf(6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
191 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
192 DB_HANDLE db = GetLocalDatabaseHandle();
193 DB_STATEMENT hStmt;
194
195 if (newObject)
196 {
197 hStmt = DBPrepare(db,
198 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
199 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
200 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
201 }
202 else
203 {
204 hStmt = DBPrepare(db,
205 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
206 _T("polling_interval=?,last_poll=?,snmp_port=?,")
207 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
208 }
209
210 if (hStmt == NULL)
211 return;
212
213 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
214 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
215 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
216 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
217 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
218 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
219 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
220 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
221 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
222 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
223
224 DBExecute(hStmt);
225 DBFreeStatement(hStmt);
226 }
227
228 /**
229 * Remove item form database and delete not synced data if exist
230 */
231 void DataCollectionItem::deleteFromDatabase()
232 {
233 DB_HANDLE db = GetLocalDatabaseHandle();
234 TCHAR query[256];
235 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
236 if (DBQuery(db, query))
237 {
238 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
239 if (DBQuery(db, query))
240 {
241 DebugPrintf(6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
242 }
243 }
244 }
245
246 /**
247 * Set last poll time for item
248 */
249 void DataCollectionItem::setLastPollTime(time_t time)
250 {
251 m_lastPollTime = time;
252 s_pollTimeChanged = true;
253 }
254
255 /**
256 * Collected data
257 */
258 class DataElement
259 {
260 private:
261 UINT64 m_serverId;
262 UINT32 m_dciId;
263 time_t m_timestamp;
264 int m_origin;
265 int m_type;
266 UINT32 m_statusCode;
267 uuid m_snmpNode;
268 union
269 {
270 TCHAR *item;
271 StringList *list;
272 Table *table;
273 } m_value;
274
275 public:
276 DataElement(DataCollectionItem *dci, const TCHAR *value, UINT32 status)
277 {
278 m_serverId = dci->getServerId();
279 m_dciId = dci->getId();
280 m_timestamp = time(NULL);
281 m_origin = dci->getOrigin();
282 m_type = DCO_TYPE_ITEM;
283 m_statusCode = status;
284 m_snmpNode = dci->getSnmpTargetGuid();
285 m_value.item = _tcsdup(value);
286 }
287
288 DataElement(DataCollectionItem *dci, StringList *value, UINT32 status)
289 {
290 m_serverId = dci->getServerId();
291 m_dciId = dci->getId();
292 m_timestamp = time(NULL);
293 m_origin = dci->getOrigin();
294 m_type = DCO_TYPE_LIST;
295 m_statusCode = status;
296 m_snmpNode = dci->getSnmpTargetGuid();
297 m_value.list = value;
298 }
299
300 DataElement(DataCollectionItem *dci, Table *value, UINT32 status)
301 {
302 m_serverId = dci->getServerId();
303 m_dciId = dci->getId();
304 m_timestamp = time(NULL);
305 m_origin = dci->getOrigin();
306 m_type = DCO_TYPE_TABLE;
307 m_statusCode = status;
308 m_snmpNode = dci->getSnmpTargetGuid();
309 m_value.table = value;
310 }
311
312 /**
313 * Create data element from database record
314 * Expected field order: server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value
315 */
316 DataElement(DB_RESULT hResult, int row)
317 {
318 m_serverId = DBGetFieldUInt64(hResult, row, 0);
319 m_dciId = DBGetFieldULong(hResult, row, 1);
320 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 6);
321 m_origin = DBGetFieldLong(hResult, row, 3);
322 m_type = DBGetFieldLong(hResult, row, 2);
323 m_statusCode = DBGetFieldLong(hResult, row, 4);
324 m_snmpNode = DBGetFieldGUID(hResult, row, 5);
325 switch(m_type)
326 {
327 case DCO_TYPE_ITEM:
328 m_value.item = DBGetField(hResult, row, 7, NULL, 0);
329 break;
330 case DCO_TYPE_LIST:
331 {
332 m_value.list = new StringList();
333 TCHAR *text = DBGetField(hResult, row, 7, NULL, 0);
334 if (text != NULL)
335 {
336 m_value.list->splitAndAdd(text, _T("\n"));
337 free(text);
338 }
339 }
340 break;
341 case DCO_TYPE_TABLE:
342 {
343 char *xml = DBGetFieldUTF8(hResult, row, 7, NULL, 0);
344 if (xml != NULL)
345 {
346 m_value.table = Table::createFromXML(xml);
347 free(xml);
348 }
349 else
350 {
351 m_value.table = NULL;
352 }
353 }
354 break;
355 default:
356 m_type = DCO_TYPE_ITEM;
357 m_value.item = _tcsdup(_T(""));
358 break;
359 }
360 }
361
362 ~DataElement()
363 {
364 switch(m_type)
365 {
366 case DCO_TYPE_ITEM:
367 free(m_value.item);
368 break;
369 case DCO_TYPE_LIST:
370 delete m_value.list;
371 break;
372 case DCO_TYPE_TABLE:
373 delete m_value.table;
374 break;
375 }
376 }
377
378 time_t getTimestamp() { return m_timestamp; }
379 UINT64 getServerId() { return m_serverId; }
380 UINT32 getDciId() { return m_dciId; }
381 int getType() { return m_type; }
382 UINT32 getStatusCode() { return m_statusCode; }
383
384 void saveToDatabase(DB_STATEMENT hStmt);
385 bool sendToServer(bool reconcillation);
386 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
387 };
388
389 /**
390 * Save data element to database
391 */
392 void DataElement::saveToDatabase(DB_STATEMENT hStmt)
393 {
394 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
395 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
396 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
397 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
398 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_statusCode);
399 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, m_snmpNode);
400 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
401 switch(m_type)
402 {
403 case DCO_TYPE_ITEM:
404 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
405 break;
406 case DCO_TYPE_LIST:
407 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
408 break;
409 case DCO_TYPE_TABLE:
410 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
411 break;
412 }
413 DBExecute(hStmt);
414 }
415
416 /**
417 * Session comparator
418 */
419 static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
420 {
421 return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
422 }
423
424 /**
425 * Send collected data to server
426 */
427 bool DataElement::sendToServer(bool reconciliation)
428 {
429 // If data in database was invalid table may not be parsed correctly
430 // Consider sending a success in that case so data element can be dropped
431 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
432 return true;
433
434 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
435 if (session == NULL)
436 return false;
437
438 NXCPMessage msg;
439 msg.setCode(CMD_DCI_DATA);
440 msg.setId(session->generateRequestId());
441 msg.setField(VID_DCI_ID, m_dciId);
442 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
443 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
444 msg.setField(VID_STATUS, m_statusCode);
445 msg.setField(VID_NODE_ID, m_snmpNode);
446 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
447 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
448 switch(m_type)
449 {
450 case DCO_TYPE_ITEM:
451 msg.setField(VID_VALUE, m_value.item);
452 break;
453 case DCO_TYPE_LIST:
454 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
455 break;
456 case DCO_TYPE_TABLE:
457 m_value.table->setSource(m_origin);
458 m_value.table->fillMessage(msg, 0, -1);
459 break;
460 }
461 UINT32 rcc = session->doRequest(&msg, 2000);
462 session->decRefCount();
463
464 // consider internal error as success because it means that server
465 // cannot accept data for some reason and retry is not feasible
466 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
467 }
468
469 /**
470 * Fill bulk reconciliation message with DCI data
471 */
472 void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
473 {
474 msg->setField(baseId, m_dciId);
475 msg->setField(baseId + 1, (INT16)m_origin);
476 msg->setField(baseId + 2, (INT16)m_type);
477 msg->setField(baseId + 3, m_snmpNode);
478 msg->setFieldFromTime(baseId + 4, m_timestamp);
479 msg->setField(baseId + 5, m_value.item);
480 msg->setField(baseId + 6, m_statusCode);
481 }
482
483 /**
484 * Server data sync status object
485 */
486 struct ServerSyncStatus
487 {
488 INT32 queueSize;
489
490 ServerSyncStatus()
491 {
492 queueSize = 0;
493 }
494 };
495
496 /**
497 * Server sync status information
498 */
499 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
500 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
501
502 /**
503 * Database writer queue
504 */
505 static Queue s_databaseWriterQueue;
506
507 /**
508 * Database writer
509 */
510 static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
511 {
512 DB_HANDLE hdb = GetLocalDatabaseHandle();
513 DebugPrintf(1, _T("Database writer thread started"));
514
515 while(true)
516 {
517 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
518 if (e == INVALID_POINTER_VALUE)
519 break;
520
521 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?,?)"));
522 if (hStmt == NULL)
523 {
524 delete e;
525 continue;
526 }
527
528 int count = 0;
529
530 DBBegin(hdb);
531 while((e != NULL) && (e != INVALID_POINTER_VALUE))
532 {
533 e->saveToDatabase(hStmt);
534 delete e;
535
536 count++;
537 if (count > 200)
538 break;
539
540 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
541 }
542 DBCommit(hdb);
543 DBFreeStatement(hStmt);
544 DebugPrintf(7, _T("Database writer: %d records inserted"), count);
545 if (e == INVALID_POINTER_VALUE)
546 break;
547 }
548
549 DebugPrintf(1, _T("Database writer thread stopped"));
550 return THREAD_OK;
551 }
552
553 /**
554 * List of all data collection items
555 */
556 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
557 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
558
559 /**
560 * Session comparator
561 */
562 static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
563 {
564 if ((session->getServerId() == 0) || !session->canAcceptData())
565 return false;
566 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
567 if ((s != NULL) && (s->queueSize > 0))
568 {
569 return true;
570 }
571 return false;
572 }
573
574 /**
575 * Data reconciliation thread
576 */
577 static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
578 {
579 DB_HANDLE hdb = GetLocalDatabaseHandle();
580 UINT32 sleepTime = 30000;
581 nxlog_debug(1, _T("Data reconciliation thread started (block size %d, timeout %d ms)"), g_dcReconciliationBlockSize, g_dcReconciliationTimeout);
582
583 bool vacuumNeeded = false;
584 while(!AgentSleepAndCheckForShutdown(sleepTime))
585 {
586 // Check if there is something to sync
587 MutexLock(s_serverSyncStatusLock);
588 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
589 MutexUnlock(s_serverSyncStatusLock);
590 if (session == NULL)
591 {
592 // Save last poll times when reconciliation thread is idle
593 MutexLock(s_itemLock);
594 if (s_pollTimeChanged)
595 {
596 DBBegin(hdb);
597 TCHAR query[256];
598 for(int i = 0; i < s_items.size(); i++)
599 {
600 DataCollectionItem *dci = s_items.get(i);
601 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
602 dci->getLastPollTime(), dci->getServerId(), dci->getId());
603 DBQuery(hdb, query);
604 }
605 DBCommit(hdb);
606 s_pollTimeChanged = false;
607 }
608 MutexUnlock(s_itemLock);
609
610 if (vacuumNeeded)
611 {
612 DebugPrintf(4, _T("ReconciliationThread: vacuum local database"));
613 DBQuery(hdb, _T("VACUUM"));
614 vacuumNeeded = false;
615 }
616 sleepTime = 30000;
617 continue;
618 }
619
620 TCHAR query[1024];
621 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue INDEXED BY idx_dc_queue_timestamp WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), g_dcReconciliationBlockSize);
622
623 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
624 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
625 if (hResult == NULL)
626 {
627 DebugPrintf(4, _T("ReconciliationThread: database query failed: %s"), sqlError);
628 sleepTime = 30000;
629 session->decRefCount();
630 continue;
631 }
632
633 int count = DBGetNumRows(hResult);
634 if (count > 0)
635 {
636 ObjectArray<DataElement> bulkSendList(count, 10, true);
637 ObjectArray<DataElement> deleteList(count, 10, true);
638 for(int i = 0; i < count; i++)
639 {
640 DataElement *e = new DataElement(hResult, i);
641 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
642 {
643 bulkSendList.add(e);
644 }
645 else
646 {
647 MutexLock(s_serverSyncStatusLock);
648 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
649 if (status != NULL)
650 {
651 if (e->sendToServer(true))
652 {
653 status->queueSize--;
654 deleteList.add(e);
655 }
656 else
657 {
658 delete e;
659 }
660 }
661 else
662 {
663 DebugPrintf(5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
664 deleteList.add(e); // record should be deleted
665 }
666 MutexUnlock(s_serverSyncStatusLock);
667 }
668 }
669
670 if (bulkSendList.size() > 0)
671 {
672 DebugPrintf(6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
673
674 NXCPMessage msg;
675 msg.setCode(CMD_DCI_DATA);
676 msg.setId(session->generateRequestId());
677 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
678 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
679
680 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
681 for(int i = 0; i < bulkSendList.size(); i++)
682 {
683 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
684 fieldId += 10;
685 }
686
687 NXCPMessage *response = session->doRequestEx(&msg, g_dcReconciliationTimeout);
688 if (response != NULL)
689 {
690 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
691 if (rcc == ERR_SUCCESS)
692 {
693 MutexLock(s_serverSyncStatusLock);
694 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
695
696 // Check status for each data element
697 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
698 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
699 response->getFieldAsBinary(VID_STATUS, status, MAX_BULK_DATA_BLOCK_SIZE);
700 bulkSendList.setOwner(false);
701 for(int i = 0; i < bulkSendList.size(); i++)
702 {
703 DataElement *e = bulkSendList.get(i);
704 if (status[i] != BULK_DATA_REC_RETRY)
705 {
706 deleteList.add(e);
707 serverSyncStatus->queueSize--;
708 }
709 else
710 {
711 delete e;
712 }
713 }
714
715 MutexUnlock(s_serverSyncStatusLock);
716 }
717 else
718 {
719 DebugPrintf(4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
720 }
721 delete response;
722 }
723 else
724 {
725 DebugPrintf(4, _T("ReconciliationThread: timeout on bulk send"));
726 }
727 }
728
729 if (deleteList.size() > 0)
730 {
731 DBBegin(hdb);
732 for(int i = 0; i < deleteList.size(); i++)
733 {
734 DataElement *e = deleteList.get(i);
735 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
736 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
737 DBQuery(hdb, query);
738 }
739 DBCommit(hdb);
740 nxlog_debug(4, _T("ReconciliationThread: %d records sent"), deleteList.size());
741 vacuumNeeded = true;
742 }
743 }
744 DBFreeResult(hResult);
745
746 session->decRefCount();
747 sleepTime = (count > 0) ? 50 : 30000;
748 }
749
750 nxlog_debug(1, _T("Data reconciliation thread stopped"));
751 return THREAD_OK;
752 }
753
754 /**
755 * Data sender queue
756 */
757 static Queue s_dataSenderQueue;
758
759 /**
760 * Data sender
761 */
762 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
763 {
764 DebugPrintf(1, _T("Data sender thread started"));
765 while(true)
766 {
767 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
768 if (e == INVALID_POINTER_VALUE)
769 break;
770
771 MutexLock(s_serverSyncStatusLock);
772 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
773 if (status == NULL)
774 {
775 status = new ServerSyncStatus();
776 s_serverSyncStatus.set(e->getServerId(), status);
777 }
778
779 if (status->queueSize == 0)
780 {
781 if (!e->sendToServer(false))
782 {
783 status->queueSize++;
784 s_databaseWriterQueue.put(e);
785 e = NULL;
786 }
787 }
788 else
789 {
790 status->queueSize++;
791 s_databaseWriterQueue.put(e);
792 e = NULL;
793 }
794 MutexUnlock(s_serverSyncStatusLock);
795
796 delete e;
797 }
798 DebugPrintf(1, _T("Data sender thread stopped"));
799 return THREAD_OK;
800 }
801
802 /**
803 * Collect data from agent
804 */
805 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
806 {
807 VirtualSession session(dci->getServerId());
808
809 DataElement *e = NULL;
810 UINT32 status;
811 if (dci->getType() == DCO_TYPE_ITEM)
812 {
813 TCHAR value[MAX_RESULT_LENGTH];
814 status = GetParameterValue(dci->getName(), value, &session);
815 e = new DataElement(dci, (status == ERR_SUCCESS) ? value : _T(""), status);
816 }
817 else if (dci->getType() == DCO_TYPE_LIST)
818 {
819 StringList *value = new StringList();
820 status = GetListValue(dci->getName(), value, &session);
821 e = new DataElement(dci, value, status);
822 }
823 else if (dci->getType() == DCO_TYPE_TABLE)
824 {
825 Table *value = new Table();
826 status = GetTableValue(dci->getName(), value, &session);
827 e = new DataElement(dci, value, status);
828 }
829
830 return e;
831 }
832
833 /**
834 * Collect data from SNMP
835 */
836 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
837 {
838 DataElement *e = NULL;
839 if (dci->getType() == DCO_TYPE_ITEM)
840 {
841 DebugPrintf(8, _T("Read SNMP parameter %s"), dci->getName());
842
843 TCHAR value[MAX_RESULT_LENGTH];
844 UINT32 status = GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType());
845 e = new DataElement(dci, status == ERR_SUCCESS ? value : _T(""), status);
846 }
847 return e;
848 }
849
850 /**
851 * Local data collection callback
852 */
853 static void LocalDataCollectionCallback(void *arg)
854 {
855 DataCollectionItem *dci = (DataCollectionItem *)arg;
856
857 DataElement *e = CollectDataFromAgent(dci);
858 if (e != NULL)
859 {
860 s_dataSenderQueue.put(e);
861 }
862 else
863 {
864 DebugPrintf(6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
865 }
866
867 dci->setLastPollTime(time(NULL));
868 dci->finishDataCollection();
869 }
870
871 /**
872 * SNMP data collection callback
873 */
874 static void SnmpDataCollectionCallback(void *arg)
875 {
876 DataCollectionItem *dci = (DataCollectionItem *)arg;
877
878 DataElement *e = CollectDataFromSNMP(dci);
879 if (e != NULL)
880 {
881 s_dataSenderQueue.put(e);
882 }
883 else
884 {
885 DebugPrintf(6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
886 }
887
888 dci->setLastPollTime(time(NULL));
889 dci->finishDataCollection();
890 }
891
892 /**
893 * Data collectors thread pool
894 */
895 static ThreadPool *s_dataCollectorPool = NULL;
896
897 /**
898 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
899 */
900 static UINT32 DataCollectionSchedulerRun()
901 {
902 UINT32 sleepTime = 60;
903
904 MutexLock(s_itemLock);
905 for(int i = 0; i < s_items.size(); i++)
906 {
907 DataCollectionItem *dci = s_items.get(i);
908 time_t now = time(NULL);
909 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
910 if (timeToPoll == 0)
911 {
912 DebugPrintf(7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
913
914 if (dci->getOrigin() == DS_NATIVE_AGENT)
915 {
916 dci->startDataCollection();
917 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
918 }
919 else if (dci->getOrigin() == DS_SNMP_AGENT)
920 {
921 dci->startDataCollection();
922 TCHAR key[64];
923 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
924 }
925 else
926 {
927 DebugPrintf(7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
928 }
929
930 timeToPoll = dci->getPollingInterval();
931 }
932
933 if (sleepTime > timeToPoll)
934 sleepTime = timeToPoll;
935 }
936 MutexUnlock(s_itemLock);
937 return sleepTime;
938 }
939
940 /**
941 * Data collection scheduler thread
942 */
943 static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
944 {
945 DebugPrintf(1, _T("Data collection scheduler thread started"));
946 s_dataCollectorPool = ThreadPoolCreate(1, g_dcMaxCollectorPoolSize, _T("DATACOLL"));
947
948 UINT32 sleepTime = DataCollectionSchedulerRun();
949 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
950 {
951 sleepTime = DataCollectionSchedulerRun();
952 DebugPrintf(7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
953 }
954
955 ThreadPoolDestroy(s_dataCollectorPool);
956 DebugPrintf(1, _T("Data collection scheduler thread stopped"));
957 return THREAD_OK;
958 }
959
960 /**
961 * Configure data collection
962 */
963 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
964 {
965 if (!s_dataCollectorStarted)
966 {
967 DebugPrintf(1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
968 return;
969 }
970
971 DB_HANDLE hdb = GetLocalDatabaseHandle();
972
973 int count = msg->getFieldAsInt32(VID_NUM_NODES);
974 if (count > 0)
975 {
976 DBBegin(hdb);
977 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
978 for(int i = 0; i < count; i++)
979 {
980 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
981 UpdateSnmpTarget(target);
982 fieldId += 50;
983 }
984 DBCommit(hdb);
985 }
986 DebugPrintf(4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
987
988 ObjectArray<DataCollectionItem> config(32, 32, true);
989
990 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
991 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
992 for(int i = 0; i < count; i++)
993 {
994 config.add(new DataCollectionItem(serverId, msg, fieldId));
995 fieldId += 10;
996 }
997 DebugPrintf(4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
998
999 bool txnOpen = false;
1000
1001 MutexLock(s_itemLock);
1002
1003 // Update and add new
1004 for(int j = 0; j < config.size(); j++)
1005 {
1006 DataCollectionItem *item = config.get(j);
1007 bool exist = false;
1008 for(int i = 0; i < s_items.size(); i++)
1009 {
1010 if (item->equals(s_items.get(i)))
1011 {
1012 s_items.get(i)->updateAndSave(item);
1013 exist = true;
1014 }
1015 }
1016 if (!exist)
1017 {
1018 DataCollectionItem *newItem = new DataCollectionItem(item);
1019 s_items.add(newItem);
1020 if (!txnOpen)
1021 {
1022 DBBegin(hdb);
1023 txnOpen = true;
1024 }
1025 newItem->saveToDatabase(true);
1026 }
1027 }
1028
1029 // Remove not existing configuration and data for it
1030 for(int i = 0; i < s_items.size(); i++)
1031 {
1032 DataCollectionItem *item = s_items.get(i);
1033 //If item is from other server, then, do not search it in list of this server
1034 if(item->getServerId() != serverId)
1035 continue;
1036 bool exist = false;
1037 for(int j = 0; j < config.size(); j++)
1038 {
1039 if (item->equals(config.get(j)))
1040 {
1041 exist = true;
1042 }
1043 }
1044 if (!exist)
1045 {
1046 if (!txnOpen)
1047 {
1048 DBBegin(hdb);
1049 txnOpen = true;
1050 }
1051 item->deleteFromDatabase();
1052 s_items.unlink(i);
1053 item->decRefCount();
1054 i--;
1055 }
1056 }
1057
1058 if (txnOpen)
1059 DBCommit(hdb);
1060
1061 MutexUnlock(s_itemLock);
1062
1063 DebugPrintf(4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
1064 }
1065
1066 /**
1067 * Load saved state of local data collection
1068 */
1069 static void LoadState()
1070 {
1071 DB_HANDLE hdb = GetLocalDatabaseHandle();
1072 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
1073 if (hResult != NULL)
1074 {
1075 int count = DBGetNumRows(hResult);
1076 for(int i = 0; i < count; i++)
1077 {
1078 s_items.add(new DataCollectionItem(hResult, i));
1079 }
1080 DBFreeResult(hResult);
1081 }
1082
1083 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1084 if (hResult != NULL)
1085 {
1086 int count = DBGetNumRows(hResult);
1087 for(int i = 0; i < count; i++)
1088 {
1089 SNMPTarget *t = new SNMPTarget(hResult, i);
1090 UpdateSnmpTarget(t);
1091 }
1092 DBFreeResult(hResult);
1093 }
1094
1095 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1096 if (hResult != NULL)
1097 {
1098 int count = DBGetNumRows(hResult);
1099 for(int i = 0; i < count; i++)
1100 {
1101 ServerSyncStatus *s = new ServerSyncStatus;
1102 s->queueSize = DBGetFieldLong(hResult, i, 1);
1103 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1104 s_serverSyncStatus.set(serverId, s);
1105 DebugPrintf(2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1106 }
1107 DBFreeResult(hResult);
1108 }
1109 }
1110
1111 /**
1112 * Data collector and sender thread handles
1113 */
1114 static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
1115 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
1116 static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
1117 static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
1118
1119 /**
1120 * Initialize and start local data collector
1121 */
1122 void StartLocalDataCollector()
1123 {
1124 DB_HANDLE db = GetLocalDatabaseHandle();
1125 if (db == NULL)
1126 {
1127 DebugPrintf(5, _T("StartLocalDataCollector: local database unavailable"));
1128 return;
1129 }
1130
1131 if (g_dcReconciliationBlockSize < 16)
1132 {
1133 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to 16"), g_dcReconciliationBlockSize);
1134 g_dcReconciliationBlockSize = 16;
1135 }
1136 else if (g_dcReconciliationBlockSize > MAX_BULK_DATA_BLOCK_SIZE)
1137 {
1138 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to %d"), g_dcReconciliationBlockSize, MAX_BULK_DATA_BLOCK_SIZE);
1139 g_dcReconciliationBlockSize = MAX_BULK_DATA_BLOCK_SIZE;
1140 }
1141
1142 if (g_dcReconciliationTimeout < 1000)
1143 {
1144 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 1000"), g_dcReconciliationTimeout);
1145 g_dcReconciliationTimeout = 1000;
1146 }
1147 else if (g_dcReconciliationTimeout > 600000)
1148 {
1149 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 600000"), g_dcReconciliationTimeout);
1150 g_dcReconciliationTimeout = 600000;
1151 }
1152
1153 s_itemLock = MutexCreate();
1154 s_serverSyncStatusLock = MutexCreate();
1155
1156 LoadState();
1157
1158 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
1159 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
1160 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
1161 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
1162
1163 s_dataCollectorStarted = true;
1164 }
1165
1166 /**
1167 * Shutdown local data collector
1168 */
1169 void ShutdownLocalDataCollector()
1170 {
1171 if (!s_dataCollectorStarted)
1172 {
1173 DebugPrintf(5, _T("Local data collector was not started"));
1174 return;
1175 }
1176
1177 DebugPrintf(5, _T("Waiting for data collector thread termination"));
1178 ThreadJoin(s_dataCollectionSchedulerThread);
1179
1180 DebugPrintf(5, _T("Waiting for data sender thread termination"));
1181 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
1182 ThreadJoin(s_dataSenderThread);
1183
1184 DebugPrintf(5, _T("Waiting for database writer thread termination"));
1185 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1186 ThreadJoin(s_databaseWriterThread);
1187
1188 DebugPrintf(5, _T("Waiting for data reconciliation thread termination"));
1189 ThreadJoin(s_reconciliationThread);
1190
1191 MutexDestroy(s_itemLock);
1192 MutexDestroy(s_serverSyncStatusLock);
1193 }
1194
1195 /**
1196 * Clear data collection configuration
1197 */
1198 void ClearDataCollectionConfiguration()
1199 {
1200 MutexLock(s_itemLock);
1201 DB_HANDLE db = GetLocalDatabaseHandle();
1202 DBQuery(db, _T("DELETE FROM dc_queue"));
1203 DBQuery(db, _T("DELETE FROM dc_config"));
1204 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1205 s_items.clear();
1206 MutexUnlock(s_itemLock);
1207
1208 MutexLock(s_serverSyncStatusLock);
1209 s_serverSyncStatus.clear();
1210 MutexUnlock(s_serverSyncStatusLock);
1211 }
1212
1213 /**
1214 * Handler for data collector queue size
1215 */
1216 LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1217 {
1218 if (!s_dataCollectorStarted)
1219 return SYSINFO_RC_UNSUPPORTED;
1220
1221 UINT32 count = 0;
1222 MutexLock(s_serverSyncStatusLock);
1223 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1224 while(it->hasNext())
1225 count += (UINT32)it->next()->queueSize;
1226 delete it;
1227 MutexUnlock(s_serverSyncStatusLock);
1228
1229 ret_uint(value, count);
1230 return SYSINFO_RC_SUCCESS;
1231 }