condTimedWait fixed
[public/netxms.git] / src / agent / core / session.cpp
CommitLineData
e9580fef
VK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003, 2004 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** $module: session.cpp
20**
21**/
22
23#include "nxagentd.h"
24
25
26//
ccdbbb52
VK
27// Externals
28//
29
30void UnregisterSession(DWORD dwIndex);
31
32
33//
0a147f4b
VK
34// Constants
35//
36
37#define RAW_MSG_SIZE 262144
38
39
40//
ccdbbb52
VK
41// Client communication read thread
42//
43
44THREAD_RESULT THREAD_CALL CommSession::ReadThreadStarter(void *pArg)
45{
46 ((CommSession *)pArg)->ReadThread();
47
48 // When CommSession::ReadThread exits, all other session
49 // threads are already stopped, so we can safely destroy
50 // session object
51 UnregisterSession(((CommSession *)pArg)->GetIndex());
52 delete (CommSession *)pArg;
53 return THREAD_OK;
54}
55
56
57//
58// Client communication write thread
59//
60
61THREAD_RESULT THREAD_CALL CommSession::WriteThreadStarter(void *pArg)
62{
63 ((CommSession *)pArg)->WriteThread();
64 return THREAD_OK;
65}
66
67
68//
69// Received message processing thread
70//
71
72THREAD_RESULT THREAD_CALL CommSession::ProcessingThreadStarter(void *pArg)
73{
74 ((CommSession *)pArg)->ProcessingThread();
75 return THREAD_OK;
76}
77
78
79//
e9580fef
VK
80// Client session class constructor
81//
82
d096bcdd 83CommSession::CommSession(SOCKET hSocket, DWORD dwHostAddr, BOOL bInstallationServer)
e9580fef
VK
84{
85 m_pSendQueue = new Queue;
86 m_pMessageQueue = new Queue;
87 m_hSocket = hSocket;
88 m_dwIndex = INVALID_INDEX;
89 m_pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
ccdbbb52
VK
90 m_hWriteThread = INVALID_THREAD_HANDLE;
91 m_hProcessingThread = INVALID_THREAD_HANDLE;
e9580fef 92 m_dwHostAddr = dwHostAddr;
7c205c0c 93 m_bIsAuthenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? FALSE : TRUE;
d096bcdd
VK
94 m_bInstallationServer = bInstallationServer;
95 m_hCurrFile = -1;
e9580fef
VK
96}
97
98
99//
100// Destructor
101//
102
103CommSession::~CommSession()
104{
105 shutdown(m_hSocket, 2);
106 closesocket(m_hSocket);
107 delete m_pSendQueue;
108 delete m_pMessageQueue;
109 safe_free(m_pMsgBuffer);
d096bcdd
VK
110 if (m_hCurrFile != -1)
111 close(m_hCurrFile);
ccdbbb52
VK
112}
113
114
115//
116// Start all threads
117//
118
119void CommSession::Run(void)
120{
121 m_hWriteThread = ThreadCreateEx(WriteThreadStarter, 0, this);
122 m_hProcessingThread = ThreadCreateEx(ProcessingThreadStarter, 0, this);
123 ThreadCreate(ReadThreadStarter, 0, this);
e9580fef 124}
0a147f4b
VK
125
126
127//
128// Reading thread
129//
130
131void CommSession::ReadThread(void)
132{
133 CSCP_MESSAGE *pRawMsg;
134 CSCPMessage *pMsg;
135 int iErr;
d096bcdd 136 char szBuffer[256];
cb7ec554 137 WORD wFlags;
0a147f4b
VK
138
139 // Initialize raw message receiving function
140 RecvCSCPMessage(0, NULL, m_pMsgBuffer, 0);
141
142 pRawMsg = (CSCP_MESSAGE *)malloc(RAW_MSG_SIZE);
143 while(1)
144 {
145 if ((iErr = RecvCSCPMessage(m_hSocket, pRawMsg, m_pMsgBuffer, RAW_MSG_SIZE)) <= 0)
146 break;
147
148 // Check if message is too large
149 if (iErr == 1)
150 continue;
151
152 // Check that actual received packet size is equal to encoded in packet
153 if ((int)ntohl(pRawMsg->dwSize) != iErr)
154 {
901c96c7 155 DebugPrintf("Actual message size doesn't match wSize value (%d,%d)", iErr, ntohl(pRawMsg->dwSize));
0a147f4b
VK
156 continue; // Bad packet, wait for next
157 }
158
cb7ec554
VK
159 wFlags = ntohs(pRawMsg->wFlags);
160 if (wFlags & MF_BINARY)
161 {
162 // Convert message header to host format
163 pRawMsg->dwId = ntohl(pRawMsg->dwId);
164 pRawMsg->wCode = ntohs(pRawMsg->wCode);
165 pRawMsg->dwNumVars = ntohl(pRawMsg->dwNumVars);
d096bcdd
VK
166 DebugPrintf("Received raw message %s", CSCPMessageCodeName(pRawMsg->wCode, szBuffer));
167
cb7ec554
VK
168 if (pRawMsg->wCode == CMD_FILE_DATA)
169 {
d096bcdd
VK
170 if ((m_hCurrFile != -1) && (m_dwFileRqId == pRawMsg->dwId))
171 {
172 if (write(m_hCurrFile, pRawMsg->df, pRawMsg->dwNumVars) == (int)pRawMsg->dwNumVars)
173 {
174 if (wFlags & MF_EOF)
175 {
176 CSCPMessage msg;
177
178 close(m_hCurrFile);
179 m_hCurrFile = -1;
180
181 msg.SetCode(CMD_REQUEST_COMPLETED);
182 msg.SetId(pRawMsg->dwId);
183 msg.SetVariable(VID_RCC, ERR_SUCCESS);
184 SendMessage(&msg);
185 }
186 }
187 else
188 {
189 // I/O error
190 CSCPMessage msg;
191
192 close(m_hCurrFile);
193 m_hCurrFile = -1;
194
195 msg.SetCode(CMD_REQUEST_COMPLETED);
196 msg.SetId(pRawMsg->dwId);
197 msg.SetVariable(VID_RCC, ERR_IO_FAILURE);
198 SendMessage(&msg);
199 }
200 }
cb7ec554
VK
201 }
202 }
203 else
204 {
205 // Create message object from raw message
206 pMsg = new CSCPMessage(pRawMsg);
207 m_pMessageQueue->Put(pMsg);
208 }
0a147f4b
VK
209 }
210 if (iErr < 0)
211 WriteLog(MSG_SESSION_BROKEN, EVENTLOG_WARNING_TYPE, "e", WSAGetLastError());
212 free(pRawMsg);
213
214 // Notify other threads to exit
215 m_pSendQueue->Put(INVALID_POINTER_VALUE);
216 m_pMessageQueue->Put(INVALID_POINTER_VALUE);
217
218 // Wait for other threads to finish
ccdbbb52
VK
219 ThreadJoin(m_hWriteThread);
220 ThreadJoin(m_hProcessingThread);
7c205c0c
VK
221
222 DebugPrintf("Session with %s closed", IpToStr(m_dwHostAddr, szBuffer));
0a147f4b
VK
223}
224
225
226//
227// Writing thread
228//
229
230void CommSession::WriteThread(void)
231{
232 CSCP_MESSAGE *pMsg;
233 char szBuffer[128];
234
235 while(1)
236 {
237 pMsg = (CSCP_MESSAGE *)m_pSendQueue->GetOrBlock();
238 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
239 break;
240
901c96c7 241 DebugPrintf("Sending message %s", CSCPMessageCodeName(ntohs(pMsg->wCode), szBuffer));
1216cc73 242 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->dwSize), 0) <= 0)
0a147f4b 243 {
9d72bde1 244 free(pMsg);
0a147f4b
VK
245 break;
246 }
9d72bde1 247 free(pMsg);
0a147f4b 248 }
0a147f4b
VK
249}
250
251
252//
253// Message processing thread
254//
255
256void CommSession::ProcessingThread(void)
257{
258 CSCPMessage *pMsg;
259 char szBuffer[128];
7c205c0c 260 CSCPMessage msg;
084352b1 261 DWORD dwCommand;
0a147f4b
VK
262
263 while(1)
264 {
265 pMsg = (CSCPMessage *)m_pMessageQueue->GetOrBlock();
266 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
267 break;
084352b1
VK
268
269 dwCommand = pMsg->GetCode();
270 DebugPrintf("Received message %s", CSCPMessageCodeName((WORD)dwCommand, szBuffer));
7c205c0c
VK
271
272 // Prepare responce message
273 msg.SetCode(CMD_REQUEST_COMPLETED);
274 msg.SetId(pMsg->GetId());
275
276 // Check if authentication required
084352b1 277 if ((!m_bIsAuthenticated) && (dwCommand != CMD_AUTHENTICATE))
7c205c0c
VK
278 {
279 msg.SetVariable(VID_RCC, ERR_AUTH_REQUIRED);
280 }
281 else
282 {
084352b1 283 switch(dwCommand)
7c205c0c
VK
284 {
285 case CMD_AUTHENTICATE:
286 Authenticate(pMsg, &msg);
287 break;
288 case CMD_GET_PARAMETER:
289 GetParameter(pMsg, &msg);
290 break;
291 case CMD_GET_LIST:
901c96c7 292 GetList(pMsg, &msg);
7c205c0c 293 break;
6c426e3a
VK
294 case CMD_KEEPALIVE:
295 msg.SetVariable(VID_RCC, ERR_SUCCESS);
296 break;
3c774461
VK
297 case CMD_ACTION:
298 Action(pMsg, &msg);
299 break;
d096bcdd
VK
300 case CMD_TRANSFER_FILE:
301 RecvFile(pMsg, &msg);
302 break;
1980ebfc 303 case CMD_UPGRADE_AGENT:
e925a5fc 304 msg.SetVariable(VID_RCC, Upgrade(pMsg));
1980ebfc 305 break;
a72a1fb1
VK
306 case CMD_GET_PARAMETER_LIST:
307 msg.SetVariable(VID_RCC, ERR_SUCCESS);
308 GetParameterList(&msg);
309 break;
7c205c0c 310 default:
084352b1
VK
311 // Attempt to process unknown command by subagents
312 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg))
313 msg.SetVariable(VID_RCC, ERR_UNKNOWN_COMMAND);
7c205c0c
VK
314 break;
315 }
316 }
0a147f4b 317 delete pMsg;
7c205c0c
VK
318
319 // Send responce
320 SendMessage(&msg);
321 msg.DeleteAllVariables();
0a147f4b 322 }
0a147f4b 323}
7c205c0c
VK
324
325
326//
327// Authenticate peer
328//
329
330void CommSession::Authenticate(CSCPMessage *pRequest, CSCPMessage *pMsg)
331{
332 if (m_bIsAuthenticated)
333 {
334 // Already authenticated
335 pMsg->SetVariable(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
336 }
337 else
338 {
339 char szSecret[MAX_SECRET_LENGTH];
d1d0b3be 340 BYTE hash[32];
7c205c0c
VK
341 WORD wAuthMethod;
342
343 wAuthMethod = pRequest->GetVariableShort(VID_AUTH_METHOD);
344 switch(wAuthMethod)
345 {
346 case AUTH_PLAINTEXT:
347 pRequest->GetVariableStr(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
348 if (!strcmp(szSecret, g_szSharedSecret))
349 {
350 m_bIsAuthenticated = TRUE;
351 pMsg->SetVariable(VID_RCC, ERR_SUCCESS);
352 }
353 else
354 {
355 WriteLog(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "as", m_dwHostAddr, "PLAIN");
356 pMsg->SetVariable(VID_RCC, ERR_AUTH_FAILED);
357 }
358 break;
d1d0b3be
VK
359 case AUTH_MD5_HASH:
360 pRequest->GetVariableBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
361 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
362 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
363 {
364 m_bIsAuthenticated = TRUE;
365 pMsg->SetVariable(VID_RCC, ERR_SUCCESS);
366 }
367 else
368 {
369 WriteLog(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "as", m_dwHostAddr, "MD5");
370 pMsg->SetVariable(VID_RCC, ERR_AUTH_FAILED);
371 }
372 break;
373 case AUTH_SHA1_HASH:
374 pRequest->GetVariableBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
375 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
376 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
377 {
378 m_bIsAuthenticated = TRUE;
379 pMsg->SetVariable(VID_RCC, ERR_SUCCESS);
380 }
381 else
382 {
383 WriteLog(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "as", m_dwHostAddr, "SHA1");
384 pMsg->SetVariable(VID_RCC, ERR_AUTH_FAILED);
385 }
386 break;
7c205c0c
VK
387 default:
388 pMsg->SetVariable(VID_RCC, ERR_NOT_IMPLEMENTED);
389 break;
390 }
391 }
392}
393
394
395//
396// Get parameter's value
397//
398
399void CommSession::GetParameter(CSCPMessage *pRequest, CSCPMessage *pMsg)
400{
401 char szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
402 DWORD dwErrorCode;
403
404 pRequest->GetVariableStr(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
405 dwErrorCode = GetParameterValue(szParameter, szValue);
406 pMsg->SetVariable(VID_RCC, dwErrorCode);
407 if (dwErrorCode == ERR_SUCCESS)
408 pMsg->SetVariable(VID_VALUE, szValue);
409}
901c96c7
VK
410
411
412//
413// Get list of values
414//
415
416void CommSession::GetList(CSCPMessage *pRequest, CSCPMessage *pMsg)
417{
418 char szParameter[MAX_PARAM_NAME];
419 DWORD i, dwErrorCode;
420 NETXMS_VALUES_LIST value;
421
422 pRequest->GetVariableStr(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
423 value.dwNumStrings = 0;
424 value.ppStringList = NULL;
425
426 dwErrorCode = GetEnumValue(szParameter, &value);
427 pMsg->SetVariable(VID_RCC, dwErrorCode);
428 if (dwErrorCode == ERR_SUCCESS)
429 {
430 pMsg->SetVariable(VID_NUM_STRINGS, value.dwNumStrings);
431 for(i = 0; i < value.dwNumStrings; i++)
432 pMsg->SetVariable(VID_ENUM_VALUE_BASE + i, value.ppStringList[i]);
433 }
434
435 for(i = 0; i < value.dwNumStrings; i++)
9d72bde1
VK
436 safe_free(value.ppStringList[i]);
437 safe_free(value.ppStringList);
901c96c7 438}
3c774461
VK
439
440
441//
442// Perform action on request
443//
444
445void CommSession::Action(CSCPMessage *pRequest, CSCPMessage *pMsg)
446{
447 char szAction[MAX_PARAM_NAME];
448 NETXMS_VALUES_LIST args;
449 DWORD i, dwRetCode;
450
451 // Get action name and arguments
452 pRequest->GetVariableStr(VID_ACTION_NAME, szAction, MAX_PARAM_NAME);
453 args.dwNumStrings = pRequest->GetVariableLong(VID_NUM_ARGS);
9d72bde1 454 args.ppStringList = (char **)malloc(sizeof(char *) * args.dwNumStrings);
3c774461
VK
455 for(i = 0; i < args.dwNumStrings; i++)
456 args.ppStringList[i] = pRequest->GetVariableStr(VID_ACTION_ARG_BASE + i);
457
458 // Execute action
459 dwRetCode = ExecAction(szAction, &args);
460 pMsg->SetVariable(VID_RCC, dwRetCode);
461
462 // Cleanup
463 for(i = 0; i < args.dwNumStrings; i++)
9d72bde1
VK
464 safe_free(args.ppStringList[i]);
465 safe_free(args.ppStringList);
3c774461 466}
d096bcdd
VK
467
468
469//
470// Prepare for receiving file
471//
472
473void CommSession::RecvFile(CSCPMessage *pRequest, CSCPMessage *pMsg)
474{
d096bcdd
VK
475 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
476
477 if (m_bInstallationServer)
478 {
479 szFileName[0] = 0;
480 pRequest->GetVariableStr(VID_FILE_NAME, szFileName, MAX_PATH);
481 DebugPrintf("Preparing for receiving file \"%s\"", szFileName);
e925a5fc 482 BuildFullPath(szFileName, szFullPath);
d096bcdd
VK
483
484 // Check if for some reason we have already opened file
485 if (m_hCurrFile != -1)
486 {
487 pMsg->SetVariable(VID_RCC, ERR_RESOURCE_BUSY);
488 }
489 else
490 {
491 DebugPrintf("Writing to local file \"%s\"", szFullPath);
492 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
493 if (m_hCurrFile == -1)
494 {
495 DebugPrintf("Error opening file for writing: %s", strerror(errno));
496 pMsg->SetVariable(VID_RCC, ERR_IO_FAILURE);
497 }
498 else
499 {
500 m_dwFileRqId = pRequest->GetId();
501 pMsg->SetVariable(VID_RCC, ERR_SUCCESS);
502 }
503 }
504 }
505 else
506 {
507 pMsg->SetVariable(VID_RCC, ERR_ACCESS_DENIED);
508 }
509}
e925a5fc
VK
510
511
512//
513// Upgrade agent from package in the file store
514//
515
516DWORD CommSession::Upgrade(CSCPMessage *pRequest)
517{
518 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
519
520 szPkgName[0] = 0;
521 pRequest->GetVariableStr(VID_FILE_NAME, szPkgName, MAX_PATH);
522 BuildFullPath(szPkgName, szFullPath);
523 return UpgradeAgent(szFullPath);
524}