303c4f7d654cc1e79a165b308569dd91cd8b9a42
[public/netxms.git] / src / agent / core / datacoll.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2015 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: datacoll.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Database schema version
27 */
28 #define DATACOLL_SCHEMA_VERSION 3
29
30 /**
31 * Data collection item
32 */
33 class DataCollectionItem : public RefCountObject
34 {
35 private:
36 UINT64 m_serverId;
37 UINT32 m_id;
38 INT32 m_pollingInterval;
39 TCHAR *m_name;
40 BYTE m_type;
41 BYTE m_origin;
42 UINT16 m_snmpPort;
43 uuid_t m_snmpTargetGuid;
44 time_t m_lastPollTime;
45
46 public:
47 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
48 DataCollectionItem(DB_RESULT hResult, int row);
49 DataCollectionItem(const DataCollectionItem *item);
50 virtual ~DataCollectionItem();
51
52 UINT32 getId() { return m_id; }
53 UINT64 getServerId() { return m_serverId; }
54 const TCHAR *getName() { return m_name; }
55 int getType() { return (int)m_type; }
56 int getOrigin() { return (int)m_origin; }
57 bool equals(const DataCollectionItem *item);
58 void updateAndSave(const DataCollectionItem *item);
59 void saveToDatabase(bool newObject);
60 void deleteFromDatabase();
61 void setLastPollTime(time_t time);
62
63 UINT32 getTimeToNextPoll(time_t now)
64 {
65 time_t diff = now - m_lastPollTime;
66 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
67 }
68 };
69
70 /**
71 * Create data collection item from NXCP mesage
72 */
73 DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
74 {
75 m_serverId = serverId;
76 m_id = msg->getFieldAsInt32(baseId);
77 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
78 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
79 m_name = msg->getFieldAsString(baseId + 3);
80 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
81 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
82 msg->getFieldAsBinary(baseId + 6, m_snmpTargetGuid, UUID_LENGTH);
83 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
84 }
85
86 /**
87 * Data is selected in this order: server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid
88 */
89 DataCollectionItem::DataCollectionItem(DB_RESULT hResult, int row)
90 {
91 m_serverId = DBGetFieldInt64(hResult, row, 0);
92 m_id = DBGetFieldULong(hResult, row, 1);
93 m_type = (BYTE)DBGetFieldULong(hResult, row, 2);
94 m_origin = (BYTE)DBGetFieldULong(hResult, row, 3);
95 m_name = (TCHAR *)malloc(sizeof(TCHAR) * 1025);
96 DBGetField(hResult, row, 4, m_name, 1025);
97 m_pollingInterval = DBGetFieldULong(hResult, row, 5);
98 m_lastPollTime = (time_t)DBGetFieldULong(hResult, row, 6);
99 DBGetFieldGUID(hResult, row, 7, m_snmpTargetGuid);
100 m_snmpPort = DBGetFieldULong(hResult, row, 8);
101 }
102
103 /**
104 * Copy constructor
105 */
106 DataCollectionItem::DataCollectionItem(const DataCollectionItem *item)
107 {
108 m_serverId = item->m_serverId;
109 m_id = item->m_id;
110 m_type = item->m_type;
111 m_origin = item->m_origin;
112 m_name = _tcsdup(item->m_name);
113 m_pollingInterval = item->m_pollingInterval;
114 m_lastPollTime = item->m_lastPollTime;
115 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
116 m_snmpPort = item->m_snmpPort;
117 }
118
119 /**
120 * Data collection item destructor
121 */
122 DataCollectionItem::~DataCollectionItem()
123 {
124 safe_free(m_name);
125 }
126
127 bool DataCollectionItem::equals(const DataCollectionItem *item)
128 {
129 if(m_serverId == item->m_serverId && m_id == item->m_id)
130 return true;
131 return false;
132 }
133
134 /**
135 * Will check if object has changed. If at least one field is changed - all data will be updated and
136 * saved to database.
137 */
138 void DataCollectionItem::updateAndSave(const DataCollectionItem *item)
139 {
140 //if at leas one of fields changed - set all fields and save to DB
141 if(m_type != item->m_type || m_origin != item->m_origin || _tcscmp(m_name, item->m_name) != 0 ||
142 m_pollingInterval != item->m_pollingInterval || uuid_compare(m_snmpTargetGuid, (unsigned char*)item->m_snmpTargetGuid)!= 0 ||
143 m_snmpPort != item->m_snmpPort)
144 {
145 m_type = item->m_type;
146 m_origin = item->m_origin;
147 m_name = _tcsdup(item->m_name);
148 m_pollingInterval = item->m_pollingInterval;
149 m_lastPollTime = item->m_lastPollTime;
150 memcpy(m_snmpTargetGuid, item->m_snmpTargetGuid, UUID_LENGTH);
151 m_snmpPort = item->m_snmpPort;
152 saveToDatabase(false);
153 }
154
155 }
156
157 void DataCollectionItem::saveToDatabase(bool newObject)
158 {
159 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::saveToDatabase: %s object(serverId=%ld,dciId=%d) saved to database"),
160 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
161 DB_HANDLE db = GetLocalDatabaseHandle();
162 DB_STATEMENT hStmt;
163
164 if(newObject)
165 {
166 hStmt = DBPrepare(db,
167 _T("INSERT INTO dc_config (type,origin,name,polling_interval,")
168 _T("last_poll,snmp_port,snmp_target_guid,server_id,dci_id)")
169 _T("VALUES (?,?,?,?,?,?,?,?,?)"));
170 }
171 else
172 {
173 hStmt = DBPrepare(db,
174 _T("UPDATE dc_config SET type=?,origin=?,name=?,")
175 _T("polling_interval=?,last_poll=?,snmp_port=?,")
176 _T("snmp_target_guid=? WHERE server_id=? AND dci_id=?"));
177 }
178
179 if (hStmt == NULL)
180 {
181 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::saveToDatabase: not possible to prepare save quary for %s object(serverId=%ld,dciId=%d)"),
182 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
183 return;
184 }
185
186 TCHAR buffer[64];
187 DBBind(hStmt, 1, DB_SQLTYPE_INTEGER, (LONG)m_type);
188 DBBind(hStmt, 2, DB_SQLTYPE_INTEGER, (LONG)m_origin);
189 DBBind(hStmt, 3, DB_SQLTYPE_VARCHAR, m_name, DB_BIND_STATIC);
190 DBBind(hStmt, 4, DB_SQLTYPE_INTEGER, (LONG)m_pollingInterval);
191 DBBind(hStmt, 5, DB_SQLTYPE_INTEGER, (LONG)m_lastPollTime);
192 DBBind(hStmt, 6, DB_SQLTYPE_INTEGER, (LONG)m_snmpPort);
193 DBBind(hStmt, 7, DB_SQLTYPE_VARCHAR, uuid_to_string(m_snmpTargetGuid, buffer), DB_BIND_STATIC);
194 DBBind(hStmt, 8, DB_SQLTYPE_BIGINT, m_serverId);
195 DBBind(hStmt, 9, DB_SQLTYPE_INTEGER, (LONG)m_id);
196
197 if(!DBExecute(hStmt))
198 {
199 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::saveToDatabase: not possible to save %s object(serverId=%ld,dciId=%d) to database"),
200 newObject ? _T("new") : _T("existing"), m_serverId, m_id);
201 }
202 }
203
204 /**
205 * Remove item form database and delete not synced data if exist
206 */
207 void DataCollectionItem::deleteFromDatabase()
208 {
209 DebugPrintf(INVALID_INDEX, 6, _T("DataCollectionItem::deleteFromDatabase: object(serverId=%ld,dciId=%d) removed from database"),
210 m_serverId, m_id);
211 DB_HANDLE db = GetLocalDatabaseHandle();
212 TCHAR query[256];
213 _sntprintf(query, 256, _T("DELETE FROM dc_config WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
214 if(!DBQuery(db, query))
215 {
216 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_config database table"),
217 m_serverId, m_id);
218 }
219 _sntprintf(query, 256, _T("DELETE FROM dc_queue WHERE server_id=%ld AND dci_id=%d"), m_serverId, m_id);
220 if(!DBQuery(db, query))
221 {
222 DebugPrintf(INVALID_INDEX, 2, _T("DataCollectionItem::deleteFromDatabase: error wile removing object(serverId=%ld,dciId=%d) from dc_queue database table"),
223 m_serverId, m_id);
224 }
225 }
226
227 /**
228 * Set last poll time for item
229 */
230 void DataCollectionItem::setLastPollTime(time_t time)
231 {
232 m_lastPollTime = time;
233 TCHAR query[256];
234 _sntprintf(query, 256, _T("UPDATE dc_config SET last_poll=") UINT64_FMT _T(" WHERE server_id=") UINT64_FMT _T(" AND dci_id=%d"),
235 (UINT64)m_lastPollTime, m_serverId, m_id);
236 DBQuery(GetLocalDatabaseHandle(), query);
237 }
238
239 /**
240 * Collected data
241 */
242 class DataElement
243 {
244 private:
245 UINT64 m_serverId;
246 UINT32 m_dciId;
247 time_t m_timestamp;
248 int m_type;
249 union
250 {
251 TCHAR *item;
252 StringList *list;
253 Table *table;
254 } m_value;
255
256 public:
257 DataElement(DataCollectionItem *dci, const TCHAR *value)
258 {
259 m_serverId = dci->getServerId();
260 m_dciId = dci->getId();
261 m_timestamp = time(NULL);
262 m_type = DCO_TYPE_ITEM;
263 m_value.item = _tcsdup(value);
264 }
265
266 DataElement(DataCollectionItem *dci, StringList *value)
267 {
268 m_serverId = dci->getServerId();
269 m_dciId = dci->getId();
270 m_timestamp = time(NULL);
271 m_type = DCO_TYPE_LIST;
272 m_value.list = value;
273 }
274
275 DataElement(DataCollectionItem *dci, Table *value)
276 {
277 m_serverId = dci->getServerId();
278 m_dciId = dci->getId();
279 m_timestamp = time(NULL);
280 m_type = DCO_TYPE_TABLE;
281 m_value.table = value;
282 }
283
284 ~DataElement()
285 {
286 switch(m_type)
287 {
288 case DCO_TYPE_ITEM:
289 free(m_value.item);
290 break;
291 case DCO_TYPE_LIST:
292 delete m_value.list;
293 break;
294 case DCO_TYPE_TABLE:
295 delete m_value.table;
296 break;
297 }
298 }
299
300 time_t getTimestamp() { return m_timestamp; }
301 UINT64 getServerId() { return m_serverId; }
302 void saveToDatabase();
303 bool sendToServer();
304 };
305
306 /**
307 *
308 */
309 void DataElement::saveToDatabase()
310 {
311 /* TODO: implement */
312 }
313
314 bool DataElement::sendToServer()
315 {
316 /* TODO: implement */
317 return true;
318 }
319
320 /**
321 * Information about
322 */
323 class ServerQueueInfo
324 {
325 private:
326 UINT64 m_serverId;
327 bool m_saveToDatabase;
328
329 public:
330 ServerQueueInfo(UINT64 serverId){ m_serverId = serverId; m_saveToDatabase = false; }
331 bool equals(const UINT64 serverId) {return serverId == m_serverId; }
332 bool saveToDatabase() { return m_saveToDatabase; }
333 void setSaveToDatabase(bool saveToDB) { m_saveToDatabase = saveToDB; }
334 };
335
336 /**
337 * Server queue information, mutex lock for info, resend que reference
338 */
339 static ObjectArray<ServerQueueInfo> s_serversInfo(5, 5, false);
340 static MUTEX s_serverInfoLock = INVALID_MUTEX_HANDLE;
341 static THREAD s_dataReconnectThread = INVALID_THREAD_HANDLE;
342
343 /**
344 * Data sender queue
345 */
346 static Queue s_dataSenderQueue;
347
348 /**
349 * This method will find info about session or create new object and add it to structure.
350 * s_serverInfoLock should be locked before this function execution.
351 */
352 static ServerQueueInfo *FindServerInfo(UINT64 serverId)
353 {
354 ServerQueueInfo *info = NULL;
355 for(int i=0; i < s_serversInfo.size(); i++)
356 {
357 if(s_serversInfo.get(i)->equals(serverId))
358 {
359 info = s_serversInfo.get(i);
360 }
361 }
362 if(info == NULL)
363 {
364 info = new ServerQueueInfo(serverId);
365 s_serversInfo.add(info);
366 }
367 return info;
368 }
369
370 /**
371 * Data sender
372 */
373 static THREAD_RESULT THREAD_CALL DataSender(void *arg)
374 {
375 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
376 while(true)
377 {
378 DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
379 if (e == INVALID_POINTER_VALUE)
380 break;
381
382 MutexLock(s_serverInfoLock);
383 ServerQueueInfo *info = FindServerInfo(e->getServerId());
384 if(!info->saveToDatabase())
385 {
386 if(!e->sendToServer())
387 {
388 e->saveToDatabase();
389 info->setSaveToDatabase(true);
390 /* TODO: start thread that will check if connection is restored */
391 }
392 }
393 else
394 {
395 e->saveToDatabase();
396 }
397 MutexUnlock(s_serverInfoLock);
398
399 delete e;
400 }
401 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
402 return THREAD_OK;
403 }
404
405 /**
406 * Pseudo-session for cached data collection
407 */
408 class VirtualSession : public AbstractCommSession
409 {
410 private:
411 UINT64 m_serverId;
412
413 public:
414 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
415
416 virtual bool isMasterServer() { return false; }
417 virtual bool isControlServer() { return false; }
418 virtual bool canAcceptTraps() { return true; }
419 virtual UINT64 getServerId() { return m_serverId; };
420 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
421
422 virtual bool isIPv6Aware() { return true; }
423
424 virtual void sendMessage(NXCPMessage *pMsg) { }
425 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
426 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
427 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
428 };
429
430 /**
431 * Collect data from agent
432 */
433 DataElement *CollectDataFromAgent(DataCollectionItem *dci)
434 {
435 VirtualSession session(dci->getServerId());
436
437 DataElement *e = NULL;
438 if (dci->getType() == DCO_TYPE_ITEM)
439 {
440 TCHAR value[MAX_RESULT_LENGTH];
441 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
442 e = new DataElement(dci, value);
443 }
444 else if (dci->getType() == DCO_TYPE_LIST)
445 {
446 StringList *value = new StringList;
447 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
448 e = new DataElement(dci, value);
449 }
450 else if (dci->getType() == DCO_TYPE_TABLE)
451 {
452 Table *value = new Table;
453 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
454 e = new DataElement(dci, value);
455 }
456
457 return e;
458 }
459
460 /**
461 * Collect data from SNMP
462 */
463 DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
464 {
465 /* TODO: implement SNMP data collection */
466 return NULL;
467 }
468
469 /**
470 * List of all data collection items
471 */
472 static ObjectArray<DataCollectionItem> s_items(64, 64, true);
473 static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
474
475 /**
476 * Single data collection run - collect data if needed and calculate sleep time
477 */
478 static UINT32 DataCollectionRun()
479 {
480 time_t now = time(NULL);
481 UINT32 sleepTime = 60;
482
483 MutexLock(s_itemLock);
484 for(int i = 0; i < s_items.size(); i++)
485 {
486 DataCollectionItem *dci = s_items.get(i);
487 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
488 if (timeToPoll == 0)
489 {
490 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
491 DataElement *e;
492 if (dci->getOrigin() == DS_NATIVE_AGENT)
493 {
494 e = CollectDataFromAgent(dci);
495 }
496 else if (dci->getOrigin() == DS_SNMP_AGENT)
497 {
498 e = CollectDataFromSNMP(dci);
499 }
500 else
501 {
502 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
503 e = NULL;
504 }
505
506 if (e != NULL)
507 {
508 dci->setLastPollTime(e->getTimestamp());
509 s_dataSenderQueue.Put(e);
510 }
511 else
512 {
513 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
514 }
515 }
516 else
517 {
518 if (sleepTime > timeToPoll)
519 sleepTime = timeToPoll;
520 }
521 }
522 MutexUnlock(s_itemLock);
523 return sleepTime;
524 }
525
526 /**
527 * Data collector thread
528 */
529 static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
530 {
531 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
532
533 UINT32 sleepTime = DataCollectionRun();
534 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
535 {
536 sleepTime = DataCollectionRun();
537 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
538 }
539
540 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
541 return THREAD_OK;
542 }
543
544 /**
545 * Configure data collection
546 */
547 void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
548 {
549 ObjectArray<DataCollectionItem> config(32, 32, true);
550
551 int count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
552 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
553 for(int i = 0; i < count; i++)
554 {
555 config.add(new DataCollectionItem(serverId, msg, fieldId));
556 fieldId += 10;
557 }
558 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
559
560 MutexLock(s_itemLock);
561 //Update and add new
562 for(int j = 0; j < config.size(); j++)
563 {
564 DataCollectionItem *item = config.get(j);
565 bool exist = false;
566 for(int i = 0; i < s_items.size(); i++)
567 {
568 if(item->equals(s_items.get(i)))
569 {
570 s_items.get(i)->updateAndSave(item);
571 exist = true;
572 }
573 }
574 if(!exist)
575 {
576 DataCollectionItem *newItem = new DataCollectionItem(item);
577 s_items.add(newItem);
578 newItem->saveToDatabase(true);
579 }
580 }
581 //Remove not existing configuration and data for it
582 for(int i = 0; i < s_items.size(); i++)
583 {
584 DataCollectionItem *item = s_items.get(i);
585 //If item is from other server, then, do not search it in list of this server
586 if(item->getServerId() != serverId)
587 continue;
588 bool exist = false;
589 for(int j = 0; j < config.size(); j++)
590 {
591 if(item->equals(config.get(j)))
592 {
593 exist = true;
594 }
595 }
596 if(!exist)
597 {
598 item->deleteFromDatabase();
599 s_items.remove(i);
600 i--;
601 delete item;
602 }
603 }
604 MutexUnlock(s_itemLock);
605 }
606
607 /**
608 * Data collector and sender thread handles
609 */
610 static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
611 static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
612
613 /**
614 * Loads configuration to for DCI
615 */
616 static void LoadConfiguration()
617 {
618 DB_HANDLE db = GetLocalDatabaseHandle();
619 const TCHAR *query = _T("SELECT server_id,dci_id,type,origin,name,polling_interval,last_poll,snmp_port,snmp_target_guid FROM dc_config");
620 DB_RESULT hResult = DBSelect(db, query);
621 if(hResult != NULL)
622 {
623 for(int i = 0; i < DBGetNumRows(hResult); i++)
624 {
625 s_items.add(new DataCollectionItem(hResult, i));
626 }
627 DBFreeResult(hResult);
628 }
629 }
630
631 /**
632 * SQL script array
633 */
634 static const TCHAR *s_upgradeQueries[] =
635 {
636 _T("CREATE TABLE dc_queue (")
637 _T(" server_id number(20) not null,")
638 _T(" dci_id integer not null,")
639 _T(" timestamp integer not null,")
640 _T(" value varchar not null,")
641 _T(" PRIMARY KEY(server_id,dci_id,timestamp))"),
642
643 _T("CREATE TABLE dc_config (")
644 _T(" server_id number(20) not null,")
645 _T(" dci_id integer not null,")
646 _T(" type integer not null,")
647 _T(" origin integer not null,")
648 _T(" name varchar(1023) null,")
649 _T(" polling_interval integer not null,")
650 _T(" last_poll integer not null,")
651 _T(" snmp_port integer not null,")
652 _T(" snmp_target_guid varchar(36) not null,")
653 _T(" PRIMARY KEY(server_id,dci_id))"),
654
655 _T("CREATE TABLE dc_snmp_targets (")
656 _T(" guid varchar(36) not null,")
657 _T(" version integer not null,")
658 _T(" ip_address varchar(48) not null,")
659 _T(" port integer not null,")
660 _T(" auth_type integer not null,")
661 _T(" enc_type integer not null,")
662 _T(" auth_pass varchar(255),")
663 _T(" enc_pass varchar(255),")
664 _T(" username varchar(255),")
665 _T(" PRIMARY KEY(guid))")
666 };
667
668 /**
669 * Initialize and start local data collector
670 */
671 void StartLocalDataCollector()
672 {
673 /* TODO: database init and configuration load */
674 DB_HANDLE db = GetLocalDatabaseHandle();
675 if (db == NULL)
676 {
677 DebugPrintf(INVALID_INDEX, 5, _T("StartLocalDataCollector: local database unavailable"));
678 return;
679 }
680 INT32 dbVersion = ReadMetadataAsInt(_T("DataCollectionSchemaVersion"));
681 while(dbVersion < DATACOLL_SCHEMA_VERSION)
682 {
683 if (!DBQuery(db, s_upgradeQueries[dbVersion]))
684 {
685 nxlog_write(MSG_DC_DBSCHEMA_UPGRADE_FAILED, NXLOG_ERROR, NULL);
686 return;
687 }
688 dbVersion++;
689 WriteMetadata(_T("DataCollectionSchemaVersion"), dbVersion);
690 }
691
692 LoadConfiguration();
693 /* TODO: add reading form database snmp_targets table */
694
695 s_itemLock = MutexCreate();
696 s_serverInfoLock = MutexCreate();
697 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
698 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
699 }
700
701 /**
702 * Shutdown local data collector
703 */
704 void ShutdownLocalDataCollector()
705 {
706 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
707 ThreadJoin(s_dataCollectorThread);
708
709 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
710 s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
711 ThreadJoin(s_dataSenderThread);
712
713
714 if(s_dataReconnectThread != INVALID_THREAD_HANDLE)
715 {
716 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender resumption thread termination"));
717 ThreadJoin(s_dataReconnectThread);
718 }
719
720 MutexDestroy(s_itemLock);
721 MutexDestroy(s_serverInfoLock);
722 }