fix in local data collection config update
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003-2015 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
296ae03d
VK
25/**
26 * Externals
27 */
28void UpdateSnmpTarget(SNMPTarget *target);
29bool GetSnmpValue(const uuid_t& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
d9c628e3 31/**
32 * Database schema version
33 */
a97f02e1 34#define DATACOLL_SCHEMA_VERSION 3
d9c628e3 35
87fff547
VK
36/**
37 * Data collection item
38 */
39class DataCollectionItem : public RefCountObject
40{
41private:
42 UINT64 m_serverId;
43 UINT32 m_id;
44 INT32 m_pollingInterval;
45 TCHAR *m_name;
46 BYTE m_type;
47 BYTE m_origin;
48 UINT16 m_snmpPort;
296ae03d 49 BYTE m_snmpRawValueType;
d9c628e3 50 uuid_t m_snmpTargetGuid;
87fff547
VK
51 time_t m_lastPollTime;
52
53public:
54 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 55 DataCollectionItem(DB_RESULT hResult, int row);
61c0e619 56 DataCollectionItem(const DataCollectionItem *item);
87fff547
VK
57 virtual ~DataCollectionItem();
58
296ae03d
VK
59 UINT32 getId() const { return m_id; }
60 UINT64 getServerId() const { return m_serverId; }
61 const TCHAR *getName() const { return m_name; }
62 int getType() const { return (int)m_type; }
63 int getOrigin() const { return (int)m_origin; }
64 const uuid_t& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
65 UINT16 getSnmpPort() const { return m_snmpPort; }
66 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
67
68 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
69
61c0e619 70 void updateAndSave(const DataCollectionItem *item);
d9c628e3 71 void saveToDatabase(bool newObject);
72 void deleteFromDatabase();
93c62d54 73 void setLastPollTime(time_t time);
87fff547 74
296ae03d 75 UINT32 getTimeToNextPoll(time_t now) const
87fff547
VK
76 {
77 time_t diff = now - m_lastPollTime;
78 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
79 }
80};
81
82/**
83 * Create data collection item from NXCP mesage
84 */
85DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
86{
87 m_serverId = serverId;
88 m_id = msg->getFieldAsInt32(baseId);
89 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
90 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
91 m_name = msg->getFieldAsString(baseId + 3);
92 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
93 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
d610afbd 94 memset(m_snmpTargetGuid, 0, UUID_LENGTH);
d9c628e3 95 msg->getFieldAsBinary(baseId + 6, m_snmpTargetGuid, UUID_LENGTH);
87fff547 96 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
296ae03d 97 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
87fff547
VK
98}
99
d9c628e3 100/**
296ae03d 101 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
d9c628e3 102 */
103DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
104{
105 m_serverId = DBGetFieldInt64(hResult, row, 0);
106 m_id = DBGetFieldULong(hResult, row, 1);
107 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
108 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
d610afbd 109 m_name = DBGetField(hResult, row, 4, NULL, 0);
d9c628e3 110 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
111 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
112 DBGetFieldGUID(hResult, row, 7, m_snmpTargetGuid);
113 m_snmpPort = DBGetFieldULong(hResult, row, 8);
296ae03d 114 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
d9c628e3 115}
116
117/**
118 * Copy constructor
119 */
61c0e619 120 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
d9c628e3 121 {
122 m_serverId = item->m_serverId;
123 m_id = item->m_id;
124 m_type = item->m_type;
125 m_origin = item->m_origin;
126 m_name = _tcsdup(item->m_name);
127 m_pollingInterval = item->m_pollingInterval;
128 m_lastPollTime = item->m_lastPollTime;
129 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
130 m_snmpPort = item->m_snmpPort;
296ae03d 131 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 132 }
133
87fff547
VK
134/**
135 * Data collection item destructor
136 */
137DataCollectionItem::~DataCollectionItem()
138{
139 safe_free(m_name);
140}
141
d9c628e3 142/**
143 * Will check if object has changed. If at least one field is changed - all data will be updated and
144 * saved to database.
145 */
61c0e619 146void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
d9c628e3 147{
148 //if at leas one of fields changed - set all fields and save to DB
296ae03d
VK
149 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
150 (m_pollingInterval != item->m_pollingInterval) || uuid_compare(m_snmpTargetGuid, item->m_snmpTargetGuid) ||
d610afbd 151 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
d9c628e3 152 {
153 m_type = item->m_type;
154 m_origin = item->m_origin;
155 m_name = _tcsdup(item->m_name);
156 m_pollingInterval = item->m_pollingInterval;
d610afbd
VK
157 if (m_lastPollTime < item->m_lastPollTime)
158 m_lastPollTime = item->m_lastPollTime;
d9c628e3 159 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
160 m_snmpPort = item->m_snmpPort;
296ae03d 161 m_snmpRawValueType = item->m_snmpRawValueType;
d9c628e3 162 saveToDatabase(false);
163 }
d9c628e3 164}
165
296ae03d
VK
166/**
167 * Save configuration object to database
168 */
d9c628e3 169void DataCollectionItem::saveToDatabase(bool newObject)
170{
171 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=%ld,dciId=%d) saved to database"),
172 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
173 DB_HANDLE db = GetLocalDatabaseHandle();
174 DB_STATEMENT hStmt;
175
296ae03d 176 if (newObject)
d9c628e3 177 {
178 hStmt = DBPrepare(db,
179 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
296ae03d
VK
180 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
181 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
d9c628e3 182 }
183 else
184 {
185 hStmt = DBPrepare(db,
61c0e619 186 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
187 _T("polling_interval=?,last_poll=?,snmp_port=?,")
296ae03d 188 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
d9c628e3 189 }
190
191 if (hStmt == NULL)
d9c628e3 192 return;
d9c628e3 193
194 TCHAR buffer[64];
195 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
196 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
197 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
198 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
199 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 200 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
201 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpTargetGuid, buffer), DB_BIND_STATIC);
296ae03d
VK
202 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
203 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
204 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
d9c628e3 205
296ae03d
VK
206 DBExecute(hStmt);
207 DBFreeStatement(hStmt);
d9c628e3 208}
209
210/**
211 * Remove item form database and delete not synced data if exist
212 */
213void DataCollectionItem::deleteFromDatabase()
214{
d9c628e3 215 DB_HANDLE db = GetLocalDatabaseHandle();
216 TCHAR query[256];
366440e8
VK
217 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
218 if (DBQuery(db, query))
d9c628e3 219 {
366440e8
VK
220 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
221 if (DBQuery(db, query))
222 {
223 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
224 }
d9c628e3 225 }
226}
227
a97f02e1
VK
228/**
229 * Set last poll time for item
230 */
93c62d54 231void DataCollectionItem::setLastPollTime(time_t time)
232{
a97f02e1 233 m_lastPollTime = time;
93c62d54 234 TCHAR query[256];
a3d3f9d5 235 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
a97f02e1
VK
236 (UINT64)m_lastPollTime, m_serverId, m_id);
237 DBQuery(GetLocalDatabaseHandle(), query);
93c62d54 238}
239
87fff547
VK
240/**
241 * Collected data
242 */
243class DataElement
244{
245private:
246 UINT64 m_serverId;
247 UINT32 m_dciId;
248 time_t m_timestamp;
02d936bd 249 int m_origin;
87fff547 250 int m_type;
366440e8 251 uuid_t m_snmpNode;
87fff547
VK
252 union
253 {
254 TCHAR *item;
255 StringList *list;
256 Table *table;
257 } m_value;
258
259public:
260 DataElement(DataCollectionItem *dci, const TCHAR *value)
261 {
262 m_serverId = dci->getServerId();
263 m_dciId = dci->getId();
264 m_timestamp = time(NULL);
02d936bd 265 m_origin = dci->getOrigin();
87fff547 266 m_type = DCO_TYPE_ITEM;
366440e8 267 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
87fff547
VK
268 m_value.item = _tcsdup(value);
269 }
270
271 DataElement(DataCollectionItem *dci, StringList *value)
272 {
273 m_serverId = dci->getServerId();
274 m_dciId = dci->getId();
275 m_timestamp = time(NULL);
02d936bd 276 m_origin = dci->getOrigin();
87fff547 277 m_type = DCO_TYPE_LIST;
366440e8 278 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
87fff547
VK
279 m_value.list = value;
280 }
281
282 DataElement(DataCollectionItem *dci, Table *value)
283 {
284 m_serverId = dci->getServerId();
285 m_dciId = dci->getId();
286 m_timestamp = time(NULL);
02d936bd 287 m_origin = dci->getOrigin();
87fff547 288 m_type = DCO_TYPE_TABLE;
366440e8 289 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
87fff547
VK
290 m_value.table = value;
291 }
292
293 ~DataElement()
294 {
295 switch(m_type)
296 {
297 case DCO_TYPE_ITEM:
298 free(m_value.item);
299 break;
300 case DCO_TYPE_LIST:
301 delete m_value.list;
302 break;
303 case DCO_TYPE_TABLE:
304 delete m_value.table;
305 break;
306 }
307 }
93c62d54 308
309 time_t getTimestamp() { return m_timestamp; }
61c0e619 310 UINT64 getServerId() { return m_serverId; }
a3d3f9d5 311
61c0e619 312 void saveToDatabase();
313 bool sendToServer();
87fff547
VK
314};
315
61c0e619 316/**
296ae03d 317 * Save data element to database
61c0e619 318 */
319void DataElement::saveToDatabase()
320{
a3d3f9d5 321 DB_HANDLE db = GetLocalDatabaseHandle();
8199376b
VK
322 DB_STATEMENT hStmt= DBPrepare(db, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,timestamp,value) VALUES (?,?,?,?,?,?)"));
323 if (hStmt == NULL)
324 return;
325
326 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
327 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
328 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
329 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
330 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
331 switch(m_type)
a3d3f9d5 332 {
8199376b
VK
333 case DCO_TYPE_ITEM:
334 DBBind(hStmt, 6, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
335 break;
336 case DCO_TYPE_LIST:
337 DBBind(hStmt, 6, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
338 break;
339 case DCO_TYPE_TABLE:
340 DBBind(hStmt, 6, DB_SQLTYPE_TEXT, m_value.table->getTableAsXML(), DB_BIND_DYNAMIC);
341 break;
a3d3f9d5 342 }
8199376b
VK
343 DBExecute(hStmt);
344 DBFreeStatement(hStmt);
61c0e619 345}
346
296ae03d
VK
347/**
348 * Send collected data to server
349 */
61c0e619 350bool DataElement::sendToServer()
351{
6fbaa926
VK
352 CommSession *session = (CommSession *)FindServerSession(m_serverId);
353 if (session == NULL)
354 return false;
355
356 NXCPMessage msg;
357 msg.setCode(CMD_DCI_DATA);
358 msg.setId(session->generateRequestId());
359 msg.setField(VID_DCI_ID, m_dciId);
02d936bd
VK
360 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
361 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
366440e8 362 msg.setField(VID_NODE_ID, m_snmpNode, UUID_LENGTH);
6fbaa926
VK
363 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
364 switch(m_type)
365 {
366 case DCO_TYPE_ITEM:
367 msg.setField(VID_VALUE, m_value.item);
368 break;
369 case DCO_TYPE_LIST:
6fbaa926
VK
370 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
371 break;
372 case DCO_TYPE_TABLE:
373 m_value.table->fillMessage(msg, 0, -1);
374 break;
375 }
376 bool success = session->doRequest(&msg, 2000);
377 session->decRefCount();
378 return success;
61c0e619 379}
380
381/**
296ae03d 382 * Server data sync status object
61c0e619 383 */
296ae03d 384struct ServerSyncStatus
61c0e619 385{
296ae03d 386 INT32 queueSize;
61c0e619 387
296ae03d
VK
388 ServerSyncStatus()
389 {
390 queueSize = 0;
391 }
61c0e619 392};
393
394/**
296ae03d 395 * Server sync status information
61c0e619 396 */
296ae03d
VK
397static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
398static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
87fff547 399
61c0e619 400/**
296ae03d 401 * Data reconcillation thread
61c0e619 402 */
296ae03d 403static THREAD_RESULT THREAD_CALL ReconcillationThread(void *arg)
61c0e619 404{
296ae03d
VK
405 DB_HANDLE hdb = GetLocalDatabaseHandle();
406 UINT32 sleepTime = 60000;
407 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread started"));
a3d3f9d5 408
296ae03d 409 while(!AgentSleepAndCheckForShutdown(sleepTime))
61c0e619 410 {
8199376b 411 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue ORDER BY timestamp DESC LIMIT 100"));
296ae03d
VK
412 if (hResult == NULL)
413 continue;
414
415 int count = DBGetNumRows(hResult);
416 for(int i = 0; i < count; i++)
61c0e619 417 {
61c0e619 418 }
296ae03d
VK
419 DBFreeResult(hResult);
420
421 sleepTime = (count == 100) ? 1000 : 60000;
61c0e619 422 }
a3d3f9d5 423
296ae03d
VK
424 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread stopped"));
425 return THREAD_OK;
61c0e619 426}
427
296ae03d
VK
428/**
429 * Data sender queue
430 */
431static Queue s_dataSenderQueue;
432
87fff547
VK
433/**
434 * Data sender
435 */
436static THREAD_RESULT THREAD_CALL DataSender(void *arg)
437{
438 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
439 while(true)
440 {
441 DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
442 if (e == INVALID_POINTER_VALUE)
443 break;
444
296ae03d
VK
445 MutexLock(s_serverSyncStatusLock);
446 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
447 if (status == NULL)
61c0e619 448 {
296ae03d
VK
449 status = new ServerSyncStatus();
450 s_serverSyncStatus.set(e->getServerId(), status);
451 }
452
453 if (status->queueSize == 0)
454 {
455 if (!e->sendToServer())
61c0e619 456 {
457 e->saveToDatabase();
a3d3f9d5 458 status->queueSize++;
61c0e619 459 }
460 }
461 else
462 {
463 e->saveToDatabase();
6fbaa926 464 status->queueSize++;
61c0e619 465 }
296ae03d 466 MutexUnlock(s_serverSyncStatusLock);
61c0e619 467
87fff547
VK
468 delete e;
469 }
470 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
471 return THREAD_OK;
472}
473
474/**
475 * Pseudo-session for cached data collection
476 */
477class VirtualSession : public AbstractCommSession
478{
479private:
480 UINT64 m_serverId;
481
482public:
483 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
484
485 virtual bool isMasterServer() { return false; }
486 virtual bool isControlServer() { return false; }
487 virtual bool canAcceptTraps() { return true; }
488 virtual UINT64 getServerId() { return m_serverId; };
489 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
490
491 virtual bool isIPv6Aware() { return true; }
492
493 virtual void sendMessage(NXCPMessage *pMsg) { }
494 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
495 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
6fbaa926
VK
496 virtual bool doRequest(NXCPMessage *msg, UINT32 timeout) { return false; }
497 virtual UINT32 generateRequestId() { return 0; }
87fff547
VK
498 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
499};
500
501/**
502 * Collect data from agent
503 */
296ae03d 504static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
87fff547
VK
505{
506 VirtualSession session(dci->getServerId());
507
508 DataElement *e = NULL;
509 if (dci->getType() == DCO_TYPE_ITEM)
510 {
511 TCHAR value[MAX_RESULT_LENGTH];
512 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
513 e = new DataElement(dci, value);
514 }
515 else if (dci->getType() == DCO_TYPE_LIST)
516 {
517 StringList *value = new StringList;
518 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
519 e = new DataElement(dci, value);
520 }
521 else if (dci->getType() == DCO_TYPE_TABLE)
522 {
523 Table *value = new Table;
524 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
525 e = new DataElement(dci, value);
526 }
527
528 return e;
529}
530
531/**
532 * Collect data from SNMP
533 */
296ae03d 534static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
87fff547 535{
296ae03d
VK
536 DataElement *e = NULL;
537 if (dci->getType() == DCO_TYPE_ITEM)
538 {
539 TCHAR value[MAX_RESULT_LENGTH];
540 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
541 e = new DataElement(dci, value);
542 }
543 return e;
87fff547
VK
544}
545
546/**
547 * List of all data collection items
548 */
61c0e619 549static ObjectArray<DataCollectionItem> s_items(64, 64, true);
87fff547
VK
550static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
551
552/**
553 * Single data collection run - collect data if needed and calculate sleep time
554 */
555static UINT32 DataCollectionRun()
556{
557 time_t now = time(NULL);
558 UINT32 sleepTime = 60;
d9c628e3 559
87fff547
VK
560 MutexLock(s_itemLock);
561 for(int i = 0; i < s_items.size(); i++)
562 {
563 DataCollectionItem *dci = s_items.get(i);
564 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
565 if (timeToPoll == 0)
566 {
567 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
568 DataElement *e;
569 if (dci->getOrigin() == DS_NATIVE_AGENT)
570 {
571 e = CollectDataFromAgent(dci);
572 }
573 else if (dci->getOrigin() == DS_SNMP_AGENT)
574 {
575 e = CollectDataFromSNMP(dci);
576 }
577 else
578 {
579 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
580 e = NULL;
581 }
582
583 if (e != NULL)
584 {
93c62d54 585 dci->setLastPollTime(e->getTimestamp());
87fff547
VK
586 s_dataSenderQueue.Put(e);
587 }
588 else
589 {
590 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
591 }
592 }
593 else
594 {
595 if (sleepTime > timeToPoll)
596 sleepTime = timeToPoll;
597 }
598 }
599 MutexUnlock(s_itemLock);
600 return sleepTime;
601}
602
603/**
604 * Data collector thread
605 */
606static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
607{
608 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
609
610 UINT32 sleepTime = DataCollectionRun();
9aa171c1 611 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547
VK
612 {
613 sleepTime = DataCollectionRun();
614 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
615 }
616
617 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
618 return THREAD_OK;
619}
620
621/**
622 * Configure data collection
623 */
624void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
625{
296ae03d
VK
626 int count = msg->getFieldAsInt32(VID_NUM_NODES);
627 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
628 for(int i = 0; i < count; i++)
629 {
630 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
631 UpdateSnmpTarget(target);
632 fieldId += 50;
633 }
634 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
635
87fff547
VK
636 ObjectArray<DataCollectionItem> config(32, 32, true);
637
296ae03d
VK
638 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
639 fieldId = VID_ELEMENT_LIST_BASE;
87fff547
VK
640 for(int i = 0; i < count; i++)
641 {
642 config.add(new DataCollectionItem(serverId, msg, fieldId));
643 fieldId += 10;
644 }
645 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
646
d9c628e3 647 MutexLock(s_itemLock);
a3d3f9d5 648
296ae03d 649 // Update and add new
d9c628e3 650 for(int j = 0; j < config.size(); j++)
651 {
652 DataCollectionItem *item = config.get(j);
653 bool exist = false;
654 for(int i = 0; i < s_items.size(); i++)
655 {
d610afbd 656 if (item->equals(s_items.get(i)))
d9c628e3 657 {
658 s_items.get(i)->updateAndSave(item);
659 exist = true;
660 }
661 }
296ae03d 662 if (!exist)
d9c628e3 663 {
664 DataCollectionItem *newItem = new DataCollectionItem(item);
665 s_items.add(newItem);
666 newItem->saveToDatabase(true);
667 }
668 }
a3d3f9d5 669
296ae03d 670 // Remove not existing configuration and data for it
d9c628e3 671 for(int i = 0; i < s_items.size(); i++)
672 {
673 DataCollectionItem *item = s_items.get(i);
61c0e619 674 //If item is from other server, then, do not search it in list of this server
675 if(item->getServerId() != serverId)
676 continue;
d9c628e3 677 bool exist = false;
678 for(int j = 0; j < config.size(); j++)
679 {
680 if(item->equals(config.get(j)))
681 {
682 exist = true;
683 }
684 }
6fbaa926 685 if (!exist)
d9c628e3 686 {
687 item->deleteFromDatabase();
688 s_items.remove(i);
689 i--;
690 delete item;
691 }
692 }
693 MutexUnlock(s_itemLock);
87fff547
VK
694}
695
d9c628e3 696/**
697 * Loads configuration to for DCI
698 */
699static void LoadConfiguration()
700{
701 DB_HANDLE db = GetLocalDatabaseHandle();
296ae03d
VK
702 DB_RESULT hResult = DBSelect(db, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
703 if (hResult != NULL)
d9c628e3 704 {
705 for(int i = 0; i < DBGetNumRows(hResult); i++)
706 {
707 s_items.add(new DataCollectionItem(hResult, i));
708 }
709 DBFreeResult(hResult);
710 }
711}
712
713/**
714 * SQL script array
715 */
a97f02e1 716static const TCHAR *s_upgradeQueries[] =
d9c628e3 717{
718 _T("CREATE TABLE dc_queue (")
a97f02e1
VK
719 _T(" server_id number(20) not null,")
720 _T(" dci_id integer not null,")
296ae03d 721 _T(" dci_type integer not null,")
8199376b 722 _T(" dci_origin integer not null,")
366440e8 723 _T(" snmp_target_guid varchar(36) not null,")
a97f02e1
VK
724 _T(" timestamp integer not null,")
725 _T(" value varchar not null,")
726 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
727
d9c628e3 728 _T("CREATE TABLE dc_config (")
a97f02e1
VK
729 _T(" server_id number(20) not null,")
730 _T(" dci_id integer not null,")
731 _T(" type integer not null,")
732 _T(" origin integer not null,")
733 _T(" name varchar(1023) null,")
734 _T(" polling_interval integer not null,")
735 _T(" last_poll integer not null,")
736 _T(" snmp_port integer not null,")
737 _T(" snmp_target_guid varchar(36) not null,")
296ae03d 738 _T(" snmp_raw_type integer not null,")
a97f02e1
VK
739 _T(" PRIMARY KEY(server_id,dci_id))"),
740
741 _T("CREATE TABLE dc_snmp_targets (")
742 _T(" guid varchar(36) not null,")
296ae03d 743 _T(" server_id number(20) not null,")
a97f02e1 744 _T(" ip_address varchar(48) not null,")
296ae03d 745 _T(" snmp_version integer not null,")
a97f02e1
VK
746 _T(" port integer not null,")
747 _T(" auth_type integer not null,")
748 _T(" enc_type integer not null,")
296ae03d
VK
749 _T(" auth_name varchar(63),")
750 _T(" auth_pass varchar(63),")
751 _T(" enc_pass varchar(63),")
a97f02e1 752 _T(" PRIMARY KEY(guid))")
d9c628e3 753};
754
296ae03d
VK
755/**
756 * Data collector and sender thread handles
757 */
758static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
759static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
760static THREAD s_reconcillationThread = INVALID_THREAD_HANDLE;
761
87fff547
VK
762/**
763 * Initialize and start local data collector
764 */
765void StartLocalDataCollector()
766{
d9c628e3 767 DB_HANDLE db = GetLocalDatabaseHandle();
a97f02e1 768 if (db == NULL)
d9c628e3 769 {
a97f02e1 770 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
d9c628e3 771 return;
772 }
296ae03d 773
a97f02e1
VK
774 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
775 while(dbVersion < DATACOLL_SCHEMA_VERSION)
d9c628e3 776 {
a97f02e1 777 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
ee733deb 778 {
a97f02e1 779 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
ee733deb 780 return;
781 }
a97f02e1
VK
782 dbVersion++;
783 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
d9c628e3 784 }
785
786 LoadConfiguration();
787 /* TODO: add reading form database snmp_targets table */
87fff547
VK
788
789 s_itemLock = MutexCreate();
296ae03d 790 s_serverSyncStatusLock = MutexCreate();
87fff547
VK
791 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
792 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
296ae03d 793 s_reconcillationThread = ThreadCreateEx(ReconcillationThread, 0, NULL);
87fff547
VK
794}
795
796/**
797 * Shutdown local data collector
798 */
799void ShutdownLocalDataCollector()
800{
801 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
802 ThreadJoin(s_dataCollectorThread);
803
804 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
805 s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
806 ThreadJoin(s_dataSenderThread);
807
296ae03d
VK
808 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
809 ThreadJoin(s_reconcillationThread);
61c0e619 810
87fff547 811 MutexDestroy(s_itemLock);
296ae03d 812 MutexDestroy(s_serverSyncStatusLock);
87fff547 813}