fixed possible NULL pointer dereference in agent
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
21ec6e4f 3** Copyright (C) 2003-2016 Victor Kirhenshtein
87fff547
VK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
296ae03d
VK
25/**
26 * Externals
27 */
28void UpdateSnmpTarget(SNMPTarget *target);
de4af576 29bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
296ae03d 30
d9c628e3 31/**
32 * Database schema version
33 */
a97f02e1 34#define DATACOLL_SCHEMA_VERSION 3
d9c628e3 35
dec46d8a
VK
36/**
37 * Data collector start indicator
38 */
39static bool s_dataCollectorStarted = false;
40
13783551
VK
41/**
42 * Unsaved poll time indicator
43 */
44static bool s_pollTimeChanged = false;
45
87fff547
VK
46/**
47 * Data collection item
48 */
49class DataCollectionItem : public RefCountObject
50{
51private:
52 UINT64 m_serverId;
53 UINT32 m_id;
54 INT32 m_pollingInterval;
55 TCHAR *m_name;
56 BYTE m_type;
57 BYTE m_origin;
58 UINT16 m_snmpPort;
296ae03d 59 BYTE m_snmpRawValueType;
374afd7b 60 BYTE m_busy;
de4af576 61 uuid m_snmpTargetGuid;
87fff547
VK
62 time_t m_lastPollTime;
63
64public:
65 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 66 DataCollectionItem(DB_RESULT hResult, int row);
61c0e619 67 DataCollectionItem(const DataCollectionItem *item);
87fff547
VK
68 virtual ~DataCollectionItem();
69
296ae03d
VK
70 UINT32 getId() const { return m_id; }
71 UINT64 getServerId() const { return m_serverId; }
72 const TCHAR *getName() const { return m_name; }
73 int getType() const { return (int)m_type; }
74 int getOrigin() const { return (int)m_origin; }
de4af576 75 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
296ae03d
VK
76 UINT16 getSnmpPort() const { return m_snmpPort; }
77 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
86ef6701 78 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
13783551 79 time_t getLastPollTime() { return m_lastPollTime; }
296ae03d
VK
80
81 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
82
61c0e619 83 void updateAndSave(const DataCollectionItem *item);
d9c628e3 84 void saveToDatabase(bool newObject);
85 void deleteFromDatabase();
93c62d54 86 void setLastPollTime(time_t time);
87fff547 87
374afd7b
VK
88 void startDataCollection() { m_busy = 1; incRefCount(); }
89 void finishDataCollection() { m_busy = 0; decRefCount(); }
90
296ae03d 91 UINT32 getTimeToNextPoll(time_t now) const
87fff547 92 {
374afd7b
VK
93 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
94 return m_pollingInterval;
87fff547
VK
95 time_t diff = now - m_lastPollTime;
96 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
97 }
98};
99
100/**
101 * Create data collection item from NXCP mesage
102 */
103DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
104{
105 m_serverId = serverId;
106 m_id = msg->getFieldAsInt32(baseId);
107 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
108 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
109 m_name = msg->getFieldAsString(baseId + 3);
110 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
111 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
de4af576 112 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
87fff547 113 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
296ae03d 114 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
374afd7b 115 m_busy = 0;
87fff547
VK
116}
117
d9c628e3 118/**
296ae03d 119 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
d9c628e3 120 */
121DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
122{
123 m_serverId = DBGetFieldInt64(hResult, row, 0);
124 m_id = DBGetFieldULong(hResult, row, 1);
125 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
126 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
d610afbd 127 m_name = DBGetField(hResult, row, 4, NULL, 0);
d9c628e3 128 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
129 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
f470ae5d 130 m_snmpPort = DBGetFieldULong(hResult, row, 7);
de4af576 131 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
296ae03d 132 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
374afd7b 133 m_busy = 0;
d9c628e3 134}
135
136/**
137 * Copy constructor
138 */
61c0e619 139 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
d9c628e3 140 {
141 m_serverId = item->m_serverId;
142 m_id = item->m_id;
143 m_type = item->m_type;
144 m_origin = item->m_origin;
145 m_name = _tcsdup(item->m_name);
146 m_pollingInterval = item->m_pollingInterval;
147 m_lastPollTime = item->m_lastPollTime;
de4af576 148 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 149 m_snmpPort = item->m_snmpPort;
296ae03d 150 m_snmpRawValueType = item->m_snmpRawValueType;
374afd7b 151 m_busy = 0;
d9c628e3 152 }
153
87fff547
VK
154/**
155 * Data collection item destructor
156 */
157DataCollectionItem::~DataCollectionItem()
158{
159 safe_free(m_name);
160}
161
d9c628e3 162/**
163 * Will check if object has changed. If at least one field is changed - all data will be updated and
164 * saved to database.
165 */
61c0e619 166void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
d9c628e3 167{
13783551 168 // if at least one of fields changed - set all fields and save to DB
296ae03d 169 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
de4af576 170 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
d610afbd 171 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
d9c628e3 172 {
173 m_type = item->m_type;
174 m_origin = item->m_origin;
175 m_name = _tcsdup(item->m_name);
176 m_pollingInterval = item->m_pollingInterval;
d610afbd
VK
177 if (m_lastPollTime < item->m_lastPollTime)
178 m_lastPollTime = item->m_lastPollTime;
de4af576 179 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 180 m_snmpPort = item->m_snmpPort;
296ae03d 181 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 182 saveToDatabase(false);
183 }
d9c628e3 184}
185
296ae03d
VK
186/**
187 * Save configuration object to database
188 */
d9c628e3 189void DataCollectionItem::saveToDatabase(bool newObject)
190{
e1c30b6f 191 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
d9c628e3 192 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
193 DB_HANDLE db = GetLocalDatabaseHandle();
194 DB_STATEMENT hStmt;
195
296ae03d 196 if (newObject)
d9c628e3 197 {
198 hStmt = DBPrepare(db,
199 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
296ae03d
VK
200 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
201 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
d9c628e3 202 }
203 else
204 {
205 hStmt = DBPrepare(db,
61c0e619 206 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
207 _T("polling_interval=?,last_poll=?,snmp_port=?,")
296ae03d 208 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
d9c628e3 209 }
210
211 if (hStmt == NULL)
d9c628e3 212 return;
d9c628e3 213
d9c628e3 214 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
215 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
216 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
217 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
218 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 219 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
de4af576 220 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
296ae03d
VK
221 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
222 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
223 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
d9c628e3 224
296ae03d
VK
225 DBExecute(hStmt);
226 DBFreeStatement(hStmt);
d9c628e3 227}
228
229/**
230 * Remove item form database and delete not synced data if exist
231 */
232void DataCollectionItem::deleteFromDatabase()
233{
d9c628e3 234 DB_HANDLE db = GetLocalDatabaseHandle();
235 TCHAR query[256];
366440e8
VK
236 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
237 if (DBQuery(db, query))
d9c628e3 238 {
366440e8
VK
239 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
240 if (DBQuery(db, query))
241 {
242 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
243 }
d9c628e3 244 }
245}
246
a97f02e1
VK
247/**
248 * Set last poll time for item
249 */
93c62d54 250void DataCollectionItem::setLastPollTime(time_t time)
251{
a97f02e1 252 m_lastPollTime = time;
13783551 253 s_pollTimeChanged = true;
93c62d54 254}
255
87fff547
VK
256/**
257 * Collected data
258 */
259class DataElement
260{
261private:
262 UINT64 m_serverId;
263 UINT32 m_dciId;
264 time_t m_timestamp;
02d936bd 265 int m_origin;
87fff547 266 int m_type;
de4af576 267 uuid m_snmpNode;
87fff547
VK
268 union
269 {
270 TCHAR *item;
271 StringList *list;
272 Table *table;
273 } m_value;
274
275public:
276 DataElement(DataCollectionItem *dci, const TCHAR *value)
277 {
278 m_serverId = dci->getServerId();
279 m_dciId = dci->getId();
280 m_timestamp = time(NULL);
02d936bd 281 m_origin = dci->getOrigin();
87fff547 282 m_type = DCO_TYPE_ITEM;
de4af576 283 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
284 m_value.item = _tcsdup(value);
285 }
286
287 DataElement(DataCollectionItem *dci, StringList *value)
288 {
289 m_serverId = dci->getServerId();
290 m_dciId = dci->getId();
291 m_timestamp = time(NULL);
02d936bd 292 m_origin = dci->getOrigin();
87fff547 293 m_type = DCO_TYPE_LIST;
de4af576 294 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
295 m_value.list = value;
296 }
297
298 DataElement(DataCollectionItem *dci, Table *value)
299 {
300 m_serverId = dci->getServerId();
301 m_dciId = dci->getId();
302 m_timestamp = time(NULL);
02d936bd 303 m_origin = dci->getOrigin();
87fff547 304 m_type = DCO_TYPE_TABLE;
de4af576 305 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
306 m_value.table = value;
307 }
308
63ff3c9d
VK
309 /**
310 * Create data element from database record
311 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
312 */
313 DataElement(DB_RESULT hResult, int row)
314 {
315 m_serverId = DBGetFieldUInt64(hResult, row, 0);
316 m_dciId = DBGetFieldULong(hResult, row, 1);
317 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
318 m_origin = DBGetFieldLong(hResult, row, 3);
319 m_type = DBGetFieldLong(hResult, row, 2);
de4af576 320 m_snmpNode = DBGetFieldGUID(hResult, row, 4);
63ff3c9d
VK
321 switch(m_type)
322 {
323 case DCO_TYPE_ITEM:
324 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
325 break;
326 case DCO_TYPE_LIST:
327 {
328 m_value.list = new StringList();
329 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
330 if (text != NULL)
331 {
332 m_value.list->splitAndAdd(text, _T("\n"));
333 free(text);
334 }
335 }
336 break;
337 case DCO_TYPE_TABLE:
338 {
339 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
340 if (xml != NULL)
341 {
342 m_value.table = Table::createFromXML(xml);
343 free(xml);
344 }
b7b1c821
VK
345 else
346 {
347 m_value.table = NULL;
348 }
63ff3c9d
VK
349 }
350 break;
351 default:
352 m_type = DCO_TYPE_ITEM;
353 m_value.item = _tcsdup(_T(""));
354 break;
355 }
356 }
357
87fff547
VK
358 ~DataElement()
359 {
360 switch(m_type)
361 {
362 case DCO_TYPE_ITEM:
363 free(m_value.item);
364 break;
365 case DCO_TYPE_LIST:
366 delete m_value.list;
367 break;
368 case DCO_TYPE_TABLE:
369 delete m_value.table;
370 break;
371 }
372 }
93c62d54 373
374 time_t getTimestamp() { return m_timestamp; }
61c0e619 375 UINT64 getServerId() { return m_serverId; }
63ff3c9d 376 UINT32 getDciId() { return m_dciId; }
a1273b42 377 int getType() { return m_type; }
a3d3f9d5 378
3ec58bb8 379 void saveToDatabase(DB_STATEMENT hStmt);
63ff3c9d 380 bool sendToServer(bool reconcillation);
a1273b42 381 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
87fff547
VK
382};
383
61c0e619 384/**
296ae03d 385 * Save data element to database
61c0e619 386 */
3ec58bb8 387void DataElement::saveToDatabase(DB_STATEMENT hStmt)
61c0e619 388{
8199376b
VK
389 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
390 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
391 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
392 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
de4af576 393 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
63ff3c9d 394 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
8199376b 395 switch(m_type)
a3d3f9d5 396 {
8199376b 397 case DCO_TYPE_ITEM:
63ff3c9d 398 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
8199376b
VK
399 break;
400 case DCO_TYPE_LIST:
63ff3c9d 401 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
8199376b
VK
402 break;
403 case DCO_TYPE_TABLE:
63ff3c9d 404 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
8199376b 405 break;
a3d3f9d5 406 }
8199376b 407 DBExecute(hStmt);
61c0e619 408}
409
296ae03d
VK
410/**
411 * Send collected data to server
412 */
883c14ae 413bool DataElement::sendToServer(bool reconciliation)
61c0e619 414{
b7b1c821
VK
415 // If data in database was invalid table may not be parsed correctly
416 // Consider sending a success in that case so data element can be dropped
417 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
418 return true;
419
6fbaa926
VK
420 CommSession *session = (CommSession *)FindServerSession(m_serverId);
421 if (session == NULL)
422 return false;
423
424 NXCPMessage msg;
425 msg.setCode(CMD_DCI_DATA);
426 msg.setId(session->generateRequestId());
427 msg.setField(VID_DCI_ID, m_dciId);
02d936bd
VK
428 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
429 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
de4af576 430 msg.setField(VID_NODE_ID, m_snmpNode);
6fbaa926 431 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
883c14ae 432 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
6fbaa926
VK
433 switch(m_type)
434 {
435 case DCO_TYPE_ITEM:
436 msg.setField(VID_VALUE, m_value.item);
437 break;
438 case DCO_TYPE_LIST:
6fbaa926
VK
439 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
440 break;
441 case DCO_TYPE_TABLE:
66816abf 442 m_value.table->setSource(m_origin);
6fbaa926
VK
443 m_value.table->fillMessage(msg, 0, -1);
444 break;
445 }
63ff3c9d 446 UINT32 rcc = session->doRequest(&msg, 2000);
6fbaa926 447 session->decRefCount();
63ff3c9d
VK
448
449 // consider internal error as success because it means that server
450 // cannot accept data for some reason and retry is not feasible
451 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
61c0e619 452}
453
a1273b42
VK
454/**
455 * Fill bulk reconciliation message with DCI data
456 */
457void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
458{
459 msg->setField(baseId, m_dciId);
460 msg->setField(baseId + 1, (INT16)m_origin);
461 msg->setField(baseId + 2, (INT16)m_type);
462 msg->setField(baseId + 3, m_snmpNode);
463 msg->setFieldFromTime(baseId + 4, m_timestamp);
464 msg->setField(baseId + 5, m_value.item);
465}
466
61c0e619 467/**
296ae03d 468 * Server data sync status object
61c0e619 469 */
296ae03d 470struct ServerSyncStatus
61c0e619 471{
296ae03d 472 INT32 queueSize;
61c0e619 473
296ae03d
VK
474 ServerSyncStatus()
475 {
476 queueSize = 0;
477 }
61c0e619 478};
479
480/**
296ae03d 481 * Server sync status information
61c0e619 482 */
296ae03d
VK
483static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
484static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
87fff547 485
3ec58bb8
VK
486/**
487 * Database writer queue
488 */
489static Queue s_databaseWriterQueue;
490
491/**
492 * Database writer
493 */
494static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
495{
496 DB_HANDLE hdb = GetLocalDatabaseHandle();
497 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread started"));
498
499 while(true)
500 {
501 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
502 if (e == INVALID_POINTER_VALUE)
503 break;
504
505 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
506 if (hStmt == NULL)
507 {
508 delete e;
509 continue;
510 }
511
512 int count = 0;
513
514 DBBegin(hdb);
515 while((e != NULL) && (e != INVALID_POINTER_VALUE))
516 {
517 e->saveToDatabase(hStmt);
518 delete e;
519
520 count++;
521 if (count > 200)
522 break;
523
524 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
525 }
526 DBCommit(hdb);
527 DBFreeStatement(hStmt);
528 DebugPrintf(INVALID_INDEX, 7, _T("Database writer: %d records inserted"), count);
529 }
530
531 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread stopped"));
532 return THREAD_OK;
533}
534
13783551
VK
535/**
536 * List of all data collection items
537 */
538static ObjectArray<DataCollectionItem> s_items(64, 64, true);
539static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
540
53c3d1e7 541/**
a1273b42 542 * Session comparator
53c3d1e7 543 */
a1273b42 544static bool SessionComparator(AbstractCommSession *session, void *data)
53c3d1e7 545{
a1273b42
VK
546 if ((session->getServerId() == 0) || !session->canAcceptTraps())
547 return false;
53c3d1e7 548 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
9e698daa
VK
549 if ((s != NULL) && (s->queueSize > 0))
550 {
a1273b42 551 return true;
9e698daa 552 }
a1273b42 553 return false;
53c3d1e7
VK
554}
555
61c0e619 556/**
883c14ae 557 * Data reconciliation thread
61c0e619 558 */
883c14ae 559static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
61c0e619 560{
296ae03d 561 DB_HANDLE hdb = GetLocalDatabaseHandle();
b7b1c821 562 UINT32 sleepTime = 30000;
883c14ae 563 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread started"));
a3d3f9d5 564
5e12ef62 565 bool vacuumNeeded = false;
296ae03d 566 while(!AgentSleepAndCheckForShutdown(sleepTime))
61c0e619 567 {
53c3d1e7
VK
568 // Check if there is something to sync
569 MutexLock(s_serverSyncStatusLock);
a1273b42 570 CommSession *session = (CommSession *)FindServerSession(SessionComparator, NULL);
53c3d1e7 571 MutexUnlock(s_serverSyncStatusLock);
a1273b42 572 if (session == NULL)
53c3d1e7 573 {
13783551
VK
574 // Save last poll times when reconciliation thread is idle
575 MutexLock(s_itemLock);
576 if (s_pollTimeChanged)
577 {
578 DBBegin(hdb);
579 TCHAR query[256];
580 for(int i = 0; i < s_items.size(); i++)
581 {
582 DataCollectionItem *dci = s_items.get(i);
583 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
584 dci->getLastPollTime(), dci->getServerId(), dci->getId());
585 DBQuery(hdb, query);
586 }
587 DBCommit(hdb);
588 s_pollTimeChanged = false;
589 }
590 MutexUnlock(s_itemLock);
591
5e12ef62
VK
592 if (vacuumNeeded)
593 {
883c14ae 594 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: vacuum local database"));
5e12ef62
VK
595 DBQuery(hdb, _T("VACUUM"));
596 vacuumNeeded = false;
597 }
53c3d1e7
VK
598 sleepTime = 30000;
599 continue;
600 }
601
9e698daa 602 TCHAR query[1024];
f7319eac 603 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
9e698daa 604
13783551
VK
605 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
606 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
296ae03d 607 if (hResult == NULL)
53c3d1e7 608 {
13783551 609 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: database query failed: %s"), sqlError);
53c3d1e7 610 sleepTime = 30000;
a1273b42 611 session->decRefCount();
296ae03d 612 continue;
53c3d1e7 613 }
296ae03d
VK
614
615 int count = DBGetNumRows(hResult);
b7b1c821 616 if (count > 0)
61c0e619 617 {
a1273b42 618 ObjectArray<DataElement> bulkSendList(count, 10, true);
b7b1c821
VK
619 ObjectArray<DataElement> deleteList(count, 10, true);
620 for(int i = 0; i < count; i++)
63ff3c9d 621 {
b7b1c821 622 DataElement *e = new DataElement(hResult, i);
a1273b42 623 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
b7b1c821 624 {
a1273b42 625 bulkSendList.add(e);
b7b1c821
VK
626 }
627 else
628 {
a1273b42
VK
629 MutexLock(s_serverSyncStatusLock);
630 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
631 if (status != NULL)
632 {
21ec6e4f
VK
633 if (e->sendToServer(true))
634 {
635 status->queueSize--;
636 deleteList.add(e);
637 }
638 else
639 {
640 delete e;
641 }
a1273b42
VK
642 }
643 else
644 {
645 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
21ec6e4f 646 deleteList.add(e); // record should be deleted
a1273b42
VK
647 }
648 MutexUnlock(s_serverSyncStatusLock);
b7b1c821 649 }
a1273b42 650 }
b7b1c821 651
a1273b42
VK
652 if (bulkSendList.size() > 0)
653 {
654 DebugPrintf(INVALID_INDEX, 6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
655
656 NXCPMessage msg;
657 msg.setCode(CMD_DCI_DATA);
658 msg.setId(session->generateRequestId());
659 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
660 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
661
662 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
663 for(int i = 0; i < bulkSendList.size(); i++)
b7b1c821 664 {
a1273b42
VK
665 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
666 fieldId += 10;
667 }
668
669 NXCPMessage *response = session->doRequestEx(&msg, 2000);
670 if (response != NULL)
671 {
672 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
673 if (rcc == ERR_SUCCESS)
674 {
675 MutexLock(s_serverSyncStatusLock);
676 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
677
678 // Check status for each data element
f7319eac
VK
679 BYTE status[BULK_DATA_BLOCK_SIZE];
680 memset(status, 0, BULK_DATA_BLOCK_SIZE);
681 response->getFieldAsBinary(VID_STATUS, status, BULK_DATA_BLOCK_SIZE);
a1273b42
VK
682 bulkSendList.setOwner(false);
683 for(int i = 0; i < bulkSendList.size(); i++)
684 {
685 DataElement *e = bulkSendList.get(i);
686 if (status[i] != BULK_DATA_REC_RETRY)
687 {
688 deleteList.add(e);
689 serverSyncStatus->queueSize--;
690 }
691 else
692 {
693 delete e;
694 }
695 }
696
697 MutexUnlock(s_serverSyncStatusLock);
698 }
699 else
700 {
701 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
702 }
703 delete response;
b7b1c821
VK
704 }
705 else
706 {
a1273b42 707 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: timeout on bulk send"));
b7b1c821 708 }
63ff3c9d
VK
709 }
710
b7b1c821 711 if (deleteList.size() > 0)
63ff3c9d 712 {
b7b1c821
VK
713 DBBegin(hdb);
714 for(int i = 0; i < deleteList.size(); i++)
63ff3c9d 715 {
b7b1c821
VK
716 DataElement *e = deleteList.get(i);
717 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
718 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
719 DBQuery(hdb, query);
63ff3c9d 720 }
b7b1c821 721 DBCommit(hdb);
883c14ae 722 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: %d records sent"), deleteList.size());
5e12ef62 723 vacuumNeeded = true;
63ff3c9d 724 }
61c0e619 725 }
296ae03d
VK
726 DBFreeResult(hResult);
727
a1273b42 728 session->decRefCount();
f7319eac 729 sleepTime = (count > 0) ? 50 : 30000;
61c0e619 730 }
a3d3f9d5 731
883c14ae 732 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread stopped"));
296ae03d 733 return THREAD_OK;
61c0e619 734}
735
296ae03d
VK
736/**
737 * Data sender queue
738 */
739static Queue s_dataSenderQueue;
740
87fff547
VK
741/**
742 * Data sender
743 */
744static THREAD_RESULT THREAD_CALL DataSender(void *arg)
745{
746 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
747 while(true)
748 {
19dbc8ef 749 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
87fff547
VK
750 if (e == INVALID_POINTER_VALUE)
751 break;
752
296ae03d
VK
753 MutexLock(s_serverSyncStatusLock);
754 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
755 if (status == NULL)
61c0e619 756 {
296ae03d
VK
757 status = new ServerSyncStatus();
758 s_serverSyncStatus.set(e->getServerId(), status);
759 }
760
761 if (status->queueSize == 0)
762 {
63ff3c9d 763 if (!e->sendToServer(false))
61c0e619 764 {
a3d3f9d5 765 status->queueSize++;
3ec58bb8
VK
766 s_databaseWriterQueue.put(e);
767 e = NULL;
61c0e619 768 }
769 }
770 else
771 {
6fbaa926 772 status->queueSize++;
3ec58bb8
VK
773 s_databaseWriterQueue.put(e);
774 e = NULL;
61c0e619 775 }
296ae03d 776 MutexUnlock(s_serverSyncStatusLock);
61c0e619 777
87fff547
VK
778 delete e;
779 }
780 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
781 return THREAD_OK;
782}
783
784/**
785 * Pseudo-session for cached data collection
786 */
787class VirtualSession : public AbstractCommSession
788{
789private:
790 UINT64 m_serverId;
791
792public:
793 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
794
795 virtual bool isMasterServer() { return false; }
796 virtual bool isControlServer() { return false; }
797 virtual bool canAcceptTraps() { return true; }
e13420c1 798 virtual bool canAcceptFileUpdates() { return false; }
87fff547
VK
799 virtual UINT64 getServerId() { return m_serverId; };
800 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
801
802 virtual bool isIPv6Aware() { return true; }
803
804 virtual void sendMessage(NXCPMessage *pMsg) { }
805 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
806 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
63ff3c9d 807 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
a1273b42 808 virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
6fbaa926 809 virtual UINT32 generateRequestId() { return 0; }
87fff547
VK
810 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
811};
812
813/**
814 * Collect data from agent
815 */
296ae03d 816static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
87fff547
VK
817{
818 VirtualSession session(dci->getServerId());
819
820 DataElement *e = NULL;
821 if (dci->getType() == DCO_TYPE_ITEM)
822 {
823 TCHAR value[MAX_RESULT_LENGTH];
824 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
825 e = new DataElement(dci, value);
826 }
827 else if (dci->getType() == DCO_TYPE_LIST)
828 {
829 StringList *value = new StringList;
830 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
831 e = new DataElement(dci, value);
832 }
833 else if (dci->getType() == DCO_TYPE_TABLE)
834 {
835 Table *value = new Table;
836 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
837 e = new DataElement(dci, value);
838 }
839
840 return e;
841}
842
843/**
844 * Collect data from SNMP
845 */
296ae03d 846static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
87fff547 847{
296ae03d
VK
848 DataElement *e = NULL;
849 if (dci->getType() == DCO_TYPE_ITEM)
850 {
c1bc623e
VK
851 DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
852
296ae03d
VK
853 TCHAR value[MAX_RESULT_LENGTH];
854 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
855 e = new DataElement(dci, value);
856 }
857 return e;
87fff547
VK
858}
859
374afd7b 860/**
c1bc623e 861 * Local data collection callback
374afd7b 862 */
c1bc623e 863static void LocalDataCollectionCallback(void *arg)
374afd7b
VK
864{
865 DataCollectionItem *dci = (DataCollectionItem *)arg;
866
c1bc623e
VK
867 DataElement *e = CollectDataFromAgent(dci);
868 if (e != NULL)
374afd7b 869 {
c1bc623e 870 s_dataSenderQueue.put(e);
374afd7b
VK
871 }
872 else
873 {
c1bc623e 874 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
374afd7b
VK
875 }
876
c1bc623e
VK
877 dci->setLastPollTime(time(NULL));
878 dci->finishDataCollection();
879}
880
881/**
882 * SNMP data collection callback
883 */
884static void SnmpDataCollectionCallback(void *arg)
885{
886 DataCollectionItem *dci = (DataCollectionItem *)arg;
887
888 DataElement *e = CollectDataFromSNMP(dci);
374afd7b
VK
889 if (e != NULL)
890 {
891 s_dataSenderQueue.put(e);
892 }
893 else
894 {
895 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
896 }
897
898 dci->setLastPollTime(time(NULL));
899 dci->finishDataCollection();
900}
901
87fff547 902/**
374afd7b 903 * Data collectors thread pool
87fff547 904 */
374afd7b
VK
905static ThreadPool *s_dataCollectorPool = NULL;
906
907/**
908 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
909 */
910static UINT32 DataCollectionSchedulerRun()
87fff547 911{
87fff547 912 UINT32 sleepTime = 60;
d9c628e3 913
87fff547
VK
914 MutexLock(s_itemLock);
915 for(int i = 0; i < s_items.size(); i++)
916 {
917 DataCollectionItem *dci = s_items.get(i);
231b0aad 918 time_t now = time(NULL);
87fff547
VK
919 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
920 if (timeToPoll == 0)
921 {
922 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
c1bc623e
VK
923
924 if (dci->getOrigin() == DS_NATIVE_AGENT)
925 {
926 dci->startDataCollection();
927 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
928 }
929 else if (dci->getOrigin() == DS_SNMP_AGENT)
930 {
931 dci->startDataCollection();
932 TCHAR key[64];
933 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
934 }
935 else
936 {
937 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
938 }
939
63ff3c9d 940 timeToPoll = dci->getPollingInterval();
87fff547 941 }
63ff3c9d
VK
942
943 if (sleepTime > timeToPoll)
944 sleepTime = timeToPoll;
87fff547
VK
945 }
946 MutexUnlock(s_itemLock);
947 return sleepTime;
948}
949
950/**
374afd7b 951 * Data collection scheduler thread
87fff547 952 */
374afd7b 953static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
87fff547 954{
374afd7b 955 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread started"));
9e698daa 956 s_dataCollectorPool = ThreadPoolCreate(1, 64, _T("DATACOLL"));
87fff547 957
374afd7b 958 UINT32 sleepTime = DataCollectionSchedulerRun();
9aa171c1 959 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547 960 {
374afd7b 961 sleepTime = DataCollectionSchedulerRun();
87fff547
VK
962 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
963 }
964
374afd7b
VK
965 ThreadPoolDestroy(s_dataCollectorPool);
966 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread stopped"));
87fff547
VK
967 return THREAD_OK;
968}
969
970/**
971 * Configure data collection
972 */
973void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
974{
dec46d8a
VK
975 if (!s_dataCollectorStarted)
976 {
977 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
978 return;
979 }
980
4c84b8a5
VK
981 DB_HANDLE hdb = GetLocalDatabaseHandle();
982
296ae03d 983 int count = msg->getFieldAsInt32(VID_NUM_NODES);
4c84b8a5 984 if (count > 0)
296ae03d 985 {
4c84b8a5
VK
986 DBBegin(hdb);
987 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
988 for(int i = 0; i < count; i++)
989 {
990 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
991 UpdateSnmpTarget(target);
992 fieldId += 50;
993 }
994 DBCommit(hdb);
296ae03d
VK
995 }
996 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
997
87fff547
VK
998 ObjectArray<DataCollectionItem> config(32, 32, true);
999
296ae03d 1000 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
4c84b8a5 1001 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
87fff547
VK
1002 for(int i = 0; i < count; i++)
1003 {
1004 config.add(new DataCollectionItem(serverId, msg, fieldId));
1005 fieldId += 10;
1006 }
1007 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
1008
8edd7506
VK
1009 bool txnOpen = false;
1010
d9c628e3 1011 MutexLock(s_itemLock);
a3d3f9d5 1012
296ae03d 1013 // Update and add new
d9c628e3 1014 for(int j = 0; j < config.size(); j++)
1015 {
1016 DataCollectionItem *item = config.get(j);
1017 bool exist = false;
1018 for(int i = 0; i < s_items.size(); i++)
1019 {
d610afbd 1020 if (item->equals(s_items.get(i)))
d9c628e3 1021 {
1022 s_items.get(i)->updateAndSave(item);
1023 exist = true;
1024 }
1025 }
296ae03d 1026 if (!exist)
d9c628e3 1027 {
1028 DataCollectionItem *newItem = new DataCollectionItem(item);
1029 s_items.add(newItem);
8edd7506
VK
1030 if (!txnOpen)
1031 {
1032 DBBegin(hdb);
1033 txnOpen = true;
1034 }
d9c628e3 1035 newItem->saveToDatabase(true);
1036 }
1037 }
a3d3f9d5 1038
296ae03d 1039 // Remove not existing configuration and data for it
d9c628e3 1040 for(int i = 0; i < s_items.size(); i++)
1041 {
1042 DataCollectionItem *item = s_items.get(i);
61c0e619 1043 //If item is from other server, then, do not search it in list of this server
1044 if(item->getServerId() != serverId)
1045 continue;
d9c628e3 1046 bool exist = false;
1047 for(int j = 0; j < config.size(); j++)
1048 {
374afd7b 1049 if (item->equals(config.get(j)))
d9c628e3 1050 {
1051 exist = true;
1052 }
1053 }
6fbaa926 1054 if (!exist)
d9c628e3 1055 {
8edd7506
VK
1056 if (!txnOpen)
1057 {
1058 DBBegin(hdb);
1059 txnOpen = true;
1060 }
d9c628e3 1061 item->deleteFromDatabase();
374afd7b
VK
1062 s_items.unlink(i);
1063 item->decRefCount();
d9c628e3 1064 i--;
d9c628e3 1065 }
1066 }
8edd7506
VK
1067
1068 if (txnOpen)
1069 DBCommit(hdb);
1070
d9c628e3 1071 MutexUnlock(s_itemLock);
8edd7506
VK
1072
1073 DebugPrintf(INVALID_INDEX, 4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
87fff547
VK
1074}
1075
d9c628e3 1076/**
63ff3c9d 1077 * Load saved state of local data collection
d9c628e3 1078 */
63ff3c9d 1079static void LoadState()
d9c628e3 1080{
63ff3c9d
VK
1081 DB_HANDLE hdb = GetLocalDatabaseHandle();
1082 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
296ae03d 1083 if (hResult != NULL)
d9c628e3 1084 {
63ff3c9d
VK
1085 int count = DBGetNumRows(hResult);
1086 for(int i = 0; i < count; i++)
d9c628e3 1087 {
1088 s_items.add(new DataCollectionItem(hResult, i));
1089 }
1090 DBFreeResult(hResult);
1091 }
63ff3c9d
VK
1092
1093 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1094 if (hResult != NULL)
1095 {
1096 int count = DBGetNumRows(hResult);
1097 for(int i = 0; i < count; i++)
1098 {
1099 SNMPTarget *t = new SNMPTarget(hResult, i);
1100 UpdateSnmpTarget(t);
1101 }
1102 DBFreeResult(hResult);
1103 }
1104
1105 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1106 if (hResult != NULL)
1107 {
1108 int count = DBGetNumRows(hResult);
1109 for(int i = 0; i < count; i++)
1110 {
1111 ServerSyncStatus *s = new ServerSyncStatus;
1112 s->queueSize = DBGetFieldLong(hResult, i, 1);
1113 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1114 s_serverSyncStatus.set(serverId, s);
1115 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1116 }
1117 DBFreeResult(hResult);
1118 }
d9c628e3 1119}
1120
1121/**
1122 * SQL script array
1123 */
a97f02e1 1124static const TCHAR *s_upgradeQueries[] =
d9c628e3 1125{
1126 _T("CREATE TABLE dc_queue (")
a97f02e1
VK
1127 _T(" server_id number(20) not null,")
1128 _T(" dci_id integer not null,")
296ae03d 1129 _T(" dci_type integer not null,")
8199376b 1130 _T(" dci_origin integer not null,")
366440e8 1131 _T(" snmp_target_guid varchar(36) not null,")
a97f02e1
VK
1132 _T(" timestamp integer not null,")
1133 _T(" value varchar not null,")
1134 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
1135
d9c628e3 1136 _T("CREATE TABLE dc_config (")
a97f02e1
VK
1137 _T(" server_id number(20) not null,")
1138 _T(" dci_id integer not null,")
1139 _T(" type integer not null,")
1140 _T(" origin integer not null,")
1141 _T(" name varchar(1023) null,")
1142 _T(" polling_interval integer not null,")
1143 _T(" last_poll integer not null,")
1144 _T(" snmp_port integer not null,")
1145 _T(" snmp_target_guid varchar(36) not null,")
296ae03d 1146 _T(" snmp_raw_type integer not null,")
a97f02e1
VK
1147 _T(" PRIMARY KEY(server_id,dci_id))"),
1148
1149 _T("CREATE TABLE dc_snmp_targets (")
1150 _T(" guid varchar(36) not null,")
296ae03d 1151 _T(" server_id number(20) not null,")
a97f02e1 1152 _T(" ip_address varchar(48) not null,")
296ae03d 1153 _T(" snmp_version integer not null,")
a97f02e1
VK
1154 _T(" port integer not null,")
1155 _T(" auth_type integer not null,")
1156 _T(" enc_type integer not null,")
296ae03d
VK
1157 _T(" auth_name varchar(63),")
1158 _T(" auth_pass varchar(63),")
1159 _T(" enc_pass varchar(63),")
a97f02e1 1160 _T(" PRIMARY KEY(guid))")
d9c628e3 1161};
1162
296ae03d
VK
1163/**
1164 * Data collector and sender thread handles
1165 */
374afd7b 1166static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
296ae03d 1167static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
3ec58bb8 1168static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
883c14ae 1169static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
296ae03d 1170
87fff547
VK
1171/**
1172 * Initialize and start local data collector
1173 */
1174void StartLocalDataCollector()
1175{
d9c628e3 1176 DB_HANDLE db = GetLocalDatabaseHandle();
a97f02e1 1177 if (db == NULL)
d9c628e3 1178 {
a97f02e1 1179 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
d9c628e3 1180 return;
1181 }
296ae03d 1182
a97f02e1
VK
1183 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
1184 while(dbVersion < DATACOLL_SCHEMA_VERSION)
d9c628e3 1185 {
a97f02e1 1186 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
ee733deb 1187 {
a97f02e1 1188 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
ee733deb 1189 return;
1190 }
a97f02e1
VK
1191 dbVersion++;
1192 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
d9c628e3 1193 }
1194
87fff547 1195 s_itemLock = MutexCreate();
296ae03d 1196 s_serverSyncStatusLock = MutexCreate();
63ff3c9d
VK
1197
1198 LoadState();
1199
374afd7b 1200 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
87fff547 1201 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
3ec58bb8 1202 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
883c14ae 1203 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
dec46d8a
VK
1204
1205 s_dataCollectorStarted = true;
87fff547
VK
1206}
1207
1208/**
1209 * Shutdown local data collector
1210 */
1211void ShutdownLocalDataCollector()
1212{
dec46d8a
VK
1213 if (!s_dataCollectorStarted)
1214 {
1215 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
1216 return;
1217 }
1218
87fff547 1219 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
374afd7b 1220 ThreadJoin(s_dataCollectionSchedulerThread);
87fff547
VK
1221
1222 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
19dbc8ef 1223 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
87fff547
VK
1224 ThreadJoin(s_dataSenderThread);
1225
3ec58bb8
VK
1226 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for database writer thread termination"));
1227 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1228 ThreadJoin(s_databaseWriterThread);
1229
883c14ae
VK
1230 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconciliation thread termination"));
1231 ThreadJoin(s_reconciliationThread);
61c0e619 1232
87fff547 1233 MutexDestroy(s_itemLock);
296ae03d 1234 MutexDestroy(s_serverSyncStatusLock);
87fff547 1235}
4b535d78 1236
81222795
VK
1237/**
1238 * Clear data collection configuration
1239 */
1240void ClearDataCollectionConfiguration()
4b535d78 1241{
1242 MutexLock(s_itemLock);
1243 DB_HANDLE db = GetLocalDatabaseHandle();
1244 DBQuery(db, _T("DELETE FROM dc_queue"));
1245 DBQuery(db, _T("DELETE FROM dc_config"));
1246 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1247 s_items.clear();
1248 MutexUnlock(s_itemLock);
1249
1250 MutexLock(s_serverSyncStatusLock);
4b535d78 1251 s_serverSyncStatus.clear();
1252 MutexUnlock(s_serverSyncStatusLock);
1253}
df26f039
VK
1254
1255/**
1256 * Handler for data collector queue size
1257 */
1258LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1259{
1260 if (!s_dataCollectorStarted)
1261 return SYSINFO_RC_UNSUPPORTED;
1262
1263 UINT32 count = 0;
1264 MutexLock(s_serverSyncStatusLock);
1265 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1266 while(it->hasNext())
1267 count += (UINT32)it->next()->queueSize;
1268 delete it;
1269 MutexUnlock(s_serverSyncStatusLock);
1270
1271 ret_uint(value, count);
1272 return SYSINFO_RC_SUCCESS;
1273}