added uuid class; code rerfactoring
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003-2015 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
25/**
296ae03d
VK
26 * Externals
27 */
28void UpdateSnmpTarget(SNMPTarget *target);
de4af576 29bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
296ae03d
VK
30
31/**
d9c628e3 32 * Database schema version
33 */
a97f02e1 34#define DATACOLL_SCHEMA_VERSION 3
d9c628e3 35
36/**
dec46d8a
VK
37 * Data collector start indicator
38 */
39static bool s_dataCollectorStarted = false;
40
41/**
87fff547
VK
42 * Data collection item
43 */
44class DataCollectionItem : public RefCountObject
45{
46private:
47 UINT64 m_serverId;
48 UINT32 m_id;
49 INT32 m_pollingInterval;
50 TCHAR *m_name;
51 BYTE m_type;
52 BYTE m_origin;
53 UINT16 m_snmpPort;
296ae03d 54 BYTE m_snmpRawValueType;
de4af576 55 uuid m_snmpTargetGuid;
87fff547
VK
56 time_t m_lastPollTime;
57
58public:
59 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 60 DataCollectionItem(DB_RESULT hResult, int row);
61c0e619 61 DataCollectionItem(const DataCollectionItem *item);
87fff547
VK
62 virtual ~DataCollectionItem();
63
296ae03d
VK
64 UINT32 getId() const { return m_id; }
65 UINT64 getServerId() const { return m_serverId; }
66 const TCHAR *getName() const { return m_name; }
67 int getType() const { return (int)m_type; }
68 int getOrigin() const { return (int)m_origin; }
de4af576 69 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
296ae03d
VK
70 UINT16 getSnmpPort() const { return m_snmpPort; }
71 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
63ff3c9d 72 UINT32 getPollingInterval() { return (UINT32)m_pollingInterval; }
296ae03d
VK
73
74 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
75
61c0e619 76 void updateAndSave(const DataCollectionItem *item);
d9c628e3 77 void saveToDatabase(bool newObject);
78 void deleteFromDatabase();
93c62d54 79 void setLastPollTime(time_t time);
87fff547 80
296ae03d 81 UINT32 getTimeToNextPoll(time_t now) const
87fff547
VK
82 {
83 time_t diff = now - m_lastPollTime;
84 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
85 }
86};
87
88/**
89 * Create data collection item from NXCP mesage
90 */
91DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
92{
93 m_serverId = serverId;
94 m_id = msg->getFieldAsInt32(baseId);
95 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
96 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
97 m_name = msg->getFieldAsString(baseId + 3);
98 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
99 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
de4af576 100 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
87fff547 101 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
296ae03d 102 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
87fff547
VK
103}
104
105/**
296ae03d 106 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
d9c628e3 107 */
108DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
109{
110 m_serverId = DBGetFieldInt64(hResult, row, 0);
111 m_id = DBGetFieldULong(hResult, row, 1);
112 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
113 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
d610afbd 114 m_name = DBGetField(hResult, row, 4, NULL, 0);
d9c628e3 115 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
116 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
f470ae5d 117 m_snmpPort = DBGetFieldULong(hResult, row, 7);
de4af576 118 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
296ae03d 119 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
d9c628e3 120}
121
122/**
123 * Copy constructor
124 */
61c0e619 125 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
d9c628e3 126 {
127 m_serverId = item->m_serverId;
128 m_id = item->m_id;
129 m_type = item->m_type;
130 m_origin = item->m_origin;
131 m_name = _tcsdup(item->m_name);
132 m_pollingInterval = item->m_pollingInterval;
133 m_lastPollTime = item->m_lastPollTime;
de4af576 134 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 135 m_snmpPort = item->m_snmpPort;
296ae03d 136 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 137 }
138
139/**
87fff547
VK
140 * Data collection item destructor
141 */
142DataCollectionItem::~DataCollectionItem()
143{
144 safe_free(m_name);
145}
146
d9c628e3 147/**
148 * Will check if object has changed. If at least one field is changed - all data will be updated and
149 * saved to database.
150 */
61c0e619 151void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
d9c628e3 152{
153 //if at leas one of fields changed - set all fields and save to DB
296ae03d 154 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
de4af576 155 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
d610afbd 156 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
d9c628e3 157 {
158 m_type = item->m_type;
159 m_origin = item->m_origin;
160 m_name = _tcsdup(item->m_name);
161 m_pollingInterval = item->m_pollingInterval;
d610afbd
VK
162 if (m_lastPollTime < item->m_lastPollTime)
163 m_lastPollTime = item->m_lastPollTime;
de4af576 164 m_snmpTargetGuid = item->m_snmpTargetGuid;
d9c628e3 165 m_snmpPort = item->m_snmpPort;
296ae03d 166 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 167 saveToDatabase(false);
168 }
d9c628e3 169}
170
296ae03d
VK
171/**
172 * Save configuration object to database
173 */
d9c628e3 174void DataCollectionItem::saveToDatabase(bool newObject)
175{
e1c30b6f 176 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
d9c628e3 177 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
178 DB_HANDLE db = GetLocalDatabaseHandle();
179 DB_STATEMENT hStmt;
180
296ae03d 181 if (newObject)
d9c628e3 182 {
183 hStmt = DBPrepare(db,
184 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
296ae03d
VK
185 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
186 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
d9c628e3 187 }
188 else
189 {
190 hStmt = DBPrepare(db,
61c0e619 191 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
192 _T("polling_interval=?,last_poll=?,snmp_port=?,")
296ae03d 193 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
d9c628e3 194 }
195
196 if (hStmt == NULL)
d9c628e3 197 return;
d9c628e3 198
d9c628e3 199 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
200 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
201 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
202 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
203 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 204 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
de4af576 205 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
296ae03d
VK
206 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
207 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
208 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
d9c628e3 209
296ae03d
VK
210 DBExecute(hStmt);
211 DBFreeStatement(hStmt);
d9c628e3 212}
213
214/**
215 * Remove item form database and delete not synced data if exist
216 */
217void DataCollectionItem::deleteFromDatabase()
218{
d9c628e3 219 DB_HANDLE db = GetLocalDatabaseHandle();
220 TCHAR query[256];
366440e8
VK
221 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
222 if (DBQuery(db, query))
d9c628e3 223 {
366440e8
VK
224 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
225 if (DBQuery(db, query))
226 {
227 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
228 }
d9c628e3 229 }
230}
231
a97f02e1
VK
232/**
233 * Set last poll time for item
234 */
93c62d54 235void DataCollectionItem::setLastPollTime(time_t time)
236{
a97f02e1 237 m_lastPollTime = time;
93c62d54 238 TCHAR query[256];
a3d3f9d5 239 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
a97f02e1
VK
240 (UINT64)m_lastPollTime, m_serverId, m_id);
241 DBQuery(GetLocalDatabaseHandle(), query);
93c62d54 242}
243
87fff547
VK
244/**
245 * Collected data
246 */
247class DataElement
248{
249private:
250 UINT64 m_serverId;
251 UINT32 m_dciId;
252 time_t m_timestamp;
02d936bd 253 int m_origin;
87fff547 254 int m_type;
de4af576 255 uuid m_snmpNode;
87fff547
VK
256 union
257 {
258 TCHAR *item;
259 StringList *list;
260 Table *table;
261 } m_value;
262
263public:
264 DataElement(DataCollectionItem *dci, const TCHAR *value)
265 {
266 m_serverId = dci->getServerId();
267 m_dciId = dci->getId();
268 m_timestamp = time(NULL);
02d936bd 269 m_origin = dci->getOrigin();
87fff547 270 m_type = DCO_TYPE_ITEM;
de4af576 271 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
272 m_value.item = _tcsdup(value);
273 }
274
275 DataElement(DataCollectionItem *dci, StringList *value)
276 {
277 m_serverId = dci->getServerId();
278 m_dciId = dci->getId();
279 m_timestamp = time(NULL);
02d936bd 280 m_origin = dci->getOrigin();
87fff547 281 m_type = DCO_TYPE_LIST;
de4af576 282 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
283 m_value.list = value;
284 }
285
286 DataElement(DataCollectionItem *dci, Table *value)
287 {
288 m_serverId = dci->getServerId();
289 m_dciId = dci->getId();
290 m_timestamp = time(NULL);
02d936bd 291 m_origin = dci->getOrigin();
87fff547 292 m_type = DCO_TYPE_TABLE;
de4af576 293 m_snmpNode = dci->getSnmpTargetGuid();
87fff547
VK
294 m_value.table = value;
295 }
296
63ff3c9d
VK
297 /**
298 * Create data element from database record
299 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
300 */
301 DataElement(DB_RESULT hResult, int row)
302 {
303 m_serverId = DBGetFieldUInt64(hResult, row, 0);
304 m_dciId = DBGetFieldULong(hResult, row, 1);
305 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
306 m_origin = DBGetFieldLong(hResult, row, 3);
307 m_type = DBGetFieldLong(hResult, row, 2);
de4af576 308 m_snmpNode = DBGetFieldGUID(hResult, row, 4);
63ff3c9d
VK
309 switch(m_type)
310 {
311 case DCO_TYPE_ITEM:
312 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
313 break;
314 case DCO_TYPE_LIST:
315 {
316 m_value.list = new StringList();
317 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
318 if (text != NULL)
319 {
320 m_value.list->splitAndAdd(text, _T("\n"));
321 free(text);
322 }
323 }
324 break;
325 case DCO_TYPE_TABLE:
326 {
327 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
328 if (xml != NULL)
329 {
330 m_value.table = Table::createFromXML(xml);
331 free(xml);
332 }
b7b1c821
VK
333 else
334 {
335 m_value.table = NULL;
336 }
63ff3c9d
VK
337 }
338 break;
339 default:
340 m_type = DCO_TYPE_ITEM;
341 m_value.item = _tcsdup(_T(""));
342 break;
343 }
344 }
345
87fff547
VK
346 ~DataElement()
347 {
348 switch(m_type)
349 {
350 case DCO_TYPE_ITEM:
351 free(m_value.item);
352 break;
353 case DCO_TYPE_LIST:
354 delete m_value.list;
355 break;
356 case DCO_TYPE_TABLE:
357 delete m_value.table;
358 break;
359 }
360 }
93c62d54 361
362 time_t getTimestamp() { return m_timestamp; }
61c0e619 363 UINT64 getServerId() { return m_serverId; }
63ff3c9d 364 UINT32 getDciId() { return m_dciId; }
a3d3f9d5 365
61c0e619 366 void saveToDatabase();
63ff3c9d 367 bool sendToServer(bool reconcillation);
87fff547
VK
368};
369
370/**
296ae03d 371 * Save data element to database
61c0e619 372 */
373void DataElement::saveToDatabase()
374{
a3d3f9d5 375 DB_HANDLE db = GetLocalDatabaseHandle();
63ff3c9d 376 DB_STATEMENT hStmt= DBPrepare(db, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
8199376b
VK
377 if (hStmt == NULL)
378 return;
379
380 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
381 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
382 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
383 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
de4af576 384 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
63ff3c9d 385 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
8199376b 386 switch(m_type)
a3d3f9d5 387 {
8199376b 388 case DCO_TYPE_ITEM:
63ff3c9d 389 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
8199376b
VK
390 break;
391 case DCO_TYPE_LIST:
63ff3c9d 392 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
8199376b
VK
393 break;
394 case DCO_TYPE_TABLE:
63ff3c9d 395 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
8199376b 396 break;
a3d3f9d5 397 }
8199376b
VK
398 DBExecute(hStmt);
399 DBFreeStatement(hStmt);
61c0e619 400}
401
296ae03d
VK
402/**
403 * Send collected data to server
404 */
63ff3c9d 405bool DataElement::sendToServer(bool reconcillation)
61c0e619 406{
b7b1c821
VK
407 // If data in database was invalid table may not be parsed correctly
408 // Consider sending a success in that case so data element can be dropped
409 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
410 return true;
411
6fbaa926
VK
412 CommSession *session = (CommSession *)FindServerSession(m_serverId);
413 if (session == NULL)
414 return false;
415
416 NXCPMessage msg;
417 msg.setCode(CMD_DCI_DATA);
418 msg.setId(session->generateRequestId());
419 msg.setField(VID_DCI_ID, m_dciId);
02d936bd
VK
420 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
421 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
de4af576 422 msg.setField(VID_NODE_ID, m_snmpNode);
6fbaa926 423 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
63ff3c9d 424 msg.setField(VID_RECONCILLATION, (INT16)(reconcillation ? 1 : 0));
6fbaa926
VK
425 switch(m_type)
426 {
427 case DCO_TYPE_ITEM:
428 msg.setField(VID_VALUE, m_value.item);
429 break;
430 case DCO_TYPE_LIST:
6fbaa926
VK
431 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
432 break;
433 case DCO_TYPE_TABLE:
66816abf 434 m_value.table->setSource(m_origin);
6fbaa926
VK
435 m_value.table->fillMessage(msg, 0, -1);
436 break;
437 }
63ff3c9d 438 UINT32 rcc = session->doRequest(&msg, 2000);
6fbaa926 439 session->decRefCount();
63ff3c9d
VK
440
441 // consider internal error as success because it means that server
442 // cannot accept data for some reason and retry is not feasible
443 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
61c0e619 444}
445
446/**
296ae03d 447 * Server data sync status object
61c0e619 448 */
296ae03d 449struct ServerSyncStatus
61c0e619 450{
296ae03d 451 INT32 queueSize;
61c0e619 452
296ae03d
VK
453 ServerSyncStatus()
454 {
455 queueSize = 0;
456 }
61c0e619 457};
458
459/**
296ae03d 460 * Server sync status information
61c0e619 461 */
296ae03d
VK
462static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
463static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
87fff547
VK
464
465/**
53c3d1e7
VK
466 * Callback to check if reconcillation is needed for session
467 */
468static EnumerationCallbackResult ReconcillationQueryCallback(AbstractCommSession *session, void *arg)
469{
470 if (session->getServerId() == 0)
471 return _CONTINUE;
472 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
473 return ((s != NULL) && (s->queueSize > 0)) ? _STOP : _CONTINUE;
474}
475
476/**
296ae03d 477 * Data reconcillation thread
61c0e619 478 */
296ae03d 479static THREAD_RESULT THREAD_CALL ReconcillationThread(void *arg)
61c0e619 480{
296ae03d 481 DB_HANDLE hdb = GetLocalDatabaseHandle();
b7b1c821 482 UINT32 sleepTime = 30000;
296ae03d 483 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread started"));
a3d3f9d5 484
5e12ef62 485 bool vacuumNeeded = false;
296ae03d 486 while(!AgentSleepAndCheckForShutdown(sleepTime))
61c0e619 487 {
53c3d1e7
VK
488 // Check if there is something to sync
489 MutexLock(s_serverSyncStatusLock);
490 bool run = EnumerateSessions(ReconcillationQueryCallback, NULL);
491 MutexUnlock(s_serverSyncStatusLock);
492 if (!run)
493 {
5e12ef62
VK
494 if (vacuumNeeded)
495 {
496 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: vacuum local database"));
497 DBQuery(hdb, _T("VACUUM"));
498 vacuumNeeded = false;
499 }
53c3d1e7
VK
500 sleepTime = 30000;
501 continue;
502 }
503
63ff3c9d 504 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue ORDER BY timestamp LIMIT 100"));
296ae03d 505 if (hResult == NULL)
53c3d1e7
VK
506 {
507 sleepTime = 30000;
296ae03d 508 continue;
53c3d1e7 509 }
296ae03d
VK
510
511 int count = DBGetNumRows(hResult);
b7b1c821 512 if (count > 0)
61c0e619 513 {
b7b1c821
VK
514 ObjectArray<DataElement> deleteList(count, 10, true);
515 for(int i = 0; i < count; i++)
63ff3c9d 516 {
b7b1c821
VK
517 DataElement *e = new DataElement(hResult, i);
518 bool sent = false;
519 MutexLock(s_serverSyncStatusLock);
520 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
521 if (status != NULL)
522 {
523 sent = e->sendToServer(true);
524 }
525 else
526 {
527 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
528 }
529
530 if (sent)
531 {
532 status->queueSize--;
533 deleteList.add(e);
534 }
535 else
536 {
537 delete e;
538 }
539 MutexUnlock(s_serverSyncStatusLock);
63ff3c9d
VK
540 }
541
b7b1c821 542 if (deleteList.size() > 0)
63ff3c9d
VK
543 {
544 TCHAR query[256];
b7b1c821
VK
545
546 DBBegin(hdb);
547 for(int i = 0; i < deleteList.size(); i++)
63ff3c9d 548 {
b7b1c821
VK
549 DataElement *e = deleteList.get(i);
550 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
551 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
552 DBQuery(hdb, query);
63ff3c9d 553 }
b7b1c821 554 DBCommit(hdb);
5e12ef62
VK
555 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: %d records sent"), deleteList.size());
556 vacuumNeeded = true;
63ff3c9d 557 }
61c0e619 558 }
296ae03d
VK
559 DBFreeResult(hResult);
560
b7b1c821 561 sleepTime = (count == 100) ? 100 : 30000;
61c0e619 562 }
a3d3f9d5 563
296ae03d
VK
564 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread stopped"));
565 return THREAD_OK;
61c0e619 566}
567
568/**
296ae03d
VK
569 * Data sender queue
570 */
571static Queue s_dataSenderQueue;
572
573/**
87fff547
VK
574 * Data sender
575 */
576static THREAD_RESULT THREAD_CALL DataSender(void *arg)
577{
578 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
579 while(true)
580 {
19dbc8ef 581 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
87fff547
VK
582 if (e == INVALID_POINTER_VALUE)
583 break;
584
296ae03d
VK
585 MutexLock(s_serverSyncStatusLock);
586 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
587 if (status == NULL)
61c0e619 588 {
296ae03d
VK
589 status = new ServerSyncStatus();
590 s_serverSyncStatus.set(e->getServerId(), status);
591 }
592
593 if (status->queueSize == 0)
594 {
63ff3c9d 595 if (!e->sendToServer(false))
61c0e619 596 {
597 e->saveToDatabase();
a3d3f9d5 598 status->queueSize++;
61c0e619 599 }
600 }
601 else
602 {
603 e->saveToDatabase();
6fbaa926 604 status->queueSize++;
61c0e619 605 }
296ae03d 606 MutexUnlock(s_serverSyncStatusLock);
61c0e619 607
87fff547
VK
608 delete e;
609 }
610 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
611 return THREAD_OK;
612}
613
614/**
615 * Pseudo-session for cached data collection
616 */
617class VirtualSession : public AbstractCommSession
618{
619private:
620 UINT64 m_serverId;
621
622public:
623 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
624
625 virtual bool isMasterServer() { return false; }
626 virtual bool isControlServer() { return false; }
627 virtual bool canAcceptTraps() { return true; }
e13420c1 628 virtual bool canAcceptFileUpdates() { return false; }
87fff547
VK
629 virtual UINT64 getServerId() { return m_serverId; };
630 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
631
632 virtual bool isIPv6Aware() { return true; }
633
634 virtual void sendMessage(NXCPMessage *pMsg) { }
635 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
636 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
63ff3c9d 637 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
6fbaa926 638 virtual UINT32 generateRequestId() { return 0; }
87fff547
VK
639 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
640};
641
642/**
643 * Collect data from agent
644 */
296ae03d 645static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
87fff547
VK
646{
647 VirtualSession session(dci->getServerId());
648
649 DataElement *e = NULL;
650 if (dci->getType() == DCO_TYPE_ITEM)
651 {
652 TCHAR value[MAX_RESULT_LENGTH];
653 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
654 e = new DataElement(dci, value);
655 }
656 else if (dci->getType() == DCO_TYPE_LIST)
657 {
658 StringList *value = new StringList;
659 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
660 e = new DataElement(dci, value);
661 }
662 else if (dci->getType() == DCO_TYPE_TABLE)
663 {
664 Table *value = new Table;
665 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
666 e = new DataElement(dci, value);
667 }
668
669 return e;
670}
671
672/**
673 * Collect data from SNMP
674 */
296ae03d 675static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
87fff547 676{
296ae03d
VK
677 DataElement *e = NULL;
678 if (dci->getType() == DCO_TYPE_ITEM)
679 {
680 TCHAR value[MAX_RESULT_LENGTH];
681 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
682 e = new DataElement(dci, value);
683 }
684 return e;
87fff547
VK
685}
686
687/**
688 * List of all data collection items
689 */
61c0e619 690static ObjectArray<DataCollectionItem> s_items(64, 64, true);
87fff547
VK
691static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
692
693/**
694 * Single data collection run - collect data if needed and calculate sleep time
695 */
696static UINT32 DataCollectionRun()
697{
87fff547 698 UINT32 sleepTime = 60;
d9c628e3 699
87fff547
VK
700 MutexLock(s_itemLock);
701 for(int i = 0; i < s_items.size(); i++)
702 {
703 DataCollectionItem *dci = s_items.get(i);
231b0aad 704 time_t now = time(NULL);
87fff547
VK
705 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
706 if (timeToPoll == 0)
707 {
708 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
709 DataElement *e;
710 if (dci->getOrigin() == DS_NATIVE_AGENT)
711 {
712 e = CollectDataFromAgent(dci);
713 }
714 else if (dci->getOrigin() == DS_SNMP_AGENT)
715 {
716 e = CollectDataFromSNMP(dci);
717 }
718 else
719 {
720 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
721 e = NULL;
722 }
723
724 if (e != NULL)
725 {
19dbc8ef 726 s_dataSenderQueue.put(e);
87fff547
VK
727 }
728 else
729 {
730 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
731 }
63ff3c9d 732
231b0aad 733 dci->setLastPollTime(now);
63ff3c9d 734 timeToPoll = dci->getPollingInterval();
87fff547 735 }
63ff3c9d
VK
736
737 if (sleepTime > timeToPoll)
738 sleepTime = timeToPoll;
87fff547
VK
739 }
740 MutexUnlock(s_itemLock);
741 return sleepTime;
742}
743
744/**
745 * Data collector thread
746 */
747static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
748{
749 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
750
751 UINT32 sleepTime = DataCollectionRun();
9aa171c1 752 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547
VK
753 {
754 sleepTime = DataCollectionRun();
755 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
756 }
757
758 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
759 return THREAD_OK;
760}
761
762/**
763 * Configure data collection
764 */
765void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
766{
dec46d8a
VK
767 if (!s_dataCollectorStarted)
768 {
769 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
770 return;
771 }
772
296ae03d
VK
773 int count = msg->getFieldAsInt32(VID_NUM_NODES);
774 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
775 for(int i = 0; i < count; i++)
776 {
777 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
778 UpdateSnmpTarget(target);
779 fieldId += 50;
780 }
781 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
782
87fff547
VK
783 ObjectArray<DataCollectionItem> config(32, 32, true);
784
296ae03d
VK
785 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
786 fieldId = VID_ELEMENT_LIST_BASE;
87fff547
VK
787 for(int i = 0; i < count; i++)
788 {
789 config.add(new DataCollectionItem(serverId, msg, fieldId));
790 fieldId += 10;
791 }
792 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
793
d9c628e3 794 MutexLock(s_itemLock);
a3d3f9d5 795
296ae03d 796 // Update and add new
d9c628e3 797 for(int j = 0; j < config.size(); j++)
798 {
799 DataCollectionItem *item = config.get(j);
800 bool exist = false;
801 for(int i = 0; i < s_items.size(); i++)
802 {
d610afbd 803 if (item->equals(s_items.get(i)))
d9c628e3 804 {
805 s_items.get(i)->updateAndSave(item);
806 exist = true;
807 }
808 }
296ae03d 809 if (!exist)
d9c628e3 810 {
811 DataCollectionItem *newItem = new DataCollectionItem(item);
812 s_items.add(newItem);
813 newItem->saveToDatabase(true);
814 }
815 }
a3d3f9d5 816
296ae03d 817 // Remove not existing configuration and data for it
d9c628e3 818 for(int i = 0; i < s_items.size(); i++)
819 {
820 DataCollectionItem *item = s_items.get(i);
61c0e619 821 //If item is from other server, then, do not search it in list of this server
822 if(item->getServerId() != serverId)
823 continue;
d9c628e3 824 bool exist = false;
825 for(int j = 0; j < config.size(); j++)
826 {
827 if(item->equals(config.get(j)))
828 {
829 exist = true;
830 }
831 }
6fbaa926 832 if (!exist)
d9c628e3 833 {
834 item->deleteFromDatabase();
835 s_items.remove(i);
836 i--;
d9c628e3 837 }
838 }
839 MutexUnlock(s_itemLock);
87fff547
VK
840}
841
842/**
63ff3c9d 843 * Load saved state of local data collection
d9c628e3 844 */
63ff3c9d 845static void LoadState()
d9c628e3 846{
63ff3c9d
VK
847 DB_HANDLE hdb = GetLocalDatabaseHandle();
848 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
296ae03d 849 if (hResult != NULL)
d9c628e3 850 {
63ff3c9d
VK
851 int count = DBGetNumRows(hResult);
852 for(int i = 0; i < count; i++)
d9c628e3 853 {
854 s_items.add(new DataCollectionItem(hResult, i));
855 }
856 DBFreeResult(hResult);
857 }
63ff3c9d
VK
858
859 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
860 if (hResult != NULL)
861 {
862 int count = DBGetNumRows(hResult);
863 for(int i = 0; i < count; i++)
864 {
865 SNMPTarget *t = new SNMPTarget(hResult, i);
866 UpdateSnmpTarget(t);
867 }
868 DBFreeResult(hResult);
869 }
870
871 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
872 if (hResult != NULL)
873 {
874 int count = DBGetNumRows(hResult);
875 for(int i = 0; i < count; i++)
876 {
877 ServerSyncStatus *s = new ServerSyncStatus;
878 s->queueSize = DBGetFieldLong(hResult, i, 1);
879 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
880 s_serverSyncStatus.set(serverId, s);
881 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
882 }
883 DBFreeResult(hResult);
884 }
d9c628e3 885}
886
887/**
888 * SQL script array
889 */
a97f02e1 890static const TCHAR *s_upgradeQueries[] =
d9c628e3 891{
892 _T("CREATE TABLE dc_queue (")
a97f02e1
VK
893 _T(" server_id number(20) not null,")
894 _T(" dci_id integer not null,")
296ae03d 895 _T(" dci_type integer not null,")
8199376b 896 _T(" dci_origin integer not null,")
366440e8 897 _T(" snmp_target_guid varchar(36) not null,")
a97f02e1
VK
898 _T(" timestamp integer not null,")
899 _T(" value varchar not null,")
900 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
901
d9c628e3 902 _T("CREATE TABLE dc_config (")
a97f02e1
VK
903 _T(" server_id number(20) not null,")
904 _T(" dci_id integer not null,")
905 _T(" type integer not null,")
906 _T(" origin integer not null,")
907 _T(" name varchar(1023) null,")
908 _T(" polling_interval integer not null,")
909 _T(" last_poll integer not null,")
910 _T(" snmp_port integer not null,")
911 _T(" snmp_target_guid varchar(36) not null,")
296ae03d 912 _T(" snmp_raw_type integer not null,")
a97f02e1
VK
913 _T(" PRIMARY KEY(server_id,dci_id))"),
914
915 _T("CREATE TABLE dc_snmp_targets (")
916 _T(" guid varchar(36) not null,")
296ae03d 917 _T(" server_id number(20) not null,")
a97f02e1 918 _T(" ip_address varchar(48) not null,")
296ae03d 919 _T(" snmp_version integer not null,")
a97f02e1
VK
920 _T(" port integer not null,")
921 _T(" auth_type integer not null,")
922 _T(" enc_type integer not null,")
296ae03d
VK
923 _T(" auth_name varchar(63),")
924 _T(" auth_pass varchar(63),")
925 _T(" enc_pass varchar(63),")
a97f02e1 926 _T(" PRIMARY KEY(guid))")
d9c628e3 927};
928
929/**
296ae03d
VK
930 * Data collector and sender thread handles
931 */
932static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
933static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
934static THREAD s_reconcillationThread = INVALID_THREAD_HANDLE;
935
936/**
87fff547
VK
937 * Initialize and start local data collector
938 */
939void StartLocalDataCollector()
940{
d9c628e3 941 DB_HANDLE db = GetLocalDatabaseHandle();
a97f02e1 942 if (db == NULL)
d9c628e3 943 {
a97f02e1 944 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
d9c628e3 945 return;
946 }
296ae03d 947
a97f02e1
VK
948 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
949 while(dbVersion < DATACOLL_SCHEMA_VERSION)
d9c628e3 950 {
a97f02e1 951 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
ee733deb 952 {
a97f02e1 953 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
ee733deb 954 return;
955 }
a97f02e1
VK
956 dbVersion++;
957 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
d9c628e3 958 }
959
87fff547 960 s_itemLock = MutexCreate();
296ae03d 961 s_serverSyncStatusLock = MutexCreate();
63ff3c9d
VK
962
963 LoadState();
964
87fff547
VK
965 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
966 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
296ae03d 967 s_reconcillationThread = ThreadCreateEx(ReconcillationThread, 0, NULL);
dec46d8a
VK
968
969 s_dataCollectorStarted = true;
87fff547
VK
970}
971
972/**
973 * Shutdown local data collector
974 */
975void ShutdownLocalDataCollector()
976{
dec46d8a
VK
977 if (!s_dataCollectorStarted)
978 {
979 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
980 return;
981 }
982
87fff547
VK
983 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
984 ThreadJoin(s_dataCollectorThread);
985
986 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
19dbc8ef 987 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
87fff547
VK
988 ThreadJoin(s_dataSenderThread);
989
296ae03d
VK
990 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
991 ThreadJoin(s_reconcillationThread);
61c0e619 992
87fff547 993 MutexDestroy(s_itemLock);
296ae03d 994 MutexDestroy(s_serverSyncStatusLock);
87fff547 995}