fixed broken DCI threshold check
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003-2015 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
d9c628e3 25/**
26 * Database schema version
27 */
28#define DATACOLL_SCHEMA_VERSION 1
29
87fff547
VK
30/**
31 * Data collection item
32 */
33class DataCollectionItem : public RefCountObject
34{
35private:
36 UINT64 m_serverId;
37 UINT32 m_id;
38 INT32 m_pollingInterval;
39 TCHAR *m_name;
40 BYTE m_type;
41 BYTE m_origin;
42 UINT16 m_snmpPort;
d9c628e3 43 uuid_t m_snmpTargetGuid;
87fff547
VK
44 time_t m_lastPollTime;
45
46public:
47 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
d9c628e3 48 DataCollectionItem(DB_RESULT hResult, int row);
49 DataCollectionItem(DataCollectionItem *item);
87fff547
VK
50 virtual ~DataCollectionItem();
51
52 UINT32 getId() { return m_id; }
53 UINT64 getServerId() { return m_serverId; }
54 const TCHAR *getName() { return m_name; }
55 int getType() { return (int)m_type; }
56 int getOrigin() { return (int)m_origin; }
d9c628e3 57 bool equals(DataCollectionItem *item);
58 void updateAndSave(DataCollectionItem *item);
59 void saveToDatabase(bool newObject);
60 void deleteFromDatabase();
93c62d54 61 void setLastPollTime(time_t time);
87fff547
VK
62
63 UINT32 getTimeToNextPoll(time_t now)
64 {
65 time_t diff = now - m_lastPollTime;
66 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
67 }
68};
69
70/**
71 * Create data collection item from NXCP mesage
72 */
73DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
74{
75 m_serverId = serverId;
76 m_id = msg->getFieldAsInt32(baseId);
77 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
78 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
79 m_name = msg->getFieldAsString(baseId + 3);
80 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
81 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
d9c628e3 82 msg->getFieldAsBinary(baseId + 6, m_snmpTargetGuid, UUID_LENGTH);
87fff547
VK
83 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
84}
85
d9c628e3 86/**
87 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid
88 */
89DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
90{
91 m_serverId = DBGetFieldInt64(hResult, row, 0);
92 m_id = DBGetFieldULong(hResult, row, 1);
93 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
94 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
95 m_name = (TCHAR *)malloc(sizeof(TCHAR) * 1025);
96 DBGetField(hResult, row, 4, m_name, 1025);
97 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
98 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
99 DBGetFieldGUID(hResult, row, 7, m_snmpTargetGuid);
100 m_snmpPort = DBGetFieldULong(hResult, row, 8);
101}
102
103/**
104 * Copy constructor
105 */
106 DataCollectionItem::DataCollectionItem(DataCollectionItem *item)
107 {
108 m_serverId = item->m_serverId;
109 m_id = item->m_id;
110 m_type = item->m_type;
111 m_origin = item->m_origin;
112 m_name = _tcsdup(item->m_name);
113 m_pollingInterval = item->m_pollingInterval;
114 m_lastPollTime = item->m_lastPollTime;
115 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
116 m_snmpPort = item->m_snmpPort;
117 }
118
87fff547
VK
119/**
120 * Data collection item destructor
121 */
122DataCollectionItem::~DataCollectionItem()
123{
124 safe_free(m_name);
125}
126
d9c628e3 127bool DataCollectionItem::equals(DataCollectionItem *item)
128{
129 if(m_serverId == item->m_serverId && m_id == item->m_id)
130 return true;
131 return false;
132}
133
134/**
135 * Will check if object has changed. If at least one field is changed - all data will be updated and
136 * saved to database.
137 */
138void DataCollectionItem::updateAndSave(DataCollectionItem *item)
139{
140 //if at leas one of fields changed - set all fields and save to DB
141 if(m_type != item->m_type || m_origin != item->m_origin || _tcscmp(m_name, item->m_name) != 0 ||
142 m_pollingInterval != item->m_pollingInterval || uuid_compare(m_snmpTargetGuid, item->m_snmpTargetGuid)!= 0 ||
143 m_snmpPort != item->m_snmpPort)
144 {
145 m_type = item->m_type;
146 m_origin = item->m_origin;
147 m_name = _tcsdup(item->m_name);
148 m_pollingInterval = item->m_pollingInterval;
149 m_lastPollTime = item->m_lastPollTime;
150 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
151 m_snmpPort = item->m_snmpPort;
152 saveToDatabase(false);
153 }
154
155}
156
157void DataCollectionItem::saveToDatabase(bool newObject)
158{
159 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=%ld,dciId=%d) saved to database"),
160 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
161 DB_HANDLE db = GetLocalDatabaseHandle();
162 DB_STATEMENT hStmt;
163
164 if(newObject)
165 {
166 hStmt = DBPrepare(db,
167 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
168 _T("last_poll,snmp_port,snmp_target_guid,server_id,dci_id)")
169 _T("VALUES (?,?,?,?,?,?,?,?,?)"));
170 }
171 else
172 {
173 hStmt = DBPrepare(db,
174 _T("UPDATE dc_config SET type=?,origin=?,name=?")
175 _T("polling_interval=?,last_poll=?,snmp_port=?")
176 _T("snmp_target_guid=? WHERE server_id=? dci_id=?"));
177 }
178
179 if (hStmt == NULL)
180 {
181 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::saveToDatabase: not possible to prepare save quary for %s object(serverId=%ld,dciId=%d)"),
182 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
183 return;
184 }
185
186 TCHAR buffer[64];
187 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
188 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
189 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
190 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
191 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
d5034b1a 192 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
193 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpTargetGuid, buffer), DB_BIND_STATIC);
97c912f2 194 DBBind(hStmt, 8, DB_SQLTYPE_BIGINT, m_serverId);
d9c628e3 195 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (LONG)m_id);
196
97c912f2 197 if(!DBExecute(hStmt))
d9c628e3 198 {
199 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::saveToDatabase: not possible to save %s object(serverId=%ld,dciId=%d) to database"),
200 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
201 }
202}
203
204/**
205 * Remove item form database and delete not synced data if exist
206 */
207void DataCollectionItem::deleteFromDatabase()
208{
209 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=%ld,dciId=%d) removed from database"),
210 m_serverId, m_id);
211 DB_HANDLE db = GetLocalDatabaseHandle();
212 TCHAR query[256];
97c912f2 213 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
d9c628e3 214 if(!DBQuery(db, query))
215 {
216 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_config database table"),
217 m_serverId, m_id);
218 }
97c912f2 219 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
d9c628e3 220 if(!DBQuery(db, query))
221 {
222 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_queue database table"),
223 m_serverId, m_id);
224
225 }
226}
227
93c62d54 228void DataCollectionItem::setLastPollTime(time_t time)
229{
230 m_pollingInterval = time;
231 TCHAR query[256];
232 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=%d WHERE server_id=%ld AND dci_id=%d"), (int)m_lastPollTime, m_serverId, m_id);
233}
234
87fff547
VK
235/**
236 * Collected data
237 */
238class DataElement
239{
240private:
241 UINT64 m_serverId;
242 UINT32 m_dciId;
243 time_t m_timestamp;
244 int m_type;
245 union
246 {
247 TCHAR *item;
248 StringList *list;
249 Table *table;
250 } m_value;
251
252public:
253 DataElement(DataCollectionItem *dci, const TCHAR *value)
254 {
255 m_serverId = dci->getServerId();
256 m_dciId = dci->getId();
257 m_timestamp = time(NULL);
258 m_type = DCO_TYPE_ITEM;
259 m_value.item = _tcsdup(value);
260 }
261
262 DataElement(DataCollectionItem *dci, StringList *value)
263 {
264 m_serverId = dci->getServerId();
265 m_dciId = dci->getId();
266 m_timestamp = time(NULL);
267 m_type = DCO_TYPE_LIST;
268 m_value.list = value;
269 }
270
271 DataElement(DataCollectionItem *dci, Table *value)
272 {
273 m_serverId = dci->getServerId();
274 m_dciId = dci->getId();
275 m_timestamp = time(NULL);
276 m_type = DCO_TYPE_TABLE;
277 m_value.table = value;
278 }
279
280 ~DataElement()
281 {
282 switch(m_type)
283 {
284 case DCO_TYPE_ITEM:
285 free(m_value.item);
286 break;
287 case DCO_TYPE_LIST:
288 delete m_value.list;
289 break;
290 case DCO_TYPE_TABLE:
291 delete m_value.table;
292 break;
293 }
294 }
93c62d54 295
296 time_t getTimestamp() { return m_timestamp; }
87fff547
VK
297};
298
299/**
300 * Data sender queue
301 */
302static Queue s_dataSenderQueue;
303
304/**
305 * Data sender
306 */
307static THREAD_RESULT THREAD_CALL DataSender(void *arg)
308{
309 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
310 while(true)
311 {
312 DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
313 if (e == INVALID_POINTER_VALUE)
314 break;
315
316 delete e;
317 }
318 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
319 return THREAD_OK;
320}
321
322/**
323 * Pseudo-session for cached data collection
324 */
325class VirtualSession : public AbstractCommSession
326{
327private:
328 UINT64 m_serverId;
329
330public:
331 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
332
333 virtual bool isMasterServer() { return false; }
334 virtual bool isControlServer() { return false; }
335 virtual bool canAcceptTraps() { return true; }
336 virtual UINT64 getServerId() { return m_serverId; };
337 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
338
339 virtual bool isIPv6Aware() { return true; }
340
341 virtual void sendMessage(NXCPMessage *pMsg) { }
342 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
343 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
344 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
345};
346
347/**
348 * Collect data from agent
349 */
350DataElement *CollectDataFromAgent(DataCollectionItem *dci)
351{
352 VirtualSession session(dci->getServerId());
353
354 DataElement *e = NULL;
355 if (dci->getType() == DCO_TYPE_ITEM)
356 {
357 TCHAR value[MAX_RESULT_LENGTH];
358 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
359 e = new DataElement(dci, value);
360 }
361 else if (dci->getType() == DCO_TYPE_LIST)
362 {
363 StringList *value = new StringList;
364 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
365 e = new DataElement(dci, value);
366 }
367 else if (dci->getType() == DCO_TYPE_TABLE)
368 {
369 Table *value = new Table;
370 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
371 e = new DataElement(dci, value);
372 }
373
374 return e;
375}
376
377/**
378 * Collect data from SNMP
379 */
380DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
381{
382 /* TODO: implement SNMP data collection */
383 return NULL;
384}
385
386/**
387 * List of all data collection items
388 */
389static ObjectArray<DataCollectionItem> s_items(64, 64, false);
390static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
391
392/**
393 * Single data collection run - collect data if needed and calculate sleep time
394 */
395static UINT32 DataCollectionRun()
396{
397 time_t now = time(NULL);
398 UINT32 sleepTime = 60;
d9c628e3 399
87fff547
VK
400 MutexLock(s_itemLock);
401 for(int i = 0; i < s_items.size(); i++)
402 {
403 DataCollectionItem *dci = s_items.get(i);
404 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
405 if (timeToPoll == 0)
406 {
407 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
408 DataElement *e;
409 if (dci->getOrigin() == DS_NATIVE_AGENT)
410 {
411 e = CollectDataFromAgent(dci);
412 }
413 else if (dci->getOrigin() == DS_SNMP_AGENT)
414 {
415 e = CollectDataFromSNMP(dci);
416 }
417 else
418 {
419 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
420 e = NULL;
421 }
422
423 if (e != NULL)
424 {
93c62d54 425 dci->setLastPollTime(e->getTimestamp());
87fff547
VK
426 s_dataSenderQueue.Put(e);
427 }
428 else
429 {
430 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
431 }
432 }
433 else
434 {
435 if (sleepTime > timeToPoll)
436 sleepTime = timeToPoll;
437 }
438 }
439 MutexUnlock(s_itemLock);
440 return sleepTime;
441}
442
443/**
444 * Data collector thread
445 */
446static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
447{
448 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
449
450 UINT32 sleepTime = DataCollectionRun();
9aa171c1 451 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547
VK
452 {
453 sleepTime = DataCollectionRun();
454 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
455 }
456
457 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
458 return THREAD_OK;
459}
460
461/**
462 * Configure data collection
463 */
464void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
465{
466 ObjectArray<DataCollectionItem> config(32, 32, true);
467
468 int count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
469 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
470 for(int i = 0; i < count; i++)
471 {
472 config.add(new DataCollectionItem(serverId, msg, fieldId));
473 fieldId += 10;
474 }
475 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
476
d9c628e3 477 MutexLock(s_itemLock);
478 //Update and add new
479 for(int j = 0; j < config.size(); j++)
480 {
481 DataCollectionItem *item = config.get(j);
482 bool exist = false;
483 for(int i = 0; i < s_items.size(); i++)
484 {
485 if(item->equals(s_items.get(i)))
486 {
487 s_items.get(i)->updateAndSave(item);
488 exist = true;
489 }
490 }
491 if(!exist)
492 {
493 DataCollectionItem *newItem = new DataCollectionItem(item);
494 s_items.add(newItem);
495 newItem->saveToDatabase(true);
496 }
497 }
498 //Remove not existing configuration and data for it
499 for(int i = 0; i < s_items.size(); i++)
500 {
501 DataCollectionItem *item = s_items.get(i);
502 bool exist = false;
503 for(int j = 0; j < config.size(); j++)
504 {
505 if(item->equals(config.get(j)))
506 {
507 exist = true;
508 }
509 }
510 if(!exist)
511 {
512 item->deleteFromDatabase();
513 s_items.remove(i);
514 i--;
515 delete item;
516 }
517 }
518 MutexUnlock(s_itemLock);
87fff547
VK
519}
520
521/**
522 * Data collector and sender thread handles
523 */
524static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
525static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
526
d9c628e3 527/**
528 * Loads configuration to for DCI
529 */
530static void LoadConfiguration()
531{
532 DB_HANDLE db = GetLocalDatabaseHandle();
533 const TCHAR *query = _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid FROM dc_config");
534 DB_RESULT hResult = DBSelect(db, query);
535 if(hResult != NULL)
536 {
537 for(int i = 0; i < DBGetNumRows(hResult); i++)
538 {
539 s_items.add(new DataCollectionItem(hResult, i));
540 }
541 DBFreeResult(hResult);
542 }
543}
544
545/**
546 * SQL script array
547 */
548const static TCHAR *Update [] =
549{
550 _T("CREATE TABLE dc_queue (")
551 _T("server_id number(20) not null,")
552 _T("dci_id integer not null,")
553 _T("timestamp integer not null,")
554 _T("value varchar not null,")
555 _T("PRIMARY KEY(server_id,dci_id,timestamp));")
556 _T("CREATE TABLE dc_config (")
557 _T("server_id number(20) not null,")
558 _T("dci_id integer not null,")
559 _T("type integer not null,")
560 _T("origin integer not null,")
561 _T("name varchar(1023) null,")
562 _T("polling_interval integer not null,")
563 _T("last_poll integer not null,")
564 _T("snmp_port integer not null,")
565 _T("snmp_target_guid varchar(36) not null,")
566 _T("PRIMARY KEY(server_id,dci_id));")
567 _T("CREATE TABLE snmp_targets (")
568 _T("guid varchar(36) not null,")
569 _T("version integer not null,")
570 _T("ip_address varchar(48) not null,")
571 _T("port integer not null,")
572 _T("auth_type integer not null,")
573 _T("enc_type integer not null,")
574 _T("auth_pass varchar(255),")
575 _T("enc_pass varchar(255),")
576 _T("username varchar(255)")
577 _T("PRIMARY KEY(guid));")
578};
579
87fff547
VK
580/**
581 * Initialize and start local data collector
582 */
583void StartLocalDataCollector()
584{
585 /* TODO: database init and configuration load */
d9c628e3 586 DB_HANDLE db = GetLocalDatabaseHandle();
587 if(db == NULL)
588 {
589 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: Not possible to load Data Collector. Database not initialized."));
590 return;
591 }
592 int dbVersion = ReadMetadataAsInt(_T("DatacollSchemaVersion"));
593 while(DATACOLL_SCHEMA_VERSION > dbVersion)
594 {
ee733deb 595 bool result = DBQuery(db, Update[dbVersion]);
d9c628e3 596 TCHAR query[256];
597 _sntprintf(query, 256, _T("INSERT INTO metadata (attribute, value) VALUES ('DatacollSchemaVersion', '%d')"), ++dbVersion);
ee733deb 598 if(result)
599 {
600 DBQuery(db, query);
601 }
602 else
603 {
604 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: Not possible to upgdate database for Data Collector. Collection not started."));
605 return;
606 }
d9c628e3 607 }
608
609 LoadConfiguration();
610 /* TODO: add reading form database snmp_targets table */
87fff547
VK
611
612 s_itemLock = MutexCreate();
613 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
614 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
615}
616
617/**
618 * Shutdown local data collector
619 */
620void ShutdownLocalDataCollector()
621{
622 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
623 ThreadJoin(s_dataCollectorThread);
624
625 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
626 s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
627 ThreadJoin(s_dataSenderThread);
628
629 MutexDestroy(s_itemLock);
630}