Added description of CSCP message "NOTIFY"
[public/netxms.git] / src / server / core / session.cpp
CommitLineData
21e4b6f0
VK
1/*
2** NetXMS - Network Management System
3** Copyright (C) 2003, 2004 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** $module: session.cpp
20**
21**/
22
23#include "nms_core.h"
24
25
26//
27// Client session class constructor
28//
29
30ClientSession::ClientSession(SOCKET hSocket)
31{
32 m_pSendQueue = new Queue;
33 m_pMessageQueue = new Queue;
62f5857f 34 m_pUpdateQueue = new Queue;
21e4b6f0
VK
35 m_hSocket = hSocket;
36 m_dwIndex = INVALID_INDEX;
37 m_iState = STATE_CONNECTED;
38 m_pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
ecb7e1ee
VK
39 m_hCondWriteThreadStopped = ConditionCreate();
40 m_hCondProcessingThreadStopped = ConditionCreate();
62f5857f
VK
41 m_hCondUpdateThreadStopped = ConditionCreate();
42 m_hMutexSendEvents = MutexCreate();
21e4b6f0
VK
43}
44
45
46//
47// Destructor
48//
49
50ClientSession::~ClientSession()
51{
52 shutdown(m_hSocket, 2);
53 closesocket(m_hSocket);
54 delete m_pSendQueue;
55 delete m_pMessageQueue;
62f5857f 56 delete m_pUpdateQueue;
21e4b6f0
VK
57 if (m_pMsgBuffer != NULL)
58 free(m_pMsgBuffer);
ecb7e1ee
VK
59 ConditionDestroy(m_hCondWriteThreadStopped);
60 ConditionDestroy(m_hCondProcessingThreadStopped);
62f5857f
VK
61 ConditionDestroy(m_hCondUpdateThreadStopped);
62 MutexDestroy(m_hMutexSendEvents);
21e4b6f0
VK
63}
64
65
66//
67// Print debug information
68//
69
70void ClientSession::DebugPrintf(char *szFormat, ...)
71{
72 if ((g_dwFlags & AF_STANDALONE) && (g_dwFlags & AF_DEBUG_CSCP))
73 {
74 va_list args;
75
76 printf("*CSCP(%d)* ", m_dwIndex);
77 va_start(args, szFormat);
78 vprintf(szFormat, args);
79 va_end(args);
80 }
81}
82
83
84//
85// Post message to send queue
86//
87
88void ClientSession::SendMessage(CSCPMessage *pMsg)
89{
90 m_pSendQueue->Put(pMsg->CreateMessage());
91}
92
93
94//
95// ReadThread()
96//
97
98void ClientSession::ReadThread(void)
99{
100 CSCP_MESSAGE *pRawMsg;
101 CSCPMessage *pMsg;
102 int iErr;
103
104 // Initialize raw message receiving function
105 RecvCSCPMessage(0, NULL, m_pMsgBuffer);
106
107 pRawMsg = (CSCP_MESSAGE *)malloc(65536);
108 while(1)
109 {
110 if ((iErr = RecvCSCPMessage(m_hSocket, pRawMsg, m_pMsgBuffer)) <= 0)
111 break;
112
113 // Check that actual received packet size is equal to encoded in packet
114 if (ntohs(pRawMsg->wSize) != iErr)
115 {
116 DebugPrintf("Actual message size doesn't match wSize value (%d,%d)\n", iErr, ntohs(pRawMsg->wSize));
117 continue; // Bad packet, wait for next
118 }
119
120 // Create message object from raw message
121 pMsg = new CSCPMessage(pRawMsg);
122 m_pMessageQueue->Put(pMsg);
123 }
124 if (iErr < 0)
125 WriteLog(MSG_SESSION_CLOSED, EVENTLOG_WARNING_TYPE, "e", WSAGetLastError());
126 free(pRawMsg);
127
128 // Notify other threads to exit
ecb7e1ee
VK
129 m_pSendQueue->Put(INVALID_POINTER_VALUE);
130 m_pMessageQueue->Put(INVALID_POINTER_VALUE);
131
132 // Wait for other threads to finish
133 ConditionWait(m_hCondWriteThreadStopped, INFINITE);
134 ConditionWait(m_hCondProcessingThreadStopped, INFINITE);
21e4b6f0
VK
135}
136
137
138//
139// WriteThread()
140//
141
142void ClientSession::WriteThread(void)
143{
144 CSCP_MESSAGE *pMsg;
145
146 while(1)
147 {
148 pMsg = (CSCP_MESSAGE *)m_pSendQueue->GetOrBlock();
ecb7e1ee 149 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
21e4b6f0
VK
150 break;
151
152 if (send(m_hSocket, (const char *)pMsg, ntohs(pMsg->wSize), 0) <= 0)
153 {
7968a52d 154 MemFree(pMsg);
21e4b6f0
VK
155 break;
156 }
7968a52d 157 MemFree(pMsg);
21e4b6f0 158 }
ecb7e1ee 159 ConditionSet(m_hCondWriteThreadStopped);
21e4b6f0
VK
160}
161
162
163//
164// Message processing thread
165//
166
167void ClientSession::ProcessingThread(void)
168{
169 CSCPMessage *pMsg, *pReply;
170
171 while(1)
172 {
173 pMsg = (CSCPMessage *)m_pMessageQueue->GetOrBlock();
ecb7e1ee 174 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
21e4b6f0
VK
175 break;
176
177 DebugPrintf("Received message with code %d\n", pMsg->GetCode());
178 if ((m_iState != STATE_AUTHENTICATED) && (pMsg->GetCode() != CMD_LOGIN))
179 {
180 delete pMsg;
181 continue;
182 }
183
184 switch(pMsg->GetCode())
185 {
186 case CMD_LOGIN:
187 if (m_iState != STATE_AUTHENTICATED)
188 {
a5f8dbb8
VK
189 char *pszLogin = pMsg->GetVariableStr(VID_LOGIN_NAME);
190 char *pszPassword = pMsg->GetVariableStr(VID_PASSWORD);
21e4b6f0
VK
191
192 if (AuthenticateUser(pszLogin, pszPassword, &m_dwUserId, &m_dwSystemAccess))
193 m_iState = STATE_AUTHENTICATED;
194
7968a52d
VK
195 MemFree(pszLogin);
196 MemFree(pszPassword);
21e4b6f0
VK
197
198 // Send reply
199 pReply = new CSCPMessage;
200 pReply->SetCode(CMD_LOGIN_RESP);
201 pReply->SetId(pMsg->GetId());
a5f8dbb8 202 pReply->SetVariable(VID_LOGIN_RESULT, (DWORD)(m_iState == STATE_AUTHENTICATED));
21e4b6f0
VK
203 SendMessage(pReply);
204 delete pReply;
205 }
206 else
207 {
208 }
209 break;
210 case CMD_GET_OBJECTS:
211 SendAllObjects();
212 break;
213 case CMD_GET_EVENTS:
214 SendAllEvents();
215 break;
216 case CMD_GET_CONFIG_VARLIST:
217 SendAllConfigVars();
218 break;
219 default:
220 break;
221 }
222 delete pMsg;
223 }
ecb7e1ee 224 ConditionSet(m_hCondProcessingThreadStopped);
21e4b6f0
VK
225}
226
227
228//
229// Send all objects to client
230//
231
232void ClientSession::SendAllObjects(void)
233{
234 DWORD i;
235 CSCPMessage msg;
236
237 // Prepare message
238 msg.SetCode(CMD_OBJECT);
239
240 // Send objects, one per message
241 ObjectsGlobalLock();
242 for(i = 0; i < g_dwIdIndexSize; i++)
243 {
244 g_pIndexById[i].pObject->CreateMessage(&msg);
245 SendMessage(&msg);
246 msg.DeleteAllVariables();
247 }
248 ObjectsGlobalUnlock();
249
250 // Send end of list notification
251 msg.SetCode(CMD_OBJECT_LIST_END);
252 SendMessage(&msg);
253}
254
255
256//
257// Send all events to client
258//
259
260void ClientSession::SendAllEvents(void)
261{
21e4b6f0 262 CSCPMessage msg;
20177e8e 263 DB_ASYNC_RESULT hResult;
9c36ef66 264 NXC_EVENT event;
21e4b6f0 265
62f5857f
VK
266 MutexLock(m_hMutexSendEvents, INFINITE);
267
21e4b6f0 268 // Retrieve events from database
20177e8e 269 hResult = DBAsyncSelect(g_hCoreDB, "SELECT event_id,timestamp,source,severity,message FROM EventLog ORDER BY timestamp");
21e4b6f0
VK
270 if (hResult != NULL)
271 {
272 // Send events, one per message
20177e8e 273 while(DBFetch(hResult))
21e4b6f0 274 {
20177e8e
VK
275 event.dwEventId = htonl(DBGetFieldAsyncULong(hResult, 0));
276 event.dwTimeStamp = htonl(DBGetFieldAsyncULong(hResult, 1));
277 event.dwSourceId = htonl(DBGetFieldAsyncULong(hResult, 2));
278 event.dwSeverity = htonl(DBGetFieldAsyncULong(hResult, 3));
279 DBGetFieldAsync(hResult, 4, event.szMessage, MAX_EVENT_MSG_LENGTH);
9c36ef66
VK
280 m_pSendQueue->Put(CreateRawCSCPMessage(CMD_EVENT, 0, sizeof(NXC_EVENT), &event, NULL));
281 }
20177e8e 282 DBFreeAsyncResult(hResult);
21e4b6f0
VK
283 }
284
285 // Send end of list notification
286 msg.SetCode(CMD_EVENT_LIST_END);
287 SendMessage(&msg);
62f5857f
VK
288
289 MutexUnlock(m_hMutexSendEvents);
21e4b6f0
VK
290}
291
292
293//
294// Send all configuration variables to client
295//
296
297void ClientSession::SendAllConfigVars(void)
298{
299 DWORD i, dwNumRecords;
300 CSCPMessage msg;
301 DB_RESULT hResult;
302
303 // Check user rights
304 if ((m_dwUserId != 0) && ((m_dwSystemAccess & SYSTEM_ACCESS_VIEW_CONFIG) == 0))
305 {
306 // Access denied
307 msg.SetCode(CMD_CONFIG_VARLIST_END);
a5f8dbb8 308 msg.SetVariable(VID_ERROR, (DWORD)1);
21e4b6f0
VK
309 SendMessage(&msg);
310 }
311 else
312 {
313 // Prepare message
314 msg.SetCode(CMD_CONFIG_VARIABLE);
315
316 // Retrieve configuration variables from database
317 hResult = DBSelect(g_hCoreDB, "SELECT name,value FROM config");
318 if (hResult != NULL)
319 {
320 // Send events, one per message
321 dwNumRecords = DBGetNumRows(hResult);
322 for(i = 0; i < dwNumRecords; i++)
323 {
a5f8dbb8
VK
324 msg.SetVariable(VID_NAME, DBGetField(hResult, i, 0));
325 msg.SetVariable(VID_VALUE, DBGetField(hResult, i, 1));
21e4b6f0
VK
326 SendMessage(&msg);
327 msg.DeleteAllVariables();
328 }
20177e8e 329 DBFreeResult(hResult);
21e4b6f0
VK
330 }
331
332 // Send end of list notification
333 msg.SetCode(CMD_CONFIG_VARLIST_END);
a5f8dbb8 334 msg.SetVariable(VID_ERROR, (DWORD)0);
21e4b6f0
VK
335 SendMessage(&msg);
336 }
337}
20177e8e
VK
338
339
340//
341// Close session forcibly
342//
343
344void ClientSession::Kill(void)
345{
346 // We shutdown socket connection, which will cause
347 // read thread to stop, and other threads will follow
348 shutdown(m_hSocket, 2);
349}
62f5857f
VK
350
351
352//
353// Handler for new events
354//
355
356void ClientSession::OnNewEvent(Event *pEvent)
357{
358 UPDATE_INFO *pUpdate;
359
360 pUpdate = (UPDATE_INFO *)malloc(sizeof(UPDATE_INFO));
361 pUpdate->dwCategory = INFO_CAT_EVENT;
362 pUpdate->pData = malloc(sizeof(NXC_EVENT));
363 pEvent->PrepareMessage((NXC_EVENT *)pUpdate->pData);
364 m_pUpdateQueue->Put(pUpdate);
365}
366
367
368//
369// Handler for object changes
370//
371
372void ClientSession::OnObjectChange(DWORD dwObjectId)
373{
374}
375
376
377//
378// Update processing thread
379//
380
381void ClientSession::UpdateThread(void)
382{
383 UPDATE_INFO *pUpdate;
384
385 while(1)
386 {
387 pUpdate = (UPDATE_INFO *)m_pUpdateQueue->GetOrBlock();
388 if (pUpdate == INVALID_POINTER_VALUE) // Session termination indicator
389 break;
390
391 switch(pUpdate->dwCategory)
392 {
393 case INFO_CAT_EVENT:
394 MutexLock(m_hMutexSendEvents, INFINITE);
395 m_pSendQueue->Put(CreateRawCSCPMessage(CMD_EVENT, 0, sizeof(NXC_EVENT), pUpdate->pData, NULL));
396 MutexUnlock(m_hMutexSendEvents);
397 free(pUpdate->pData);
398 break;
399 default:
400 break;
401 }
402
403 free(pUpdate);
404 }
405 ConditionSet(m_hCondUpdateThreadStopped);
406}