agent data reconciliation block size and timeout can be configured
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
21ec6e4f 3** Copyright (C) 2003-2016 Victor Kirhenshtein
87fff547
VK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
296ae03d
VK
25/**
26 * Externals
27 */
28void UpdateSnmpTarget(SNMPTarget *target);
df94243f 29UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
296ae03d 30
9cf1b53c
VK
31extern UINT32 g_dcReconciliationBlockSize;
32extern UINT32 g_dcReconciliationTimeout;
33
dec46d8a
VK
34/**
35 * Data collector start indicator
36 */
37static bool s_dataCollectorStarted = false;
38
13783551
VK
39/**
40 * Unsaved poll time indicator
41 */
42static bool s_pollTimeChanged = false;
43
87fff547
VK
44/**
45 * Data collection item
46 */
47class DataCollectionItem : public RefCountObject
48{
49private:
50 UINT64 m_serverId;
51 UINT32 m_id;
52 INT32 m_pollingInterval;
53 TCHAR *m_name;
54 BYTE m_type;
55 BYTE m_origin;
56 UINT16 m_snmpPort;
296ae03d 57 BYTE m_snmpRawValueType;
374afd7b 58 BYTE m_busy;
de4af576 59 uuid m_snmpTargetGuid;
87fff547
VK
60 time_t m_lastPollTime;
61
62public:
63 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 64 DataCollectionItem(DB_RESULT hResult, int row);
61c0e619 65 DataCollectionItem(const DataCollectionItem *item);
87fff547
VK
66 virtual ~DataCollectionItem();
67
296ae03d
VK
68 UINT32 getId() const { return m_id; }
69 UINT64 getServerId() const { return m_serverId; }
70 const TCHAR *getName() const { return m_name; }
71 int getType() const { return (int)m_type; }
72 int getOrigin() const { return (int)m_origin; }
de4af576 73 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
296ae03d
VK
74 UINT16 getSnmpPort() const { return m_snmpPort; }
75 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
86ef6701 76 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
13783551 77 time_t getLastPollTime() { return m_lastPollTime; }
296ae03d
VK
78
79 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
80
61c0e619 81 void updateAndSave(const DataCollectionItem *item);
d9c628e3 82 void saveToDatabase(bool newObject);
83 void deleteFromDatabase();
93c62d54 84 void setLastPollTime(time_t time);
87fff547 85
374afd7b
VK
86 void startDataCollection() { m_busy = 1; incRefCount(); }
87 void finishDataCollection() { m_busy = 0; decRefCount(); }
88
296ae03d 89 UINT32 getTimeToNextPoll(time_t now) const
87fff547 90 {
374afd7b
VK
91 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
92 return m_pollingInterval;
87fff547
VK
93 time_t diff = now - m_lastPollTime;
94 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
95 }
96};
97
98/**
99 * Create data collection item from NXCP mesage
100 */
101DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
102{
103 m_serverId = serverId;
104 m_id = msg->getFieldAsInt32(baseId);
105 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
106 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
107 m_name = msg->getFieldAsString(baseId + 3);
108 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
109 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
de4af576 110 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
87fff547 111 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
296ae03d 112 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
374afd7b 113 m_busy = 0;
87fff547
VK
114}
115
d9c628e3 116/**
296ae03d 117 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
d9c628e3 118 */
119DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
120{
121 m_serverId = DBGetFieldInt64(hResult, row, 0);
122 m_id = DBGetFieldULong(hResult, row, 1);
123 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
124 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
d610afbd 125 m_name = DBGetField(hResult, row, 4, NULL, 0);
d9c628e3 126 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
127 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
f470ae5d 128 m_snmpPort = DBGetFieldULong(hResult, row, 7);
de4af576 129 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
296ae03d 130 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
374afd7b 131 m_busy = 0;
d9c628e3 132}
133
134/**
135 * Copy constructor
136 */
61c0e619 137 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
d9c628e3 138 {
139 m_serverId = item->m_serverId;
140 m_id = item->m_id;
141 m_type = item->m_type;
142 m_origin = item->m_origin;
143 m_name = _tcsdup(item->m_name);
144 m_pollingInterval = item->m_pollingInterval;
145 m_lastPollTime = item->m_lastPollTime;
de4af576 146 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 147 m_snmpPort = item->m_snmpPort;
296ae03d 148 m_snmpRawValueType = item->m_snmpRawValueType;
374afd7b 149 m_busy = 0;
d9c628e3 150 }
151
87fff547
VK
152/**
153 * Data collection item destructor
154 */
155DataCollectionItem::~DataCollectionItem()
156{
157 safe_free(m_name);
158}
159
d9c628e3 160/**
161 * Will check if object has changed. If at least one field is changed - all data will be updated and
162 * saved to database.
163 */
61c0e619 164void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
d9c628e3 165{
13783551 166 // if at least one of fields changed - set all fields and save to DB
296ae03d 167 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
de4af576 168 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
d610afbd 169 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
d9c628e3 170 {
171 m_type = item->m_type;
172 m_origin = item->m_origin;
173 m_name = _tcsdup(item->m_name);
174 m_pollingInterval = item->m_pollingInterval;
d610afbd
VK
175 if (m_lastPollTime < item->m_lastPollTime)
176 m_lastPollTime = item->m_lastPollTime;
de4af576 177 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 178 m_snmpPort = item->m_snmpPort;
296ae03d 179 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 180 saveToDatabase(false);
181 }
d9c628e3 182}
183
296ae03d
VK
184/**
185 * Save configuration object to database
186 */
d9c628e3 187void DataCollectionItem::saveToDatabase(bool newObject)
188{
e1c30b6f 189 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
d9c628e3 190 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
191 DB_HANDLE db = GetLocalDatabaseHandle();
192 DB_STATEMENT hStmt;
193
296ae03d 194 if (newObject)
d9c628e3 195 {
196 hStmt = DBPrepare(db,
197 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
296ae03d
VK
198 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
199 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
d9c628e3 200 }
201 else
202 {
203 hStmt = DBPrepare(db,
61c0e619 204 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
205 _T("polling_interval=?,last_poll=?,snmp_port=?,")
296ae03d 206 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
d9c628e3 207 }
208
209 if (hStmt == NULL)
d9c628e3 210 return;
d9c628e3 211
d9c628e3 212 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
213 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
214 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
215 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
216 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 217 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
de4af576 218 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
296ae03d
VK
219 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
220 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
221 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
d9c628e3 222
296ae03d
VK
223 DBExecute(hStmt);
224 DBFreeStatement(hStmt);
d9c628e3 225}
226
227/**
228 * Remove item form database and delete not synced data if exist
229 */
230void DataCollectionItem::deleteFromDatabase()
231{
d9c628e3 232 DB_HANDLE db = GetLocalDatabaseHandle();
233 TCHAR query[256];
366440e8
VK
234 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
235 if (DBQuery(db, query))
d9c628e3 236 {
366440e8
VK
237 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
238 if (DBQuery(db, query))
239 {
240 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
241 }
d9c628e3 242 }
243}
244
a97f02e1
VK
245/**
246 * Set last poll time for item
247 */
93c62d54 248void DataCollectionItem::setLastPollTime(time_t time)
249{
a97f02e1 250 m_lastPollTime = time;
13783551 251 s_pollTimeChanged = true;
93c62d54 252}
253
87fff547
VK
254/**
255 * Collected data
256 */
257class DataElement
258{
259private:
260 UINT64 m_serverId;
261 UINT32 m_dciId;
262 time_t m_timestamp;
02d936bd 263 int m_origin;
87fff547 264 int m_type;
df94243f 265 UINT32 m_statusCode;
de4af576 266 uuid m_snmpNode;
87fff547
VK
267 union
268 {
269 TCHAR *item;
270 StringList *list;
271 Table *table;
272 } m_value;
273
274public:
df94243f 275 DataElement(DataCollectionItem *dci, const TCHAR *value, UINT32 status)
87fff547
VK
276 {
277 m_serverId = dci->getServerId();
278 m_dciId = dci->getId();
279 m_timestamp = time(NULL);
02d936bd 280 m_origin = dci->getOrigin();
87fff547 281 m_type = DCO_TYPE_ITEM;
df94243f 282 m_statusCode = status;
de4af576 283 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
284 m_value.item = _tcsdup(value);
285 }
286
df94243f 287 DataElement(DataCollectionItem *dci, StringList *value, UINT32 status)
87fff547
VK
288 {
289 m_serverId = dci->getServerId();
290 m_dciId = dci->getId();
291 m_timestamp = time(NULL);
02d936bd 292 m_origin = dci->getOrigin();
87fff547 293 m_type = DCO_TYPE_LIST;
df94243f 294 m_statusCode = status;
de4af576 295 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
296 m_value.list = value;
297 }
298
df94243f 299 DataElement(DataCollectionItem *dci, Table *value, UINT32 status)
87fff547
VK
300 {
301 m_serverId = dci->getServerId();
302 m_dciId = dci->getId();
303 m_timestamp = time(NULL);
02d936bd 304 m_origin = dci->getOrigin();
87fff547 305 m_type = DCO_TYPE_TABLE;
df94243f 306 m_statusCode = status;
de4af576 307 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
308 m_value.table = value;
309 }
310
63ff3c9d
VK
311 /**
312 * Create data element from database record
df94243f 313 * Expected field order: server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value
63ff3c9d
VK
314 */
315 DataElement(DB_RESULT hResult, int row)
316 {
317 m_serverId = DBGetFieldUInt64(hResult, row, 0);
318 m_dciId = DBGetFieldULong(hResult, row, 1);
df94243f 319 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 6);
63ff3c9d
VK
320 m_origin = DBGetFieldLong(hResult, row, 3);
321 m_type = DBGetFieldLong(hResult, row, 2);
df94243f 322 m_statusCode = DBGetFieldLong(hResult, row, 4);
323 m_snmpNode = DBGetFieldGUID(hResult, row, 5);
63ff3c9d
VK
324 switch(m_type)
325 {
326 case DCO_TYPE_ITEM:
df94243f 327 m_value.item = DBGetField(hResult, row, 7, NULL, 0);
63ff3c9d
VK
328 break;
329 case DCO_TYPE_LIST:
330 {
331 m_value.list = new StringList();
df94243f 332 TCHAR *text = DBGetField(hResult, row, 7, NULL, 0);
63ff3c9d
VK
333 if (text != NULL)
334 {
335 m_value.list->splitAndAdd(text, _T("\n"));
336 free(text);
337 }
338 }
339 break;
340 case DCO_TYPE_TABLE:
341 {
df94243f 342 char *xml = DBGetFieldUTF8(hResult, row, 7, NULL, 0);
63ff3c9d
VK
343 if (xml != NULL)
344 {
345 m_value.table = Table::createFromXML(xml);
346 free(xml);
347 }
b7b1c821
VK
348 else
349 {
350 m_value.table = NULL;
351 }
63ff3c9d
VK
352 }
353 break;
354 default:
355 m_type = DCO_TYPE_ITEM;
356 m_value.item = _tcsdup(_T(""));
357 break;
358 }
359 }
360
87fff547
VK
361 ~DataElement()
362 {
363 switch(m_type)
364 {
365 case DCO_TYPE_ITEM:
366 free(m_value.item);
367 break;
368 case DCO_TYPE_LIST:
369 delete m_value.list;
370 break;
371 case DCO_TYPE_TABLE:
372 delete m_value.table;
373 break;
374 }
375 }
93c62d54 376
377 time_t getTimestamp() { return m_timestamp; }
61c0e619 378 UINT64 getServerId() { return m_serverId; }
63ff3c9d 379 UINT32 getDciId() { return m_dciId; }
a1273b42 380 int getType() { return m_type; }
df94243f 381 UINT32 getStatusCode() { return m_statusCode; }
a3d3f9d5 382
3ec58bb8 383 void saveToDatabase(DB_STATEMENT hStmt);
63ff3c9d 384 bool sendToServer(bool reconcillation);
a1273b42 385 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
87fff547
VK
386};
387
61c0e619 388/**
296ae03d 389 * Save data element to database
61c0e619 390 */
3ec58bb8 391void DataElement::saveToDatabase(DB_STATEMENT hStmt)
61c0e619 392{
8199376b
VK
393 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
394 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
395 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
396 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
df94243f 397 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_statusCode);
398 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, m_snmpNode);
399 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
8199376b 400 switch(m_type)
a3d3f9d5 401 {
8199376b 402 case DCO_TYPE_ITEM:
df94243f 403 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
8199376b
VK
404 break;
405 case DCO_TYPE_LIST:
df94243f 406 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
8199376b
VK
407 break;
408 case DCO_TYPE_TABLE:
df94243f 409 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
8199376b 410 break;
a3d3f9d5 411 }
8199376b 412 DBExecute(hStmt);
61c0e619 413}
414
d1dfba1a
VK
415/**
416 * Session comparator
417 */
418static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
419{
420 return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
421}
422
296ae03d
VK
423/**
424 * Send collected data to server
425 */
883c14ae 426bool DataElement::sendToServer(bool reconciliation)
61c0e619 427{
b7b1c821
VK
428 // If data in database was invalid table may not be parsed correctly
429 // Consider sending a success in that case so data element can be dropped
430 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
431 return true;
432
d1dfba1a 433 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
6fbaa926
VK
434 if (session == NULL)
435 return false;
436
437 NXCPMessage msg;
438 msg.setCode(CMD_DCI_DATA);
439 msg.setId(session->generateRequestId());
440 msg.setField(VID_DCI_ID, m_dciId);
02d936bd
VK
441 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
442 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
df94243f 443 msg.setField(VID_STATUS, m_statusCode);
de4af576 444 msg.setField(VID_NODE_ID, m_snmpNode);
6fbaa926 445 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
883c14ae 446 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
6fbaa926
VK
447 switch(m_type)
448 {
449 case DCO_TYPE_ITEM:
450 msg.setField(VID_VALUE, m_value.item);
451 break;
452 case DCO_TYPE_LIST:
6fbaa926
VK
453 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
454 break;
455 case DCO_TYPE_TABLE:
66816abf 456 m_value.table->setSource(m_origin);
6fbaa926
VK
457 m_value.table->fillMessage(msg, 0, -1);
458 break;
459 }
63ff3c9d 460 UINT32 rcc = session->doRequest(&msg, 2000);
6fbaa926 461 session->decRefCount();
63ff3c9d
VK
462
463 // consider internal error as success because it means that server
464 // cannot accept data for some reason and retry is not feasible
465 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
61c0e619 466}
467
a1273b42
VK
468/**
469 * Fill bulk reconciliation message with DCI data
470 */
471void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
472{
473 msg->setField(baseId, m_dciId);
474 msg->setField(baseId + 1, (INT16)m_origin);
475 msg->setField(baseId + 2, (INT16)m_type);
476 msg->setField(baseId + 3, m_snmpNode);
477 msg->setFieldFromTime(baseId + 4, m_timestamp);
478 msg->setField(baseId + 5, m_value.item);
df94243f 479 msg->setField(baseId + 6, m_statusCode);
a1273b42
VK
480}
481
61c0e619 482/**
296ae03d 483 * Server data sync status object
61c0e619 484 */
296ae03d 485struct ServerSyncStatus
61c0e619 486{
296ae03d 487 INT32 queueSize;
61c0e619 488
296ae03d
VK
489 ServerSyncStatus()
490 {
491 queueSize = 0;
492 }
61c0e619 493};
494
495/**
296ae03d 496 * Server sync status information
61c0e619 497 */
296ae03d
VK
498static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
499static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
87fff547 500
3ec58bb8
VK
501/**
502 * Database writer queue
503 */
504static Queue s_databaseWriterQueue;
505
506/**
507 * Database writer
508 */
509static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
510{
511 DB_HANDLE hdb = GetLocalDatabaseHandle();
512 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread started"));
513
514 while(true)
515 {
516 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
517 if (e == INVALID_POINTER_VALUE)
518 break;
519
df94243f 520 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?,?)"));
3ec58bb8
VK
521 if (hStmt == NULL)
522 {
523 delete e;
524 continue;
525 }
526
527 int count = 0;
528
529 DBBegin(hdb);
530 while((e != NULL) && (e != INVALID_POINTER_VALUE))
531 {
532 e->saveToDatabase(hStmt);
533 delete e;
534
535 count++;
536 if (count > 200)
537 break;
538
539 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
540 }
541 DBCommit(hdb);
542 DBFreeStatement(hStmt);
543 DebugPrintf(INVALID_INDEX, 7, _T("Database writer: %d records inserted"), count);
544 }
545
546 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread stopped"));
547 return THREAD_OK;
548}
549
13783551
VK
550/**
551 * List of all data collection items
552 */
553static ObjectArray<DataCollectionItem> s_items(64, 64, true);
554static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
555
53c3d1e7 556/**
a1273b42 557 * Session comparator
53c3d1e7 558 */
d1dfba1a 559static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
53c3d1e7 560{
d1dfba1a 561 if ((session->getServerId() == 0) || !session->canAcceptData())
a1273b42 562 return false;
53c3d1e7 563 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
9e698daa
VK
564 if ((s != NULL) && (s->queueSize > 0))
565 {
a1273b42 566 return true;
9e698daa 567 }
a1273b42 568 return false;
53c3d1e7
VK
569}
570
61c0e619 571/**
883c14ae 572 * Data reconciliation thread
61c0e619 573 */
883c14ae 574static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
61c0e619 575{
296ae03d 576 DB_HANDLE hdb = GetLocalDatabaseHandle();
b7b1c821 577 UINT32 sleepTime = 30000;
9cf1b53c 578 nxlog_debug(1, _T("Data reconciliation thread started (block size %d, timeout %d ms)"), g_dcReconciliationBlockSize, g_dcReconciliationTimeout);
a3d3f9d5 579
5e12ef62 580 bool vacuumNeeded = false;
296ae03d 581 while(!AgentSleepAndCheckForShutdown(sleepTime))
61c0e619 582 {
53c3d1e7
VK
583 // Check if there is something to sync
584 MutexLock(s_serverSyncStatusLock);
d1dfba1a 585 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
53c3d1e7 586 MutexUnlock(s_serverSyncStatusLock);
a1273b42 587 if (session == NULL)
53c3d1e7 588 {
13783551
VK
589 // Save last poll times when reconciliation thread is idle
590 MutexLock(s_itemLock);
591 if (s_pollTimeChanged)
592 {
593 DBBegin(hdb);
594 TCHAR query[256];
595 for(int i = 0; i < s_items.size(); i++)
596 {
597 DataCollectionItem *dci = s_items.get(i);
598 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
599 dci->getLastPollTime(), dci->getServerId(), dci->getId());
600 DBQuery(hdb, query);
601 }
602 DBCommit(hdb);
603 s_pollTimeChanged = false;
604 }
605 MutexUnlock(s_itemLock);
606
5e12ef62
VK
607 if (vacuumNeeded)
608 {
883c14ae 609 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: vacuum local database"));
5e12ef62
VK
610 DBQuery(hdb, _T("VACUUM"));
611 vacuumNeeded = false;
612 }
53c3d1e7
VK
613 sleepTime = 30000;
614 continue;
615 }
616
9e698daa 617 TCHAR query[1024];
9cf1b53c 618 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), g_dcReconciliationBlockSize);
9e698daa 619
13783551
VK
620 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
621 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
296ae03d 622 if (hResult == NULL)
53c3d1e7 623 {
13783551 624 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: database query failed: %s"), sqlError);
53c3d1e7 625 sleepTime = 30000;
a1273b42 626 session->decRefCount();
296ae03d 627 continue;
53c3d1e7 628 }
296ae03d
VK
629
630 int count = DBGetNumRows(hResult);
b7b1c821 631 if (count > 0)
61c0e619 632 {
a1273b42 633 ObjectArray<DataElement> bulkSendList(count, 10, true);
b7b1c821
VK
634 ObjectArray<DataElement> deleteList(count, 10, true);
635 for(int i = 0; i < count; i++)
63ff3c9d 636 {
b7b1c821 637 DataElement *e = new DataElement(hResult, i);
a1273b42 638 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
b7b1c821 639 {
a1273b42 640 bulkSendList.add(e);
b7b1c821
VK
641 }
642 else
643 {
a1273b42
VK
644 MutexLock(s_serverSyncStatusLock);
645 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
646 if (status != NULL)
647 {
21ec6e4f
VK
648 if (e->sendToServer(true))
649 {
650 status->queueSize--;
651 deleteList.add(e);
652 }
653 else
654 {
655 delete e;
656 }
a1273b42
VK
657 }
658 else
659 {
660 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
21ec6e4f 661 deleteList.add(e); // record should be deleted
a1273b42
VK
662 }
663 MutexUnlock(s_serverSyncStatusLock);
b7b1c821 664 }
a1273b42 665 }
b7b1c821 666
a1273b42
VK
667 if (bulkSendList.size() > 0)
668 {
669 DebugPrintf(INVALID_INDEX, 6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
670
671 NXCPMessage msg;
672 msg.setCode(CMD_DCI_DATA);
673 msg.setId(session->generateRequestId());
674 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
675 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
676
677 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
678 for(int i = 0; i < bulkSendList.size(); i++)
b7b1c821 679 {
a1273b42
VK
680 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
681 fieldId += 10;
682 }
683
9cf1b53c 684 NXCPMessage *response = session->doRequestEx(&msg, g_dcReconciliationTimeout);
a1273b42
VK
685 if (response != NULL)
686 {
687 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
688 if (rcc == ERR_SUCCESS)
689 {
690 MutexLock(s_serverSyncStatusLock);
691 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
692
693 // Check status for each data element
9cf1b53c
VK
694 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
695 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
696 response->getFieldAsBinary(VID_STATUS, status, MAX_BULK_DATA_BLOCK_SIZE);
a1273b42
VK
697 bulkSendList.setOwner(false);
698 for(int i = 0; i < bulkSendList.size(); i++)
699 {
700 DataElement *e = bulkSendList.get(i);
701 if (status[i] != BULK_DATA_REC_RETRY)
702 {
703 deleteList.add(e);
704 serverSyncStatus->queueSize--;
705 }
706 else
707 {
708 delete e;
709 }
710 }
711
712 MutexUnlock(s_serverSyncStatusLock);
713 }
714 else
715 {
716 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
717 }
718 delete response;
b7b1c821
VK
719 }
720 else
721 {
a1273b42 722 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: timeout on bulk send"));
b7b1c821 723 }
63ff3c9d
VK
724 }
725
b7b1c821 726 if (deleteList.size() > 0)
63ff3c9d 727 {
b7b1c821
VK
728 DBBegin(hdb);
729 for(int i = 0; i < deleteList.size(); i++)
63ff3c9d 730 {
b7b1c821
VK
731 DataElement *e = deleteList.get(i);
732 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
733 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
734 DBQuery(hdb, query);
63ff3c9d 735 }
b7b1c821 736 DBCommit(hdb);
9cf1b53c 737 nxlog_debug(4, _T("ReconciliationThread: %d records sent"), deleteList.size());
5e12ef62 738 vacuumNeeded = true;
63ff3c9d 739 }
61c0e619 740 }
296ae03d
VK
741 DBFreeResult(hResult);
742
a1273b42 743 session->decRefCount();
f7319eac 744 sleepTime = (count > 0) ? 50 : 30000;
61c0e619 745 }
a3d3f9d5 746
9cf1b53c 747 nxlog_debug(1, _T("Data reconciliation thread stopped"));
296ae03d 748 return THREAD_OK;
61c0e619 749}
750
296ae03d
VK
751/**
752 * Data sender queue
753 */
754static Queue s_dataSenderQueue;
755
87fff547
VK
756/**
757 * Data sender
758 */
759static THREAD_RESULT THREAD_CALL DataSender(void *arg)
760{
761 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
762 while(true)
763 {
19dbc8ef 764 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
87fff547
VK
765 if (e == INVALID_POINTER_VALUE)
766 break;
767
296ae03d
VK
768 MutexLock(s_serverSyncStatusLock);
769 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
770 if (status == NULL)
61c0e619 771 {
296ae03d
VK
772 status = new ServerSyncStatus();
773 s_serverSyncStatus.set(e->getServerId(), status);
774 }
775
776 if (status->queueSize == 0)
777 {
63ff3c9d 778 if (!e->sendToServer(false))
61c0e619 779 {
a3d3f9d5 780 status->queueSize++;
3ec58bb8
VK
781 s_databaseWriterQueue.put(e);
782 e = NULL;
61c0e619 783 }
784 }
785 else
786 {
6fbaa926 787 status->queueSize++;
3ec58bb8
VK
788 s_databaseWriterQueue.put(e);
789 e = NULL;
61c0e619 790 }
296ae03d 791 MutexUnlock(s_serverSyncStatusLock);
61c0e619 792
87fff547
VK
793 delete e;
794 }
795 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
796 return THREAD_OK;
797}
798
799/**
800 * Pseudo-session for cached data collection
801 */
802class VirtualSession : public AbstractCommSession
803{
804private:
805 UINT64 m_serverId;
806
807public:
808 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
809
810 virtual bool isMasterServer() { return false; }
811 virtual bool isControlServer() { return false; }
d1dfba1a 812 virtual bool canAcceptData() { return true; }
87fff547 813 virtual bool canAcceptTraps() { return true; }
e13420c1 814 virtual bool canAcceptFileUpdates() { return false; }
87fff547
VK
815 virtual UINT64 getServerId() { return m_serverId; };
816 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
817
818 virtual bool isIPv6Aware() { return true; }
819
820 virtual void sendMessage(NXCPMessage *pMsg) { }
821 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
822 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
63ff3c9d 823 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
a1273b42 824 virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
6fbaa926 825 virtual UINT32 generateRequestId() { return 0; }
87fff547
VK
826 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
827};
828
829/**
830 * Collect data from agent
831 */
296ae03d 832static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
87fff547
VK
833{
834 VirtualSession session(dci->getServerId());
835
836 DataElement *e = NULL;
df94243f 837 UINT32 status;
87fff547
VK
838 if (dci->getType() == DCO_TYPE_ITEM)
839 {
840 TCHAR value[MAX_RESULT_LENGTH];
df94243f 841 status = GetParameterValue(INVALID_INDEX, dci->getName(), value, &session);
842 e = new DataElement(dci, (status == ERR_SUCCESS) ? value : _T(""), status);
87fff547
VK
843 }
844 else if (dci->getType() == DCO_TYPE_LIST)
845 {
df94243f 846 StringList *value = new StringList();
847 status = GetListValue(INVALID_INDEX, dci->getName(), value, &session);
848 e = new DataElement(dci, value, status);
87fff547
VK
849 }
850 else if (dci->getType() == DCO_TYPE_TABLE)
851 {
df94243f 852 Table *value = new Table();
853 status = GetTableValue(INVALID_INDEX, dci->getName(), value, &session);
854 e = new DataElement(dci, value, status);
87fff547
VK
855 }
856
857 return e;
858}
859
860/**
861 * Collect data from SNMP
862 */
296ae03d 863static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
87fff547 864{
296ae03d
VK
865 DataElement *e = NULL;
866 if (dci->getType() == DCO_TYPE_ITEM)
867 {
c1bc623e
VK
868 DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
869
296ae03d 870 TCHAR value[MAX_RESULT_LENGTH];
df94243f 871 UINT32 status = GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType());
872 e = new DataElement(dci, status == ERR_SUCCESS ? value : _T(""), status);
296ae03d
VK
873 }
874 return e;
87fff547
VK
875}
876
374afd7b 877/**
c1bc623e 878 * Local data collection callback
374afd7b 879 */
c1bc623e 880static void LocalDataCollectionCallback(void *arg)
374afd7b
VK
881{
882 DataCollectionItem *dci = (DataCollectionItem *)arg;
883
c1bc623e
VK
884 DataElement *e = CollectDataFromAgent(dci);
885 if (e != NULL)
374afd7b 886 {
c1bc623e 887 s_dataSenderQueue.put(e);
374afd7b
VK
888 }
889 else
890 {
c1bc623e 891 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
374afd7b
VK
892 }
893
c1bc623e
VK
894 dci->setLastPollTime(time(NULL));
895 dci->finishDataCollection();
896}
897
898/**
899 * SNMP data collection callback
900 */
901static void SnmpDataCollectionCallback(void *arg)
902{
903 DataCollectionItem *dci = (DataCollectionItem *)arg;
904
905 DataElement *e = CollectDataFromSNMP(dci);
374afd7b
VK
906 if (e != NULL)
907 {
908 s_dataSenderQueue.put(e);
909 }
910 else
911 {
912 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
913 }
914
915 dci->setLastPollTime(time(NULL));
916 dci->finishDataCollection();
917}
918
87fff547 919/**
374afd7b 920 * Data collectors thread pool
87fff547 921 */
374afd7b
VK
922static ThreadPool *s_dataCollectorPool = NULL;
923
924/**
925 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
926 */
927static UINT32 DataCollectionSchedulerRun()
87fff547 928{
87fff547 929 UINT32 sleepTime = 60;
d9c628e3 930
87fff547
VK
931 MutexLock(s_itemLock);
932 for(int i = 0; i < s_items.size(); i++)
933 {
934 DataCollectionItem *dci = s_items.get(i);
231b0aad 935 time_t now = time(NULL);
87fff547
VK
936 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
937 if (timeToPoll == 0)
938 {
939 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
c1bc623e
VK
940
941 if (dci->getOrigin() == DS_NATIVE_AGENT)
942 {
943 dci->startDataCollection();
944 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
945 }
946 else if (dci->getOrigin() == DS_SNMP_AGENT)
947 {
948 dci->startDataCollection();
949 TCHAR key[64];
950 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
951 }
952 else
953 {
954 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
955 }
956
63ff3c9d 957 timeToPoll = dci->getPollingInterval();
87fff547 958 }
63ff3c9d
VK
959
960 if (sleepTime > timeToPoll)
961 sleepTime = timeToPoll;
87fff547
VK
962 }
963 MutexUnlock(s_itemLock);
964 return sleepTime;
965}
966
967/**
374afd7b 968 * Data collection scheduler thread
87fff547 969 */
374afd7b 970static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
87fff547 971{
374afd7b 972 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread started"));
9e698daa 973 s_dataCollectorPool = ThreadPoolCreate(1, 64, _T("DATACOLL"));
87fff547 974
374afd7b 975 UINT32 sleepTime = DataCollectionSchedulerRun();
9aa171c1 976 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547 977 {
374afd7b 978 sleepTime = DataCollectionSchedulerRun();
87fff547
VK
979 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
980 }
981
374afd7b
VK
982 ThreadPoolDestroy(s_dataCollectorPool);
983 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread stopped"));
87fff547
VK
984 return THREAD_OK;
985}
986
987/**
988 * Configure data collection
989 */
990void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
991{
dec46d8a
VK
992 if (!s_dataCollectorStarted)
993 {
994 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
995 return;
996 }
997
4c84b8a5
VK
998 DB_HANDLE hdb = GetLocalDatabaseHandle();
999
296ae03d 1000 int count = msg->getFieldAsInt32(VID_NUM_NODES);
4c84b8a5 1001 if (count > 0)
296ae03d 1002 {
4c84b8a5
VK
1003 DBBegin(hdb);
1004 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
1005 for(int i = 0; i < count; i++)
1006 {
1007 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
1008 UpdateSnmpTarget(target);
1009 fieldId += 50;
1010 }
1011 DBCommit(hdb);
296ae03d
VK
1012 }
1013 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
1014
87fff547
VK
1015 ObjectArray<DataCollectionItem> config(32, 32, true);
1016
296ae03d 1017 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
4c84b8a5 1018 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
87fff547
VK
1019 for(int i = 0; i < count; i++)
1020 {
1021 config.add(new DataCollectionItem(serverId, msg, fieldId));
1022 fieldId += 10;
1023 }
1024 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
1025
8edd7506
VK
1026 bool txnOpen = false;
1027
d9c628e3 1028 MutexLock(s_itemLock);
a3d3f9d5 1029
296ae03d 1030 // Update and add new
d9c628e3 1031 for(int j = 0; j < config.size(); j++)
1032 {
1033 DataCollectionItem *item = config.get(j);
1034 bool exist = false;
1035 for(int i = 0; i < s_items.size(); i++)
1036 {
d610afbd 1037 if (item->equals(s_items.get(i)))
d9c628e3 1038 {
1039 s_items.get(i)->updateAndSave(item);
1040 exist = true;
1041 }
1042 }
296ae03d 1043 if (!exist)
d9c628e3 1044 {
1045 DataCollectionItem *newItem = new DataCollectionItem(item);
1046 s_items.add(newItem);
8edd7506
VK
1047 if (!txnOpen)
1048 {
1049 DBBegin(hdb);
1050 txnOpen = true;
1051 }
d9c628e3 1052 newItem->saveToDatabase(true);
1053 }
1054 }
a3d3f9d5 1055
296ae03d 1056 // Remove not existing configuration and data for it
d9c628e3 1057 for(int i = 0; i < s_items.size(); i++)
1058 {
1059 DataCollectionItem *item = s_items.get(i);
61c0e619 1060 //If item is from other server, then, do not search it in list of this server
1061 if(item->getServerId() != serverId)
1062 continue;
d9c628e3 1063 bool exist = false;
1064 for(int j = 0; j < config.size(); j++)
1065 {
374afd7b 1066 if (item->equals(config.get(j)))
d9c628e3 1067 {
1068 exist = true;
1069 }
1070 }
6fbaa926 1071 if (!exist)
d9c628e3 1072 {
8edd7506
VK
1073 if (!txnOpen)
1074 {
1075 DBBegin(hdb);
1076 txnOpen = true;
1077 }
d9c628e3 1078 item->deleteFromDatabase();
374afd7b
VK
1079 s_items.unlink(i);
1080 item->decRefCount();
d9c628e3 1081 i--;
d9c628e3 1082 }
1083 }
8edd7506
VK
1084
1085 if (txnOpen)
1086 DBCommit(hdb);
1087
d9c628e3 1088 MutexUnlock(s_itemLock);
8edd7506
VK
1089
1090 DebugPrintf(INVALID_INDEX, 4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
87fff547
VK
1091}
1092
d9c628e3 1093/**
63ff3c9d 1094 * Load saved state of local data collection
d9c628e3 1095 */
63ff3c9d 1096static void LoadState()
d9c628e3 1097{
63ff3c9d
VK
1098 DB_HANDLE hdb = GetLocalDatabaseHandle();
1099 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
296ae03d 1100 if (hResult != NULL)
d9c628e3 1101 {
63ff3c9d
VK
1102 int count = DBGetNumRows(hResult);
1103 for(int i = 0; i < count; i++)
d9c628e3 1104 {
1105 s_items.add(new DataCollectionItem(hResult, i));
1106 }
1107 DBFreeResult(hResult);
1108 }
63ff3c9d
VK
1109
1110 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1111 if (hResult != NULL)
1112 {
1113 int count = DBGetNumRows(hResult);
1114 for(int i = 0; i < count; i++)
1115 {
1116 SNMPTarget *t = new SNMPTarget(hResult, i);
1117 UpdateSnmpTarget(t);
1118 }
1119 DBFreeResult(hResult);
1120 }
1121
1122 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1123 if (hResult != NULL)
1124 {
1125 int count = DBGetNumRows(hResult);
1126 for(int i = 0; i < count; i++)
1127 {
1128 ServerSyncStatus *s = new ServerSyncStatus;
1129 s->queueSize = DBGetFieldLong(hResult, i, 1);
1130 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1131 s_serverSyncStatus.set(serverId, s);
1132 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1133 }
1134 DBFreeResult(hResult);
1135 }
d9c628e3 1136}
1137
296ae03d
VK
1138/**
1139 * Data collector and sender thread handles
1140 */
374afd7b 1141static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
296ae03d 1142static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
3ec58bb8 1143static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
883c14ae 1144static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
296ae03d 1145
87fff547
VK
1146/**
1147 * Initialize and start local data collector
1148 */
1149void StartLocalDataCollector()
1150{
d9c628e3 1151 DB_HANDLE db = GetLocalDatabaseHandle();
a97f02e1 1152 if (db == NULL)
d9c628e3 1153 {
a97f02e1 1154 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
d9c628e3 1155 return;
1156 }
296ae03d 1157
9cf1b53c
VK
1158 if (g_dcReconciliationBlockSize < 16)
1159 {
1160 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to 16"), g_dcReconciliationBlockSize);
1161 g_dcReconciliationBlockSize = 16;
1162 }
1163 else if (g_dcReconciliationBlockSize > MAX_BULK_DATA_BLOCK_SIZE)
1164 {
1165 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to %d"), g_dcReconciliationBlockSize, MAX_BULK_DATA_BLOCK_SIZE);
1166 g_dcReconciliationBlockSize = MAX_BULK_DATA_BLOCK_SIZE;
1167 }
1168
1169 if (g_dcReconciliationTimeout < 1000)
1170 {
1171 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 1000"), g_dcReconciliationTimeout);
1172 g_dcReconciliationTimeout = 1000;
1173 }
1174 else if (g_dcReconciliationTimeout > 600000)
1175 {
1176 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 600000"), g_dcReconciliationTimeout);
1177 g_dcReconciliationTimeout = 600000;
1178 }
1179
87fff547 1180 s_itemLock = MutexCreate();
296ae03d 1181 s_serverSyncStatusLock = MutexCreate();
63ff3c9d
VK
1182
1183 LoadState();
1184
374afd7b 1185 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
87fff547 1186 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
3ec58bb8 1187 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
883c14ae 1188 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
dec46d8a
VK
1189
1190 s_dataCollectorStarted = true;
87fff547
VK
1191}
1192
1193/**
1194 * Shutdown local data collector
1195 */
1196void ShutdownLocalDataCollector()
1197{
dec46d8a
VK
1198 if (!s_dataCollectorStarted)
1199 {
1200 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
1201 return;
1202 }
1203
87fff547 1204 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
374afd7b 1205 ThreadJoin(s_dataCollectionSchedulerThread);
87fff547
VK
1206
1207 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
19dbc8ef 1208 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
87fff547
VK
1209 ThreadJoin(s_dataSenderThread);
1210
3ec58bb8
VK
1211 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for database writer thread termination"));
1212 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1213 ThreadJoin(s_databaseWriterThread);
1214
883c14ae
VK
1215 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconciliation thread termination"));
1216 ThreadJoin(s_reconciliationThread);
61c0e619 1217
87fff547 1218 MutexDestroy(s_itemLock);
296ae03d 1219 MutexDestroy(s_serverSyncStatusLock);
87fff547 1220}
4b535d78 1221
81222795
VK
1222/**
1223 * Clear data collection configuration
1224 */
1225void ClearDataCollectionConfiguration()
4b535d78 1226{
1227 MutexLock(s_itemLock);
1228 DB_HANDLE db = GetLocalDatabaseHandle();
1229 DBQuery(db, _T("DELETE FROM dc_queue"));
1230 DBQuery(db, _T("DELETE FROM dc_config"));
1231 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1232 s_items.clear();
1233 MutexUnlock(s_itemLock);
1234
1235 MutexLock(s_serverSyncStatusLock);
4b535d78 1236 s_serverSyncStatus.clear();
1237 MutexUnlock(s_serverSyncStatusLock);
1238}
df26f039
VK
1239
1240/**
1241 * Handler for data collector queue size
1242 */
1243LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1244{
1245 if (!s_dataCollectorStarted)
1246 return SYSINFO_RC_UNSUPPORTED;
1247
1248 UINT32 count = 0;
1249 MutexLock(s_serverSyncStatusLock);
1250 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1251 while(it->hasNext())
1252 count += (UINT32)it->next()->queueSize;
1253 delete it;
1254 MutexUnlock(s_serverSyncStatusLock);
1255
1256 ret_uint(value, count);
1257 return SYSINFO_RC_SUCCESS;
1258}