implemented SNMP target sync with agent for cached DCIs; GUID functions refactoring...
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UpdateSnmpTarget(SNMPTarget *target);
29 bool GetSnmpValue(const uuid_t& target, UINT16 port, const TCHAR *oid, TCHAR *value, int interpretRawValue);
30
31 /**
32 * Database schema version
33 */
34 #define DATACOLL_SCHEMA_VERSION 3
35
36 /**
37 * Data collection item
38 */
39 class DataCollectionItem : public RefCountObject
40 {
41 private:
42 UINT64 m_serverId;
43 UINT32 m_id;
44 INT32 m_pollingInterval;
45 TCHAR *m_name;
46 BYTE m_type;
47 BYTE m_origin;
48 UINT16 m_snmpPort;
49 BYTE m_snmpRawValueType;
50 uuid_t m_snmpTargetGuid;
51 time_t m_lastPollTime;
52
53 public:
54 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
55 DataCollectionItem(DB_RESULT hResult, int row);
56 DataCollectionItem(const DataCollectionItem *item);
57 virtual ~DataCollectionItem();
58
59 UINT32 getId() const { return m_id; }
60 UINT64 getServerId() const { return m_serverId; }
61 const TCHAR *getName() const { return m_name; }
62 int getType() const { return (int)m_type; }
63 int getOrigin() const { return (int)m_origin; }
64 const uuid_t& getSnmpTargetGuid() const { return m_snmpTargetGuid; }
65 UINT16 getSnmpPort() const { return m_snmpPort; }
66 int getSnmpRawValueType() const { return (int)m_snmpRawValueType; }
67
68 bool equals(const DataCollectionItem *item) const { return (m_serverId == item->m_serverId) && (m_id == item->m_id); }
69
70 void updateAndSave(const DataCollectionItem *item);
71 void saveToDatabase(bool newObject);
72 void deleteFromDatabase();
73 void setLastPollTime(time_t time);
74
75 UINT32 getTimeToNextPoll(time_t now) const
76 {
77 time_t diff = now - m_lastPollTime;
78 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
79 }
80 };
81
82 /**
83 * Create data collection item from NXCP mesage
84 */
85 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
86 {
87 m_serverId = serverId;
88 m_id = msg->getFieldAsInt32(baseId);
89 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
90 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
91 m_name = msg->getFieldAsString(baseId + 3);
92 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
93 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
94 msg->getFieldAsBinary(baseId + 6, m_snmpTargetGuid, UUID_LENGTH);
95 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
96 m_snmpRawValueType = (BYTE)msg->getFieldAsUInt16(baseId + 8);
97 }
98
99 /**
100 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type
101 */
102 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
103 {
104 m_serverId = DBGetFieldInt64(hResult, row, 0);
105 m_id = DBGetFieldULong(hResult, row, 1);
106 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
107 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
108 m_name = (TCHAR *)malloc(sizeof(TCHAR) * 1025);
109 DBGetField(hResult, row, 4, m_name, 1025);
110 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
111 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
112 DBGetFieldGUID(hResult, row, 7, m_snmpTargetGuid);
113 m_snmpPort = DBGetFieldULong(hResult, row, 8);
114 m_snmpRawValueType = (BYTE)DBGetFieldULong(hResult, row, 9);
115 }
116
117 /**
118 * Copy constructor
119 */
120 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
121 {
122 m_serverId = item->m_serverId;
123 m_id = item->m_id;
124 m_type = item->m_type;
125 m_origin = item->m_origin;
126 m_name = _tcsdup(item->m_name);
127 m_pollingInterval = item->m_pollingInterval;
128 m_lastPollTime = item->m_lastPollTime;
129 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
130 m_snmpPort = item->m_snmpPort;
131 m_snmpRawValueType = item->m_snmpRawValueType;
132 }
133
134 /**
135 * Data collection item destructor
136 */
137 DataCollectionItem::~DataCollectionItem()
138 {
139 safe_free(m_name);
140 }
141
142 /**
143 * Will check if object has changed. If at least one field is changed - all data will be updated and
144 * saved to database.
145 */
146 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
147 {
148 //if at leas one of fields changed - set all fields and save to DB
149 if ((m_type != item->m_type) || (m_origin != item->m_origin) || _tcscmp(m_name, item->m_name) ||
150 (m_pollingInterval != item->m_pollingInterval) || uuid_compare(m_snmpTargetGuid, item->m_snmpTargetGuid) ||
151 (m_snmpPort != item->m_snmpPort) || (m_snmpRawValueType != item->m_snmpRawValueType))
152 {
153 m_type = item->m_type;
154 m_origin = item->m_origin;
155 m_name = _tcsdup(item->m_name);
156 m_pollingInterval = item->m_pollingInterval;
157 m_lastPollTime = item->m_lastPollTime;
158 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
159 m_snmpPort = item->m_snmpPort;
160 m_snmpRawValueType = item->m_snmpRawValueType;
161 saveToDatabase(false);
162 }
163 }
164
165 /**
166 * Save configuration object to database
167 */
168 void DataCollectionItem::saveToDatabase(bool newObject)
169 {
170 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=%ld,dciId=%d) saved to database"),
171 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
172 DB_HANDLE db = GetLocalDatabaseHandle();
173 DB_STATEMENT hStmt;
174
175 if (newObject)
176 {
177 hStmt = DBPrepare(db,
178 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
179 _T("last_poll,snmp_port,snmp_target_guid,snmp_raw_type,server_id,dci_id)")
180 _T("VALUES (?,?,?,?,?,?,?,?,?,?)"));
181 }
182 else
183 {
184 hStmt = DBPrepare(db,
185 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
186 _T("polling_interval=?,last_poll=?,snmp_port=?,")
187 _T("snmp_target_guid=?,snmp_raw_type=? WHERE server_id=? AND dci_id=?"));
188 }
189
190 if (hStmt == NULL)
191 return;
192
193 TCHAR buffer[64];
194 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
195 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
196 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
197 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
198 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
199 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
200 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpTargetGuid, buffer), DB_BIND_STATIC);
201 DBBind(hStmt, 8, DB_SQLTYPE_INTEGER, (LONG)m_snmpRawValueType);
202 DBBind(hStmt, 9, DB_SQLTYPE_BIGINT, m_serverId);
203 DBBind(hStmt, 10, DB_SQLTYPE_INTEGER, (LONG)m_id);
204
205 DBExecute(hStmt);
206 DBFreeStatement(hStmt);
207 }
208
209 /**
210 * Remove item form database and delete not synced data if exist
211 */
212 void DataCollectionItem::deleteFromDatabase()
213 {
214 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=%ld,dciId=%d) removed from database"),
215 m_serverId, m_id);
216 DB_HANDLE db = GetLocalDatabaseHandle();
217 TCHAR query[256];
218 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
219 if(!DBQuery(db, query))
220 {
221 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_config database table"),
222 m_serverId, m_id);
223 }
224 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
225 if(!DBQuery(db, query))
226 {
227 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_queue database table"),
228 m_serverId, m_id);
229 }
230 }
231
232 /**
233 * Set last poll time for item
234 */
235 void DataCollectionItem::setLastPollTime(time_t time)
236 {
237 m_lastPollTime = time;
238 TCHAR query[256];
239 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
240 (UINT64)m_lastPollTime, m_serverId, m_id);
241 DBQuery(GetLocalDatabaseHandle(), query);
242 }
243
244 /**
245 * Collected data
246 */
247 class DataElement
248 {
249 private:
250 UINT64 m_serverId;
251 UINT32 m_dciId;
252 time_t m_timestamp;
253 int m_type;
254 union
255 {
256 TCHAR *item;
257 StringList *list;
258 Table *table;
259 } m_value;
260
261 public:
262 DataElement(DataCollectionItem *dci, const TCHAR *value)
263 {
264 m_serverId = dci->getServerId();
265 m_dciId = dci->getId();
266 m_timestamp = time(NULL);
267 m_type = DCO_TYPE_ITEM;
268 m_value.item = _tcsdup(value);
269 }
270
271 DataElement(DataCollectionItem *dci, StringList *value)
272 {
273 m_serverId = dci->getServerId();
274 m_dciId = dci->getId();
275 m_timestamp = time(NULL);
276 m_type = DCO_TYPE_LIST;
277 m_value.list = value;
278 }
279
280 DataElement(DataCollectionItem *dci, Table *value)
281 {
282 m_serverId = dci->getServerId();
283 m_dciId = dci->getId();
284 m_timestamp = time(NULL);
285 m_type = DCO_TYPE_TABLE;
286 m_value.table = value;
287 }
288
289 ~DataElement()
290 {
291 switch(m_type)
292 {
293 case DCO_TYPE_ITEM:
294 free(m_value.item);
295 break;
296 case DCO_TYPE_LIST:
297 delete m_value.list;
298 break;
299 case DCO_TYPE_TABLE:
300 delete m_value.table;
301 break;
302 }
303 }
304
305 time_t getTimestamp() { return m_timestamp; }
306 UINT64 getServerId() { return m_serverId; }
307
308 void saveToDatabase();
309 bool sendToServer();
310 };
311
312 /**
313 * Save data element to database
314 */
315 void DataElement::saveToDatabase()
316 {
317 /* TODO: implement */
318 }
319
320 /**
321 * Send collected data to server
322 */
323 bool DataElement::sendToServer()
324 {
325 /* TODO: implement */
326 return true;
327 }
328
329 /**
330 * Server data sync status object
331 */
332 struct ServerSyncStatus
333 {
334 INT32 queueSize;
335
336 ServerSyncStatus()
337 {
338 queueSize = 0;
339 }
340 };
341
342 /**
343 * Server sync status information
344 */
345 static HashMap<UINT64, ServerSyncStatus> s_serverSyncStatus(true);
346 static MUTEX s_serverSyncStatusLock = INVALID_MUTEX_HANDLE;
347
348 /**
349 * Data reconcillation thread
350 */
351 static THREAD_RESULT THREAD_CALL ReconcillationThread(void *arg)
352 {
353 DB_HANDLE hdb = GetLocalDatabaseHandle();
354 UINT32 sleepTime = 60000;
355 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread started"));
356
357 while(!AgentSleepAndCheckForShutdown(sleepTime))
358 {
359 DB_RESULT hResult = DBSelect(hdb, _T("SELECT server_id,dci_id,dci_type,timestamp,value FROM dc_queue ORDER BY timestamp DESC LIMIT 100"));
360 if (hResult == NULL)
361 continue;
362
363 int count = DBGetNumRows(hResult);
364 for(int i = 0; i < count; i++)
365 {
366 }
367 DBFreeResult(hResult);
368
369 sleepTime = (count == 100) ? 1000 : 60000;
370 }
371
372 DebugPrintf(INVALID_INDEX, 1, _T("Data reconcillation thread stopped"));
373 return THREAD_OK;
374 }
375
376 /**
377 * Data sender queue
378 */
379 static Queue s_dataSenderQueue;
380
381 /**
382 * Data sender
383 */
384 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
385 {
386 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
387 while(true)
388 {
389 DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
390 if (e == INVALID_POINTER_VALUE)
391 break;
392
393 MutexLock(s_serverSyncStatusLock);
394 ServerSyncStatus *status = s_serverSyncStatus.get(e->getServerId());
395 if (status == NULL)
396 {
397 status = new ServerSyncStatus();
398 s_serverSyncStatus.set(e->getServerId(), status);
399 }
400
401 if (status->queueSize == 0)
402 {
403 if (!e->sendToServer())
404 {
405 e->saveToDatabase();
406 status->queueSize++;
407 }
408 }
409 else
410 {
411 e->saveToDatabase();
412 }
413 MutexUnlock(s_serverSyncStatusLock);
414
415 delete e;
416 }
417 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
418 return THREAD_OK;
419 }
420
421 /**
422 * Pseudo-session for cached data collection
423 */
424 class VirtualSession : public AbstractCommSession
425 {
426 private:
427 UINT64 m_serverId;
428
429 public:
430 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
431
432 virtual bool isMasterServer() { return false; }
433 virtual bool isControlServer() { return false; }
434 virtual bool canAcceptTraps() { return true; }
435 virtual UINT64 getServerId() { return m_serverId; };
436 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
437
438 virtual bool isIPv6Aware() { return true; }
439
440 virtual void sendMessage(NXCPMessage *pMsg) { }
441 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
442 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
443 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
444 };
445
446 /**
447 * Collect data from agent
448 */
449 static DataElement *CollectDataFromAgent(DataCollectionItem *dci)
450 {
451 VirtualSession session(dci->getServerId());
452
453 DataElement *e = NULL;
454 if (dci->getType() == DCO_TYPE_ITEM)
455 {
456 TCHAR value[MAX_RESULT_LENGTH];
457 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
458 e = new DataElement(dci, value);
459 }
460 else if (dci->getType() == DCO_TYPE_LIST)
461 {
462 StringList *value = new StringList;
463 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
464 e = new DataElement(dci, value);
465 }
466 else if (dci->getType() == DCO_TYPE_TABLE)
467 {
468 Table *value = new Table;
469 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
470 e = new DataElement(dci, value);
471 }
472
473 return e;
474 }
475
476 /**
477 * Collect data from SNMP
478 */
479 static DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
480 {
481 DataElement *e = NULL;
482 if (dci->getType() == DCO_TYPE_ITEM)
483 {
484 TCHAR value[MAX_RESULT_LENGTH];
485 if (GetSnmpValue(dci->getSnmpTargetGuid(), dci->getSnmpPort(), dci->getName(), value, dci->getSnmpRawValueType()))
486 e = new DataElement(dci, value);
487 }
488 return e;
489 }
490
491 /**
492 * List of all data collection items
493 */
494 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
495 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
496
497 /**
498 * Single data collection run - collect data if needed and calculate sleep time
499 */
500 static UINT32 DataCollectionRun()
501 {
502 time_t now = time(NULL);
503 UINT32 sleepTime = 60;
504
505 MutexLock(s_itemLock);
506 for(int i = 0; i < s_items.size(); i++)
507 {
508 DataCollectionItem *dci = s_items.get(i);
509 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
510 if (timeToPoll == 0)
511 {
512 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
513 DataElement *e;
514 if (dci->getOrigin() == DS_NATIVE_AGENT)
515 {
516 e = CollectDataFromAgent(dci);
517 }
518 else if (dci->getOrigin() == DS_SNMP_AGENT)
519 {
520 e = CollectDataFromSNMP(dci);
521 }
522 else
523 {
524 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
525 e = NULL;
526 }
527
528 if (e != NULL)
529 {
530 dci->setLastPollTime(e->getTimestamp());
531 s_dataSenderQueue.Put(e);
532 }
533 else
534 {
535 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
536 }
537 }
538 else
539 {
540 if (sleepTime > timeToPoll)
541 sleepTime = timeToPoll;
542 }
543 }
544 MutexUnlock(s_itemLock);
545 return sleepTime;
546 }
547
548 /**
549 * Data collector thread
550 */
551 static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
552 {
553 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
554
555 UINT32 sleepTime = DataCollectionRun();
556 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
557 {
558 sleepTime = DataCollectionRun();
559 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
560 }
561
562 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
563 return THREAD_OK;
564 }
565
566 /**
567 * Configure data collection
568 */
569 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
570 {
571 int count = msg->getFieldAsInt32(VID_NUM_NODES);
572 UINT32 fieldId = VID_NODE_INFO_LIST_BASE;
573 for(int i = 0; i < count; i++)
574 {
575 SNMPTarget *target = new SNMPTarget(serverId, msg, fieldId);
576 UpdateSnmpTarget(target);
577 fieldId += 50;
578 }
579 DebugPrintf(INVALID_INDEX, 4, _T("%d SNMP targets received from server ") UINT64X_FMT(_T("016")), count, serverId);
580
581 ObjectArray<DataCollectionItem> config(32, 32, true);
582
583 count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
584 fieldId = VID_ELEMENT_LIST_BASE;
585 for(int i = 0; i < count; i++)
586 {
587 config.add(new DataCollectionItem(serverId, msg, fieldId));
588 fieldId += 10;
589 }
590 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
591
592 MutexLock(s_itemLock);
593
594 // Update and add new
595 for(int j = 0; j < config.size(); j++)
596 {
597 DataCollectionItem *item = config.get(j);
598 bool exist = false;
599 for(int i = 0; i < s_items.size(); i++)
600 {
601 if(item->equals(s_items.get(i)))
602 {
603 s_items.get(i)->updateAndSave(item);
604 exist = true;
605 }
606 }
607 if (!exist)
608 {
609 DataCollectionItem *newItem = new DataCollectionItem(item);
610 s_items.add(newItem);
611 newItem->saveToDatabase(true);
612 }
613 }
614
615 // Remove not existing configuration and data for it
616 for(int i = 0; i < s_items.size(); i++)
617 {
618 DataCollectionItem *item = s_items.get(i);
619 //If item is from other server, then, do not search it in list of this server
620 if(item->getServerId() != serverId)
621 continue;
622 bool exist = false;
623 for(int j = 0; j < config.size(); j++)
624 {
625 if(item->equals(config.get(j)))
626 {
627 exist = true;
628 }
629 }
630 if(!exist)
631 {
632 item->deleteFromDatabase();
633 s_items.remove(i);
634 i--;
635 delete item;
636 }
637 }
638 MutexUnlock(s_itemLock);
639 }
640
641 /**
642 * Loads configuration to for DCI
643 */
644 static void LoadConfiguration()
645 {
646 DB_HANDLE db = GetLocalDatabaseHandle();
647 DB_RESULT hResult = DBSelect(db, _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid,snmp_raw_type FROM dc_config"));
648 if (hResult != NULL)
649 {
650 for(int i = 0; i < DBGetNumRows(hResult); i++)
651 {
652 s_items.add(new DataCollectionItem(hResult, i));
653 }
654 DBFreeResult(hResult);
655 }
656 }
657
658 /**
659 * SQL script array
660 */
661 static const TCHAR *s_upgradeQueries[] =
662 {
663 _T("CREATE TABLE dc_queue (")
664 _T(" server_id number(20) not null,")
665 _T(" dci_id integer not null,")
666 _T(" dci_type integer not null,")
667 _T(" timestamp integer not null,")
668 _T(" value varchar not null,")
669 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
670
671 _T("CREATE TABLE dc_config (")
672 _T(" server_id number(20) not null,")
673 _T(" dci_id integer not null,")
674 _T(" type integer not null,")
675 _T(" origin integer not null,")
676 _T(" name varchar(1023) null,")
677 _T(" polling_interval integer not null,")
678 _T(" last_poll integer not null,")
679 _T(" snmp_port integer not null,")
680 _T(" snmp_target_guid varchar(36) not null,")
681 _T(" snmp_raw_type integer not null,")
682 _T(" PRIMARY KEY(server_id,dci_id))"),
683
684 _T("CREATE TABLE dc_snmp_targets (")
685 _T(" guid varchar(36) not null,")
686 _T(" server_id number(20) not null,")
687 _T(" ip_address varchar(48) not null,")
688 _T(" snmp_version integer not null,")
689 _T(" port integer not null,")
690 _T(" auth_type integer not null,")
691 _T(" enc_type integer not null,")
692 _T(" auth_name varchar(63),")
693 _T(" auth_pass varchar(63),")
694 _T(" enc_pass varchar(63),")
695 _T(" PRIMARY KEY(guid))")
696 };
697
698 /**
699 * Data collector and sender thread handles
700 */
701 static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
702 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
703 static THREAD s_reconcillationThread = INVALID_THREAD_HANDLE;
704
705 /**
706 * Initialize and start local data collector
707 */
708 void StartLocalDataCollector()
709 {
710 DB_HANDLE db = GetLocalDatabaseHandle();
711 if (db == NULL)
712 {
713 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
714 return;
715 }
716
717 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
718 while(dbVersion < DATACOLL_SCHEMA_VERSION)
719 {
720 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
721 {
722 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
723 return;
724 }
725 dbVersion++;
726 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
727 }
728
729 LoadConfiguration();
730 /* TODO: add reading form database snmp_targets table */
731
732 s_itemLock = MutexCreate();
733 s_serverSyncStatusLock = MutexCreate();
734 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
735 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
736 s_reconcillationThread = ThreadCreateEx(ReconcillationThread, 0, NULL);
737 }
738
739 /**
740 * Shutdown local data collector
741 */
742 void ShutdownLocalDataCollector()
743 {
744 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
745 ThreadJoin(s_dataCollectorThread);
746
747 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
748 s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
749 ThreadJoin(s_dataSenderThread);
750
751 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
752 ThreadJoin(s_reconcillationThread);
753
754 MutexDestroy(s_itemLock);
755 MutexDestroy(s_serverSyncStatusLock);
756 }