- Initial DCI transformation support on server side
[public/netxms.git] / src / server / core / datacoll.cpp
CommitLineData
1275c750
VK
1/*
2** NetXMS - Network Management System
3** Copyright (C) 2003 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** $module: datacoll.cpp
20**
21**/
22
23#include "nms_core.h"
24
25
26//
27// Static data
28//
29
30static Queue *m_pItemQueue = NULL;
1275c750
VK
31
32
33//
34// Data collector
35//
36
ccdbbb52 37static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
1275c750 38{
5add75d8 39 DCItem *pItem;
129f8f7b 40 Node *pNode;
1275c750 41 DWORD dwError;
129f8f7b 42 time_t currTime;
333ece94 43 char *pBuffer;
1275c750
VK
44
45 pBuffer = (char *)malloc(MAX_LINE_SIZE);
46
067f2689 47 while(!ShutdownInProgress())
1275c750 48 {
5add75d8
VK
49 pItem = (DCItem *)m_pItemQueue->GetOrBlock();
50 pNode = pItem->RelatedNode();
129f8f7b 51 if (pNode != NULL)
1275c750 52 {
5add75d8 53 switch(pItem->DataSource())
1275c750 54 {
8e99117a 55 case DS_INTERNAL: // Server internal parameters (like status)
5add75d8 56 dwError = pNode->GetInternalItem(pItem->Name(), MAX_LINE_SIZE, pBuffer);
8e99117a 57 break;
1275c750 58 case DS_SNMP_AGENT:
5add75d8 59 dwError = pNode->GetItemFromSNMP(pItem->Name(), MAX_LINE_SIZE, pBuffer);
1275c750
VK
60 break;
61 case DS_NATIVE_AGENT:
5add75d8 62 dwError = pNode->GetItemFromAgent(pItem->Name(), MAX_LINE_SIZE, pBuffer);
1275c750
VK
63 break;
64 }
65
129f8f7b
VK
66 // Update item's last poll time
67 currTime = time(NULL);
5add75d8 68 pItem->SetLastPollTime(currTime);
129f8f7b 69
333ece94 70 // Transform and store received value into database or handle error
1275c750
VK
71 switch(dwError)
72 {
73 case DCE_SUCCESS:
333ece94 74 pItem->NewValue(currTime, pBuffer);
1275c750
VK
75 break;
76 case DCE_COMM_ERROR:
77 break;
78 case DCE_NOT_SUPPORTED:
79 // Change item's status
5add75d8 80 pItem->SetStatus(ITEM_STATUS_NOT_SUPPORTED);
1275c750
VK
81 break;
82 }
5add75d8
VK
83
84 // Clear busy flag so item can be polled again
85 pItem->SetBusyFlag(FALSE);
1275c750 86 }
129f8f7b
VK
87 else /* pNode == NULL */
88 {
3e4e127f 89 DbgPrintf(AF_DEBUG_DC, "*** DataCollector: Attempt to collect information for non-existing node.\n");
129f8f7b 90 }
1275c750 91 }
e0d3b217
VK
92
93 free(pBuffer);
a7b85168 94 DbgPrintf(AF_DEBUG_DC, "Data collector thread terminated\n");
ccdbbb52 95 return THREAD_OK;
1275c750
VK
96}
97
98
99//
067f2689
VK
100// Item poller thread: check nodes' items and put into the
101// data collector queue when data polling required
1275c750
VK
102//
103
ccdbbb52 104static THREAD_RESULT THREAD_CALL ItemPoller(void *pArg)
1275c750 105{
b7271ccb
VK
106 DWORD i, dwElapsed, dwWatchdogId;
107 INT64 qwStart;
1275c750 108
a97797f8 109 dwWatchdogId = WatchdogAddThread("Item Poller", 20);
1275c750 110
067f2689
VK
111 while(!ShutdownInProgress())
112 {
113 if (SleepAndCheckForShutdown(2))
114 break; // Shutdown has arrived
115 WatchdogNotify(dwWatchdogId);
116
117 MutexLock(g_hMutexNodeIndex, INFINITE);
b7271ccb 118 qwStart = GetCurrentTimeMs();
067f2689
VK
119 for(i = 0; i < g_dwNodeAddrIndexSize; i++)
120 ((Node *)g_pNodeIndexByAddr[i].pObject)->QueueItemsForPolling(m_pItemQueue);
121 MutexUnlock(g_hMutexNodeIndex);
122
b7271ccb 123 dwElapsed = (DWORD)(GetCurrentTimeMs() - qwStart);
067f2689 124 }
a7b85168 125 DbgPrintf(AF_DEBUG_DC, "Item poller thread terminated\n");
ccdbbb52 126 return THREAD_OK;
067f2689 127}
1275c750 128
1275c750 129
067f2689
VK
130//
131// Statistics collection thread
132//
133
ccdbbb52 134static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
067f2689
VK
135{
136 while(!ShutdownInProgress())
137 {
138 if (SleepAndCheckForShutdown(10))
139 break; // Shutdown has arrived
140
3e4e127f
VK
141 if (g_dwFlags & AF_DEBUG_DC)
142 {
a7b85168
VK
143// printf("*** Poller Queue size: %d ***\n", m_pItemQueue->Size());
144// printf("*** DB Writer Queue size: %d ***\n", g_pLazyRequestQueue->Size());
3e4e127f 145 }
067f2689 146 }
ccdbbb52 147 return THREAD_OK;
1275c750
VK
148}
149
150
151//
067f2689 152// Initialize data collection subsystem
1275c750
VK
153//
154
067f2689 155BOOL InitDataCollector(void)
1275c750 156{
067f2689 157 int i, iNumCollectors;
1275c750 158
067f2689
VK
159 // Create collection requests queue
160 m_pItemQueue = new Queue(4096, 256);
1275c750 161
067f2689
VK
162 // Start data collection threads
163 iNumCollectors = ConfigReadInt("NumberOfDataCollectors", 10);
164 for(i = 0; i < iNumCollectors; i++)
165 ThreadCreate(DataCollector, 0, NULL);
1275c750 166
067f2689
VK
167 // Start item poller thread
168 ThreadCreate(ItemPoller, 0, NULL);
169
170 // Start statistics collection thread
171 ThreadCreate(StatCollector, 0, NULL);
172
173 return TRUE;
1275c750 174}