- Initial DCI transformation support on server side
[public/netxms.git] / src / server / core / datacoll.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** $module: datacoll.cpp
20 **
21 **/
22
23 #include "nms_core.h"
24
25
26 //
27 // Static data
28 //
29
30 static Queue *m_pItemQueue = NULL;
31
32
33 //
34 // Data collector
35 //
36
37 static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
38 {
39 DCItem *pItem;
40 Node *pNode;
41 DWORD dwError;
42 time_t currTime;
43 char *pBuffer;
44
45 pBuffer = (char *)malloc(MAX_LINE_SIZE);
46
47 while(!ShutdownInProgress())
48 {
49 pItem = (DCItem *)m_pItemQueue->GetOrBlock();
50 pNode = pItem->RelatedNode();
51 if (pNode != NULL)
52 {
53 switch(pItem->DataSource())
54 {
55 case DS_INTERNAL: // Server internal parameters (like status)
56 dwError = pNode->GetInternalItem(pItem->Name(), MAX_LINE_SIZE, pBuffer);
57 break;
58 case DS_SNMP_AGENT:
59 dwError = pNode->GetItemFromSNMP(pItem->Name(), MAX_LINE_SIZE, pBuffer);
60 break;
61 case DS_NATIVE_AGENT:
62 dwError = pNode->GetItemFromAgent(pItem->Name(), MAX_LINE_SIZE, pBuffer);
63 break;
64 }
65
66 // Update item's last poll time
67 currTime = time(NULL);
68 pItem->SetLastPollTime(currTime);
69
70 // Transform and store received value into database or handle error
71 switch(dwError)
72 {
73 case DCE_SUCCESS:
74 pItem->NewValue(currTime, pBuffer);
75 break;
76 case DCE_COMM_ERROR:
77 break;
78 case DCE_NOT_SUPPORTED:
79 // Change item's status
80 pItem->SetStatus(ITEM_STATUS_NOT_SUPPORTED);
81 break;
82 }
83
84 // Clear busy flag so item can be polled again
85 pItem->SetBusyFlag(FALSE);
86 }
87 else /* pNode == NULL */
88 {
89 DbgPrintf(AF_DEBUG_DC, "*** DataCollector: Attempt to collect information for non-existing node.\n");
90 }
91 }
92
93 free(pBuffer);
94 DbgPrintf(AF_DEBUG_DC, "Data collector thread terminated\n");
95 return THREAD_OK;
96 }
97
98
99 //
100 // Item poller thread: check nodes' items and put into the
101 // data collector queue when data polling required
102 //
103
104 static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
105 {
106 DWORD i, dwElapsed, dwWatchdogId;
107 INT64 qwStart;
108
109 dwWatchdogId = WatchdogAddThread("Item Poller", 20);
110
111 while(!ShutdownInProgress())
112 {
113 if (SleepAndCheckForShutdown(2))
114 break; // Shutdown has arrived
115 WatchdogNotify(dwWatchdogId);
116
117 MutexLock(g_hMutexNodeIndex, INFINITE);
118 qwStart = GetCurrentTimeMs();
119 for(i = 0; i < g_dwNodeAddrIndexSize; i++)
120 ((Node *)g_pNodeIndexByAddr[i].pObject)->QueueItemsForPolling(m_pItemQueue);
121 MutexUnlock(g_hMutexNodeIndex);
122
123 dwElapsed = (DWORD)(GetCurrentTimeMs() - qwStart);
124 }
125 DbgPrintf(AF_DEBUG_DC, "Item poller thread terminated\n");
126 return THREAD_OK;
127 }
128
129
130 //
131 // Statistics collection thread
132 //
133
134 static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
135 {
136 while(!ShutdownInProgress())
137 {
138 if (SleepAndCheckForShutdown(10))
139 break; // Shutdown has arrived
140
141 if (g_dwFlags & AF_DEBUG_DC)
142 {
143 // printf("*** Poller Queue size: %d ***\n", m_pItemQueue->Size());
144 // printf("*** DB Writer Queue size: %d ***\n", g_pLazyRequestQueue->Size());
145 }
146 }
147 return THREAD_OK;
148 }
149
150
151 //
152 // Initialize data collection subsystem
153 //
154
155 BOOL InitDataCollector(void)
156 {
157 int i, iNumCollectors;
158
159 // Create collection requests queue
160 m_pItemQueue = new Queue(4096, 256);
161
162 // Start data collection threads
163 iNumCollectors = ConfigReadInt("NumberOfDataCollectors", 10);
164 for(i = 0; i < iNumCollectors; i++)
165 ThreadCreate(DataCollector, 0, NULL);
166
167 // Start item poller thread
168 ThreadCreate(ItemPoller, 0, NULL);
169
170 // Start statistics collection thread
171 ThreadCreate(StatCollector, 0, NULL);
172
173 return TRUE;
174 }