06d32a3fe4c382a64b156226a4675a19f23333f0
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 bool GetSnmpValue(const uuid_t& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 /**
32 * Database schema version
33 */
34 #define DATACOLL_SCHEMA_VERSION 3
35
36 /**
37 * Data collector start indicator
38 */
39 static bool s_dataCollectorStarted = false;
40
41 /**
42 * Data collection item
43 */
44 class DataCollectionItem : public RefCountObject
45 {
46 private:
47 UINT64 m_serverId;
48 UINT32 m_id;
49 INT32 m_pollingInterval;
50 TCHAR *m_name;
51 BYTE m_type;
52 BYTE m_origin;
53 UINT16 m_snmpPort;
54 BYTE m_snmpRawValueType;
55 uuid_t m_snmpTargetGuid;
56 time_t m_lastPollTime;
57
58 public:
59 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
60 DataCollectionItem(DB_RESULT hResult, int row);
61 DataCollectionItem(const DataCollectionItem *item);
62 virtual ~DataCollectionItem();
63
64 UINT32 getId() const { return m_id; }
65 UINT64 getServerId() const { return m_serverId; }
66 const TCHAR *getName() const { return m_name; }
67 int getType() const { return (int)m_type; }
68 int getOrigin() const { return (int)m_origin; }
69 const uuid_t& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
70 UINT16 getSnmpPort() const { return m_snmpPort; }
71 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
72 UINT32 getPollingInterval() { return (UINT32)m_pollingInterval; }
73
74 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
75
76 void updateAndSave(const DataCollectionItem *item);
77 void saveToDatabase(bool newObject);
78 void deleteFromDatabase();
79 void setLastPollTime(time_t time);
80
81 UINT32 getTimeToNextPoll(time_t now) const
82 {
83 time_t diff = now - m_lastPollTime;
84 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
85 }
86 };
87
88 /**
89 * Create data collection item from NXCP mesage
90 */
91 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
92 {
93 m_serverId = serverId;
94 m_id = msg->getFieldAsInt32(baseId);
95 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
96 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
97 m_name = msg->getFieldAsString(baseId + 3);
98 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
99 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
100 memset(m_snmpTargetGuid, 0, UUID_LENGTH);
101 msg->getFieldAsBinary(baseId + 6, m_snmpTargetGuid, UUID_LENGTH);
102 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
103 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
104 }
105
106 /**
107 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
108 */
109 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
110 {
111 m_serverId = DBGetFieldInt64(hResult, row, 0);
112 m_id = DBGetFieldULong(hResult, row, 1);
113 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
114 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
115 m_name = DBGetField(hResult, row, 4, NULL, 0);
116 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
117 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
118 memset(m_snmpTargetGuid, 0, UUID_LENGTH);
119 m_snmpPort = DBGetFieldULong(hResult, row, 7);
120 DBGetFieldGUID(hResult, row, 8, m_snmpTargetGuid);
121 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
122 }
123
124 /**
125 * Copy constructor
126 */
127 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
128 {
129 m_serverId = item->m_serverId;
130 m_id = item->m_id;
131 m_type = item->m_type;
132 m_origin = item->m_origin;
133 m_name = _tcsdup(item->m_name);
134 m_pollingInterval = item->m_pollingInterval;
135 m_lastPollTime = item->m_lastPollTime;
136 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
137 m_snmpPort = item->m_snmpPort;
138 m_snmpRawValueType = item->m_snmpRawValueType;
139 }
140
141 /**
142 * Data collection item destructor
143 */
144 DataCollectionItem::~DataCollectionItem()
145 {
146 safe_free(m_name);
147 }
148
149 /**
150 * Will check if object has changed. If at least one field is changed - all data will be updated and
151 * saved to database.
152 */
153 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
154 {
155 //if at leas one of fields changed - set all fields and save to DB
156 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
157 (m_pollingInterval != item->m_pollingInterval) || uuid_compare(m_snmpTargetGuid, item->m_snmpTargetGuid) ||
158 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
159 {
160 m_type = item->m_type;
161 m_origin = item->m_origin;
162 m_name = _tcsdup(item->m_name);
163 m_pollingInterval = item->m_pollingInterval;
164 if (m_lastPollTime < item->m_lastPollTime)
165 m_lastPollTime = item->m_lastPollTime;
166 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
167 m_snmpPort = item->m_snmpPort;
168 m_snmpRawValueType = item->m_snmpRawValueType;
169 saveToDatabase(false);
170 }
171 }
172
173 /**
174 * Save configuration object to database
175 */
176 void DataCollectionItem::saveToDatabase(bool newObject)
177 {
178 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
179 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
180 DB_HANDLE db = GetLocalDatabaseHandle();
181 DB_STATEMENT hStmt;
182
183 if (newObject)
184 {
185 hStmt = DBPrepare(db,
186 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
187 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
188 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
189 }
190 else
191 {
192 hStmt = DBPrepare(db,
193 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
194 _T("polling_interval=?,last_poll=?,snmp_port=?,")
195 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
196 }
197
198 if (hStmt == NULL)
199 return;
200
201 TCHAR buffer[64];
202 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
203 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
204 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
205 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
206 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
207 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
208 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpTargetGuid, buffer), DB_BIND_STATIC);
209 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
210 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
211 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
212
213 DBExecute(hStmt);
214 DBFreeStatement(hStmt);
215 }
216
217 /**
218 * Remove item form database and delete not synced data if exist
219 */
220 void DataCollectionItem::deleteFromDatabase()
221 {
222 DB_HANDLE db = GetLocalDatabaseHandle();
223 TCHAR query[256];
224 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
225 if (DBQuery(db, query))
226 {
227 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
228 if (DBQuery(db, query))
229 {
230 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
231 }
232 }
233 }
234
235 /**
236 * Set last poll time for item
237 */
238 void DataCollectionItem::setLastPollTime(time_t time)
239 {
240 m_lastPollTime = time;
241 TCHAR query[256];
242 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
243 (UINT64)m_lastPollTime, m_serverId, m_id);
244 DBQuery(GetLocalDatabaseHandle(), query);
245 }
246
247 /**
248 * Collected data
249 */
250 class DataElement
251 {
252 private:
253 UINT64 m_serverId;
254 UINT32 m_dciId;
255 time_t m_timestamp;
256 int m_origin;
257 int m_type;
258 uuid_t m_snmpNode;
259 union
260 {
261 TCHAR *item;
262 StringList *list;
263 Table *table;
264 } m_value;
265
266 public:
267 DataElement(DataCollectionItem *dci, const TCHAR *value)
268 {
269 m_serverId = dci->getServerId();
270 m_dciId = dci->getId();
271 m_timestamp = time(NULL);
272 m_origin = dci->getOrigin();
273 m_type = DCO_TYPE_ITEM;
274 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
275 m_value.item = _tcsdup(value);
276 }
277
278 DataElement(DataCollectionItem *dci, StringList *value)
279 {
280 m_serverId = dci->getServerId();
281 m_dciId = dci->getId();
282 m_timestamp = time(NULL);
283 m_origin = dci->getOrigin();
284 m_type = DCO_TYPE_LIST;
285 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
286 m_value.list = value;
287 }
288
289 DataElement(DataCollectionItem *dci, Table *value)
290 {
291 m_serverId = dci->getServerId();
292 m_dciId = dci->getId();
293 m_timestamp = time(NULL);
294 m_origin = dci->getOrigin();
295 m_type = DCO_TYPE_TABLE;
296 memcpy(m_snmpNode, dci->getSnmpTargetGuid(), UUID_LENGTH);
297 m_value.table = value;
298 }
299
300 /**
301 * Create data element from database record
302 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
303 */
304 DataElement(DB_RESULT hResult, int row)
305 {
306 m_serverId = DBGetFieldUInt64(hResult, row, 0);
307 m_dciId = DBGetFieldULong(hResult, row, 1);
308 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
309 m_origin = DBGetFieldLong(hResult, row, 3);
310 m_type = DBGetFieldLong(hResult, row, 2);
311 DBGetFieldGUID(hResult, row, 4, m_snmpNode);
312 switch(m_type)
313 {
314 case DCO_TYPE_ITEM:
315 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
316 break;
317 case DCO_TYPE_LIST:
318 {
319 m_value.list = new StringList();
320 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
321 if (text != NULL)
322 {
323 m_value.list->splitAndAdd(text, _T("\n"));
324 free(text);
325 }
326 }
327 break;
328 case DCO_TYPE_TABLE:
329 {
330 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
331 if (xml != NULL)
332 {
333 m_value.table = Table::createFromXML(xml);
334 free(xml);
335 }
336 else
337 {
338 m_value.table = NULL;
339 }
340 }
341 break;
342 default:
343 m_type = DCO_TYPE_ITEM;
344 m_value.item = _tcsdup(_T(""));
345 break;
346 }
347 }
348
349 ~DataElement()
350 {
351 switch(m_type)
352 {
353 case DCO_TYPE_ITEM:
354 free(m_value.item);
355 break;
356 case DCO_TYPE_LIST:
357 delete m_value.list;
358 break;
359 case DCO_TYPE_TABLE:
360 delete m_value.table;
361 break;
362 }
363 }
364
365 time_t getTimestamp() { return m_timestamp; }
366 UINT64 getServerId() { return m_serverId; }
367 UINT32 getDciId() { return m_dciId; }
368
369 void saveToDatabase();
370 bool sendToServer(bool reconcillation);
371 };
372
373 /**
374 * Save data element to database
375 */
376 void DataElement::saveToDatabase()
377 {
378 DB_HANDLE db = GetLocalDatabaseHandle();
379 DB_STATEMENT hStmt= DBPrepare(db, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
380 if (hStmt == NULL)
381 return;
382
383 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
384 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
385 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
386 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
387 TCHAR buffer[64];
388 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpNode, buffer), DB_BIND_STATIC);
389 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
390 switch(m_type)
391 {
392 case DCO_TYPE_ITEM:
393 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
394 break;
395 case DCO_TYPE_LIST:
396 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
397 break;
398 case DCO_TYPE_TABLE:
399 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
400 break;
401 }
402 DBExecute(hStmt);
403 DBFreeStatement(hStmt);
404 }
405
406 /**
407 * Send collected data to server
408 */
409 bool DataElement::sendToServer(bool reconcillation)
410 {
411 // If data in database was invalid table may not be parsed correctly
412 // Consider sending a success in that case so data element can be dropped
413 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
414 return true;
415
416 CommSession *session = (CommSession *)FindServerSession(m_serverId);
417 if (session == NULL)
418 return false;
419
420 NXCPMessage msg;
421 msg.setCode(CMD_DCI_DATA);
422 msg.setId(session->generateRequestId());
423 msg.setField(VID_DCI_ID, m_dciId);
424 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
425 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
426 msg.setField(VID_NODE_ID, m_snmpNode, UUID_LENGTH);
427 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
428 msg.setField(VID_RECONCILLATION, (INT16)(reconcillation ? 1 : 0));
429 switch(m_type)
430 {
431 case DCO_TYPE_ITEM:
432 msg.setField(VID_VALUE, m_value.item);
433 break;
434 case DCO_TYPE_LIST:
435 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
436 break;
437 case DCO_TYPE_TABLE:
438 m_value.table->setSource(m_origin);
439 m_value.table->fillMessage(msg, 0, -1);
440 break;
441 }
442 UINT32 rcc = session->doRequest(&msg, 2000);
443 session->decRefCount();
444
445 // consider internal error as success because it means that server
446 // cannot accept data for some reason and retry is not feasible
447 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
448 }
449
450 /**
451 * Server data sync status object
452 */
453 struct ServerSyncStatus
454 {
455 INT32 queueSize;
456
457 ServerSyncStatus()
458 {
459 queueSize = 0;
460 }
461 };
462
463 /**
464 * Server sync status information
465 */
466 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
467 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
468
469 /**
470 * Callback to check if reconcillation is needed for session
471 */
472 static EnumerationCallbackResult ReconcillationQueryCallback(AbstractCommSession *session, void *arg)
473 {
474 if (session->getServerId() == 0)
475 return _CONTINUE;
476 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
477 return ((s != NULL) && (s->queueSize > 0)) ? _STOP : _CONTINUE;
478 }
479
480 /**
481 * Data reconcillation thread
482 */
483 static THREAD_RESULT THREAD_CALL ReconcillationThread(void *arg)
484 {
485 DB_HANDLE hdb = GetLocalDatabaseHandle();
486 UINT32 sleepTime = 30000;
487 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread started"));
488
489 bool vacuumNeeded = false;
490 while(!AgentSleepAndCheckForShutdown(sleepTime))
491 {
492 // Check if there is something to sync
493 MutexLock(s_serverSyncStatusLock);
494 bool run = EnumerateSessions(ReconcillationQueryCallback, NULL);
495 MutexUnlock(s_serverSyncStatusLock);
496 if (!run)
497 {
498 if (vacuumNeeded)
499 {
500 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: vacuum local database"));
501 DBQuery(hdb, _T("VACUUM"));
502 vacuumNeeded = false;
503 }
504 sleepTime = 30000;
505 continue;
506 }
507
508 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue ORDER BY timestamp LIMIT 100"));
509 if (hResult == NULL)
510 {
511 sleepTime = 30000;
512 continue;
513 }
514
515 int count = DBGetNumRows(hResult);
516 if (count > 0)
517 {
518 ObjectArray<DataElement> deleteList(count, 10, true);
519 for(int i = 0; i < count; i++)
520 {
521 DataElement *e = new DataElement(hResult, i);
522 bool sent = false;
523 MutexLock(s_serverSyncStatusLock);
524 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
525 if (status != NULL)
526 {
527 sent = e->sendToServer(true);
528 }
529 else
530 {
531 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
532 }
533
534 if (sent)
535 {
536 status->queueSize--;
537 deleteList.add(e);
538 }
539 else
540 {
541 delete e;
542 }
543 MutexUnlock(s_serverSyncStatusLock);
544 }
545
546 if (deleteList.size() > 0)
547 {
548 TCHAR query[256];
549
550 DBBegin(hdb);
551 for(int i = 0; i < deleteList.size(); i++)
552 {
553 DataElement *e = deleteList.get(i);
554 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
555 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
556 DBQuery(hdb, query);
557 }
558 DBCommit(hdb);
559 DebugPrintf(INVALID_INDEX, 4, _T("ReconcillationThread: %d records sent"), deleteList.size());
560 vacuumNeeded = true;
561 }
562 }
563 DBFreeResult(hResult);
564
565 sleepTime = (count == 100) ? 100 : 30000;
566 }
567
568 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread stopped"));
569 return THREAD_OK;
570 }
571
572 /**
573 * Data sender queue
574 */
575 static Queue s_dataSenderQueue;
576
577 /**
578 * Data sender
579 */
580 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
581 {
582 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
583 while(true)
584 {
585 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
586 if (e == INVALID_POINTER_VALUE)
587 break;
588
589 MutexLock(s_serverSyncStatusLock);
590 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
591 if (status == NULL)
592 {
593 status = new ServerSyncStatus();
594 s_serverSyncStatus.set(e->getServerId(), status);
595 }
596
597 if (status->queueSize == 0)
598 {
599 if (!e->sendToServer(false))
600 {
601 e->saveToDatabase();
602 status->queueSize++;
603 }
604 }
605 else
606 {
607 e->saveToDatabase();
608 status->queueSize++;
609 }
610 MutexUnlock(s_serverSyncStatusLock);
611
612 delete e;
613 }
614 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
615 return THREAD_OK;
616 }
617
618 /**
619 * Pseudo-session for cached data collection
620 */
621 class VirtualSession : public AbstractCommSession
622 {
623 private:
624 UINT64 m_serverId;
625
626 public:
627 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
628
629 virtual bool isMasterServer() { return false; }
630 virtual bool isControlServer() { return false; }
631 virtual bool canAcceptTraps() { return true; }
632 virtual bool canAcceptFileUpdates() { return false; }
633 virtual UINT64 getServerId() { return m_serverId; };
634 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
635
636 virtual bool isIPv6Aware() { return true; }
637
638 virtual void sendMessage(NXCPMessage *pMsg) { }
639 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
640 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
641 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
642 virtual UINT32 generateRequestId() { return 0; }
643 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
644 };
645
646 /**
647 * Collect data from agent
648 */
649 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
650 {
651 VirtualSession session(dci->getServerId());
652
653 DataElement *e = NULL;
654 if (dci->getType() == DCO_TYPE_ITEM)
655 {
656 TCHAR value[MAX_RESULT_LENGTH];
657 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
658 e = new DataElement(dci, value);
659 }
660 else if (dci->getType() == DCO_TYPE_LIST)
661 {
662 StringList *value = new StringList;
663 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
664 e = new DataElement(dci, value);
665 }
666 else if (dci->getType() == DCO_TYPE_TABLE)
667 {
668 Table *value = new Table;
669 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
670 e = new DataElement(dci, value);
671 }
672
673 return e;
674 }
675
676 /**
677 * Collect data from SNMP
678 */
679 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
680 {
681 DataElement *e = NULL;
682 if (dci->getType() == DCO_TYPE_ITEM)
683 {
684 TCHAR value[MAX_RESULT_LENGTH];
685 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
686 e = new DataElement(dci, value);
687 }
688 return e;
689 }
690
691 /**
692 * List of all data collection items
693 */
694 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
695 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
696
697 /**
698 * Single data collection run - collect data if needed and calculate sleep time
699 */
700 static UINT32 DataCollectionRun()
701 {
702 UINT32 sleepTime = 60;
703
704 MutexLock(s_itemLock);
705 for(int i = 0; i < s_items.size(); i++)
706 {
707 DataCollectionItem *dci = s_items.get(i);
708 time_t now = time(NULL);
709 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
710 if (timeToPoll == 0)
711 {
712 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
713 DataElement *e;
714 if (dci->getOrigin() == DS_NATIVE_AGENT)
715 {
716 e = CollectDataFromAgent(dci);
717 }
718 else if (dci->getOrigin() == DS_SNMP_AGENT)
719 {
720 e = CollectDataFromSNMP(dci);
721 }
722 else
723 {
724 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
725 e = NULL;
726 }
727
728 if (e != NULL)
729 {
730 s_dataSenderQueue.put(e);
731 }
732 else
733 {
734 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
735 }
736
737 dci->setLastPollTime(now);
738 timeToPoll = dci->getPollingInterval();
739 }
740
741 if (sleepTime > timeToPoll)
742 sleepTime = timeToPoll;
743 }
744 MutexUnlock(s_itemLock);
745 return sleepTime;
746 }
747
748 /**
749 * Data collector thread
750 */
751 static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
752 {
753 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
754
755 UINT32 sleepTime = DataCollectionRun();
756 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
757 {
758 sleepTime = DataCollectionRun();
759 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
760 }
761
762 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
763 return THREAD_OK;
764 }
765
766 /**
767 * Configure data collection
768 */
769 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
770 {
771 if (!s_dataCollectorStarted)
772 {
773 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
774 return;
775 }
776
777 int count = msg->getFieldAsInt32(VID_NUM_NODES);
778 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
779 for(int i = 0; i < count; i++)
780 {
781 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
782 UpdateSnmpTarget(target);
783 fieldId += 50;
784 }
785 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
786
787 ObjectArray<DataCollectionItem> config(32, 32, true);
788
789 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
790 fieldId = VID_ELEMENT_LIST_BASE;
791 for(int i = 0; i < count; i++)
792 {
793 config.add(new DataCollectionItem(serverId, msg, fieldId));
794 fieldId += 10;
795 }
796 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
797
798 MutexLock(s_itemLock);
799
800 // Update and add new
801 for(int j = 0; j < config.size(); j++)
802 {
803 DataCollectionItem *item = config.get(j);
804 bool exist = false;
805 for(int i = 0; i < s_items.size(); i++)
806 {
807 if (item->equals(s_items.get(i)))
808 {
809 s_items.get(i)->updateAndSave(item);
810 exist = true;
811 }
812 }
813 if (!exist)
814 {
815 DataCollectionItem *newItem = new DataCollectionItem(item);
816 s_items.add(newItem);
817 newItem->saveToDatabase(true);
818 }
819 }
820
821 // Remove not existing configuration and data for it
822 for(int i = 0; i < s_items.size(); i++)
823 {
824 DataCollectionItem *item = s_items.get(i);
825 //If item is from other server, then, do not search it in list of this server
826 if(item->getServerId() != serverId)
827 continue;
828 bool exist = false;
829 for(int j = 0; j < config.size(); j++)
830 {
831 if(item->equals(config.get(j)))
832 {
833 exist = true;
834 }
835 }
836 if (!exist)
837 {
838 item->deleteFromDatabase();
839 s_items.remove(i);
840 i--;
841 }
842 }
843 MutexUnlock(s_itemLock);
844 }
845
846 /**
847 * Load saved state of local data collection
848 */
849 static void LoadState()
850 {
851 DB_HANDLE hdb = GetLocalDatabaseHandle();
852 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
853 if (hResult != NULL)
854 {
855 int count = DBGetNumRows(hResult);
856 for(int i = 0; i < count; i++)
857 {
858 s_items.add(new DataCollectionItem(hResult, i));
859 }
860 DBFreeResult(hResult);
861 }
862
863 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
864 if (hResult != NULL)
865 {
866 int count = DBGetNumRows(hResult);
867 for(int i = 0; i < count; i++)
868 {
869 SNMPTarget *t = new SNMPTarget(hResult, i);
870 UpdateSnmpTarget(t);
871 }
872 DBFreeResult(hResult);
873 }
874
875 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
876 if (hResult != NULL)
877 {
878 int count = DBGetNumRows(hResult);
879 for(int i = 0; i < count; i++)
880 {
881 ServerSyncStatus *s = new ServerSyncStatus;
882 s->queueSize = DBGetFieldLong(hResult, i, 1);
883 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
884 s_serverSyncStatus.set(serverId, s);
885 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
886 }
887 DBFreeResult(hResult);
888 }
889 }
890
891 /**
892 * SQL script array
893 */
894 static const TCHAR *s_upgradeQueries[] =
895 {
896 _T("CREATE TABLE dc_queue (")
897 _T(" server_id number(20) not null,")
898 _T(" dci_id integer not null,")
899 _T(" dci_type integer not null,")
900 _T(" dci_origin integer not null,")
901 _T(" snmp_target_guid varchar(36) not null,")
902 _T(" timestamp integer not null,")
903 _T(" value varchar not null,")
904 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
905
906 _T("CREATE TABLE dc_config (")
907 _T(" server_id number(20) not null,")
908 _T(" dci_id integer not null,")
909 _T(" type integer not null,")
910 _T(" origin integer not null,")
911 _T(" name varchar(1023) null,")
912 _T(" polling_interval integer not null,")
913 _T(" last_poll integer not null,")
914 _T(" snmp_port integer not null,")
915 _T(" snmp_target_guid varchar(36) not null,")
916 _T(" snmp_raw_type integer not null,")
917 _T(" PRIMARY KEY(server_id,dci_id))"),
918
919 _T("CREATE TABLE dc_snmp_targets (")
920 _T(" guid varchar(36) not null,")
921 _T(" server_id number(20) not null,")
922 _T(" ip_address varchar(48) not null,")
923 _T(" snmp_version integer not null,")
924 _T(" port integer not null,")
925 _T(" auth_type integer not null,")
926 _T(" enc_type integer not null,")
927 _T(" auth_name varchar(63),")
928 _T(" auth_pass varchar(63),")
929 _T(" enc_pass varchar(63),")
930 _T(" PRIMARY KEY(guid))")
931 };
932
933 /**
934 * Data collector and sender thread handles
935 */
936 static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
937 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
938 static THREAD s_reconcillationThread = INVALID_THREAD_HANDLE;
939
940 /**
941 * Initialize and start local data collector
942 */
943 void StartLocalDataCollector()
944 {
945 DB_HANDLE db = GetLocalDatabaseHandle();
946 if (db == NULL)
947 {
948 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
949 return;
950 }
951
952 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
953 while(dbVersion < DATACOLL_SCHEMA_VERSION)
954 {
955 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
956 {
957 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
958 return;
959 }
960 dbVersion++;
961 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
962 }
963
964 s_itemLock = MutexCreate();
965 s_serverSyncStatusLock = MutexCreate();
966
967 LoadState();
968
969 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
970 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
971 s_reconcillationThread = ThreadCreateEx(ReconcillationThread, 0, NULL);
972
973 s_dataCollectorStarted = true;
974 }
975
976 /**
977 * Shutdown local data collector
978 */
979 void ShutdownLocalDataCollector()
980 {
981 if (!s_dataCollectorStarted)
982 {
983 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
984 return;
985 }
986
987 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
988 ThreadJoin(s_dataCollectorThread);
989
990 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
991 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
992 ThreadJoin(s_dataSenderThread);
993
994 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
995 ThreadJoin(s_reconcillationThread);
996
997 MutexDestroy(s_itemLock);
998 MutexDestroy(s_serverSyncStatusLock);
999 }