added uuid class; code rerfactoring
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 /**
32 * Database schema version
33 */
34 #define DATACOLL_SCHEMA_VERSION 3
35
36 /**
37 * Data collector start indicator
38 */
39 static bool s_dataCollectorStarted = false;
40
41 /**
42 * Data collection item
43 */
44 class DataCollectionItem : public RefCountObject
45 {
46 private:
47 UINT64 m_serverId;
48 UINT32 m_id;
49 INT32 m_pollingInterval;
50 TCHAR *m_name;
51 BYTE m_type;
52 BYTE m_origin;
53 UINT16 m_snmpPort;
54 BYTE m_snmpRawValueType;
55 uuid m_snmpTargetGuid;
56 time_t m_lastPollTime;
57
58 public:
59 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
60 DataCollectionItem(DB_RESULT hResult, int row);
61 DataCollectionItem(const DataCollectionItem *item);
62 virtual ~DataCollectionItem();
63
64 UINT32 getId() const { return m_id; }
65 UINT64 getServerId() const { return m_serverId; }
66 const TCHAR *getName() const { return m_name; }
67 int getType() const { return (int)m_type; }
68 int getOrigin() const { return (int)m_origin; }
69 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
70 UINT16 getSnmpPort() const { return m_snmpPort; }
71 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
72 UINT32 getPollingInterval() { return (UINT32)m_pollingInterval; }
73
74 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
75
76 void updateAndSave(const DataCollectionItem *item);
77 void saveToDatabase(bool newObject);
78 void deleteFromDatabase();
79 void setLastPollTime(time_t time);
80
81 UINT32 getTimeToNextPoll(time_t now) const
82 {
83 time_t diff = now - m_lastPollTime;
84 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
85 }
86 };
87
88 /**
89 * Create data collection item from NXCP mesage
90 */
91 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
92 {
93 m_serverId = serverId;
94 m_id = msg->getFieldAsInt32(baseId);
95 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
96 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
97 m_name = msg->getFieldAsString(baseId + 3);
98 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
99 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
100 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
101 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
102 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
103 }
104
105 /**
106 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
107 */
108 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
109 {
110 m_serverId = DBGetFieldInt64(hResult, row, 0);
111 m_id = DBGetFieldULong(hResult, row, 1);
112 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
113 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
114 m_name = DBGetField(hResult, row, 4, NULL, 0);
115 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
116 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
117 m_snmpPort = DBGetFieldULong(hResult, row, 7);
118 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
119 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
120 }
121
122 /**
123 * Copy constructor
124 */
125 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
126 {
127 m_serverId = item->m_serverId;
128 m_id = item->m_id;
129 m_type = item->m_type;
130 m_origin = item->m_origin;
131 m_name = _tcsdup(item->m_name);
132 m_pollingInterval = item->m_pollingInterval;
133 m_lastPollTime = item->m_lastPollTime;
134 m_snmpTargetGuid = item->m_snmpTargetGuid;
135 m_snmpPort = item->m_snmpPort;
136 m_snmpRawValueType = item->m_snmpRawValueType;
137 }
138
139 /**
140 * Data collection item destructor
141 */
142 DataCollectionItem::~DataCollectionItem()
143 {
144 safe_free(m_name);
145 }
146
147 /**
148 * Will check if object has changed. If at least one field is changed - all data will be updated and
149 * saved to database.
150 */
151 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
152 {
153 //if at leas one of fields changed - set all fields and save to DB
154 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
155 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
156 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
157 {
158 m_type = item->m_type;
159 m_origin = item->m_origin;
160 m_name = _tcsdup(item->m_name);
161 m_pollingInterval = item->m_pollingInterval;
162 if (m_lastPollTime < item->m_lastPollTime)
163 m_lastPollTime = item->m_lastPollTime;
164 m_snmpTargetGuid = item->m_snmpTargetGuid;
165 m_snmpPort = item->m_snmpPort;
166 m_snmpRawValueType = item->m_snmpRawValueType;
167 saveToDatabase(false);
168 }
169 }
170
171 /**
172 * Save configuration object to database
173 */
174 void DataCollectionItem::saveToDatabase(bool newObject)
175 {
176 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
177 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
178 DB_HANDLE db = GetLocalDatabaseHandle();
179 DB_STATEMENT hStmt;
180
181 if (newObject)
182 {
183 hStmt = DBPrepare(db,
184 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
185 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
186 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
187 }
188 else
189 {
190 hStmt = DBPrepare(db,
191 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
192 _T("polling_interval=?,last_poll=?,snmp_port=?,")
193 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
194 }
195
196 if (hStmt == NULL)
197 return;
198
199 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
200 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
201 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
202 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
203 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
204 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
205 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
206 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
207 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
208 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
209
210 DBExecute(hStmt);
211 DBFreeStatement(hStmt);
212 }
213
214 /**
215 * Remove item form database and delete not synced data if exist
216 */
217 void DataCollectionItem::deleteFromDatabase()
218 {
219 DB_HANDLE db = GetLocalDatabaseHandle();
220 TCHAR query[256];
221 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
222 if (DBQuery(db, query))
223 {
224 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
225 if (DBQuery(db, query))
226 {
227 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
228 }
229 }
230 }
231
232 /**
233 * Set last poll time for item
234 */
235 void DataCollectionItem::setLastPollTime(time_t time)
236 {
237 m_lastPollTime = time;
238 TCHAR query[256];
239 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
240 (UINT64)m_lastPollTime, m_serverId, m_id);
241 DBQuery(GetLocalDatabaseHandle(), query);
242 }
243
244 /**
245 * Collected data
246 */
247 class DataElement
248 {
249 private:
250 UINT64 m_serverId;
251 UINT32 m_dciId;
252 time_t m_timestamp;
253 int m_origin;
254 int m_type;
255 uuid m_snmpNode;
256 union
257 {
258 TCHAR *item;
259 StringList *list;
260 Table *table;
261 } m_value;
262
263 public:
264 DataElement(DataCollectionItem *dci, const TCHAR *value)
265 {
266 m_serverId = dci->getServerId();
267 m_dciId = dci->getId();
268 m_timestamp = time(NULL);
269 m_origin = dci->getOrigin();
270 m_type = DCO_TYPE_ITEM;
271 m_snmpNode = dci->getSnmpTargetGuid();
272 m_value.item = _tcsdup(value);
273 }
274
275 DataElement(DataCollectionItem *dci, StringList *value)
276 {
277 m_serverId = dci->getServerId();
278 m_dciId = dci->getId();
279 m_timestamp = time(NULL);
280 m_origin = dci->getOrigin();
281 m_type = DCO_TYPE_LIST;
282 m_snmpNode = dci->getSnmpTargetGuid();
283 m_value.list = value;
284 }
285
286 DataElement(DataCollectionItem *dci, Table *value)
287 {
288 m_serverId = dci->getServerId();
289 m_dciId = dci->getId();
290 m_timestamp = time(NULL);
291 m_origin = dci->getOrigin();
292 m_type = DCO_TYPE_TABLE;
293 m_snmpNode = dci->getSnmpTargetGuid();
294 m_value.table = value;
295 }
296
297 /**
298 * Create data element from database record
299 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
300 */
301 DataElement(DB_RESULT hResult, int row)
302 {
303 m_serverId = DBGetFieldUInt64(hResult, row, 0);
304 m_dciId = DBGetFieldULong(hResult, row, 1);
305 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
306 m_origin = DBGetFieldLong(hResult, row, 3);
307 m_type = DBGetFieldLong(hResult, row, 2);
308 m_snmpNode = DBGetFieldGUID(hResult, row, 4);
309 switch(m_type)
310 {
311 case DCO_TYPE_ITEM:
312 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
313 break;
314 case DCO_TYPE_LIST:
315 {
316 m_value.list = new StringList();
317 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
318 if (text != NULL)
319 {
320 m_value.list->splitAndAdd(text, _T("\n"));
321 free(text);
322 }
323 }
324 break;
325 case DCO_TYPE_TABLE:
326 {
327 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
328 if (xml != NULL)
329 {
330 m_value.table = Table::createFromXML(xml);
331 free(xml);
332 }
333 else
334 {
335 m_value.table = NULL;
336 }
337 }
338 break;
339 default:
340 m_type = DCO_TYPE_ITEM;
341 m_value.item = _tcsdup(_T(""));
342 break;
343 }
344 }
345
346 ~DataElement()
347 {
348 switch(m_type)
349 {
350 case DCO_TYPE_ITEM:
351 free(m_value.item);
352 break;
353 case DCO_TYPE_LIST:
354 delete m_value.list;
355 break;
356 case DCO_TYPE_TABLE:
357 delete m_value.table;
358 break;
359 }
360 }
361
362 time_t getTimestamp() { return m_timestamp; }
363 UINT64 getServerId() { return m_serverId; }
364 UINT32 getDciId() { return m_dciId; }
365
366 void saveToDatabase();
367 bool sendToServer(bool reconcillation);
368 };
369
370 /**
371 * Save data element to database
372 */
373 void DataElement::saveToDatabase()
374 {
375 DB_HANDLE db = GetLocalDatabaseHandle();
376 DB_STATEMENT hStmt= DBPrepare(db, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
377 if (hStmt == NULL)
378 return;
379
380 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
381 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
382 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
383 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
384 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
385 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
386 switch(m_type)
387 {
388 case DCO_TYPE_ITEM:
389 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
390 break;
391 case DCO_TYPE_LIST:
392 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
393 break;
394 case DCO_TYPE_TABLE:
395 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
396 break;
397 }
398 DBExecute(hStmt);
399 DBFreeStatement(hStmt);
400 }
401
402 /**
403 * Send collected data to server
404 */
405 bool DataElement::sendToServer(bool reconcillation)
406 {
407 // If data in database was invalid table may not be parsed correctly
408 // Consider sending a success in that case so data element can be dropped
409 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
410 return true;
411
412 CommSession *session = (CommSession *)FindServerSession(m_serverId);
413 if (session == NULL)
414 return false;
415
416 NXCPMessage msg;
417 msg.setCode(CMD_DCI_DATA);
418 msg.setId(session->generateRequestId());
419 msg.setField(VID_DCI_ID, m_dciId);
420 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
421 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
422 msg.setField(VID_NODE_ID, m_snmpNode);
423 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
424 msg.setField(VID_RECONCILLATION, (INT16)(reconcillation ? 1 : 0));
425 switch(m_type)
426 {
427 case DCO_TYPE_ITEM:
428 msg.setField(VID_VALUE, m_value.item);
429 break;
430 case DCO_TYPE_LIST:
431 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
432 break;
433 case DCO_TYPE_TABLE:
434 m_value.table->setSource(m_origin);
435 m_value.table->fillMessage(msg, 0, -1);
436 break;
437 }
438 UINT32 rcc = session->doRequest(&msg, 2000);
439 session->decRefCount();
440
441 // consider internal error as success because it means that server
442 // cannot accept data for some reason and retry is not feasible
443 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
444 }
445
446 /**
447 * Server data sync status object
448 */
449 struct ServerSyncStatus
450 {
451 INT32 queueSize;
452
453 ServerSyncStatus()
454 {
455 queueSize = 0;
456 }
457 };
458
459 /**
460 * Server sync status information
461 */
462 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
463 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
464
465 /**
466 * Callback to check if reconcillation is needed for session
467 */
468 static EnumerationCallbackResult ReconcillationQueryCallback(AbstractCommSession *session, void *arg)
469 {
470 if (session->getServerId() == 0)
471 return _CONTINUE;
472 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
473 return ((s != NULL) && (s->queueSize > 0)) ? _STOP : _CONTINUE;
474 }
475
476 /**
477 * Data reconcillation thread
478 */
479 static THREAD_RESULT THREAD_CALL ReconcillationThread(void *arg)
480 {
481 DB_HANDLE hdb = GetLocalDatabaseHandle();
482 UINT32 sleepTime = 30000;
483 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread started"));
484
485 bool vacuumNeeded = false;
486 while(!AgentSleepAndCheckForShutdown(sleepTime))
487 {
488 // Check if there is something to sync
489 MutexLock(s_serverSyncStatusLock);
490 bool run = EnumerateSessions(ReconcillationQueryCallback, NULL);
491 MutexUnlock(s_serverSyncStatusLock);
492 if (!run)
493 {
494 if (vacuumNeeded)
495 {
496 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: vacuum local database"));
497 DBQuery(hdb, _T("VACUUM"));
498 vacuumNeeded = false;
499 }
500 sleepTime = 30000;
501 continue;
502 }
503
504 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue ORDER BY timestamp LIMIT 100"));
505 if (hResult == NULL)
506 {
507 sleepTime = 30000;
508 continue;
509 }
510
511 int count = DBGetNumRows(hResult);
512 if (count > 0)
513 {
514 ObjectArray<DataElement> deleteList(count, 10, true);
515 for(int i = 0; i < count; i++)
516 {
517 DataElement *e = new DataElement(hResult, i);
518 bool sent = false;
519 MutexLock(s_serverSyncStatusLock);
520 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
521 if (status != NULL)
522 {
523 sent = e->sendToServer(true);
524 }
525 else
526 {
527 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
528 }
529
530 if (sent)
531 {
532 status->queueSize--;
533 deleteList.add(e);
534 }
535 else
536 {
537 delete e;
538 }
539 MutexUnlock(s_serverSyncStatusLock);
540 }
541
542 if (deleteList.size() > 0)
543 {
544 TCHAR query[256];
545
546 DBBegin(hdb);
547 for(int i = 0; i < deleteList.size(); i++)
548 {
549 DataElement *e = deleteList.get(i);
550 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
551 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
552 DBQuery(hdb, query);
553 }
554 DBCommit(hdb);
555 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: %d records sent"), deleteList.size());
556 vacuumNeeded = true;
557 }
558 }
559 DBFreeResult(hResult);
560
561 sleepTime = (count == 100) ? 100 : 30000;
562 }
563
564 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread stopped"));
565 return THREAD_OK;
566 }
567
568 /**
569 * Data sender queue
570 */
571 static Queue s_dataSenderQueue;
572
573 /**
574 * Data sender
575 */
576 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
577 {
578 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
579 while(true)
580 {
581 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
582 if (e == INVALID_POINTER_VALUE)
583 break;
584
585 MutexLock(s_serverSyncStatusLock);
586 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
587 if (status == NULL)
588 {
589 status = new ServerSyncStatus();
590 s_serverSyncStatus.set(e->getServerId(), status);
591 }
592
593 if (status->queueSize == 0)
594 {
595 if (!e->sendToServer(false))
596 {
597 e->saveToDatabase();
598 status->queueSize++;
599 }
600 }
601 else
602 {
603 e->saveToDatabase();
604 status->queueSize++;
605 }
606 MutexUnlock(s_serverSyncStatusLock);
607
608 delete e;
609 }
610 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
611 return THREAD_OK;
612 }
613
614 /**
615 * Pseudo-session for cached data collection
616 */
617 class VirtualSession : public AbstractCommSession
618 {
619 private:
620 UINT64 m_serverId;
621
622 public:
623 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
624
625 virtual bool isMasterServer() { return false; }
626 virtual bool isControlServer() { return false; }
627 virtual bool canAcceptTraps() { return true; }
628 virtual bool canAcceptFileUpdates() { return false; }
629 virtual UINT64 getServerId() { return m_serverId; };
630 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
631
632 virtual bool isIPv6Aware() { return true; }
633
634 virtual void sendMessage(NXCPMessage *pMsg) { }
635 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
636 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
637 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
638 virtual UINT32 generateRequestId() { return 0; }
639 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
640 };
641
642 /**
643 * Collect data from agent
644 */
645 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
646 {
647 VirtualSession session(dci->getServerId());
648
649 DataElement *e = NULL;
650 if (dci->getType() == DCO_TYPE_ITEM)
651 {
652 TCHAR value[MAX_RESULT_LENGTH];
653 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
654 e = new DataElement(dci, value);
655 }
656 else if (dci->getType() == DCO_TYPE_LIST)
657 {
658 StringList *value = new StringList;
659 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
660 e = new DataElement(dci, value);
661 }
662 else if (dci->getType() == DCO_TYPE_TABLE)
663 {
664 Table *value = new Table;
665 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
666 e = new DataElement(dci, value);
667 }
668
669 return e;
670 }
671
672 /**
673 * Collect data from SNMP
674 */
675 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
676 {
677 DataElement *e = NULL;
678 if (dci->getType() == DCO_TYPE_ITEM)
679 {
680 TCHAR value[MAX_RESULT_LENGTH];
681 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
682 e = new DataElement(dci, value);
683 }
684 return e;
685 }
686
687 /**
688 * List of all data collection items
689 */
690 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
691 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
692
693 /**
694 * Single data collection run - collect data if needed and calculate sleep time
695 */
696 static UINT32 DataCollectionRun()
697 {
698 UINT32 sleepTime = 60;
699
700 MutexLock(s_itemLock);
701 for(int i = 0; i < s_items.size(); i++)
702 {
703 DataCollectionItem *dci = s_items.get(i);
704 time_t now = time(NULL);
705 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
706 if (timeToPoll == 0)
707 {
708 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
709 DataElement *e;
710 if (dci->getOrigin() == DS_NATIVE_AGENT)
711 {
712 e = CollectDataFromAgent(dci);
713 }
714 else if (dci->getOrigin() == DS_SNMP_AGENT)
715 {
716 e = CollectDataFromSNMP(dci);
717 }
718 else
719 {
720 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
721 e = NULL;
722 }
723
724 if (e != NULL)
725 {
726 s_dataSenderQueue.put(e);
727 }
728 else
729 {
730 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
731 }
732
733 dci->setLastPollTime(now);
734 timeToPoll = dci->getPollingInterval();
735 }
736
737 if (sleepTime > timeToPoll)
738 sleepTime = timeToPoll;
739 }
740 MutexUnlock(s_itemLock);
741 return sleepTime;
742 }
743
744 /**
745 * Data collector thread
746 */
747 static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
748 {
749 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
750
751 UINT32 sleepTime = DataCollectionRun();
752 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
753 {
754 sleepTime = DataCollectionRun();
755 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
756 }
757
758 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
759 return THREAD_OK;
760 }
761
762 /**
763 * Configure data collection
764 */
765 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
766 {
767 if (!s_dataCollectorStarted)
768 {
769 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
770 return;
771 }
772
773 int count = msg->getFieldAsInt32(VID_NUM_NODES);
774 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
775 for(int i = 0; i < count; i++)
776 {
777 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
778 UpdateSnmpTarget(target);
779 fieldId += 50;
780 }
781 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
782
783 ObjectArray<DataCollectionItem> config(32, 32, true);
784
785 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
786 fieldId = VID_ELEMENT_LIST_BASE;
787 for(int i = 0; i < count; i++)
788 {
789 config.add(new DataCollectionItem(serverId, msg, fieldId));
790 fieldId += 10;
791 }
792 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
793
794 MutexLock(s_itemLock);
795
796 // Update and add new
797 for(int j = 0; j < config.size(); j++)
798 {
799 DataCollectionItem *item = config.get(j);
800 bool exist = false;
801 for(int i = 0; i < s_items.size(); i++)
802 {
803 if (item->equals(s_items.get(i)))
804 {
805 s_items.get(i)->updateAndSave(item);
806 exist = true;
807 }
808 }
809 if (!exist)
810 {
811 DataCollectionItem *newItem = new DataCollectionItem(item);
812 s_items.add(newItem);
813 newItem->saveToDatabase(true);
814 }
815 }
816
817 // Remove not existing configuration and data for it
818 for(int i = 0; i < s_items.size(); i++)
819 {
820 DataCollectionItem *item = s_items.get(i);
821 //If item is from other server, then, do not search it in list of this server
822 if(item->getServerId() != serverId)
823 continue;
824 bool exist = false;
825 for(int j = 0; j < config.size(); j++)
826 {
827 if(item->equals(config.get(j)))
828 {
829 exist = true;
830 }
831 }
832 if (!exist)
833 {
834 item->deleteFromDatabase();
835 s_items.remove(i);
836 i--;
837 }
838 }
839 MutexUnlock(s_itemLock);
840 }
841
842 /**
843 * Load saved state of local data collection
844 */
845 static void LoadState()
846 {
847 DB_HANDLE hdb = GetLocalDatabaseHandle();
848 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
849 if (hResult != NULL)
850 {
851 int count = DBGetNumRows(hResult);
852 for(int i = 0; i < count; i++)
853 {
854 s_items.add(new DataCollectionItem(hResult, i));
855 }
856 DBFreeResult(hResult);
857 }
858
859 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
860 if (hResult != NULL)
861 {
862 int count = DBGetNumRows(hResult);
863 for(int i = 0; i < count; i++)
864 {
865 SNMPTarget *t = new SNMPTarget(hResult, i);
866 UpdateSnmpTarget(t);
867 }
868 DBFreeResult(hResult);
869 }
870
871 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
872 if (hResult != NULL)
873 {
874 int count = DBGetNumRows(hResult);
875 for(int i = 0; i < count; i++)
876 {
877 ServerSyncStatus *s = new ServerSyncStatus;
878 s->queueSize = DBGetFieldLong(hResult, i, 1);
879 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
880 s_serverSyncStatus.set(serverId, s);
881 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
882 }
883 DBFreeResult(hResult);
884 }
885 }
886
887 /**
888 * SQL script array
889 */
890 static const TCHAR *s_upgradeQueries[] =
891 {
892 _T("CREATE TABLE dc_queue (")
893 _T(" server_id number(20) not null,")
894 _T(" dci_id integer not null,")
895 _T(" dci_type integer not null,")
896 _T(" dci_origin integer not null,")
897 _T(" snmp_target_guid varchar(36) not null,")
898 _T(" timestamp integer not null,")
899 _T(" value varchar not null,")
900 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
901
902 _T("CREATE TABLE dc_config (")
903 _T(" server_id number(20) not null,")
904 _T(" dci_id integer not null,")
905 _T(" type integer not null,")
906 _T(" origin integer not null,")
907 _T(" name varchar(1023) null,")
908 _T(" polling_interval integer not null,")
909 _T(" last_poll integer not null,")
910 _T(" snmp_port integer not null,")
911 _T(" snmp_target_guid varchar(36) not null,")
912 _T(" snmp_raw_type integer not null,")
913 _T(" PRIMARY KEY(server_id,dci_id))"),
914
915 _T("CREATE TABLE dc_snmp_targets (")
916 _T(" guid varchar(36) not null,")
917 _T(" server_id number(20) not null,")
918 _T(" ip_address varchar(48) not null,")
919 _T(" snmp_version integer not null,")
920 _T(" port integer not null,")
921 _T(" auth_type integer not null,")
922 _T(" enc_type integer not null,")
923 _T(" auth_name varchar(63),")
924 _T(" auth_pass varchar(63),")
925 _T(" enc_pass varchar(63),")
926 _T(" PRIMARY KEY(guid))")
927 };
928
929 /**
930 * Data collector and sender thread handles
931 */
932 static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
933 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
934 static THREAD s_reconcillationThread = INVALID_THREAD_HANDLE;
935
936 /**
937 * Initialize and start local data collector
938 */
939 void StartLocalDataCollector()
940 {
941 DB_HANDLE db = GetLocalDatabaseHandle();
942 if (db == NULL)
943 {
944 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
945 return;
946 }
947
948 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
949 while(dbVersion < DATACOLL_SCHEMA_VERSION)
950 {
951 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
952 {
953 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
954 return;
955 }
956 dbVersion++;
957 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
958 }
959
960 s_itemLock = MutexCreate();
961 s_serverSyncStatusLock = MutexCreate();
962
963 LoadState();
964
965 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
966 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
967 s_reconcillationThread = ThreadCreateEx(ReconcillationThread, 0, NULL);
968
969 s_dataCollectorStarted = true;
970 }
971
972 /**
973 * Shutdown local data collector
974 */
975 void ShutdownLocalDataCollector()
976 {
977 if (!s_dataCollectorStarted)
978 {
979 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
980 return;
981 }
982
983 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
984 ThreadJoin(s_dataCollectorThread);
985
986 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
987 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
988 ThreadJoin(s_dataSenderThread);
989
990 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
991 ThreadJoin(s_reconcillationThread);
992
993 MutexDestroy(s_itemLock);
994 MutexDestroy(s_serverSyncStatusLock);
995 }