fixed data reconciliation problems when server is not master server (issue #1231)
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 bool GetSnmpValue(const uuid& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 /**
32 * Data collector start indicator
33 */
34 static bool s_dataCollectorStarted = false;
35
36 /**
37 * Unsaved poll time indicator
38 */
39 static bool s_pollTimeChanged = false;
40
41 /**
42 * Data collection item
43 */
44 class DataCollectionItem : public RefCountObject
45 {
46 private:
47 UINT64 m_serverId;
48 UINT32 m_id;
49 INT32 m_pollingInterval;
50 TCHAR *m_name;
51 BYTE m_type;
52 BYTE m_origin;
53 UINT16 m_snmpPort;
54 BYTE m_snmpRawValueType;
55 BYTE m_busy;
56 uuid m_snmpTargetGuid;
57 time_t m_lastPollTime;
58
59 public:
60 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
61 DataCollectionItem(DB_RESULT hResult, int row);
62 DataCollectionItem(const DataCollectionItem *item);
63 virtual ~DataCollectionItem();
64
65 UINT32 getId() const { return m_id; }
66 UINT64 getServerId() const { return m_serverId; }
67 const TCHAR *getName() const { return m_name; }
68 int getType() const { return (int)m_type; }
69 int getOrigin() const { return (int)m_origin; }
70 const uuid& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
71 UINT16 getSnmpPort() const { return m_snmpPort; }
72 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
73 UINT32 getPollingInterval() const { return (UINT32)m_pollingInterval; }
74 time_t getLastPollTime() { return m_lastPollTime; }
75
76 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
77
78 void updateAndSave(const DataCollectionItem *item);
79 void saveToDatabase(bool newObject);
80 void deleteFromDatabase();
81 void setLastPollTime(time_t time);
82
83 void startDataCollection() { m_busy = 1; incRefCount(); }
84 void finishDataCollection() { m_busy = 0; decRefCount(); }
85
86 UINT32 getTimeToNextPoll(time_t now) const
87 {
88 if (m_busy) // being polled now - time to next poll should not be less than full polling interval
89 return m_pollingInterval;
90 time_t diff = now - m_lastPollTime;
91 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
92 }
93 };
94
95 /**
96 * Create data collection item from NXCP mesage
97 */
98 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
99 {
100 m_serverId = serverId;
101 m_id = msg->getFieldAsInt32(baseId);
102 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
103 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
104 m_name = msg->getFieldAsString(baseId + 3);
105 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
106 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
107 m_snmpTargetGuid = msg->getFieldAsGUID(baseId + 6);
108 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
109 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
110 m_busy = 0;
111 }
112
113 /**
114 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
115 */
116 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
117 {
118 m_serverId = DBGetFieldInt64(hResult, row, 0);
119 m_id = DBGetFieldULong(hResult, row, 1);
120 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
121 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
122 m_name = DBGetField(hResult, row, 4, NULL, 0);
123 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
124 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
125 m_snmpPort = DBGetFieldULong(hResult, row, 7);
126 m_snmpTargetGuid = DBGetFieldGUID(hResult, row, 8);
127 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
128 m_busy = 0;
129 }
130
131 /**
132 * Copy constructor
133 */
134 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
135 {
136 m_serverId = item->m_serverId;
137 m_id = item->m_id;
138 m_type = item->m_type;
139 m_origin = item->m_origin;
140 m_name = _tcsdup(item->m_name);
141 m_pollingInterval = item->m_pollingInterval;
142 m_lastPollTime = item->m_lastPollTime;
143 m_snmpTargetGuid = item->m_snmpTargetGuid;
144 m_snmpPort = item->m_snmpPort;
145 m_snmpRawValueType = item->m_snmpRawValueType;
146 m_busy = 0;
147 }
148
149 /**
150 * Data collection item destructor
151 */
152 DataCollectionItem::~DataCollectionItem()
153 {
154 safe_free(m_name);
155 }
156
157 /**
158 * Will check if object has changed. If at least one field is changed - all data will be updated and
159 * saved to database.
160 */
161 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
162 {
163 // if at least one of fields changed - set all fields and save to DB
164 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
165 (m_pollingInterval != item->m_pollingInterval) || m_snmpTargetGuid.compare(item->m_snmpTargetGuid) ||
166 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType) || (m_lastPollTime < item->m_lastPollTime))
167 {
168 m_type = item->m_type;
169 m_origin = item->m_origin;
170 m_name = _tcsdup(item->m_name);
171 m_pollingInterval = item->m_pollingInterval;
172 if (m_lastPollTime < item->m_lastPollTime)
173 m_lastPollTime = item->m_lastPollTime;
174 m_snmpTargetGuid = item->m_snmpTargetGuid;
175 m_snmpPort = item->m_snmpPort;
176 m_snmpRawValueType = item->m_snmpRawValueType;
177 saveToDatabase(false);
178 }
179 }
180
181 /**
182 * Save configuration object to database
183 */
184 void DataCollectionItem::saveToDatabase(bool newObject)
185 {
186 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) saved to database"),
187 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
188 DB_HANDLE db = GetLocalDatabaseHandle();
189 DB_STATEMENT hStmt;
190
191 if (newObject)
192 {
193 hStmt = DBPrepare(db,
194 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
195 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
196 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
197 }
198 else
199 {
200 hStmt = DBPrepare(db,
201 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
202 _T("polling_interval=?,last_poll=?,snmp_port=?,")
203 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
204 }
205
206 if (hStmt == NULL)
207 return;
208
209 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
210 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
211 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
212 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
213 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
214 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
215 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, m_snmpTargetGuid);
216 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
217 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
218 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
219
220 DBExecute(hStmt);
221 DBFreeStatement(hStmt);
222 }
223
224 /**
225 * Remove item form database and delete not synced data if exist
226 */
227 void DataCollectionItem::deleteFromDatabase()
228 {
229 DB_HANDLE db = GetLocalDatabaseHandle();
230 TCHAR query[256];
231 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
232 if (DBQuery(db, query))
233 {
234 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"), m_serverId, m_id);
235 if (DBQuery(db, query))
236 {
237 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=") UINT64X_FMT(_T("016")) _T(",dciId=%d) removed from database"), m_serverId, m_id);
238 }
239 }
240 }
241
242 /**
243 * Set last poll time for item
244 */
245 void DataCollectionItem::setLastPollTime(time_t time)
246 {
247 m_lastPollTime = time;
248 s_pollTimeChanged = true;
249 }
250
251 /**
252 * Collected data
253 */
254 class DataElement
255 {
256 private:
257 UINT64 m_serverId;
258 UINT32 m_dciId;
259 time_t m_timestamp;
260 int m_origin;
261 int m_type;
262 uuid m_snmpNode;
263 union
264 {
265 TCHAR *item;
266 StringList *list;
267 Table *table;
268 } m_value;
269
270 public:
271 DataElement(DataCollectionItem *dci, const TCHAR *value)
272 {
273 m_serverId = dci->getServerId();
274 m_dciId = dci->getId();
275 m_timestamp = time(NULL);
276 m_origin = dci->getOrigin();
277 m_type = DCO_TYPE_ITEM;
278 m_snmpNode = dci->getSnmpTargetGuid();
279 m_value.item = _tcsdup(value);
280 }
281
282 DataElement(DataCollectionItem *dci, StringList *value)
283 {
284 m_serverId = dci->getServerId();
285 m_dciId = dci->getId();
286 m_timestamp = time(NULL);
287 m_origin = dci->getOrigin();
288 m_type = DCO_TYPE_LIST;
289 m_snmpNode = dci->getSnmpTargetGuid();
290 m_value.list = value;
291 }
292
293 DataElement(DataCollectionItem *dci, Table *value)
294 {
295 m_serverId = dci->getServerId();
296 m_dciId = dci->getId();
297 m_timestamp = time(NULL);
298 m_origin = dci->getOrigin();
299 m_type = DCO_TYPE_TABLE;
300 m_snmpNode = dci->getSnmpTargetGuid();
301 m_value.table = value;
302 }
303
304 /**
305 * Create data element from database record
306 * Expected field order: server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value
307 */
308 DataElement(DB_RESULT hResult, int row)
309 {
310 m_serverId = DBGetFieldUInt64(hResult, row, 0);
311 m_dciId = DBGetFieldULong(hResult, row, 1);
312 m_timestamp = (time_t)DBGetFieldInt64(hResult, row, 5);
313 m_origin = DBGetFieldLong(hResult, row, 3);
314 m_type = DBGetFieldLong(hResult, row, 2);
315 m_snmpNode = DBGetFieldGUID(hResult, row, 4);
316 switch(m_type)
317 {
318 case DCO_TYPE_ITEM:
319 m_value.item = DBGetField(hResult, row, 6, NULL, 0);
320 break;
321 case DCO_TYPE_LIST:
322 {
323 m_value.list = new StringList();
324 TCHAR *text = DBGetField(hResult, row, 6, NULL, 0);
325 if (text != NULL)
326 {
327 m_value.list->splitAndAdd(text, _T("\n"));
328 free(text);
329 }
330 }
331 break;
332 case DCO_TYPE_TABLE:
333 {
334 char *xml = DBGetFieldUTF8(hResult, row, 6, NULL, 0);
335 if (xml != NULL)
336 {
337 m_value.table = Table::createFromXML(xml);
338 free(xml);
339 }
340 else
341 {
342 m_value.table = NULL;
343 }
344 }
345 break;
346 default:
347 m_type = DCO_TYPE_ITEM;
348 m_value.item = _tcsdup(_T(""));
349 break;
350 }
351 }
352
353 ~DataElement()
354 {
355 switch(m_type)
356 {
357 case DCO_TYPE_ITEM:
358 free(m_value.item);
359 break;
360 case DCO_TYPE_LIST:
361 delete m_value.list;
362 break;
363 case DCO_TYPE_TABLE:
364 delete m_value.table;
365 break;
366 }
367 }
368
369 time_t getTimestamp() { return m_timestamp; }
370 UINT64 getServerId() { return m_serverId; }
371 UINT32 getDciId() { return m_dciId; }
372 int getType() { return m_type; }
373
374 void saveToDatabase(DB_STATEMENT hStmt);
375 bool sendToServer(bool reconcillation);
376 void fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId);
377 };
378
379 /**
380 * Save data element to database
381 */
382 void DataElement::saveToDatabase(DB_STATEMENT hStmt)
383 {
384 DBBind(hStmt, 1, DB_SQLTYPE_BIGINT, m_serverId);
385 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_dciId);
386 DBBind(hStmt, 3, DB_SQLTYPE_INTEGER, (LONG)m_type);
387 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_origin);
388 DBBind(hStmt, 5, DB_SQLTYPE_VARCHAR, m_snmpNode);
389 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_timestamp);
390 switch(m_type)
391 {
392 case DCO_TYPE_ITEM:
393 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.item, DB_BIND_STATIC);
394 break;
395 case DCO_TYPE_LIST:
396 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.list->join(_T("\n")), DB_BIND_DYNAMIC);
397 break;
398 case DCO_TYPE_TABLE:
399 DBBind(hStmt, 7, DB_SQLTYPE_TEXT, m_value.table->createXML(), DB_BIND_DYNAMIC);
400 break;
401 }
402 DBExecute(hStmt);
403 }
404
405 /**
406 * Session comparator
407 */
408 static bool SessionComparator_Sender(AbstractCommSession *session, void *data)
409 {
410 return (session->getServerId() == *((INT64 *)data)) && session->canAcceptData();
411 }
412
413 /**
414 * Send collected data to server
415 */
416 bool DataElement::sendToServer(bool reconciliation)
417 {
418 // If data in database was invalid table may not be parsed correctly
419 // Consider sending a success in that case so data element can be dropped
420 if ((m_type == DCO_TYPE_TABLE) && (m_value.table == NULL))
421 return true;
422
423 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Sender, &m_serverId);
424 if (session == NULL)
425 return false;
426
427 NXCPMessage msg;
428 msg.setCode(CMD_DCI_DATA);
429 msg.setId(session->generateRequestId());
430 msg.setField(VID_DCI_ID, m_dciId);
431 msg.setField(VID_DCI_SOURCE_TYPE, (INT16)m_origin);
432 msg.setField(VID_DCOBJECT_TYPE, (INT16)m_type);
433 msg.setField(VID_NODE_ID, m_snmpNode);
434 msg.setFieldFromTime(VID_TIMESTAMP, m_timestamp);
435 msg.setField(VID_RECONCILIATION, (INT16)(reconciliation ? 1 : 0));
436 switch(m_type)
437 {
438 case DCO_TYPE_ITEM:
439 msg.setField(VID_VALUE, m_value.item);
440 break;
441 case DCO_TYPE_LIST:
442 m_value.list->fillMessage(&msg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
443 break;
444 case DCO_TYPE_TABLE:
445 m_value.table->setSource(m_origin);
446 m_value.table->fillMessage(msg, 0, -1);
447 break;
448 }
449 UINT32 rcc = session->doRequest(&msg, 2000);
450 session->decRefCount();
451
452 // consider internal error as success because it means that server
453 // cannot accept data for some reason and retry is not feasible
454 return (rcc == ERR_SUCCESS) || (rcc == ERR_INTERNAL_ERROR);
455 }
456
457 /**
458 * Fill bulk reconciliation message with DCI data
459 */
460 void DataElement::fillReconciliationMessage(NXCPMessage *msg, UINT32 baseId)
461 {
462 msg->setField(baseId, m_dciId);
463 msg->setField(baseId + 1, (INT16)m_origin);
464 msg->setField(baseId + 2, (INT16)m_type);
465 msg->setField(baseId + 3, m_snmpNode);
466 msg->setFieldFromTime(baseId + 4, m_timestamp);
467 msg->setField(baseId + 5, m_value.item);
468 }
469
470 /**
471 * Server data sync status object
472 */
473 struct ServerSyncStatus
474 {
475 INT32 queueSize;
476
477 ServerSyncStatus()
478 {
479 queueSize = 0;
480 }
481 };
482
483 /**
484 * Server sync status information
485 */
486 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
487 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
488
489 /**
490 * Database writer queue
491 */
492 static Queue s_databaseWriterQueue;
493
494 /**
495 * Database writer
496 */
497 static THREAD_RESULT THREAD_CALL DatabaseWriter(void *arg)
498 {
499 DB_HANDLE hdb = GetLocalDatabaseHandle();
500 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread started"));
501
502 while(true)
503 {
504 DataElement *e = (DataElement *)s_databaseWriterQueue.getOrBlock();
505 if (e == INVALID_POINTER_VALUE)
506 break;
507
508 DB_STATEMENT hStmt= DBPrepare(hdb, _T("INSERT INTO dc_queue (server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value) VALUES (?,?,?,?,?,?,?)"));
509 if (hStmt == NULL)
510 {
511 delete e;
512 continue;
513 }
514
515 int count = 0;
516
517 DBBegin(hdb);
518 while((e != NULL) && (e != INVALID_POINTER_VALUE))
519 {
520 e->saveToDatabase(hStmt);
521 delete e;
522
523 count++;
524 if (count > 200)
525 break;
526
527 e = (DataElement *)s_databaseWriterQueue.getOrBlock(500); // Wait up to 500 ms for next data block
528 }
529 DBCommit(hdb);
530 DBFreeStatement(hStmt);
531 DebugPrintf(INVALID_INDEX, 7, _T("Database writer: %d records inserted"), count);
532 }
533
534 DebugPrintf(INVALID_INDEX, 1, _T("Database writer thread stopped"));
535 return THREAD_OK;
536 }
537
538 /**
539 * List of all data collection items
540 */
541 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
542 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
543
544 /**
545 * Session comparator
546 */
547 static bool SessionComparator_Reconciliation(AbstractCommSession *session, void *data)
548 {
549 if ((session->getServerId() == 0) || !session->canAcceptData())
550 return false;
551 ServerSyncStatus *s = s_serverSyncStatus.get(session->getServerId());
552 if ((s != NULL) && (s->queueSize > 0))
553 {
554 return true;
555 }
556 return false;
557 }
558
559 /**
560 * Data reconciliation thread
561 */
562 static THREAD_RESULT THREAD_CALL ReconciliationThread(void *arg)
563 {
564 DB_HANDLE hdb = GetLocalDatabaseHandle();
565 UINT32 sleepTime = 30000;
566 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread started"));
567
568 bool vacuumNeeded = false;
569 while(!AgentSleepAndCheckForShutdown(sleepTime))
570 {
571 // Check if there is something to sync
572 MutexLock(s_serverSyncStatusLock);
573 CommSession *session = (CommSession *)FindServerSession(SessionComparator_Reconciliation, NULL);
574 MutexUnlock(s_serverSyncStatusLock);
575 if (session == NULL)
576 {
577 // Save last poll times when reconciliation thread is idle
578 MutexLock(s_itemLock);
579 if (s_pollTimeChanged)
580 {
581 DBBegin(hdb);
582 TCHAR query[256];
583 for(int i = 0; i < s_items.size(); i++)
584 {
585 DataCollectionItem *dci = s_items.get(i);
586 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
587 dci->getLastPollTime(), dci->getServerId(), dci->getId());
588 DBQuery(hdb, query);
589 }
590 DBCommit(hdb);
591 s_pollTimeChanged = false;
592 }
593 MutexUnlock(s_itemLock);
594
595 if (vacuumNeeded)
596 {
597 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: vacuum local database"));
598 DBQuery(hdb, _T("VACUUM"));
599 vacuumNeeded = false;
600 }
601 sleepTime = 30000;
602 continue;
603 }
604
605 TCHAR query[1024];
606 _sntprintf(query, 1024, _T("SELECT server_id,dci_id,dci_type,dci_origin,snmp_target_guid,timestamp,value FROM dc_queue WHERE server_id=") UINT64_FMT _T(" ORDER BY timestamp LIMIT %d"), session->getServerId(), BULK_DATA_BLOCK_SIZE);
607
608 TCHAR sqlError[DBDRV_MAX_ERROR_TEXT];
609 DB_RESULT hResult = DBSelectEx(hdb, query, sqlError);
610 if (hResult == NULL)
611 {
612 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: database query failed: %s"), sqlError);
613 sleepTime = 30000;
614 session->decRefCount();
615 continue;
616 }
617
618 int count = DBGetNumRows(hResult);
619 if (count > 0)
620 {
621 ObjectArray<DataElement> bulkSendList(count, 10, true);
622 ObjectArray<DataElement> deleteList(count, 10, true);
623 for(int i = 0; i < count; i++)
624 {
625 DataElement *e = new DataElement(hResult, i);
626 if ((e->getType() == DCO_TYPE_ITEM) && session->isBulkReconciliationSupported())
627 {
628 bulkSendList.add(e);
629 }
630 else
631 {
632 MutexLock(s_serverSyncStatusLock);
633 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
634 if (status != NULL)
635 {
636 if (e->sendToServer(true))
637 {
638 status->queueSize--;
639 deleteList.add(e);
640 }
641 else
642 {
643 delete e;
644 }
645 }
646 else
647 {
648 DebugPrintf(INVALID_INDEX, 5, _T("INTERNAL ERROR: cached DCI value without server sync status object"));
649 deleteList.add(e); // record should be deleted
650 }
651 MutexUnlock(s_serverSyncStatusLock);
652 }
653 }
654
655 if (bulkSendList.size() > 0)
656 {
657 DebugPrintf(INVALID_INDEX, 6, _T("ReconciliationThread: %d records to be sent in bulk mode"), bulkSendList.size());
658
659 NXCPMessage msg;
660 msg.setCode(CMD_DCI_DATA);
661 msg.setId(session->generateRequestId());
662 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
663 msg.setField(VID_NUM_ELEMENTS, (INT16)bulkSendList.size());
664
665 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
666 for(int i = 0; i < bulkSendList.size(); i++)
667 {
668 bulkSendList.get(i)->fillReconciliationMessage(&msg, fieldId);
669 fieldId += 10;
670 }
671
672 NXCPMessage *response = session->doRequestEx(&msg, 2000);
673 if (response != NULL)
674 {
675 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
676 if (rcc == ERR_SUCCESS)
677 {
678 MutexLock(s_serverSyncStatusLock);
679 ServerSyncStatus *serverSyncStatus = s_serverSyncStatus.get(session->getServerId());
680
681 // Check status for each data element
682 BYTE status[BULK_DATA_BLOCK_SIZE];
683 memset(status, 0, BULK_DATA_BLOCK_SIZE);
684 response->getFieldAsBinary(VID_STATUS, status, BULK_DATA_BLOCK_SIZE);
685 bulkSendList.setOwner(false);
686 for(int i = 0; i < bulkSendList.size(); i++)
687 {
688 DataElement *e = bulkSendList.get(i);
689 if (status[i] != BULK_DATA_REC_RETRY)
690 {
691 deleteList.add(e);
692 serverSyncStatus->queueSize--;
693 }
694 else
695 {
696 delete e;
697 }
698 }
699
700 MutexUnlock(s_serverSyncStatusLock);
701 }
702 else
703 {
704 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: bulk send failed (%d)"), rcc);
705 }
706 delete response;
707 }
708 else
709 {
710 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: timeout on bulk send"));
711 }
712 }
713
714 if (deleteList.size() > 0)
715 {
716 DBBegin(hdb);
717 for(int i = 0; i < deleteList.size(); i++)
718 {
719 DataElement *e = deleteList.get(i);
720 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d AND timestamp=") INT64_FMT,
721 e->getServerId(), e->getDciId(), (INT64)e->getTimestamp());
722 DBQuery(hdb, query);
723 }
724 DBCommit(hdb);
725 DebugPrintf(INVALID_INDEX, 4, _T("ReconciliationThread: %d records sent"), deleteList.size());
726 vacuumNeeded = true;
727 }
728 }
729 DBFreeResult(hResult);
730
731 session->decRefCount();
732 sleepTime = (count > 0) ? 50 : 30000;
733 }
734
735 DebugPrintf(INVALID_INDEX, 1, _T("Data reconciliation thread stopped"));
736 return THREAD_OK;
737 }
738
739 /**
740 * Data sender queue
741 */
742 static Queue s_dataSenderQueue;
743
744 /**
745 * Data sender
746 */
747 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
748 {
749 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
750 while(true)
751 {
752 DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
753 if (e == INVALID_POINTER_VALUE)
754 break;
755
756 MutexLock(s_serverSyncStatusLock);
757 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
758 if (status == NULL)
759 {
760 status = new ServerSyncStatus();
761 s_serverSyncStatus.set(e->getServerId(), status);
762 }
763
764 if (status->queueSize == 0)
765 {
766 if (!e->sendToServer(false))
767 {
768 status->queueSize++;
769 s_databaseWriterQueue.put(e);
770 e = NULL;
771 }
772 }
773 else
774 {
775 status->queueSize++;
776 s_databaseWriterQueue.put(e);
777 e = NULL;
778 }
779 MutexUnlock(s_serverSyncStatusLock);
780
781 delete e;
782 }
783 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
784 return THREAD_OK;
785 }
786
787 /**
788 * Pseudo-session for cached data collection
789 */
790 class VirtualSession : public AbstractCommSession
791 {
792 private:
793 UINT64 m_serverId;
794
795 public:
796 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
797
798 virtual bool isMasterServer() { return false; }
799 virtual bool isControlServer() { return false; }
800 virtual bool canAcceptData() { return true; }
801 virtual bool canAcceptTraps() { return true; }
802 virtual bool canAcceptFileUpdates() { return false; }
803 virtual UINT64 getServerId() { return m_serverId; };
804 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
805
806 virtual bool isIPv6Aware() { return true; }
807
808 virtual void sendMessage(NXCPMessage *pMsg) { }
809 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
810 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
811 virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout) { return RCC_NOT_IMPLEMENTED; }
812 virtual NXCPMessage *doRequestEx(NXCPMessage *msg, UINT32 timeout) { return NULL; }
813 virtual UINT32 generateRequestId() { return 0; }
814 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
815 };
816
817 /**
818 * Collect data from agent
819 */
820 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
821 {
822 VirtualSession session(dci->getServerId());
823
824 DataElement *e = NULL;
825 if (dci->getType() == DCO_TYPE_ITEM)
826 {
827 TCHAR value[MAX_RESULT_LENGTH];
828 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
829 e = new DataElement(dci, value);
830 }
831 else if (dci->getType() == DCO_TYPE_LIST)
832 {
833 StringList *value = new StringList;
834 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
835 e = new DataElement(dci, value);
836 }
837 else if (dci->getType() == DCO_TYPE_TABLE)
838 {
839 Table *value = new Table;
840 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
841 e = new DataElement(dci, value);
842 }
843
844 return e;
845 }
846
847 /**
848 * Collect data from SNMP
849 */
850 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
851 {
852 DataElement *e = NULL;
853 if (dci->getType() == DCO_TYPE_ITEM)
854 {
855 DebugPrintf(INVALID_INDEX, 8, _T("Read SNMP parameter %s"), dci->getName());
856
857 TCHAR value[MAX_RESULT_LENGTH];
858 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
859 e = new DataElement(dci, value);
860 }
861 return e;
862 }
863
864 /**
865 * Local data collection callback
866 */
867 static void LocalDataCollectionCallback(void *arg)
868 {
869 DataCollectionItem *dci = (DataCollectionItem *)arg;
870
871 DataElement *e = CollectDataFromAgent(dci);
872 if (e != NULL)
873 {
874 s_dataSenderQueue.put(e);
875 }
876 else
877 {
878 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
879 }
880
881 dci->setLastPollTime(time(NULL));
882 dci->finishDataCollection();
883 }
884
885 /**
886 * SNMP data collection callback
887 */
888 static void SnmpDataCollectionCallback(void *arg)
889 {
890 DataCollectionItem *dci = (DataCollectionItem *)arg;
891
892 DataElement *e = CollectDataFromSNMP(dci);
893 if (e != NULL)
894 {
895 s_dataSenderQueue.put(e);
896 }
897 else
898 {
899 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
900 }
901
902 dci->setLastPollTime(time(NULL));
903 dci->finishDataCollection();
904 }
905
906 /**
907 * Data collectors thread pool
908 */
909 static ThreadPool *s_dataCollectorPool = NULL;
910
911 /**
912 * Single data collection scheduler run - schedule data collection if needed and calculate sleep time
913 */
914 static UINT32 DataCollectionSchedulerRun()
915 {
916 UINT32 sleepTime = 60;
917
918 MutexLock(s_itemLock);
919 for(int i = 0; i < s_items.size(); i++)
920 {
921 DataCollectionItem *dci = s_items.get(i);
922 time_t now = time(NULL);
923 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
924 if (timeToPoll == 0)
925 {
926 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
927
928 if (dci->getOrigin() == DS_NATIVE_AGENT)
929 {
930 dci->startDataCollection();
931 ThreadPoolExecute(s_dataCollectorPool, LocalDataCollectionCallback, dci);
932 }
933 else if (dci->getOrigin() == DS_SNMP_AGENT)
934 {
935 dci->startDataCollection();
936 TCHAR key[64];
937 ThreadPoolExecuteSerialized(s_dataCollectorPool, dci->getSnmpTargetGuid().toString(key), SnmpDataCollectionCallback, dci);
938 }
939 else
940 {
941 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
942 }
943
944 timeToPoll = dci->getPollingInterval();
945 }
946
947 if (sleepTime > timeToPoll)
948 sleepTime = timeToPoll;
949 }
950 MutexUnlock(s_itemLock);
951 return sleepTime;
952 }
953
954 /**
955 * Data collection scheduler thread
956 */
957 static THREAD_RESULT THREAD_CALL DataCollectionScheduler(void *arg)
958 {
959 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread started"));
960 s_dataCollectorPool = ThreadPoolCreate(1, 64, _T("DATACOLL"));
961
962 UINT32 sleepTime = DataCollectionSchedulerRun();
963 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
964 {
965 sleepTime = DataCollectionSchedulerRun();
966 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
967 }
968
969 ThreadPoolDestroy(s_dataCollectorPool);
970 DebugPrintf(INVALID_INDEX, 1, _T("Data collection scheduler thread stopped"));
971 return THREAD_OK;
972 }
973
974 /**
975 * Configure data collection
976 */
977 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
978 {
979 if (!s_dataCollectorStarted)
980 {
981 DebugPrintf(INVALID_INDEX, 1, _T("Local data collector was not started, ignoring configuration received from server ") UINT64X_FMT(_T("016")), serverId);
982 return;
983 }
984
985 DB_HANDLE hdb = GetLocalDatabaseHandle();
986
987 int count = msg->getFieldAsInt32(VID_NUM_NODES);
988 if (count > 0)
989 {
990 DBBegin(hdb);
991 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
992 for(int i = 0; i < count; i++)
993 {
994 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
995 UpdateSnmpTarget(target);
996 fieldId += 50;
997 }
998 DBCommit(hdb);
999 }
1000 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
1001
1002 ObjectArray<DataCollectionItem> config(32, 32, true);
1003
1004 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
1005 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
1006 for(int i = 0; i < count; i++)
1007 {
1008 config.add(new DataCollectionItem(serverId, msg, fieldId));
1009 fieldId += 10;
1010 }
1011 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
1012
1013 bool txnOpen = false;
1014
1015 MutexLock(s_itemLock);
1016
1017 // Update and add new
1018 for(int j = 0; j < config.size(); j++)
1019 {
1020 DataCollectionItem *item = config.get(j);
1021 bool exist = false;
1022 for(int i = 0; i < s_items.size(); i++)
1023 {
1024 if (item->equals(s_items.get(i)))
1025 {
1026 s_items.get(i)->updateAndSave(item);
1027 exist = true;
1028 }
1029 }
1030 if (!exist)
1031 {
1032 DataCollectionItem *newItem = new DataCollectionItem(item);
1033 s_items.add(newItem);
1034 if (!txnOpen)
1035 {
1036 DBBegin(hdb);
1037 txnOpen = true;
1038 }
1039 newItem->saveToDatabase(true);
1040 }
1041 }
1042
1043 // Remove not existing configuration and data for it
1044 for(int i = 0; i < s_items.size(); i++)
1045 {
1046 DataCollectionItem *item = s_items.get(i);
1047 //If item is from other server, then, do not search it in list of this server
1048 if(item->getServerId() != serverId)
1049 continue;
1050 bool exist = false;
1051 for(int j = 0; j < config.size(); j++)
1052 {
1053 if (item->equals(config.get(j)))
1054 {
1055 exist = true;
1056 }
1057 }
1058 if (!exist)
1059 {
1060 if (!txnOpen)
1061 {
1062 DBBegin(hdb);
1063 txnOpen = true;
1064 }
1065 item->deleteFromDatabase();
1066 s_items.unlink(i);
1067 item->decRefCount();
1068 i--;
1069 }
1070 }
1071
1072 if (txnOpen)
1073 DBCommit(hdb);
1074
1075 MutexUnlock(s_itemLock);
1076
1077 DebugPrintf(INVALID_INDEX, 4, _T("Data collection for server ") UINT64X_FMT(_T("016")) _T(" reconfigured"), serverId);
1078 }
1079
1080 /**
1081 * Load saved state of local data collection
1082 */
1083 static void LoadState()
1084 {
1085 DB_HANDLE hdb = GetLocalDatabaseHandle();
1086 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
1087 if (hResult != NULL)
1088 {
1089 int count = DBGetNumRows(hResult);
1090 for(int i = 0; i < count; i++)
1091 {
1092 s_items.add(new DataCollectionItem(hResult, i));
1093 }
1094 DBFreeResult(hResult);
1095 }
1096
1097 hResult = DBSelect(hdb, _T("SELECT guid,server_id,ip_address,snmp_version,port,auth_type,enc_type,auth_name,auth_pass,enc_pass FROM dc_snmp_targets"));
1098 if (hResult != NULL)
1099 {
1100 int count = DBGetNumRows(hResult);
1101 for(int i = 0; i < count; i++)
1102 {
1103 SNMPTarget *t = new SNMPTarget(hResult, i);
1104 UpdateSnmpTarget(t);
1105 }
1106 DBFreeResult(hResult);
1107 }
1108
1109 hResult = DBSelect(hdb, _T("SELECT server_id,count(*) FROM dc_queue GROUP BY server_id"));
1110 if (hResult != NULL)
1111 {
1112 int count = DBGetNumRows(hResult);
1113 for(int i = 0; i < count; i++)
1114 {
1115 ServerSyncStatus *s = new ServerSyncStatus;
1116 s->queueSize = DBGetFieldLong(hResult, i, 1);
1117 UINT64 serverId = DBGetFieldUInt64(hResult, i, 0);
1118 s_serverSyncStatus.set(serverId, s);
1119 DebugPrintf(INVALID_INDEX, 2, _T("%d elements in queue for server ID ") UINT64X_FMT(_T("016")), s->queueSize, serverId);
1120 }
1121 DBFreeResult(hResult);
1122 }
1123 }
1124
1125 /**
1126 * Data collector and sender thread handles
1127 */
1128 static THREAD s_dataCollectionSchedulerThread = INVALID_THREAD_HANDLE;
1129 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
1130 static THREAD s_databaseWriterThread = INVALID_THREAD_HANDLE;
1131 static THREAD s_reconciliationThread = INVALID_THREAD_HANDLE;
1132
1133 /**
1134 * Initialize and start local data collector
1135 */
1136 void StartLocalDataCollector()
1137 {
1138 DB_HANDLE db = GetLocalDatabaseHandle();
1139 if (db == NULL)
1140 {
1141 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
1142 return;
1143 }
1144
1145 s_itemLock = MutexCreate();
1146 s_serverSyncStatusLock = MutexCreate();
1147
1148 LoadState();
1149
1150 s_dataCollectionSchedulerThread = ThreadCreateEx(DataCollectionScheduler, 0, NULL);
1151 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
1152 s_databaseWriterThread = ThreadCreateEx(DatabaseWriter, 0, NULL);
1153 s_reconciliationThread = ThreadCreateEx(ReconciliationThread, 0, NULL);
1154
1155 s_dataCollectorStarted = true;
1156 }
1157
1158 /**
1159 * Shutdown local data collector
1160 */
1161 void ShutdownLocalDataCollector()
1162 {
1163 if (!s_dataCollectorStarted)
1164 {
1165 DebugPrintf(INVALID_INDEX, 5, _T("Local data collector was not started"));
1166 return;
1167 }
1168
1169 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
1170 ThreadJoin(s_dataCollectionSchedulerThread);
1171
1172 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
1173 s_dataSenderQueue.put(INVALID_POINTER_VALUE);
1174 ThreadJoin(s_dataSenderThread);
1175
1176 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for database writer thread termination"));
1177 s_databaseWriterQueue.put(INVALID_POINTER_VALUE);
1178 ThreadJoin(s_databaseWriterThread);
1179
1180 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconciliation thread termination"));
1181 ThreadJoin(s_reconciliationThread);
1182
1183 MutexDestroy(s_itemLock);
1184 MutexDestroy(s_serverSyncStatusLock);
1185 }
1186
1187 /**
1188 * Clear data collection configuration
1189 */
1190 void ClearDataCollectionConfiguration()
1191 {
1192 MutexLock(s_itemLock);
1193 DB_HANDLE db = GetLocalDatabaseHandle();
1194 DBQuery(db, _T("DELETE FROM dc_queue"));
1195 DBQuery(db, _T("DELETE FROM dc_config"));
1196 DBQuery(db, _T("DELETE FROM dc_snmp_targets"));
1197 s_items.clear();
1198 MutexUnlock(s_itemLock);
1199
1200 MutexLock(s_serverSyncStatusLock);
1201 s_serverSyncStatus.clear();
1202 MutexUnlock(s_serverSyncStatusLock);
1203 }
1204
1205 /**
1206 * Handler for data collector queue size
1207 */
1208 LONG H_DataCollectorQueueSize(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
1209 {
1210 if (!s_dataCollectorStarted)
1211 return SYSINFO_RC_UNSUPPORTED;
1212
1213 UINT32 count = 0;
1214 MutexLock(s_serverSyncStatusLock);
1215 Iterator<ServerSyncStatus> *it = s_serverSyncStatus.iterator();
1216 while(it->hasNext())
1217 count += (UINT32)it->next()->queueSize;
1218 delete it;
1219 MutexUnlock(s_serverSyncStatusLock);
1220
1221 ret_uint(value, count);
1222 return SYSINFO_RC_SUCCESS;
1223 }