agent data reconciliation block size and timeout can be configured
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 extern UINT32 g_dcReconciliationBlockSize;
32 extern UINT32 g_dcReconciliationTimeout;
33
34 /**
35 * Data collector start indicator
36 */
37 static bool s_dataCollectorStarted = false;
38
39 /**
40 * Unsaved poll time indicator
41 */
42 static bool s_pollTimeChanged = false;
43
44 /**
45 * Data collection item
46 */
47 class DataCollectionItem : public RefCountObject
48 {
49 private:
50 UINT64 m_serverId;
51 UINT32 m_id;
52 INT32 m_pollingInterval;
53 TCHAR *m_name;
54 BYTE m_type;
55 BYTE m_origin;
56 UINT16 m_snmpPort;
57 BYTE m_snmpRawValueType;
58 BYTE m_busy;
59 uuid m_snmpTargetGuid;
60 time_t m_lastPollTime;
61
62 public:
63 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
64 DataCollectionItem(DB_RESULT hResult, int row);
65 DataCollectionItem(const DataCollectionItem *item);
66 virtual ~DataCollectionItem();
67
68 UINT32 getId() const { return m_id; }
69 UINT64 getServerId() const { return m_serverId; }
70 const TCHAR *getName() const { return m_name; }
71 int getType() const { return (int)m_type; }
72 int getOrigin() const { return (int)m_origin; }
73 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
74 UINT16 getSnmpPort() const { return m_snmpPort; }
75 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
76 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
77 time_t getLastPollTime() { return m_lastPollTime; }
78
79 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
80
81 void updateAndSave(const DataCollectionItem *item);
82 void saveToDatabase(bool newObject);
83 void deleteFromDatabase();
84 void setLastPollTime(time_t time);
85
86 void startDataCollection() { m_busy = 1; incRefCount(); }
87 void finishDataCollection() { m_busy = 0; decRefCount(); }
88
89 UINT32 getTimeToNextPoll(time_t now) const
90 {
91 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
92 return m_pollingInterval;
93 time_t diff = now - m_lastPollTime;
94 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
95 }
96 };
97
98 /**
99 * Create data collection item from NXCP mesage
100 */
101 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
102 {
103 m_serverId = serverId;
104 m_id = msg->getFieldAsInt32(baseId);
105 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
106 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
107 m_name = msg->getFieldAsString(baseId + 3);
108 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
109 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
110 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
111 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
112 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
113 m_busy = 0;
114 }
115
116 /**
117 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
118 */
119 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
120 {
121 m_serverId = DBGetFieldInt64(hResult, row, 0);
122 m_id = DBGetFieldULong(hResult, row, 1);
123 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
124 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
125 m_name = DBGetField(hResult, row, 4, NULL, 0);
126 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
127 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
128 m_snmpPort = DBGetFieldULong(hResult, row, 7);
129 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
130 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
131 m_busy = 0;
132 }
133
134 /**
135 * Copy constructor
136 */
137 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
138 {
139 m_serverId = item->m_serverId;
140 m_id = item->m_id;
141 m_type = item->m_type;
142 m_origin = item->m_origin;
143 m_name = _tcsdup(item->m_name);
144 m_pollingInterval = item->m_pollingInterval;
145 m_lastPollTime = item->m_lastPollTime;
146 m_snmpTargetGuid = item->m_snmpTargetGuid;
147 m_snmpPort = item->m_snmpPort;
148 m_snmpRawValueType = item->m_snmpRawValueType;
149 m_busy = 0;
150 }
151
152 /**
153 * Data collection item destructor
154 */
155 DataCollectionItem::~DataCollectionItem()
156 {
157 safe_free(m_name);
158 }
159
160 /**
161 * Will check if object has changed. If at least one field is changed - all data will be updated and
162 * saved to database.
163 */
164 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
165 {
166 // if at least one of fields changed - set all fields and save to DB
167 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
168 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
169 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
170 {
171 m_type = item->m_type;
172 m_origin = item->m_origin;
173 m_name = _tcsdup(item->m_name);
174 m_pollingInterval = item->m_pollingInterval;
175 if (m_lastPollTime < item->m_lastPollTime)
176 m_lastPollTime = item->m_lastPollTime;
177 m_snmpTargetGuid = item->m_snmpTargetGuid;
178 m_snmpPort = item->m_snmpPort;
179 m_snmpRawValueType = item->m_snmpRawValueType;
180 saveToDatabase(false);
181 }
182 }
183
184 /**
185 * Save configuration object to database
186 */
187 void DataCollectionItem::saveToDatabase(bool newObject)
188 {
189 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
190 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
191 DB_HANDLE db = GetLocalDatabaseHandle();
192 DB_STATEMENT hStmt;
193
194 if (newObject)
195 {
196 hStmt = DBPrepare(db,
197 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
198 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
199 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
200 }
201 else
202 {
203 hStmt = DBPrepare(db,
204 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
205 _T("polling_interval=?,last_poll=?,snmp_port=?,")
206 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
207 }
208
209 if (hStmt == NULL)
210 return;
211
212 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
213 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
214 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
215 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
216 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
217 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
218 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
219 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
220 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
221 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
222
223 DBExecute(hStmt);
224 DBFreeStatement(hStmt);
225 }
226
227 /**
228 * Remove item form database and delete not synced data if exist
229 */
230 void DataCollectionItem::deleteFromDatabase()
231 {
232 DB_HANDLE db = GetLocalDatabaseHandle();
233 TCHAR query[256];
234 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
235 if (DBQuery(db, query))
236 {
237 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
238 if (DBQuery(db, query))
239 {
240 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
241 }
242 }
243 }
244
245 /**
246 * Set last poll time for item
247 */
248 void DataCollectionItem::setLastPollTime(time_t time)
249 {
250 m_lastPollTime = time;
251 s_pollTimeChanged = true;
252 }
253
254 /**
255 * Collected data
256 */
257 class DataElement
258 {
259 private:
260 UINT64 m_serverId;
261 UINT32 m_dciId;
262 time_t m_timestamp;
263 int m_origin;
264 int m_type;
265 UINT32 m_statusCode;
266 uuid m_snmpNode;
267 union
268 {
269 TCHAR *item;
270 StringList *list;
271 Table *table;
272 } m_value;
273
274 public:
275 DataElement(DataCollectionItem *dci, const TCHAR *value, UINT32 status)
276 {
277 m_serverId = dci->getServerId();
278 m_dciId = dci->getId();
279 m_timestamp = time(NULL);
280 m_origin = dci->getOrigin();
281 m_type = DCO_TYPE_ITEM;
282 m_statusCode = status;
283 m_snmpNode = dci->getSnmpTargetGuid();
284 m_value.item = _tcsdup(value);
285 }
286
287 DataElement(DataCollectionItem *dci, StringList *value, UINT32 status)
288 {
289 m_serverId = dci->getServerId();
290 m_dciId = dci->getId();
291 m_timestamp = time(NULL);
292 m_origin = dci->getOrigin();
293 m_type = DCO_TYPE_LIST;
294 m_statusCode = status;
295 m_snmpNode = dci->getSnmpTargetGuid();
296 m_value.list = value;
297 }
298
299 DataElement(DataCollectionItem *dci, Table *value, UINT32 status)
300 {
301 m_serverId = dci->getServerId();
302 m_dciId = dci->getId();
303 m_timestamp = time(NULL);
304 m_origin = dci->getOrigin();
305 m_type = DCO_TYPE_TABLE;
306 m_statusCode = status;
307 m_snmpNode = dci->getSnmpTargetGuid();
308 m_value.table = value;
309 }
310
311 /**
312 * Create data element from database record
313 * Expected field order: server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value
314 */
315 DataElement(DB_RESULT hResult, int row)
316 {
317 m_serverId = DBGetFieldUInt64(hResult, row, 0);
318 m_dciId = DBGetFieldULong(hResult, row, 1);
319 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 6);
320 m_origin = DBGetFieldLong(hResult, row, 3);
321 m_type = DBGetFieldLong(hResult, row, 2);
322 m_statusCode = DBGetFieldLong(hResult, row, 4);
323 m_snmpNode = DBGetFieldGUID(hResult, row, 5);
324 switch(m_type)
325 {
326 case DCO_TYPE_ITEM:
327 m_value.item = DBGetField(hResult, row, 7, NULL, 0);
328 break;
329 case DCO_TYPE_LIST:
330 {
331 m_value.list = new StringList();
332 TCHAR *text = DBGetField(hResult, row, 7, NULL, 0);
333 if (text != NULL)
334 {
335 m_value.list->splitAndAdd(text, _T("\n"));
336 free(text);
337 }
338 }
339 break;
340 case DCO_TYPE_TABLE:
341 {
342 char *xml = DBGetFieldUTF8(hResult, row, 7, NULL, 0);
343 if (xml != NULL)
344 {
345 m_value.table = Table::createFromXML(xml);
346 free(xml);
347 }
348 else
349 {
350 m_value.table = NULL;
351 }
352 }
353 break;
354 default:
355 m_type = DCO_TYPE_ITEM;
356 m_value.item = _tcsdup(_T(""));
357 break;
358 }
359 }
360
361 ~DataElement()
362 {
363 switch(m_type)
364 {
365 case DCO_TYPE_ITEM:
366 free(m_value.item);
367 break;
368 case DCO_TYPE_LIST:
369 delete m_value.list;
370 break;
371 case DCO_TYPE_TABLE:
372 delete m_value.table;
373 break;
374 }
375 }
376
377 time_t getTimestamp() { return m_timestamp; }
378 UINT64 getServerId() { return m_serverId; }
379 UINT32 getDciId() { return m_dciId; }
380 int getType() { return m_type; }
381 UINT32 getStatusCode() { return m_statusCode; }
382
383 void saveToDatabase(DB_STATEMENT hStmt);
384 bool sendToServer(bool reconcillation);
385 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
386 };
387
388 /**
389 * Save data element to database
390 */
391 void DataElement::saveToDatabase(DB_STATEMENT hStmt)
392 {
393 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
394 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
395 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
396 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
397 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_statusCode);
398 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, m_snmpNode);
399 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
400 switch(m_type)
401 {
402 case DCO_TYPE_ITEM:
403 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
404 break;
405 case DCO_TYPE_LIST:
406 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
407 break;
408 case DCO_TYPE_TABLE:
409 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
410 break;
411 }
412 DBExecute(hStmt);
413 }
414
415 /**
416 * Session comparator
417 */
418 static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
419 {
420 return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
421 }
422
423 /**
424 * Send collected data to server
425 */
426 bool DataElement::sendToServer(bool reconciliation)
427 {
428 // If data in database was invalid table may not be parsed correctly
429 // Consider sending a success in that case so data element can be dropped
430 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
431 return true;
432
433 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
434 if (session == NULL)
435 return false;
436
437 NXCPMessage msg;
438 msg.setCode(CMD_DCI_DATA);
439 msg.setId(session->generateRequestId());
440 msg.setField(VID_DCI_ID, m_dciId);
441 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
442 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
443 msg.setField(VID_STATUS, m_statusCode);
444 msg.setField(VID_NODE_ID, m_snmpNode);
445 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
446 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
447 switch(m_type)
448 {
449 case DCO_TYPE_ITEM:
450 msg.setField(VID_VALUE, m_value.item);
451 break;
452 case DCO_TYPE_LIST:
453 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
454 break;
455 case DCO_TYPE_TABLE:
456 m_value.table->setSource(m_origin);
457 m_value.table->fillMessage(msg, 0, -1);
458 break;
459 }
460 UINT32 rcc = session->doRequest(&msg, 2000);
461 session->decRefCount();
462
463 // consider internal error as success because it means that server
464 // cannot accept data for some reason and retry is not feasible
465 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
466 }
467
468 /**
469 * Fill bulk reconciliation message with DCI data
470 */
471 void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
472 {
473 msg->setField(baseId, m_dciId);
474 msg->setField(baseId + 1, (INT16)m_origin);
475 msg->setField(baseId + 2, (INT16)m_type);
476 msg->setField(baseId + 3, m_snmpNode);
477 msg->setFieldFromTime(baseId + 4, m_timestamp);
478 msg->setField(baseId + 5, m_value.item);
479 msg->setField(baseId + 6, m_statusCode);
480 }
481
482 /**
483 * Server data sync status object
484 */
485 struct ServerSyncStatus
486 {
487 INT32 queueSize;
488
489 ServerSyncStatus()
490 {
491 queueSize = 0;
492 }
493 };
494
495 /**
496 * Server sync status information
497 */
498 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
499 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
500
501 /**
502 * Database writer queue
503 */
504 static Queue s_databaseWriterQueue;
505
506 /**
507 * Database writer
508 */
509 static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
510 {
511 DB_HANDLE hdb = GetLocalDatabaseHandle();
512 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread started"));
513
514 while(true)
515 {
516 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
517 if (e == INVALID_POINTER_VALUE)
518 break;
519
520 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?,?)"));
521 if (hStmt == NULL)
522 {
523 delete e;
524 continue;
525 }
526
527 int count = 0;
528
529 DBBegin(hdb);
530 while((e != NULL) && (e != INVALID_POINTER_VALUE))
531 {
532 e->saveToDatabase(hStmt);
533 delete e;
534
535 count++;
536 if (count > 200)
537 break;
538
539 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
540 }
541 DBCommit(hdb);
542 DBFreeStatement(hStmt);
543 DebugPrintf(INVALID_INDEX, 7, _T("Database writer: %d records inserted"), count);
544 }
545
546 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread stopped"));
547 return THREAD_OK;
548 }
549
550 /**
551 * List of all data collection items
552 */
553 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
554 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
555
556 /**
557 * Session comparator
558 */
559 static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
560 {
561 if ((session->getServerId() == 0) || !session->canAcceptData())
562 return false;
563 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
564 if ((s != NULL) && (s->queueSize > 0))
565 {
566 return true;
567 }
568 return false;
569 }
570
571 /**
572 * Data reconciliation thread
573 */
574 static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
575 {
576 DB_HANDLE hdb = GetLocalDatabaseHandle();
577 UINT32 sleepTime = 30000;
578 nxlog_debug(1, _T("Data reconciliation thread started (block size %d, timeout %d ms)"), g_dcReconciliationBlockSize, g_dcReconciliationTimeout);
579
580 bool vacuumNeeded = false;
581 while(!AgentSleepAndCheckForShutdown(sleepTime))
582 {
583 // Check if there is something to sync
584 MutexLock(s_serverSyncStatusLock);
585 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
586 MutexUnlock(s_serverSyncStatusLock);
587 if (session == NULL)
588 {
589 // Save last poll times when reconciliation thread is idle
590 MutexLock(s_itemLock);
591 if (s_pollTimeChanged)
592 {
593 DBBegin(hdb);
594 TCHAR query[256];
595 for(int i = 0; i < s_items.size(); i++)
596 {
597 DataCollectionItem *dci = s_items.get(i);
598 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
599 dci->getLastPollTime(), dci->getServerId(), dci->getId());
600 DBQuery(hdb, query);
601 }
602 DBCommit(hdb);
603 s_pollTimeChanged = false;
604 }
605 MutexUnlock(s_itemLock);
606
607 if (vacuumNeeded)
608 {
609 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: vacuum local database"));
610 DBQuery(hdb, _T("VACUUM"));
611 vacuumNeeded = false;
612 }
613 sleepTime = 30000;
614 continue;
615 }
616
617 TCHAR query[1024];
618 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), g_dcReconciliationBlockSize);
619
620 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
621 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
622 if (hResult == NULL)
623 {
624 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: database query failed: %s"), sqlError);
625 sleepTime = 30000;
626 session->decRefCount();
627 continue;
628 }
629
630 int count = DBGetNumRows(hResult);
631 if (count > 0)
632 {
633 ObjectArray<DataElement> bulkSendList(count, 10, true);
634 ObjectArray<DataElement> deleteList(count, 10, true);
635 for(int i = 0; i < count; i++)
636 {
637 DataElement *e = new DataElement(hResult, i);
638 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
639 {
640 bulkSendList.add(e);
641 }
642 else
643 {
644 MutexLock(s_serverSyncStatusLock);
645 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
646 if (status != NULL)
647 {
648 if (e->sendToServer(true))
649 {
650 status->queueSize--;
651 deleteList.add(e);
652 }
653 else
654 {
655 delete e;
656 }
657 }
658 else
659 {
660 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
661 deleteList.add(e); // record should be deleted
662 }
663 MutexUnlock(s_serverSyncStatusLock);
664 }
665 }
666
667 if (bulkSendList.size() > 0)
668 {
669 DebugPrintf(INVALID_INDEX, 6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
670
671 NXCPMessage msg;
672 msg.setCode(CMD_DCI_DATA);
673 msg.setId(session->generateRequestId());
674 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
675 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
676
677 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
678 for(int i = 0; i < bulkSendList.size(); i++)
679 {
680 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
681 fieldId += 10;
682 }
683
684 NXCPMessage *response = session->doRequestEx(&msg, g_dcReconciliationTimeout);
685 if (response != NULL)
686 {
687 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
688 if (rcc == ERR_SUCCESS)
689 {
690 MutexLock(s_serverSyncStatusLock);
691 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
692
693 // Check status for each data element
694 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
695 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
696 response->getFieldAsBinary(VID_STATUS, status, MAX_BULK_DATA_BLOCK_SIZE);
697 bulkSendList.setOwner(false);
698 for(int i = 0; i < bulkSendList.size(); i++)
699 {
700 DataElement *e = bulkSendList.get(i);
701 if (status[i] != BULK_DATA_REC_RETRY)
702 {
703 deleteList.add(e);
704 serverSyncStatus->queueSize--;
705 }
706 else
707 {
708 delete e;
709 }
710 }
711
712 MutexUnlock(s_serverSyncStatusLock);
713 }
714 else
715 {
716 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
717 }
718 delete response;
719 }
720 else
721 {
722 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: timeout on bulk send"));
723 }
724 }
725
726 if (deleteList.size() > 0)
727 {
728 DBBegin(hdb);
729 for(int i = 0; i < deleteList.size(); i++)
730 {
731 DataElement *e = deleteList.get(i);
732 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
733 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
734 DBQuery(hdb, query);
735 }
736 DBCommit(hdb);
737 nxlog_debug(4, _T("ReconciliationThread: %d records sent"), deleteList.size());
738 vacuumNeeded = true;
739 }
740 }
741 DBFreeResult(hResult);
742
743 session->decRefCount();
744 sleepTime = (count > 0) ? 50 : 30000;
745 }
746
747 nxlog_debug(1, _T("Data reconciliation thread stopped"));
748 return THREAD_OK;
749 }
750
751 /**
752 * Data sender queue
753 */
754 static Queue s_dataSenderQueue;
755
756 /**
757 * Data sender
758 */
759 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
760 {
761 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
762 while(true)
763 {
764 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
765 if (e == INVALID_POINTER_VALUE)
766 break;
767
768 MutexLock(s_serverSyncStatusLock);
769 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
770 if (status == NULL)
771 {
772 status = new ServerSyncStatus();
773 s_serverSyncStatus.set(e->getServerId(), status);
774 }
775
776 if (status->queueSize == 0)
777 {
778 if (!e->sendToServer(false))
779 {
780 status->queueSize++;
781 s_databaseWriterQueue.put(e);
782 e = NULL;
783 }
784 }
785 else
786 {
787 status->queueSize++;
788 s_databaseWriterQueue.put(e);
789 e = NULL;
790 }
791 MutexUnlock(s_serverSyncStatusLock);
792
793 delete e;
794 }
795 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
796 return THREAD_OK;
797 }
798
799 /**
800 * Pseudo-session for cached data collection
801 */
802 class VirtualSession : public AbstractCommSession
803 {
804 private:
805 UINT64 m_serverId;
806
807 public:
808 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
809
810 virtual bool isMasterServer() { return false; }
811 virtual bool isControlServer() { return false; }
812 virtual bool canAcceptData() { return true; }
813 virtual bool canAcceptTraps() { return true; }
814 virtual bool canAcceptFileUpdates() { return false; }
815 virtual UINT64 getServerId() { return m_serverId; };
816 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
817
818 virtual bool isIPv6Aware() { return true; }
819
820 virtual void sendMessage(NXCPMessage *pMsg) { }
821 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
822 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
823 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
824 virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
825 virtual UINT32 generateRequestId() { return 0; }
826 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
827 };
828
829 /**
830 * Collect data from agent
831 */
832 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
833 {
834 VirtualSession session(dci->getServerId());
835
836 DataElement *e = NULL;
837 UINT32 status;
838 if (dci->getType() == DCO_TYPE_ITEM)
839 {
840 TCHAR value[MAX_RESULT_LENGTH];
841 status = GetParameterValue(INVALID_INDEX, dci->getName(), value, &session);
842 e = new DataElement(dci, (status == ERR_SUCCESS) ? value : _T(""), status);
843 }
844 else if (dci->getType() == DCO_TYPE_LIST)
845 {
846 StringList *value = new StringList();
847 status = GetListValue(INVALID_INDEX, dci->getName(), value, &session);
848 e = new DataElement(dci, value, status);
849 }
850 else if (dci->getType() == DCO_TYPE_TABLE)
851 {
852 Table *value = new Table();
853 status = GetTableValue(INVALID_INDEX, dci->getName(), value, &session);
854 e = new DataElement(dci, value, status);
855 }
856
857 return e;
858 }
859
860 /**
861 * Collect data from SNMP
862 */
863 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
864 {
865 DataElement *e = NULL;
866 if (dci->getType() == DCO_TYPE_ITEM)
867 {
868 DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
869
870 TCHAR value[MAX_RESULT_LENGTH];
871 UINT32 status = GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType());
872 e = new DataElement(dci, status == ERR_SUCCESS ? value : _T(""), status);
873 }
874 return e;
875 }
876
877 /**
878 * Local data collection callback
879 */
880 static void LocalDataCollectionCallback(void *arg)
881 {
882 DataCollectionItem *dci = (DataCollectionItem *)arg;
883
884 DataElement *e = CollectDataFromAgent(dci);
885 if (e != NULL)
886 {
887 s_dataSenderQueue.put(e);
888 }
889 else
890 {
891 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
892 }
893
894 dci->setLastPollTime(time(NULL));
895 dci->finishDataCollection();
896 }
897
898 /**
899 * SNMP data collection callback
900 */
901 static void SnmpDataCollectionCallback(void *arg)
902 {
903 DataCollectionItem *dci = (DataCollectionItem *)arg;
904
905 DataElement *e = CollectDataFromSNMP(dci);
906 if (e != NULL)
907 {
908 s_dataSenderQueue.put(e);
909 }
910 else
911 {
912 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
913 }
914
915 dci->setLastPollTime(time(NULL));
916 dci->finishDataCollection();
917 }
918
919 /**
920 * Data collectors thread pool
921 */
922 static ThreadPool *s_dataCollectorPool = NULL;
923
924 /**
925 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
926 */
927 static UINT32 DataCollectionSchedulerRun()
928 {
929 UINT32 sleepTime = 60;
930
931 MutexLock(s_itemLock);
932 for(int i = 0; i < s_items.size(); i++)
933 {
934 DataCollectionItem *dci = s_items.get(i);
935 time_t now = time(NULL);
936 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
937 if (timeToPoll == 0)
938 {
939 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
940
941 if (dci->getOrigin() == DS_NATIVE_AGENT)
942 {
943 dci->startDataCollection();
944 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
945 }
946 else if (dci->getOrigin() == DS_SNMP_AGENT)
947 {
948 dci->startDataCollection();
949 TCHAR key[64];
950 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
951 }
952 else
953 {
954 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
955 }
956
957 timeToPoll = dci->getPollingInterval();
958 }
959
960 if (sleepTime > timeToPoll)
961 sleepTime = timeToPoll;
962 }
963 MutexUnlock(s_itemLock);
964 return sleepTime;
965 }
966
967 /**
968 * Data collection scheduler thread
969 */
970 static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
971 {
972 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread started"));
973 s_dataCollectorPool = ThreadPoolCreate(1, 64, _T("DATACOLL"));
974
975 UINT32 sleepTime = DataCollectionSchedulerRun();
976 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
977 {
978 sleepTime = DataCollectionSchedulerRun();
979 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
980 }
981
982 ThreadPoolDestroy(s_dataCollectorPool);
983 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread stopped"));
984 return THREAD_OK;
985 }
986
987 /**
988 * Configure data collection
989 */
990 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
991 {
992 if (!s_dataCollectorStarted)
993 {
994 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
995 return;
996 }
997
998 DB_HANDLE hdb = GetLocalDatabaseHandle();
999
1000 int count = msg->getFieldAsInt32(VID_NUM_NODES);
1001 if (count > 0)
1002 {
1003 DBBegin(hdb);
1004 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
1005 for(int i = 0; i < count; i++)
1006 {
1007 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
1008 UpdateSnmpTarget(target);
1009 fieldId += 50;
1010 }
1011 DBCommit(hdb);
1012 }
1013 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
1014
1015 ObjectArray<DataCollectionItem> config(32, 32, true);
1016
1017 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
1018 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
1019 for(int i = 0; i < count; i++)
1020 {
1021 config.add(new DataCollectionItem(serverId, msg, fieldId));
1022 fieldId += 10;
1023 }
1024 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
1025
1026 bool txnOpen = false;
1027
1028 MutexLock(s_itemLock);
1029
1030 // Update and add new
1031 for(int j = 0; j < config.size(); j++)
1032 {
1033 DataCollectionItem *item = config.get(j);
1034 bool exist = false;
1035 for(int i = 0; i < s_items.size(); i++)
1036 {
1037 if (item->equals(s_items.get(i)))
1038 {
1039 s_items.get(i)->updateAndSave(item);
1040 exist = true;
1041 }
1042 }
1043 if (!exist)
1044 {
1045 DataCollectionItem *newItem = new DataCollectionItem(item);
1046 s_items.add(newItem);
1047 if (!txnOpen)
1048 {
1049 DBBegin(hdb);
1050 txnOpen = true;
1051 }
1052 newItem->saveToDatabase(true);
1053 }
1054 }
1055
1056 // Remove not existing configuration and data for it
1057 for(int i = 0; i < s_items.size(); i++)
1058 {
1059 DataCollectionItem *item = s_items.get(i);
1060 //If item is from other server, then, do not search it in list of this server
1061 if(item->getServerId() != serverId)
1062 continue;
1063 bool exist = false;
1064 for(int j = 0; j < config.size(); j++)
1065 {
1066 if (item->equals(config.get(j)))
1067 {
1068 exist = true;
1069 }
1070 }
1071 if (!exist)
1072 {
1073 if (!txnOpen)
1074 {
1075 DBBegin(hdb);
1076 txnOpen = true;
1077 }
1078 item->deleteFromDatabase();
1079 s_items.unlink(i);
1080 item->decRefCount();
1081 i--;
1082 }
1083 }
1084
1085 if (txnOpen)
1086 DBCommit(hdb);
1087
1088 MutexUnlock(s_itemLock);
1089
1090 DebugPrintf(INVALID_INDEX, 4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
1091 }
1092
1093 /**
1094 * Load saved state of local data collection
1095 */
1096 static void LoadState()
1097 {
1098 DB_HANDLE hdb = GetLocalDatabaseHandle();
1099 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
1100 if (hResult != NULL)
1101 {
1102 int count = DBGetNumRows(hResult);
1103 for(int i = 0; i < count; i++)
1104 {
1105 s_items.add(new DataCollectionItem(hResult, i));
1106 }
1107 DBFreeResult(hResult);
1108 }
1109
1110 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1111 if (hResult != NULL)
1112 {
1113 int count = DBGetNumRows(hResult);
1114 for(int i = 0; i < count; i++)
1115 {
1116 SNMPTarget *t = new SNMPTarget(hResult, i);
1117 UpdateSnmpTarget(t);
1118 }
1119 DBFreeResult(hResult);
1120 }
1121
1122 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1123 if (hResult != NULL)
1124 {
1125 int count = DBGetNumRows(hResult);
1126 for(int i = 0; i < count; i++)
1127 {
1128 ServerSyncStatus *s = new ServerSyncStatus;
1129 s->queueSize = DBGetFieldLong(hResult, i, 1);
1130 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1131 s_serverSyncStatus.set(serverId, s);
1132 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1133 }
1134 DBFreeResult(hResult);
1135 }
1136 }
1137
1138 /**
1139 * Data collector and sender thread handles
1140 */
1141 static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
1142 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
1143 static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
1144 static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
1145
1146 /**
1147 * Initialize and start local data collector
1148 */
1149 void StartLocalDataCollector()
1150 {
1151 DB_HANDLE db = GetLocalDatabaseHandle();
1152 if (db == NULL)
1153 {
1154 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
1155 return;
1156 }
1157
1158 if (g_dcReconciliationBlockSize < 16)
1159 {
1160 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to 16"), g_dcReconciliationBlockSize);
1161 g_dcReconciliationBlockSize = 16;
1162 }
1163 else if (g_dcReconciliationBlockSize > MAX_BULK_DATA_BLOCK_SIZE)
1164 {
1165 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to %d"), g_dcReconciliationBlockSize, MAX_BULK_DATA_BLOCK_SIZE);
1166 g_dcReconciliationBlockSize = MAX_BULK_DATA_BLOCK_SIZE;
1167 }
1168
1169 if (g_dcReconciliationTimeout < 1000)
1170 {
1171 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 1000"), g_dcReconciliationTimeout);
1172 g_dcReconciliationTimeout = 1000;
1173 }
1174 else if (g_dcReconciliationTimeout > 600000)
1175 {
1176 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 600000"), g_dcReconciliationTimeout);
1177 g_dcReconciliationTimeout = 600000;
1178 }
1179
1180 s_itemLock = MutexCreate();
1181 s_serverSyncStatusLock = MutexCreate();
1182
1183 LoadState();
1184
1185 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
1186 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
1187 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
1188 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
1189
1190 s_dataCollectorStarted = true;
1191 }
1192
1193 /**
1194 * Shutdown local data collector
1195 */
1196 void ShutdownLocalDataCollector()
1197 {
1198 if (!s_dataCollectorStarted)
1199 {
1200 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
1201 return;
1202 }
1203
1204 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
1205 ThreadJoin(s_dataCollectionSchedulerThread);
1206
1207 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
1208 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
1209 ThreadJoin(s_dataSenderThread);
1210
1211 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for database writer thread termination"));
1212 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1213 ThreadJoin(s_databaseWriterThread);
1214
1215 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconciliation thread termination"));
1216 ThreadJoin(s_reconciliationThread);
1217
1218 MutexDestroy(s_itemLock);
1219 MutexDestroy(s_serverSyncStatusLock);
1220 }
1221
1222 /**
1223 * Clear data collection configuration
1224 */
1225 void ClearDataCollectionConfiguration()
1226 {
1227 MutexLock(s_itemLock);
1228 DB_HANDLE db = GetLocalDatabaseHandle();
1229 DBQuery(db, _T("DELETE FROM dc_queue"));
1230 DBQuery(db, _T("DELETE FROM dc_config"));
1231 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1232 s_items.clear();
1233 MutexUnlock(s_itemLock);
1234
1235 MutexLock(s_serverSyncStatusLock);
1236 s_serverSyncStatus.clear();
1237 MutexUnlock(s_serverSyncStatusLock);
1238 }
1239
1240 /**
1241 * Handler for data collector queue size
1242 */
1243 LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1244 {
1245 if (!s_dataCollectorStarted)
1246 return SYSINFO_RC_UNSUPPORTED;
1247
1248 UINT32 count = 0;
1249 MutexLock(s_serverSyncStatusLock);
1250 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1251 while(it->hasNext())
1252 count += (UINT32)it->next()->queueSize;
1253 delete it;
1254 MutexUnlock(s_serverSyncStatusLock);
1255
1256 ret_uint(value, count);
1257 return SYSINFO_RC_SUCCESS;
1258 }