fixed server deadlock
[public/netxms.git] / src / agent / core / datacoll.cpp
CommitLineData
87fff547
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003-2015 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: datacoll.cpp
20**
21**/
22
23#include "nxagentd.h"
24
25/**
26 * Data collection item
27 */
28class DataCollectionItem : public RefCountObject
29{
30private:
31 UINT64 m_serverId;
32 UINT32 m_id;
33 INT32 m_pollingInterval;
34 TCHAR *m_name;
35 BYTE m_type;
36 BYTE m_origin;
37 UINT16 m_snmpPort;
38 InetAddress m_snmpTarget;
39 time_t m_lastPollTime;
40
41public:
42 DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId);
43 virtual ~DataCollectionItem();
44
45 UINT32 getId() { return m_id; }
46 UINT64 getServerId() { return m_serverId; }
47 const TCHAR *getName() { return m_name; }
48 int getType() { return (int)m_type; }
49 int getOrigin() { return (int)m_origin; }
50
51 UINT32 getTimeToNextPoll(time_t now)
52 {
53 time_t diff = now - m_lastPollTime;
54 return (diff >= m_pollingInterval) ? 0 : m_pollingInterval - (UINT32)diff;
55 }
56};
57
58/**
59 * Create data collection item from NXCP mesage
60 */
61DataCollectionItem::DataCollectionItem(UINT64 serverId, NXCPMessage *msg, UINT32 baseId) : RefCountObject()
62{
63 m_serverId = serverId;
64 m_id = msg->getFieldAsInt32(baseId);
65 m_type = (BYTE)msg->getFieldAsUInt16(baseId + 1);
66 m_origin = (BYTE)msg->getFieldAsUInt16(baseId + 2);
67 m_name = msg->getFieldAsString(baseId + 3);
68 m_pollingInterval = msg->getFieldAsInt32(baseId + 4);
69 m_lastPollTime = msg->getFieldAsTime(baseId + 5);
70 m_snmpTarget = msg->getFieldAsInetAddress(baseId + 6);
71 m_snmpPort = msg->getFieldAsUInt16(baseId + 7);
72}
73
74/**
75 * Data collection item destructor
76 */
77DataCollectionItem::~DataCollectionItem()
78{
79 safe_free(m_name);
80}
81
82/**
83 * Collected data
84 */
85class DataElement
86{
87private:
88 UINT64 m_serverId;
89 UINT32 m_dciId;
90 time_t m_timestamp;
91 int m_type;
92 union
93 {
94 TCHAR *item;
95 StringList *list;
96 Table *table;
97 } m_value;
98
99public:
100 DataElement(DataCollectionItem *dci, const TCHAR *value)
101 {
102 m_serverId = dci->getServerId();
103 m_dciId = dci->getId();
104 m_timestamp = time(NULL);
105 m_type = DCO_TYPE_ITEM;
106 m_value.item = _tcsdup(value);
107 }
108
109 DataElement(DataCollectionItem *dci, StringList *value)
110 {
111 m_serverId = dci->getServerId();
112 m_dciId = dci->getId();
113 m_timestamp = time(NULL);
114 m_type = DCO_TYPE_LIST;
115 m_value.list = value;
116 }
117
118 DataElement(DataCollectionItem *dci, Table *value)
119 {
120 m_serverId = dci->getServerId();
121 m_dciId = dci->getId();
122 m_timestamp = time(NULL);
123 m_type = DCO_TYPE_TABLE;
124 m_value.table = value;
125 }
126
127 ~DataElement()
128 {
129 switch(m_type)
130 {
131 case DCO_TYPE_ITEM:
132 free(m_value.item);
133 break;
134 case DCO_TYPE_LIST:
135 delete m_value.list;
136 break;
137 case DCO_TYPE_TABLE:
138 delete m_value.table;
139 break;
140 }
141 }
142};
143
144/**
145 * Data sender queue
146 */
147static Queue s_dataSenderQueue;
148
149/**
150 * Data sender
151 */
152static THREAD_RESULT THREAD_CALL DataSender(void *arg)
153{
154 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
155 while(true)
156 {
157 DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
158 if (e == INVALID_POINTER_VALUE)
159 break;
160
161 delete e;
162 }
163 DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread stopped"));
164 return THREAD_OK;
165}
166
167/**
168 * Pseudo-session for cached data collection
169 */
170class VirtualSession : public AbstractCommSession
171{
172private:
173 UINT64 m_serverId;
174
175public:
176 VirtualSession(UINT64 serverId) { m_serverId = serverId; }
177
178 virtual bool isMasterServer() { return false; }
179 virtual bool isControlServer() { return false; }
180 virtual bool canAcceptTraps() { return true; }
181 virtual UINT64 getServerId() { return m_serverId; };
182 virtual const InetAddress& getServerAddress() { return InetAddress::LOOPBACK; }
183
184 virtual bool isIPv6Aware() { return true; }
185
186 virtual void sendMessage(NXCPMessage *pMsg) { }
187 virtual void sendRawMessage(NXCP_MESSAGE *pMsg) { }
188 virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset) { return false; }
189 virtual UINT32 openFile(TCHAR *fileName, UINT32 requestId) { return ERR_INTERNAL_ERROR; }
190};
191
192/**
193 * Collect data from agent
194 */
195DataElement *CollectDataFromAgent(DataCollectionItem *dci)
196{
197 VirtualSession session(dci->getServerId());
198
199 DataElement *e = NULL;
200 if (dci->getType() == DCO_TYPE_ITEM)
201 {
202 TCHAR value[MAX_RESULT_LENGTH];
203 if (GetParameterValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
204 e = new DataElement(dci, value);
205 }
206 else if (dci->getType() == DCO_TYPE_LIST)
207 {
208 StringList *value = new StringList;
209 if (GetListValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
210 e = new DataElement(dci, value);
211 }
212 else if (dci->getType() == DCO_TYPE_TABLE)
213 {
214 Table *value = new Table;
215 if (GetTableValue(INVALID_INDEX, dci->getName(), value, &session) == ERR_SUCCESS)
216 e = new DataElement(dci, value);
217 }
218
219 return e;
220}
221
222/**
223 * Collect data from SNMP
224 */
225DataElement *CollectDataFromSNMP(DataCollectionItem *dci)
226{
227 /* TODO: implement SNMP data collection */
228 return NULL;
229}
230
231/**
232 * List of all data collection items
233 */
234static ObjectArray<DataCollectionItem> s_items(64, 64, false);
235static MUTEX s_itemLock = INVALID_MUTEX_HANDLE;
236
237/**
238 * Single data collection run - collect data if needed and calculate sleep time
239 */
240static UINT32 DataCollectionRun()
241{
242 time_t now = time(NULL);
243 UINT32 sleepTime = 60;
244
245 MutexLock(s_itemLock);
246 for(int i = 0; i < s_items.size(); i++)
247 {
248 DataCollectionItem *dci = s_items.get(i);
249 UINT32 timeToPoll = dci->getTimeToNextPoll(now);
250 if (timeToPoll == 0)
251 {
252 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: polling DCI %d \"%s\""), dci->getId(), dci->getName());
253 DataElement *e;
254 if (dci->getOrigin() == DS_NATIVE_AGENT)
255 {
256 e = CollectDataFromAgent(dci);
257 }
258 else if (dci->getOrigin() == DS_SNMP_AGENT)
259 {
260 e = CollectDataFromSNMP(dci);
261 }
262 else
263 {
264 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: unsupported origin %d"), dci->getOrigin());
265 e = NULL;
266 }
267
268 if (e != NULL)
269 {
270 s_dataSenderQueue.Put(e);
271 }
272 else
273 {
274 DebugPrintf(INVALID_INDEX, 6, _T("DataCollector: collection error for DCI %d \"%s\""), dci->getId(), dci->getName());
275 }
276 }
277 else
278 {
279 if (sleepTime > timeToPoll)
280 sleepTime = timeToPoll;
281 }
282 }
283 MutexUnlock(s_itemLock);
284 return sleepTime;
285}
286
287/**
288 * Data collector thread
289 */
290static THREAD_RESULT THREAD_CALL DataCollector(void *arg)
291{
292 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread started"));
293
294 UINT32 sleepTime = DataCollectionRun();
9aa171c1 295 while(!AgentSleepAndCheckForShutdown(sleepTime * 1000))
87fff547
VK
296 {
297 sleepTime = DataCollectionRun();
298 DebugPrintf(INVALID_INDEX, 7, _T("DataCollector: sleeping for %d seconds"), sleepTime);
299 }
300
301 DebugPrintf(INVALID_INDEX, 1, _T("Data collector thread stopped"));
302 return THREAD_OK;
303}
304
305/**
306 * Configure data collection
307 */
308void ConfigureDataCollection(UINT64 serverId, NXCPMessage *msg)
309{
310 ObjectArray<DataCollectionItem> config(32, 32, true);
311
312 int count = msg->getFieldAsInt32(VID_NUM_ELEMENTS);
313 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
314 for(int i = 0; i < count; i++)
315 {
316 config.add(new DataCollectionItem(serverId, msg, fieldId));
317 fieldId += 10;
318 }
319 DebugPrintf(INVALID_INDEX, 4, _T("%d data collection elements received from server ") UINT64X_FMT(_T("016")), count, serverId);
320
321 /* TODO: config update */
322}
323
324/**
325 * Data collector and sender thread handles
326 */
327static THREAD s_dataCollectorThread = INVALID_THREAD_HANDLE;
328static THREAD s_dataSenderThread = INVALID_THREAD_HANDLE;
329
330/**
331 * Initialize and start local data collector
332 */
333void StartLocalDataCollector()
334{
335 /* TODO: database init and configuration load */
336
337 s_itemLock = MutexCreate();
338 s_dataCollectorThread = ThreadCreateEx(DataCollector, 0, NULL);
339 s_dataSenderThread = ThreadCreateEx(DataSender, 0, NULL);
340}
341
342/**
343 * Shutdown local data collector
344 */
345void ShutdownLocalDataCollector()
346{
347 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data collector thread termination"));
348 ThreadJoin(s_dataCollectorThread);
349
350 DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
351 s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
352 ThreadJoin(s_dataSenderThread);
353
354 MutexDestroy(s_itemLock);
355}