fixed OpenBSD 4.8 compatibility issues (non-UNICODE build only)
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
51e38637 3** Copyright (C) 2003-2017 Victor Kirhenshtein
87fff547
VK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
296ae03d
VK
25/**
26 * Externals
27 */
28void UpdateSnmpTarget(SNMPTarget *target);
df94243f 29UINT32 GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
296ae03d 30
9cf1b53c
VK
31extern UINT32 g_dcReconciliationBlockSize;
32extern UINT32 g_dcReconciliationTimeout;
1304b91d 33extern UINT32 g_dcMaxCollectorPoolSize;
9cf1b53c 34
dec46d8a
VK
35/**
36 * Data collector start indicator
37 */
38static bool s_dataCollectorStarted = false;
39
13783551
VK
40/**
41 * Unsaved poll time indicator
42 */
43static bool s_pollTimeChanged = false;
44
87fff547
VK
45/**
46 * Data collection item
47 */
48class DataCollectionItem : public RefCountObject
49{
50private:
51 UINT64 m_serverId;
52 UINT32 m_id;
53 INT32 m_pollingInterval;
54 TCHAR *m_name;
55 BYTE m_type;
56 BYTE m_origin;
57 UINT16 m_snmpPort;
296ae03d 58 BYTE m_snmpRawValueType;
374afd7b 59 BYTE m_busy;
de4af576 60 uuid m_snmpTargetGuid;
87fff547
VK
61 time_t m_lastPollTime;
62
63public:
64 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 65 DataCollectionItem(DB_RESULT hResult, int row);
61c0e619 66 DataCollectionItem(const DataCollectionItem *item);
87fff547
VK
67 virtual ~DataCollectionItem();
68
296ae03d
VK
69 UINT32 getId() const { return m_id; }
70 UINT64 getServerId() const { return m_serverId; }
71 const TCHAR *getName() const { return m_name; }
72 int getType() const { return (int)m_type; }
73 int getOrigin() const { return (int)m_origin; }
de4af576 74 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
296ae03d
VK
75 UINT16 getSnmpPort() const { return m_snmpPort; }
76 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
86ef6701 77 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
13783551 78 time_t getLastPollTime() { return m_lastPollTime; }
296ae03d
VK
79
80 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
81
61c0e619 82 void updateAndSave(const DataCollectionItem *item);
d9c628e3 83 void saveToDatabase(bool newObject);
84 void deleteFromDatabase();
93c62d54 85 void setLastPollTime(time_t time);
87fff547 86
374afd7b
VK
87 void startDataCollection() { m_busy = 1; incRefCount(); }
88 void finishDataCollection() { m_busy = 0; decRefCount(); }
89
296ae03d 90 UINT32 getTimeToNextPoll(time_t now) const
87fff547 91 {
374afd7b
VK
92 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
93 return m_pollingInterval;
87fff547
VK
94 time_t diff = now - m_lastPollTime;
95 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
96 }
97};
98
99/**
100 * Create data collection item from NXCP mesage
101 */
102DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
103{
104 m_serverId = serverId;
105 m_id = msg->getFieldAsInt32(baseId);
106 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
107 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
108 m_name = msg->getFieldAsString(baseId + 3);
109 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
110 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
de4af576 111 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
87fff547 112 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
296ae03d 113 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
374afd7b 114 m_busy = 0;
87fff547
VK
115}
116
d9c628e3 117/**
296ae03d 118 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
d9c628e3 119 */
120DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
121{
122 m_serverId = DBGetFieldInt64(hResult, row, 0);
123 m_id = DBGetFieldULong(hResult, row, 1);
124 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
125 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
d610afbd 126 m_name = DBGetField(hResult, row, 4, NULL, 0);
d9c628e3 127 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
128 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
f470ae5d 129 m_snmpPort = DBGetFieldULong(hResult, row, 7);
de4af576 130 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
296ae03d 131 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
374afd7b 132 m_busy = 0;
d9c628e3 133}
134
135/**
136 * Copy constructor
137 */
61c0e619 138 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
d9c628e3 139 {
140 m_serverId = item->m_serverId;
141 m_id = item->m_id;
142 m_type = item->m_type;
143 m_origin = item->m_origin;
144 m_name = _tcsdup(item->m_name);
145 m_pollingInterval = item->m_pollingInterval;
146 m_lastPollTime = item->m_lastPollTime;
de4af576 147 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 148 m_snmpPort = item->m_snmpPort;
296ae03d 149 m_snmpRawValueType = item->m_snmpRawValueType;
374afd7b 150 m_busy = 0;
d9c628e3 151 }
152
87fff547
VK
153/**
154 * Data collection item destructor
155 */
156DataCollectionItem::~DataCollectionItem()
157{
158 safe_free(m_name);
159}
160
d9c628e3 161/**
162 * Will check if object has changed. If at least one field is changed - all data will be updated and
163 * saved to database.
164 */
61c0e619 165void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
d9c628e3 166{
13783551 167 // if at least one of fields changed - set all fields and save to DB
296ae03d 168 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
de4af576 169 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
d610afbd 170 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
d9c628e3 171 {
172 m_type = item->m_type;
173 m_origin = item->m_origin;
174 m_name = _tcsdup(item->m_name);
175 m_pollingInterval = item->m_pollingInterval;
d610afbd
VK
176 if (m_lastPollTime < item->m_lastPollTime)
177 m_lastPollTime = item->m_lastPollTime;
de4af576 178 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 179 m_snmpPort = item->m_snmpPort;
296ae03d 180 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 181 saveToDatabase(false);
182 }
d9c628e3 183}
184
296ae03d
VK
185/**
186 * Save configuration object to database
187 */
d9c628e3 188void DataCollectionItem::saveToDatabase(bool newObject)
189{
da8453a5 190 DebugPrintf(6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
d9c628e3 191 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
192 DB_HANDLE db = GetLocalDatabaseHandle();
193 DB_STATEMENT hStmt;
194
296ae03d 195 if (newObject)
d9c628e3 196 {
197 hStmt = DBPrepare(db,
198 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
296ae03d
VK
199 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
200 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
d9c628e3 201 }
202 else
203 {
204 hStmt = DBPrepare(db,
61c0e619 205 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
206 _T("polling_interval=?,last_poll=?,snmp_port=?,")
296ae03d 207 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
d9c628e3 208 }
209
210 if (hStmt == NULL)
d9c628e3 211 return;
d9c628e3 212
d9c628e3 213 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
214 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
215 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
216 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
217 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 218 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
de4af576 219 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
296ae03d
VK
220 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
221 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
222 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
d9c628e3 223
296ae03d
VK
224 DBExecute(hStmt);
225 DBFreeStatement(hStmt);
d9c628e3 226}
227
228/**
229 * Remove item form database and delete not synced data if exist
230 */
231void DataCollectionItem::deleteFromDatabase()
232{
d9c628e3 233 DB_HANDLE db = GetLocalDatabaseHandle();
234 TCHAR query[256];
366440e8
VK
235 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
236 if (DBQuery(db, query))
d9c628e3 237 {
366440e8
VK
238 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
239 if (DBQuery(db, query))
240 {
da8453a5 241 DebugPrintf(6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
366440e8 242 }
d9c628e3 243 }
244}
245
a97f02e1
VK
246/**
247 * Set last poll time for item
248 */
93c62d54 249void DataCollectionItem::setLastPollTime(time_t time)
250{
a97f02e1 251 m_lastPollTime = time;
13783551 252 s_pollTimeChanged = true;
93c62d54 253}
254
87fff547
VK
255/**
256 * Collected data
257 */
258class DataElement
259{
260private:
261 UINT64 m_serverId;
262 UINT32 m_dciId;
263 time_t m_timestamp;
02d936bd 264 int m_origin;
87fff547 265 int m_type;
df94243f 266 UINT32 m_statusCode;
de4af576 267 uuid m_snmpNode;
87fff547
VK
268 union
269 {
270 TCHAR *item;
271 StringList *list;
272 Table *table;
273 } m_value;
274
275public:
df94243f 276 DataElement(DataCollectionItem *dci, const TCHAR *value, UINT32 status)
87fff547
VK
277 {
278 m_serverId = dci->getServerId();
279 m_dciId = dci->getId();
280 m_timestamp = time(NULL);
02d936bd 281 m_origin = dci->getOrigin();
87fff547 282 m_type = DCO_TYPE_ITEM;
df94243f 283 m_statusCode = status;
de4af576 284 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
285 m_value.item = _tcsdup(value);
286 }
287
df94243f 288 DataElement(DataCollectionItem *dci, StringList *value, UINT32 status)
87fff547
VK
289 {
290 m_serverId = dci->getServerId();
291 m_dciId = dci->getId();
292 m_timestamp = time(NULL);
02d936bd 293 m_origin = dci->getOrigin();
87fff547 294 m_type = DCO_TYPE_LIST;
df94243f 295 m_statusCode = status;
de4af576 296 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
297 m_value.list = value;
298 }
299
df94243f 300 DataElement(DataCollectionItem *dci, Table *value, UINT32 status)
87fff547
VK
301 {
302 m_serverId = dci->getServerId();
303 m_dciId = dci->getId();
304 m_timestamp = time(NULL);
02d936bd 305 m_origin = dci->getOrigin();
87fff547 306 m_type = DCO_TYPE_TABLE;
df94243f 307 m_statusCode = status;
de4af576 308 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
309 m_value.table = value;
310 }
311
63ff3c9d
VK
312 /**
313 * Create data element from database record
df94243f 314 * Expected field order: server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value
63ff3c9d
VK
315 */
316 DataElement(DB_RESULT hResult, int row)
317 {
318 m_serverId = DBGetFieldUInt64(hResult, row, 0);
319 m_dciId = DBGetFieldULong(hResult, row, 1);
df94243f 320 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 6);
63ff3c9d
VK
321 m_origin = DBGetFieldLong(hResult, row, 3);
322 m_type = DBGetFieldLong(hResult, row, 2);
df94243f 323 m_statusCode = DBGetFieldLong(hResult, row, 4);
324 m_snmpNode = DBGetFieldGUID(hResult, row, 5);
63ff3c9d
VK
325 switch(m_type)
326 {
327 case DCO_TYPE_ITEM:
df94243f 328 m_value.item = DBGetField(hResult, row, 7, NULL, 0);
63ff3c9d
VK
329 break;
330 case DCO_TYPE_LIST:
331 {
332 m_value.list = new StringList();
df94243f 333 TCHAR *text = DBGetField(hResult, row, 7, NULL, 0);
63ff3c9d
VK
334 if (text != NULL)
335 {
336 m_value.list->splitAndAdd(text, _T("\n"));
337 free(text);
338 }
339 }
340 break;
341 case DCO_TYPE_TABLE:
342 {
df94243f 343 char *xml = DBGetFieldUTF8(hResult, row, 7, NULL, 0);
63ff3c9d
VK
344 if (xml != NULL)
345 {
346 m_value.table = Table::createFromXML(xml);
347 free(xml);
348 }
b7b1c821
VK
349 else
350 {
351 m_value.table = NULL;
352 }
63ff3c9d
VK
353 }
354 break;
355 default:
356 m_type = DCO_TYPE_ITEM;
357 m_value.item = _tcsdup(_T(""));
358 break;
359 }
360 }
361
87fff547
VK
362 ~DataElement()
363 {
364 switch(m_type)
365 {
366 case DCO_TYPE_ITEM:
367 free(m_value.item);
368 break;
369 case DCO_TYPE_LIST:
370 delete m_value.list;
371 break;
372 case DCO_TYPE_TABLE:
373 delete m_value.table;
374 break;
375 }
376 }
93c62d54 377
378 time_t getTimestamp() { return m_timestamp; }
61c0e619 379 UINT64 getServerId() { return m_serverId; }
63ff3c9d 380 UINT32 getDciId() { return m_dciId; }
a1273b42 381 int getType() { return m_type; }
df94243f 382 UINT32 getStatusCode() { return m_statusCode; }
a3d3f9d5 383
3ec58bb8 384 void saveToDatabase(DB_STATEMENT hStmt);
63ff3c9d 385 bool sendToServer(bool reconcillation);
a1273b42 386 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
87fff547
VK
387};
388
61c0e619 389/**
296ae03d 390 * Save data element to database
61c0e619 391 */
3ec58bb8 392void DataElement::saveToDatabase(DB_STATEMENT hStmt)
61c0e619 393{
8199376b
VK
394 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
395 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
396 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
397 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
df94243f 398 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_statusCode);
399 DBBind(hStmt, 6, DB_SQLTYPE_VARCHAR, m_snmpNode);
400 DBBind(hStmt, 7, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
8199376b 401 switch(m_type)
a3d3f9d5 402 {
8199376b 403 case DCO_TYPE_ITEM:
df94243f 404 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
8199376b
VK
405 break;
406 case DCO_TYPE_LIST:
df94243f 407 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
8199376b
VK
408 break;
409 case DCO_TYPE_TABLE:
df94243f 410 DBBind(hStmt, 8, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
8199376b 411 break;
a3d3f9d5 412 }
8199376b 413 DBExecute(hStmt);
61c0e619 414}
415
d1dfba1a
VK
416/**
417 * Session comparator
418 */
419static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
420{
421 return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
422}
423
296ae03d
VK
424/**
425 * Send collected data to server
426 */
883c14ae 427bool DataElement::sendToServer(bool reconciliation)
61c0e619 428{
b7b1c821
VK
429 // If data in database was invalid table may not be parsed correctly
430 // Consider sending a success in that case so data element can be dropped
431 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
432 return true;
433
d1dfba1a 434 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
6fbaa926
VK
435 if (session == NULL)
436 return false;
437
438 NXCPMessage msg;
439 msg.setCode(CMD_DCI_DATA);
440 msg.setId(session->generateRequestId());
441 msg.setField(VID_DCI_ID, m_dciId);
02d936bd
VK
442 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
443 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
df94243f 444 msg.setField(VID_STATUS, m_statusCode);
de4af576 445 msg.setField(VID_NODE_ID, m_snmpNode);
6fbaa926 446 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
883c14ae 447 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
6fbaa926
VK
448 switch(m_type)
449 {
450 case DCO_TYPE_ITEM:
451 msg.setField(VID_VALUE, m_value.item);
452 break;
453 case DCO_TYPE_LIST:
6fbaa926
VK
454 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
455 break;
456 case DCO_TYPE_TABLE:
66816abf 457 m_value.table->setSource(m_origin);
6fbaa926
VK
458 m_value.table->fillMessage(msg, 0, -1);
459 break;
460 }
63ff3c9d 461 UINT32 rcc = session->doRequest(&msg, 2000);
6fbaa926 462 session->decRefCount();
63ff3c9d
VK
463
464 // consider internal error as success because it means that server
465 // cannot accept data for some reason and retry is not feasible
466 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
61c0e619 467}
468
a1273b42
VK
469/**
470 * Fill bulk reconciliation message with DCI data
471 */
472void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
473{
474 msg->setField(baseId, m_dciId);
475 msg->setField(baseId + 1, (INT16)m_origin);
476 msg->setField(baseId + 2, (INT16)m_type);
477 msg->setField(baseId + 3, m_snmpNode);
478 msg->setFieldFromTime(baseId + 4, m_timestamp);
479 msg->setField(baseId + 5, m_value.item);
df94243f 480 msg->setField(baseId + 6, m_statusCode);
a1273b42
VK
481}
482
61c0e619 483/**
296ae03d 484 * Server data sync status object
61c0e619 485 */
296ae03d 486struct ServerSyncStatus
61c0e619 487{
296ae03d 488 INT32 queueSize;
61c0e619 489
296ae03d
VK
490 ServerSyncStatus()
491 {
492 queueSize = 0;
493 }
61c0e619 494};
495
496/**
296ae03d 497 * Server sync status information
61c0e619 498 */
296ae03d
VK
499static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
500static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
87fff547 501
3ec58bb8
VK
502/**
503 * Database writer queue
504 */
505static Queue s_databaseWriterQueue;
506
507/**
508 * Database writer
509 */
510static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
511{
512 DB_HANDLE hdb = GetLocalDatabaseHandle();
da8453a5 513 DebugPrintf(1, _T("Database writer thread started"));
3ec58bb8
VK
514
515 while(true)
516 {
517 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
518 if (e == INVALID_POINTER_VALUE)
519 break;
520
df94243f 521 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?,?)"));
3ec58bb8
VK
522 if (hStmt == NULL)
523 {
524 delete e;
525 continue;
526 }
527
528 int count = 0;
529
530 DBBegin(hdb);
531 while((e != NULL) && (e != INVALID_POINTER_VALUE))
532 {
533 e->saveToDatabase(hStmt);
534 delete e;
535
536 count++;
537 if (count > 200)
538 break;
539
540 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
541 }
542 DBCommit(hdb);
543 DBFreeStatement(hStmt);
da8453a5 544 DebugPrintf(7, _T("Database writer: %d records inserted"), count);
51e38637
VK
545 if (e == INVALID_POINTER_VALUE)
546 break;
3ec58bb8
VK
547 }
548
da8453a5 549 DebugPrintf(1, _T("Database writer thread stopped"));
3ec58bb8
VK
550 return THREAD_OK;
551}
552
13783551
VK
553/**
554 * List of all data collection items
555 */
556static ObjectArray<DataCollectionItem> s_items(64, 64, true);
557static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
558
53c3d1e7 559/**
a1273b42 560 * Session comparator
53c3d1e7 561 */
d1dfba1a 562static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
53c3d1e7 563{
d1dfba1a 564 if ((session->getServerId() == 0) || !session->canAcceptData())
a1273b42 565 return false;
53c3d1e7 566 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
9e698daa
VK
567 if ((s != NULL) && (s->queueSize > 0))
568 {
a1273b42 569 return true;
9e698daa 570 }
a1273b42 571 return false;
53c3d1e7
VK
572}
573
61c0e619 574/**
883c14ae 575 * Data reconciliation thread
61c0e619 576 */
883c14ae 577static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
61c0e619 578{
296ae03d 579 DB_HANDLE hdb = GetLocalDatabaseHandle();
b7b1c821 580 UINT32 sleepTime = 30000;
9cf1b53c 581 nxlog_debug(1, _T("Data reconciliation thread started (block size %d, timeout %d ms)"), g_dcReconciliationBlockSize, g_dcReconciliationTimeout);
a3d3f9d5 582
5e12ef62 583 bool vacuumNeeded = false;
296ae03d 584 while(!AgentSleepAndCheckForShutdown(sleepTime))
61c0e619 585 {
53c3d1e7
VK
586 // Check if there is something to sync
587 MutexLock(s_serverSyncStatusLock);
d1dfba1a 588 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
53c3d1e7 589 MutexUnlock(s_serverSyncStatusLock);
a1273b42 590 if (session == NULL)
53c3d1e7 591 {
13783551
VK
592 // Save last poll times when reconciliation thread is idle
593 MutexLock(s_itemLock);
594 if (s_pollTimeChanged)
595 {
596 DBBegin(hdb);
597 TCHAR query[256];
598 for(int i = 0; i < s_items.size(); i++)
599 {
600 DataCollectionItem *dci = s_items.get(i);
601 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
c6d5c1b0 602 (UINT64)dci->getLastPollTime(), (UINT64)dci->getServerId(), dci->getId());
13783551
VK
603 DBQuery(hdb, query);
604 }
605 DBCommit(hdb);
606 s_pollTimeChanged = false;
607 }
608 MutexUnlock(s_itemLock);
609
5e12ef62
VK
610 if (vacuumNeeded)
611 {
da8453a5 612 DebugPrintf(4, _T("ReconciliationThread: vacuum local database"));
5e12ef62
VK
613 DBQuery(hdb, _T("VACUUM"));
614 vacuumNeeded = false;
615 }
53c3d1e7
VK
616 sleepTime = 30000;
617 continue;
618 }
619
9e698daa 620 TCHAR query[1024];
51e38637 621 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,status_code,snmp_target_guid,timestamp,value FROM dc_queue INDEXED BY idx_dc_queue_timestamp WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), g_dcReconciliationBlockSize);
9e698daa 622
13783551
VK
623 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
624 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
296ae03d 625 if (hResult == NULL)
53c3d1e7 626 {
da8453a5 627 DebugPrintf(4, _T("ReconciliationThread: database query failed: %s"), sqlError);
53c3d1e7 628 sleepTime = 30000;
a1273b42 629 session->decRefCount();
296ae03d 630 continue;
53c3d1e7 631 }
296ae03d
VK
632
633 int count = DBGetNumRows(hResult);
b7b1c821 634 if (count > 0)
61c0e619 635 {
a1273b42 636 ObjectArray<DataElement> bulkSendList(count, 10, true);
b7b1c821
VK
637 ObjectArray<DataElement> deleteList(count, 10, true);
638 for(int i = 0; i < count; i++)
63ff3c9d 639 {
b7b1c821 640 DataElement *e = new DataElement(hResult, i);
a1273b42 641 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
b7b1c821 642 {
a1273b42 643 bulkSendList.add(e);
b7b1c821
VK
644 }
645 else
646 {
a1273b42
VK
647 MutexLock(s_serverSyncStatusLock);
648 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
649 if (status != NULL)
650 {
21ec6e4f
VK
651 if (e->sendToServer(true))
652 {
653 status->queueSize--;
654 deleteList.add(e);
655 }
656 else
657 {
658 delete e;
659 }
a1273b42
VK
660 }
661 else
662 {
da8453a5 663 DebugPrintf(5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
21ec6e4f 664 deleteList.add(e); // record should be deleted
a1273b42
VK
665 }
666 MutexUnlock(s_serverSyncStatusLock);
b7b1c821 667 }
a1273b42 668 }
b7b1c821 669
a1273b42
VK
670 if (bulkSendList.size() > 0)
671 {
da8453a5 672 DebugPrintf(6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
a1273b42
VK
673
674 NXCPMessage msg;
675 msg.setCode(CMD_DCI_DATA);
676 msg.setId(session->generateRequestId());
677 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
678 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
679
680 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
681 for(int i = 0; i < bulkSendList.size(); i++)
b7b1c821 682 {
a1273b42
VK
683 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
684 fieldId += 10;
685 }
686
9cf1b53c 687 NXCPMessage *response = session->doRequestEx(&msg, g_dcReconciliationTimeout);
a1273b42
VK
688 if (response != NULL)
689 {
690 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
691 if (rcc == ERR_SUCCESS)
692 {
693 MutexLock(s_serverSyncStatusLock);
694 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
695
696 // Check status for each data element
9cf1b53c
VK
697 BYTE status[MAX_BULK_DATA_BLOCK_SIZE];
698 memset(status, 0, MAX_BULK_DATA_BLOCK_SIZE);
699 response->getFieldAsBinary(VID_STATUS, status, MAX_BULK_DATA_BLOCK_SIZE);
a1273b42
VK
700 bulkSendList.setOwner(false);
701 for(int i = 0; i < bulkSendList.size(); i++)
702 {
703 DataElement *e = bulkSendList.get(i);
704 if (status[i] != BULK_DATA_REC_RETRY)
705 {
706 deleteList.add(e);
707 serverSyncStatus->queueSize--;
708 }
709 else
710 {
711 delete e;
712 }
713 }
714
715 MutexUnlock(s_serverSyncStatusLock);
716 }
717 else
718 {
da8453a5 719 DebugPrintf(4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
a1273b42
VK
720 }
721 delete response;
b7b1c821
VK
722 }
723 else
724 {
da8453a5 725 DebugPrintf(4, _T("ReconciliationThread: timeout on bulk send"));
b7b1c821 726 }
63ff3c9d
VK
727 }
728
b7b1c821 729 if (deleteList.size() > 0)
63ff3c9d 730 {
b7b1c821
VK
731 DBBegin(hdb);
732 for(int i = 0; i < deleteList.size(); i++)
63ff3c9d 733 {
b7b1c821
VK
734 DataElement *e = deleteList.get(i);
735 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
736 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
737 DBQuery(hdb, query);
63ff3c9d 738 }
b7b1c821 739 DBCommit(hdb);
9cf1b53c 740 nxlog_debug(4, _T("ReconciliationThread: %d records sent"), deleteList.size());
5e12ef62 741 vacuumNeeded = true;
63ff3c9d 742 }
61c0e619 743 }
296ae03d
VK
744 DBFreeResult(hResult);
745
a1273b42 746 session->decRefCount();
f7319eac 747 sleepTime = (count > 0) ? 50 : 30000;
61c0e619 748 }
a3d3f9d5 749
9cf1b53c 750 nxlog_debug(1, _T("Data reconciliation thread stopped"));
296ae03d 751 return THREAD_OK;
61c0e619 752}
753
296ae03d
VK
754/**
755 * Data sender queue
756 */
757static Queue s_dataSenderQueue;
758
87fff547
VK
759/**
760 * Data sender
761 */
762static THREAD_RESULT THREAD_CALL DataSender(void *arg)
763{
da8453a5 764 DebugPrintf(1, _T("Data sender thread started"));
87fff547
VK
765 while(true)
766 {
19dbc8ef 767 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
87fff547
VK
768 if (e == INVALID_POINTER_VALUE)
769 break;
770
296ae03d
VK
771 MutexLock(s_serverSyncStatusLock);
772 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
773 if (status == NULL)
61c0e619 774 {
296ae03d
VK
775 status = new ServerSyncStatus();
776 s_serverSyncStatus.set(e->getServerId(), status);
777 }
778
779 if (status->queueSize == 0)
780 {
63ff3c9d 781 if (!e->sendToServer(false))
61c0e619 782 {
a3d3f9d5 783 status->queueSize++;
3ec58bb8
VK
784 s_databaseWriterQueue.put(e);
785 e = NULL;
61c0e619 786 }
787 }
788 else
789 {
6fbaa926 790 status->queueSize++;
3ec58bb8
VK
791 s_databaseWriterQueue.put(e);
792 e = NULL;
61c0e619 793 }
296ae03d 794 MutexUnlock(s_serverSyncStatusLock);
61c0e619 795
87fff547
VK
796 delete e;
797 }
da8453a5 798 DebugPrintf(1, _T("Data sender thread stopped"));
87fff547
VK
799 return THREAD_OK;
800}
801
87fff547
VK
802/**
803 * Collect data from agent
804 */
296ae03d 805static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
87fff547
VK
806{
807 VirtualSession session(dci->getServerId());
808
809 DataElement *e = NULL;
df94243f 810 UINT32 status;
87fff547
VK
811 if (dci->getType() == DCO_TYPE_ITEM)
812 {
813 TCHAR value[MAX_RESULT_LENGTH];
da8453a5 814 status = GetParameterValue(dci->getName(), value, &session);
df94243f 815 e = new DataElement(dci, (status == ERR_SUCCESS) ? value : _T(""), status);
87fff547
VK
816 }
817 else if (dci->getType() == DCO_TYPE_LIST)
818 {
df94243f 819 StringList *value = new StringList();
da8453a5 820 status = GetListValue(dci->getName(), value, &session);
df94243f 821 e = new DataElement(dci, value, status);
87fff547
VK
822 }
823 else if (dci->getType() == DCO_TYPE_TABLE)
824 {
df94243f 825 Table *value = new Table();
da8453a5 826 status = GetTableValue(dci->getName(), value, &session);
df94243f 827 e = new DataElement(dci, value, status);
87fff547
VK
828 }
829
830 return e;
831}
832
833/**
834 * Collect data from SNMP
835 */
296ae03d 836static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
87fff547 837{
296ae03d
VK
838 DataElement *e = NULL;
839 if (dci->getType() == DCO_TYPE_ITEM)
840 {
da8453a5 841 DebugPrintf(8, _T("Read SNMP parameter %s"), dci->getName());
c1bc623e 842
296ae03d 843 TCHAR value[MAX_RESULT_LENGTH];
df94243f 844 UINT32 status = GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType());
845 e = new DataElement(dci, status == ERR_SUCCESS ? value : _T(""), status);
296ae03d
VK
846 }
847 return e;
87fff547
VK
848}
849
374afd7b 850/**
c1bc623e 851 * Local data collection callback
374afd7b 852 */
c1bc623e 853static void LocalDataCollectionCallback(void *arg)
374afd7b
VK
854{
855 DataCollectionItem *dci = (DataCollectionItem *)arg;
856
c1bc623e
VK
857 DataElement *e = CollectDataFromAgent(dci);
858 if (e != NULL)
374afd7b 859 {
c1bc623e 860 s_dataSenderQueue.put(e);
374afd7b
VK
861 }
862 else
863 {
da8453a5 864 DebugPrintf(6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
374afd7b
VK
865 }
866
c1bc623e
VK
867 dci->setLastPollTime(time(NULL));
868 dci->finishDataCollection();
869}
870
871/**
872 * SNMP data collection callback
873 */
874static void SnmpDataCollectionCallback(void *arg)
875{
876 DataCollectionItem *dci = (DataCollectionItem *)arg;
877
878 DataElement *e = CollectDataFromSNMP(dci);
374afd7b
VK
879 if (e != NULL)
880 {
881 s_dataSenderQueue.put(e);
882 }
883 else
884 {
da8453a5 885 DebugPrintf(6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
374afd7b
VK
886 }
887
888 dci->setLastPollTime(time(NULL));
889 dci->finishDataCollection();
890}
891
87fff547 892/**
374afd7b 893 * Data collectors thread pool
87fff547 894 */
374afd7b
VK
895static ThreadPool *s_dataCollectorPool = NULL;
896
897/**
898 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
899 */
900static UINT32 DataCollectionSchedulerRun()
87fff547 901{
87fff547 902 UINT32 sleepTime = 60;
d9c628e3 903
87fff547
VK
904 MutexLock(s_itemLock);
905 for(int i = 0; i < s_items.size(); i++)
906 {
907 DataCollectionItem *dci = s_items.get(i);
231b0aad 908 time_t now = time(NULL);
87fff547
VK
909 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
910 if (timeToPoll == 0)
911 {
da8453a5 912 DebugPrintf(7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
c1bc623e
VK
913
914 if (dci->getOrigin() == DS_NATIVE_AGENT)
915 {
916 dci->startDataCollection();
917 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
918 }
919 else if (dci->getOrigin() == DS_SNMP_AGENT)
920 {
921 dci->startDataCollection();
922 TCHAR key[64];
923 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
924 }
925 else
926 {
da8453a5 927 DebugPrintf(7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
c1bc623e
VK
928 }
929
63ff3c9d 930 timeToPoll = dci->getPollingInterval();
87fff547 931 }
63ff3c9d
VK
932
933 if (sleepTime > timeToPoll)
934 sleepTime = timeToPoll;
87fff547
VK
935 }
936 MutexUnlock(s_itemLock);
937 return sleepTime;
938}
939
940/**
374afd7b 941 * Data collection scheduler thread
87fff547 942 */
374afd7b 943static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
87fff547 944{
da8453a5 945 DebugPrintf(1, _T("Data collection scheduler thread started"));
1304b91d 946 s_dataCollectorPool = ThreadPoolCreate(1, g_dcMaxCollectorPoolSize, _T("DATACOLL"));
87fff547 947
374afd7b 948 UINT32 sleepTime = DataCollectionSchedulerRun();
9aa171c1 949 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547 950 {
374afd7b 951 sleepTime = DataCollectionSchedulerRun();
da8453a5 952 DebugPrintf(7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
87fff547
VK
953 }
954
374afd7b 955 ThreadPoolDestroy(s_dataCollectorPool);
da8453a5 956 DebugPrintf(1, _T("Data collection scheduler thread stopped"));
87fff547
VK
957 return THREAD_OK;
958}
959
960/**
961 * Configure data collection
962 */
963void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
964{
dec46d8a
VK
965 if (!s_dataCollectorStarted)
966 {
da8453a5 967 DebugPrintf(1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
dec46d8a
VK
968 return;
969 }
970
4c84b8a5
VK
971 DB_HANDLE hdb = GetLocalDatabaseHandle();
972
296ae03d 973 int count = msg->getFieldAsInt32(VID_NUM_NODES);
4c84b8a5 974 if (count > 0)
296ae03d 975 {
4c84b8a5
VK
976 DBBegin(hdb);
977 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
978 for(int i = 0; i < count; i++)
979 {
980 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
981 UpdateSnmpTarget(target);
982 fieldId += 50;
983 }
984 DBCommit(hdb);
296ae03d 985 }
da8453a5 986 DebugPrintf(4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
296ae03d 987
87fff547
VK
988 ObjectArray<DataCollectionItem> config(32, 32, true);
989
296ae03d 990 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
4c84b8a5 991 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
87fff547
VK
992 for(int i = 0; i < count; i++)
993 {
994 config.add(new DataCollectionItem(serverId, msg, fieldId));
995 fieldId += 10;
996 }
da8453a5 997 DebugPrintf(4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
87fff547 998
8edd7506
VK
999 bool txnOpen = false;
1000
d9c628e3 1001 MutexLock(s_itemLock);
a3d3f9d5 1002
296ae03d 1003 // Update and add new
d9c628e3 1004 for(int j = 0; j < config.size(); j++)
1005 {
1006 DataCollectionItem *item = config.get(j);
1007 bool exist = false;
1008 for(int i = 0; i < s_items.size(); i++)
1009 {
d610afbd 1010 if (item->equals(s_items.get(i)))
d9c628e3 1011 {
1012 s_items.get(i)->updateAndSave(item);
1013 exist = true;
1014 }
1015 }
296ae03d 1016 if (!exist)
d9c628e3 1017 {
1018 DataCollectionItem *newItem = new DataCollectionItem(item);
1019 s_items.add(newItem);
8edd7506
VK
1020 if (!txnOpen)
1021 {
1022 DBBegin(hdb);
1023 txnOpen = true;
1024 }
d9c628e3 1025 newItem->saveToDatabase(true);
1026 }
1027 }
a3d3f9d5 1028
296ae03d 1029 // Remove not existing configuration and data for it
d9c628e3 1030 for(int i = 0; i < s_items.size(); i++)
1031 {
1032 DataCollectionItem *item = s_items.get(i);
61c0e619 1033 //If item is from other server, then, do not search it in list of this server
1034 if(item->getServerId() != serverId)
1035 continue;
d9c628e3 1036 bool exist = false;
1037 for(int j = 0; j < config.size(); j++)
1038 {
374afd7b 1039 if (item->equals(config.get(j)))
d9c628e3 1040 {
1041 exist = true;
1042 }
1043 }
6fbaa926 1044 if (!exist)
d9c628e3 1045 {
8edd7506
VK
1046 if (!txnOpen)
1047 {
1048 DBBegin(hdb);
1049 txnOpen = true;
1050 }
d9c628e3 1051 item->deleteFromDatabase();
374afd7b
VK
1052 s_items.unlink(i);
1053 item->decRefCount();
d9c628e3 1054 i--;
d9c628e3 1055 }
1056 }
8edd7506
VK
1057
1058 if (txnOpen)
1059 DBCommit(hdb);
1060
d9c628e3 1061 MutexUnlock(s_itemLock);
8edd7506 1062
da8453a5 1063 DebugPrintf(4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
87fff547
VK
1064}
1065
d9c628e3 1066/**
63ff3c9d 1067 * Load saved state of local data collection
d9c628e3 1068 */
63ff3c9d 1069static void LoadState()
d9c628e3 1070{
63ff3c9d
VK
1071 DB_HANDLE hdb = GetLocalDatabaseHandle();
1072 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
296ae03d 1073 if (hResult != NULL)
d9c628e3 1074 {
63ff3c9d
VK
1075 int count = DBGetNumRows(hResult);
1076 for(int i = 0; i < count; i++)
d9c628e3 1077 {
1078 s_items.add(new DataCollectionItem(hResult, i));
1079 }
1080 DBFreeResult(hResult);
1081 }
63ff3c9d
VK
1082
1083 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1084 if (hResult != NULL)
1085 {
1086 int count = DBGetNumRows(hResult);
1087 for(int i = 0; i < count; i++)
1088 {
1089 SNMPTarget *t = new SNMPTarget(hResult, i);
1090 UpdateSnmpTarget(t);
1091 }
1092 DBFreeResult(hResult);
1093 }
1094
1095 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1096 if (hResult != NULL)
1097 {
1098 int count = DBGetNumRows(hResult);
1099 for(int i = 0; i < count; i++)
1100 {
1101 ServerSyncStatus *s = new ServerSyncStatus;
1102 s->queueSize = DBGetFieldLong(hResult, i, 1);
1103 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1104 s_serverSyncStatus.set(serverId, s);
da8453a5 1105 DebugPrintf(2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
63ff3c9d
VK
1106 }
1107 DBFreeResult(hResult);
1108 }
d9c628e3 1109}
1110
296ae03d
VK
1111/**
1112 * Data collector and sender thread handles
1113 */
374afd7b 1114static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
296ae03d 1115static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
3ec58bb8 1116static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
883c14ae 1117static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
296ae03d 1118
87fff547
VK
1119/**
1120 * Initialize and start local data collector
1121 */
1122void StartLocalDataCollector()
1123{
d9c628e3 1124 DB_HANDLE db = GetLocalDatabaseHandle();
a97f02e1 1125 if (db == NULL)
d9c628e3 1126 {
da8453a5 1127 DebugPrintf(5, _T("StartLocalDataCollector: local database unavailable"));
d9c628e3 1128 return;
1129 }
296ae03d 1130
9cf1b53c
VK
1131 if (g_dcReconciliationBlockSize < 16)
1132 {
1133 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to 16"), g_dcReconciliationBlockSize);
1134 g_dcReconciliationBlockSize = 16;
1135 }
1136 else if (g_dcReconciliationBlockSize > MAX_BULK_DATA_BLOCK_SIZE)
1137 {
1138 nxlog_debug(1, _T("Invalid data reconciliation block size %d, resetting to %d"), g_dcReconciliationBlockSize, MAX_BULK_DATA_BLOCK_SIZE);
1139 g_dcReconciliationBlockSize = MAX_BULK_DATA_BLOCK_SIZE;
1140 }
1141
1142 if (g_dcReconciliationTimeout < 1000)
1143 {
1144 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 1000"), g_dcReconciliationTimeout);
1145 g_dcReconciliationTimeout = 1000;
1146 }
1147 else if (g_dcReconciliationTimeout > 600000)
1148 {
1149 nxlog_debug(1, _T("Invalid data reconciliation timeout %d, resetting to 600000"), g_dcReconciliationTimeout);
1150 g_dcReconciliationTimeout = 600000;
1151 }
1152
87fff547 1153 s_itemLock = MutexCreate();
296ae03d 1154 s_serverSyncStatusLock = MutexCreate();
63ff3c9d
VK
1155
1156 LoadState();
1157
374afd7b 1158 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
87fff547 1159 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
3ec58bb8 1160 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
883c14ae 1161 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
dec46d8a
VK
1162
1163 s_dataCollectorStarted = true;
87fff547
VK
1164}
1165
1166/**
1167 * Shutdown local data collector
1168 */
1169void ShutdownLocalDataCollector()
1170{
dec46d8a
VK
1171 if (!s_dataCollectorStarted)
1172 {
da8453a5 1173 DebugPrintf(5, _T("Local data collector was not started"));
dec46d8a
VK
1174 return;
1175 }
1176
da8453a5 1177 DebugPrintf(5, _T("Waiting for data collector thread termination"));
374afd7b 1178 ThreadJoin(s_dataCollectionSchedulerThread);
87fff547 1179
da8453a5 1180 DebugPrintf(5, _T("Waiting for data sender thread termination"));
19dbc8ef 1181 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
87fff547
VK
1182 ThreadJoin(s_dataSenderThread);
1183
da8453a5 1184 DebugPrintf(5, _T("Waiting for database writer thread termination"));
3ec58bb8
VK
1185 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1186 ThreadJoin(s_databaseWriterThread);
1187
da8453a5 1188 DebugPrintf(5, _T("Waiting for data reconciliation thread termination"));
883c14ae 1189 ThreadJoin(s_reconciliationThread);
61c0e619 1190
87fff547 1191 MutexDestroy(s_itemLock);
296ae03d 1192 MutexDestroy(s_serverSyncStatusLock);
87fff547 1193}
4b535d78 1194
81222795
VK
1195/**
1196 * Clear data collection configuration
1197 */
1198void ClearDataCollectionConfiguration()
4b535d78 1199{
1200 MutexLock(s_itemLock);
1201 DB_HANDLE db = GetLocalDatabaseHandle();
1202 DBQuery(db, _T("DELETE FROM dc_queue"));
1203 DBQuery(db, _T("DELETE FROM dc_config"));
1204 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1205 s_items.clear();
1206 MutexUnlock(s_itemLock);
1207
1208 MutexLock(s_serverSyncStatusLock);
4b535d78 1209 s_serverSyncStatus.clear();
1210 MutexUnlock(s_serverSyncStatusLock);
1211}
df26f039
VK
1212
1213/**
1214 * Handler for data collector queue size
1215 */
1216LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1217{
1218 if (!s_dataCollectorStarted)
1219 return SYSINFO_RC_UNSUPPORTED;
1220
1221 UINT32 count = 0;
1222 MutexLock(s_serverSyncStatusLock);
1223 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1224 while(it->hasNext())
1225 count += (UINT32)it->next()->queueSize;
1226 delete it;
1227 MutexUnlock(s_serverSyncStatusLock);
1228
1229 ret_uint(value, count);
1230 return SYSINFO_RC_SUCCESS;
1231}