Fixed Windows build errors
[public/netxms.git] / src / agent / core / session.cpp
CommitLineData
9fa031cd 1/*
5039dede 2** NetXMS multiplatform core agent
908d71bd 3** Copyright (C) 2003-2014 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
9fcdfda7 19** File: session.cpp
5039dede
AK
20**
21**/
22
23#include "nxagentd.h"
24
62baece1
VK
25#ifdef _WIN32
26#define write _write
27#define close _close
28#endif
29
0f506caa
VK
30/**
31 * Externals
32 */
967893bb 33void UnregisterSession(UINT32 dwIndex);
b368969c
VK
34void ProxySNMPRequest(NXCPMessage *pRequest, NXCPMessage *pResponse);
35UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
36UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
37UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
81222795 38void ClearDataCollectionConfiguration();
5039dede 39
0f506caa 40/**
81222795 41 * Max message size
0f506caa 42 */
81222795 43#define MAX_MSG_SIZE 4194304
5039dede 44
0f506caa
VK
45/**
46 * Client communication read thread
47 */
e4a64da2 48THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
5039dede 49{
e4a64da2 50 ((CommSession *)pArg)->readThread();
5039dede
AK
51
52 // When CommSession::ReadThread exits, all other session
53 // threads are already stopped, so we can safely destroy
54 // session object
e4a64da2 55 UnregisterSession(((CommSession *)pArg)->getIndex());
6fbaa926 56 ((CommSession *)pArg)->decRefCount();
5039dede
AK
57 return THREAD_OK;
58}
59
1a93e64a
VK
60/**
61 * Client communication write thread
62 */
e4a64da2 63THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
5039dede 64{
e4a64da2 65 ((CommSession *)pArg)->writeThread();
5039dede
AK
66 return THREAD_OK;
67}
68
1a93e64a
VK
69/**
70 * Received message processing thread
71 */
e4a64da2 72THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
5039dede 73{
e4a64da2 74 ((CommSession *)pArg)->processingThread();
5039dede
AK
75 return THREAD_OK;
76}
77
1a93e64a
VK
78/**
79 * Client communication write thread
80 */
e4a64da2 81THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
5039dede 82{
e4a64da2 83 ((CommSession *)pArg)->proxyReadThread();
5039dede
AK
84 return THREAD_OK;
85}
86
1a93e64a
VK
87/**
88 * Client session class constructor
89 */
8c75ad41 90CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer)
5039dede 91{
6fbaa926
VK
92 m_sendQueue = new Queue;
93 m_processingQueue = new Queue;
5039dede 94 m_hSocket = hSocket;
81222795 95 m_hProxySocket = INVALID_SOCKET;
5039dede 96 m_dwIndex = INVALID_INDEX;
5039dede
AK
97 m_hWriteThread = INVALID_THREAD_HANDLE;
98 m_hProcessingThread = INVALID_THREAD_HANDLE;
124528cc 99 m_hProxyReadThread = INVALID_THREAD_HANDLE;
e9902466 100 m_serverId = 0;
9319c166
VK
101 m_serverAddr = serverAddr;
102 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
103 m_masterServer = masterServer;
104 m_controlServer = controlServer;
105 m_proxyConnection = false;
106 m_acceptTraps = false;
124528cc 107 m_acceptFileUpdates = false;
ea3993c8 108 m_ipv6Aware = false;
5039dede 109 m_hCurrFile = -1;
503da871
VK
110 m_fileRqId = 0;
111 m_compressor = NULL;
5039dede
AK
112 m_pCtx = NULL;
113 m_ts = time(NULL);
ef8a3c32 114 m_socketWriteMutex = MutexCreate();
6fbaa926
VK
115 m_responseQueue = new MsgWaitQueue();
116 m_requestId = 0;
5039dede
AK
117}
118
1a93e64a
VK
119/**
120 * Destructor
121 */
5039dede
AK
122CommSession::~CommSession()
123{
124 shutdown(m_hSocket, SHUT_RDWR);
125 closesocket(m_hSocket);
81222795 126 if (m_hProxySocket != INVALID_SOCKET)
5039dede 127 closesocket(m_hProxySocket);
6fbaa926
VK
128 delete m_sendQueue;
129 delete m_processingQueue;
5039dede 130 if (m_hCurrFile != -1)
62baece1 131 close(m_hCurrFile);
503da871 132 delete m_compressor;
ea50695d 133 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
98abc9f1 134 m_pCtx->decRefCount();
7b8b337e 135 MutexDestroy(m_socketWriteMutex);
6fbaa926 136 delete m_responseQueue;
5039dede
AK
137}
138
1a93e64a
VK
139/**
140 * Start all threads
141 */
4685a2ad 142void CommSession::run()
5039dede 143{
e4a64da2
VK
144 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
145 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
146 ThreadCreate(readThreadStarter, 0, this);
5039dede
AK
147}
148
1a93e64a
VK
149/**
150 * Disconnect session
151 */
4685a2ad 152void CommSession::disconnect()
5039dede 153{
bf3b7f79 154 DebugPrintf(m_dwIndex, 5, _T("CommSession::disconnect()"));
5039dede
AK
155 shutdown(m_hSocket, SHUT_RDWR);
156 if (m_hProxySocket != -1)
157 shutdown(m_hProxySocket, SHUT_RDWR);
158}
159
5891329f
VK
160/**
161 * Reading thread
162 */
4685a2ad 163void CommSession::readThread()
5039dede 164{
81222795
VK
165 SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE);
166 while(true)
5039dede 167 {
81222795 168 if (!m_proxyConnection)
5039dede 169 {
81222795
VK
170 MessageReceiverResult result;
171 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
5039dede 172
81222795
VK
173 // Check for decryption error
174 if (result == MSGRECV_DECRYPTION_FAILURE)
175 {
176 DebugPrintf(m_dwIndex, 4, _T("Unable to decrypt received message"));
177 continue;
178 }
5039dede 179
81222795
VK
180 // Check for timeout
181 if (result == MSGRECV_TIMEOUT)
182 {
183 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
184 {
185 DebugPrintf(m_dwIndex, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
186 break;
187 }
188 continue;
189 }
5039dede 190
81222795
VK
191 // Receive error
192 if (msg == NULL)
5e60b759 193 {
81222795 194 DebugPrintf(m_dwIndex, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
5e60b759
VK
195 break;
196 }
5e60b759 197
81222795
VK
198 // Update activity timestamp
199 m_ts = time(NULL);
5039dede 200
81222795
VK
201 if (g_debugLevel >= 8)
202 {
203 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
204 DebugPrintf(m_dwIndex, 8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
205 }
5c44534b 206
81222795 207 if (msg->isBinary())
5039dede 208 {
81222795
VK
209 TCHAR buffer[64];
210 DebugPrintf(m_dwIndex, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
5039dede 211
81222795 212 if (msg->getCode() == CMD_FILE_DATA)
5039dede 213 {
81222795 214 if ((m_hCurrFile != -1) && (m_fileRqId == msg->getId()))
5039dede 215 {
503da871
VK
216 const BYTE *data;
217 int dataSize;
81222795 218 if (msg->isCompressed())
503da871 219 {
81222795 220 BYTE *in = msg->getBinaryData();
503da871
VK
221 if (m_compressor == NULL)
222 {
223 NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
224 m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
225 if (m_compressor == NULL)
226 {
227 DebugPrintf(m_dwIndex, 5, _T("Unable to create stream compressor for method %d"), (int)method);
228 data = NULL;
229 dataSize = -1;
230 }
231 }
232
233 if (m_compressor != NULL)
234 {
81222795 235 dataSize = (int)m_compressor->decompress(in + 4, msg->getBinaryDataSize() - 4, &data);
503da871
VK
236 if (dataSize != (int)ntohs(*((UINT16 *)(in + 2))))
237 {
238 // decompressed block size validation failed
239 dataSize = -1;
240 }
241 }
242 }
243 else
244 {
81222795
VK
245 data = msg->getBinaryData();
246 dataSize = (int)msg->getBinaryDataSize();
503da871
VK
247 }
248
249 if ((dataSize >= 0) && (write(m_hCurrFile, data, dataSize) == dataSize))
5039dede 250 {
81222795 251 if (msg->isEndOfFile())
5039dede 252 {
81222795 253 NXCPMessage response;
5039dede 254
62baece1 255 close(m_hCurrFile);
5039dede 256 m_hCurrFile = -1;
503da871 257 delete_and_null(m_compressor);
9fa031cd 258
81222795
VK
259 response.setCode(CMD_REQUEST_COMPLETED);
260 response.setId(msg->getId());
261 response.setField(VID_RCC, ERR_SUCCESS);
262 sendMessage(&response);
5039dede
AK
263 }
264 }
265 else
266 {
267 // I/O error
81222795 268 NXCPMessage response;
5039dede 269
62baece1 270 close(m_hCurrFile);
5039dede 271 m_hCurrFile = -1;
503da871 272 delete_and_null(m_compressor);
9fa031cd 273
81222795
VK
274 response.setCode(CMD_REQUEST_COMPLETED);
275 response.setId(msg->getId());
276 response.setField(VID_RCC, ERR_IO_FAILURE);
277 sendMessage(&response);
5039dede
AK
278 }
279 }
280 }
281 }
81222795 282 else if (msg->isControl())
5039dede 283 {
81222795
VK
284 TCHAR buffer[64];
285 DebugPrintf(m_dwIndex, 6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
5039dede 286
81222795 287 if (msg->getCode() == CMD_GET_NXCP_CAPS)
5039dede 288 {
b368969c 289 NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
81222795 290 pMsg->id = htonl(msg->getId());
b368969c
VK
291 pMsg->code = htons((WORD)CMD_NXCP_CAPS);
292 pMsg->flags = htons(MF_CONTROL);
293 pMsg->numFields = htonl(NXCP_VERSION << 24);
294 pMsg->size = htonl(NXCP_HEADER_SIZE);
e4a64da2 295 sendRawMessage(pMsg, m_pCtx);
5039dede 296 }
81222795 297 delete msg;
5039dede
AK
298 }
299 else
300 {
81222795
VK
301 TCHAR buffer[64];
302 DebugPrintf(m_dwIndex, 6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
303
304 if (msg->getCode() == CMD_REQUEST_SESSION_KEY)
5039dede 305 {
5039dede
AK
306 if (m_pCtx == NULL)
307 {
b368969c 308 NXCPMessage *pResponse;
81222795 309 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
e4a64da2 310 sendMessage(pResponse);
5039dede 311 delete pResponse;
81222795 312 receiver.setEncryptionContext(m_pCtx);
5039dede 313 }
81222795
VK
314 delete msg;
315 }
316 else if (msg->getCode() == CMD_REQUEST_COMPLETED)
317 {
318 m_responseQueue->put(msg);
5039dede 319 }
81222795 320 else if (msg->getCode() == CMD_SETUP_PROXY_CONNECTION)
6fbaa926 321 {
4a8dec30 322 UINT32 rcc = setupProxyConnection(msg);
81222795
VK
323 // When proxy session established incoming messages will
324 // not be processed locally. Acknowledgement message sent
325 // by SetupProxyConnection() in case of success.
326 if (rcc == ERR_SUCCESS)
327 {
328 m_processingQueue->put(INVALID_POINTER_VALUE);
329 }
330 else
331 {
332 NXCPMessage response;
333 response.setCode(CMD_REQUEST_COMPLETED);
334 response.setId(msg->getId());
335 response.setField(VID_RCC, rcc);
336 sendMessage(&response);
337 }
338 delete msg;
6fbaa926 339 }
5039dede
AK
340 else
341 {
81222795 342 m_processingQueue->put(msg);
5039dede
AK
343 }
344 }
345 }
81222795
VK
346 else // m_proxyConnection
347 {
348 fd_set rdfs;
349 struct timeval tv;
350 char buffer[32768];
351
352 FD_ZERO(&rdfs);
353 FD_SET(m_hSocket, &rdfs);
354 tv.tv_sec = g_dwIdleTimeout + 1;
355 tv.tv_usec = 0;
356 int rc = select(SELECT_NFDS(m_hSocket + 1), &rdfs, NULL, NULL, &tv);
357 if (rc <= 0)
358 break;
359 if (rc > 0)
360 {
361 // Update activity timestamp
362 m_ts = time(NULL);
363
364 rc = recv(m_hSocket, buffer, 32768, 0);
365 if (rc <= 0)
366 break;
367 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
368 }
369 }
5039dede 370 }
5039dede
AK
371
372 // Notify other threads to exit
19dbc8ef
VK
373 m_sendQueue->put(INVALID_POINTER_VALUE);
374 m_processingQueue->put(INVALID_POINTER_VALUE);
81222795 375 if (m_hProxySocket != INVALID_SOCKET)
5039dede
AK
376 shutdown(m_hProxySocket, SHUT_RDWR);
377
378 // Wait for other threads to finish
379 ThreadJoin(m_hWriteThread);
380 ThreadJoin(m_hProcessingThread);
9319c166 381 if (m_proxyConnection)
5039dede
AK
382 ThreadJoin(m_hProxyReadThread);
383
8c75ad41 384 DebugPrintf(m_dwIndex, 5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
5039dede
AK
385}
386
5891329f
VK
387/**
388 * Send prepared raw message over the network and destroy it
389 */
b368969c 390BOOL CommSession::sendRawMessage(NXCP_MESSAGE *pMsg, NXCPEncryptionContext *pCtx)
5039dede
AK
391{
392 BOOL bResult = TRUE;
bf3b7f79 393 TCHAR szBuffer[128];
5039dede 394
b368969c 395 DebugPrintf(m_dwIndex, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
5039dede
AK
396 if ((pCtx != NULL) && (pCtx != PROXY_ENCRYPTION_CTX))
397 {
b368969c 398 NXCP_ENCRYPTED_MESSAGE *enMsg = pCtx->encryptMessage(pMsg);
6be0a20b 399 if (enMsg != NULL)
5039dede 400 {
b368969c 401 if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
5039dede
AK
402 {
403 bResult = FALSE;
404 }
6be0a20b 405 free(enMsg);
5039dede
AK
406 }
407 }
408 else
409 {
b368969c 410 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->size), 0, m_socketWriteMutex) <= 0)
5039dede
AK
411 {
412 bResult = FALSE;
413 }
5039dede
AK
414 }
415 if (!bResult)
b368969c 416 DebugPrintf(m_dwIndex, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
486942d7 417 free(pMsg);
5039dede
AK
418 return bResult;
419}
420
1a93e64a
VK
421/**
422 * Writing thread
423 */
ab51c4d0 424void CommSession::writeThread()
5039dede 425{
b368969c 426 NXCP_MESSAGE *pMsg;
5039dede
AK
427
428 while(1)
429 {
19dbc8ef 430 pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
5039dede
AK
431 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
432 break;
433
e4a64da2 434 if (!sendRawMessage(pMsg, m_pCtx))
5039dede
AK
435 break;
436 }
19dbc8ef 437 m_sendQueue->clear();
5039dede
AK
438}
439
1a93e64a
VK
440/**
441 * Message processing thread
442 */
ab51c4d0 443void CommSession::processingThread()
5039dede 444{
b368969c 445 NXCPMessage *pMsg;
b368969c 446 NXCPMessage msg;
6ba36557 447 UINT32 dwCommand;
5039dede
AK
448
449 while(1)
450 {
19dbc8ef 451 pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
5039dede
AK
452 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
453 break;
b368969c 454 dwCommand = pMsg->getCode();
5039dede
AK
455
456 // Prepare response message
b368969c
VK
457 msg.setCode(CMD_REQUEST_COMPLETED);
458 msg.setId(pMsg->getId());
5039dede
AK
459
460 // Check if authentication required
9319c166 461 if ((!m_authenticated) && (dwCommand != CMD_AUTHENTICATE))
5039dede 462 {
74df14c2 463 DebugPrintf(m_dwIndex, 6, _T("Authentication required"));
b368969c 464 msg.setField(VID_RCC, ERR_AUTH_REQUIRED);
5039dede
AK
465 }
466 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
467 {
74df14c2 468 DebugPrintf(m_dwIndex, 6, _T("Encryption required"));
b368969c 469 msg.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
5039dede
AK
470 }
471 else
472 {
473 switch(dwCommand)
474 {
475 case CMD_AUTHENTICATE:
e4a64da2 476 authenticate(pMsg, &msg);
5039dede
AK
477 break;
478 case CMD_GET_PARAMETER:
e4a64da2 479 getParameter(pMsg, &msg);
5039dede
AK
480 break;
481 case CMD_GET_LIST:
e4a64da2 482 getList(pMsg, &msg);
5039dede 483 break;
4687826e
VK
484 case CMD_GET_TABLE:
485 getTable(pMsg, &msg);
486 break;
5039dede 487 case CMD_KEEPALIVE:
b368969c 488 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
489 break;
490 case CMD_ACTION:
e4a64da2 491 action(pMsg, &msg);
5039dede
AK
492 break;
493 case CMD_TRANSFER_FILE:
e4a64da2 494 recvFile(pMsg, &msg);
5039dede
AK
495 break;
496 case CMD_UPGRADE_AGENT:
b368969c 497 msg.setField(VID_RCC, upgrade(pMsg));
5039dede
AK
498 break;
499 case CMD_GET_PARAMETER_LIST:
b368969c 500 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
501 GetParameterList(&msg);
502 break;
0f506caa 503 case CMD_GET_ENUM_LIST:
b368969c 504 msg.setField(VID_RCC, ERR_SUCCESS);
0f506caa
VK
505 GetEnumList(&msg);
506 break;
507 case CMD_GET_TABLE_LIST:
b368969c 508 msg.setField(VID_RCC, ERR_SUCCESS);
0f506caa
VK
509 GetTableList(&msg);
510 break;
5039dede 511 case CMD_GET_AGENT_CONFIG:
e4a64da2 512 getConfig(&msg);
5039dede
AK
513 break;
514 case CMD_UPDATE_AGENT_CONFIG:
e4a64da2 515 updateConfig(pMsg, &msg);
5039dede 516 break;
5039dede 517 case CMD_ENABLE_AGENT_TRAPS:
9319c166 518 if (m_masterServer)
5039dede 519 {
9319c166 520 m_acceptTraps = true;
b368969c 521 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
522 }
523 else
524 {
b368969c 525 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede 526 }
e13420c1 527 break;
528 case CMD_ENABLE_FILE_UPDATES:
529 if (m_masterServer)
530 {
531 m_acceptFileUpdates = true;
532 msg.setField(VID_RCC, ERR_SUCCESS);
533 }
534 else
535 {
536 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
537 }
5039dede
AK
538 break;
539 case CMD_SNMP_REQUEST:
9319c166 540 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
5039dede
AK
541 {
542 ProxySNMPRequest(pMsg, &msg);
543 }
544 else
545 {
b368969c 546 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
6ca3b41c
VK
547 }
548 break;
549 case CMD_DEPLOY_AGENT_POLICY:
9319c166 550 if (m_masterServer)
6ca3b41c 551 {
b368969c 552 msg.setField(VID_RCC, DeployPolicy(this, pMsg));
6ca3b41c
VK
553 }
554 else
555 {
b368969c 556 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
93599cfd
VK
557 }
558 break;
559 case CMD_UNINSTALL_AGENT_POLICY:
9319c166 560 if (m_masterServer)
93599cfd 561 {
b368969c 562 msg.setField(VID_RCC, UninstallPolicy(this, pMsg));
93599cfd
VK
563 }
564 else
565 {
b368969c 566 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
1f385e47
VK
567 }
568 break;
569 case CMD_GET_POLICY_INVENTORY:
9319c166 570 if (m_masterServer)
1f385e47 571 {
b368969c 572 msg.setField(VID_RCC, GetPolicyInventory(this, &msg));
1f385e47
VK
573 }
574 else
575 {
b368969c 576 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
577 }
578 break;
9c786c0f 579 case CMD_TAKE_SCREENSHOT:
9319c166 580 if (m_controlServer)
9c786c0f
VK
581 {
582 TCHAR sessionName[256];
b368969c 583 pMsg->getFieldAsString(VID_NAME, sessionName, 256);
9c786c0f
VK
584 DebugPrintf(m_dwIndex, 6, _T("Take snapshot from session \"%s\""), sessionName);
585 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
586 if (conn != NULL)
587 {
588 DebugPrintf(m_dwIndex, 6, _T("Session agent connector acquired"));
589 conn->takeScreenshot(&msg);
590 conn->decRefCount();
591 }
592 else
593 {
b368969c 594 msg.setField(VID_RCC, ERR_NO_SESSION_AGENT);
9c786c0f
VK
595 }
596 }
597 else
598 {
b368969c 599 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
9c786c0f
VK
600 }
601 break;
ea3993c8
VK
602 case CMD_ENABLE_IPV6:
603 m_ipv6Aware = pMsg->getFieldAsBoolean(VID_ENABLED);
604 msg.setField(VID_RCC, ERR_SUCCESS);
605 break;
e9902466
VK
606 case CMD_SET_SERVER_ID:
607 m_serverId = pMsg->getFieldAsUInt64(VID_SERVER_ID);
608 DebugPrintf(m_dwIndex, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
609 msg.setField(VID_RCC, ERR_SUCCESS);
610 break;
87fff547
VK
611 case CMD_DATA_COLLECTION_CONFIG:
612 if (m_serverId != 0)
613 {
614 ConfigureDataCollection(m_serverId, pMsg);
615 msg.setField(VID_RCC, ERR_SUCCESS);
616 }
617 else
618 {
619 DebugPrintf(m_dwIndex, 1, _T("Data collection configuration command received but server ID is not set"));
620 msg.setField(VID_RCC, ERR_SERVER_ID_UNSET);
621 }
622 break;
4b535d78 623 case CMD_CLEAN_AGENT_DCI_CONF:
624 if (m_masterServer)
625 {
81222795 626 ClearDataCollectionConfiguration();
4b535d78 627 msg.setField(VID_RCC, ERR_SUCCESS);
628 }
629 else
630 {
631 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
632 }
633 break;
5039dede
AK
634 default:
635 // Attempt to process unknown command by subagents
7cf549ad 636 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg, this))
b368969c 637 msg.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
5039dede
AK
638 break;
639 }
640 }
641 delete pMsg;
642
643 // Send response
e4a64da2 644 sendMessage(&msg);
b368969c 645 msg.deleteAllFields();
5039dede 646 }
5039dede
AK
647}
648
4af351c7
VK
649/**
650 * Authenticate peer
651 */
b368969c 652void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 653{
9319c166 654 if (m_authenticated)
5039dede
AK
655 {
656 // Already authenticated
b368969c 657 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
5039dede
AK
658 }
659 else
660 {
bf3b7f79 661 TCHAR szSecret[MAX_SECRET_LENGTH];
5039dede
AK
662 BYTE hash[32];
663 WORD wAuthMethod;
664
b368969c 665 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
5039dede
AK
666 switch(wAuthMethod)
667 {
668 case AUTH_PLAINTEXT:
b368969c 669 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
bf3b7f79 670 if (!_tcscmp(szSecret, g_szSharedSecret))
5039dede 671 {
9319c166 672 m_authenticated = true;
b368969c 673 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
674 }
675 else
676 {
e434ba62 677 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, "PLAIN");
b368969c 678 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
679 }
680 break;
681 case AUTH_MD5_HASH:
b368969c 682 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
bf3b7f79
VK
683#ifdef UNICODE
684 {
685 char sharedSecret[256];
686 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
687 sharedSecret[255] = 0;
688 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
689 }
690#else
5039dede 691 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
bf3b7f79 692#endif
5039dede
AK
693 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
694 {
9319c166 695 m_authenticated = true;
b368969c 696 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
697 }
698 else
699 {
e434ba62 700 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
b368969c 701 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
702 }
703 break;
704 case AUTH_SHA1_HASH:
b368969c 705 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
bf3b7f79
VK
706#ifdef UNICODE
707 {
708 char sharedSecret[256];
709 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
710 sharedSecret[255] = 0;
711 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
712 }
713#else
5039dede 714 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
bf3b7f79 715#endif
5039dede
AK
716 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
717 {
9319c166 718 m_authenticated = true;
b368969c 719 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
720 }
721 else
722 {
e434ba62 723 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
b368969c 724 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
725 }
726 break;
727 default:
b368969c 728 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
5039dede
AK
729 break;
730 }
731 }
732}
733
1a93e64a
VK
734/**
735 * Get parameter's value
736 */
b368969c 737void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 738{
0df58041 739 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
967893bb 740 UINT32 dwErrorCode;
5039dede 741
b368969c 742 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
060c5a11 743 dwErrorCode = GetParameterValue(m_dwIndex, szParameter, szValue, this);
b368969c 744 pMsg->setField(VID_RCC, dwErrorCode);
5039dede 745 if (dwErrorCode == ERR_SUCCESS)
b368969c 746 pMsg->setField(VID_VALUE, szValue);
5039dede
AK
747}
748
0f506caa
VK
749/**
750 * Get list of values
751 */
b368969c 752void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 753{
fd1fa78b 754 TCHAR szParameter[MAX_PARAM_NAME];
b368969c 755 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
5039dede 756
6173bea8 757 StringList value;
060c5a11 758 UINT32 dwErrorCode = GetListValue(m_dwIndex, szParameter, &value, this);
b368969c 759 pMsg->setField(VID_RCC, dwErrorCode);
5039dede
AK
760 if (dwErrorCode == ERR_SUCCESS)
761 {
6fbaa926 762 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
5039dede 763 }
5039dede
AK
764}
765
0f506caa
VK
766/**
767 * Get table
768 */
b368969c 769void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
4687826e
VK
770{
771 TCHAR szParameter[MAX_PARAM_NAME];
772
b368969c 773 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
4687826e
VK
774
775 Table value;
060c5a11 776 UINT32 dwErrorCode = GetTableValue(m_dwIndex, szParameter, &value, this);
b368969c 777 pMsg->setField(VID_RCC, dwErrorCode);
4687826e
VK
778 if (dwErrorCode == ERR_SUCCESS)
779 {
780 value.fillMessage(*pMsg, 0, -1); // no row limit
781 }
782}
783
0f506caa
VK
784/**
785 * Perform action on request
786 */
b368969c 787void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 788{
9319c166 789 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
5039dede
AK
790 {
791 // Get action name and arguments
908d71bd 792 TCHAR action[MAX_PARAM_NAME];
b368969c 793 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
489b117b 794
908d71bd
VK
795 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
796 StringList *args = new StringList;
797 for(int i = 0; i < numArgs; i++)
b368969c 798 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
5039dede
AK
799
800 // Execute action
908d71bd
VK
801 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
802 {
b368969c
VK
803 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
804 pMsg->setField(VID_RCC, rcc);
908d71bd
VK
805 }
806 else
807 {
060c5a11 808 UINT32 rcc = ExecAction(action, args, this);
b368969c 809 pMsg->setField(VID_RCC, rcc);
908d71bd
VK
810 delete args;
811 }
5039dede
AK
812 }
813 else
814 {
b368969c 815 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
816 }
817}
818
d9821bd7 819/**
820 * Prepare for receiving file
821 */
b368969c 822void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 823{
7cf549ad 824 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
5039dede 825
9319c166 826 if (m_masterServer)
f0c1d2a4
AK
827 {
828 szFileName[0] = 0;
b368969c 829 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
bf3b7f79 830 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
7cf549ad 831 BuildFullPath(szFileName, szFullPath);
f0c1d2a4
AK
832
833 // Check if for some reason we have already opened file
b368969c 834 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId()));
f0c1d2a4
AK
835 }
836 else
837 {
b368969c 838 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
f0c1d2a4 839 }
5039dede
AK
840}
841
8581943e
VK
842/**
843 * Open file for writing
844 */
845UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId)
7cf549ad 846{
847 if (m_hCurrFile != -1)
848 {
849 return ERR_RESOURCE_BUSY;
850 }
851 else
852 {
853 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
854 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
855 if (m_hCurrFile == -1)
856 {
857 DebugPrintf(m_dwIndex, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
858 return ERR_IO_FAILURE;
859 }
860 else
861 {
503da871 862 m_fileRqId = requestId;
8581943e 863 return ERR_SUCCESS;
7cf549ad 864 }
865 }
866}
867
503da871
VK
868/**
869 * Progress callback for file sending
870 */
4685a2ad
VK
871static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
872{
873 ((CommSession *)cbArg)->updateTimeStamp();
874}
875
9c786c0f
VK
876/**
877 * Send file to server
878 */
cc022855 879bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
4685a2ad 880{
cc022855 881 return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
4685a2ad
VK
882}
883
9c786c0f
VK
884/**
885 * Upgrade agent from package in the file store
886 */
b368969c 887UINT32 CommSession::upgrade(NXCPMessage *pRequest)
5039dede 888{
9319c166 889 if (m_masterServer)
5039dede
AK
890 {
891 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
892
893 szPkgName[0] = 0;
b368969c 894 pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
5039dede 895 BuildFullPath(szPkgName, szFullPath);
80f2318b 896
897 //Create line in registry file with upgrade file name to delete it after system start
359784de 898 Config *registry = AgentOpenRegistry();
80f2318b 899 registry->setValue(_T("/upgrade/file"), szFullPath);
359784de 900 AgentCloseRegistry(true);
80f2318b 901
5039dede
AK
902 return UpgradeAgent(szFullPath);
903 }
904 else
905 {
906 return ERR_ACCESS_DENIED;
907 }
908}
909
5944946e
VK
910/**
911 * Get agent's configuration file
912 */
b368969c 913void CommSession::getConfig(NXCPMessage *pMsg)
5039dede 914{
9319c166 915 if (m_masterServer)
5039dede 916 {
b368969c 917 pMsg->setField(VID_RCC,
5944946e 918 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
5039dede
AK
919 }
920 else
921 {
b368969c 922 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
923 }
924}
925
5c44534b
VK
926/**
927 * Update agent's configuration file
928 */
b368969c 929void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 930{
9319c166 931 if (m_masterServer)
5039dede
AK
932 {
933 BYTE *pConfig;
934 int hFile;
b368969c 935 UINT32 size;
5039dede 936
5c44534b 937 if (pRequest->isFieldExist(VID_CONFIG_FILE))
5039dede 938 {
b368969c
VK
939 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
940 pConfig = (BYTE *)malloc(size);
941 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
05e050f6 942 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
5039dede
AK
943 if (hFile != -1)
944 {
b368969c 945 if (size > 0)
5039dede 946 {
b368969c 947 for(UINT32 i = 0; i < size - 1; i++)
05e050f6 948 if (pConfig[i] == 0x0D)
5039dede 949 {
b368969c
VK
950 size--;
951 memmove(&pConfig[i], &pConfig[i + 1], size - i);
05e050f6 952 i--;
5039dede
AK
953 }
954 }
e5cceced
VK
955 if (write(hFile, pConfig, size) == size)
956 pMsg->setField(VID_RCC, ERR_SUCCESS);
957 else
958 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
62baece1 959 close(hFile);
5039dede
AK
960 }
961 else
962 {
bf3b7f79
VK
963 DebugPrintf(m_dwIndex, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
964 g_szConfigFile, _tcserror(errno));
b368969c 965 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
5039dede
AK
966 }
967 free(pConfig);
968 }
969 else
970 {
b368969c 971 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
5039dede
AK
972 }
973 }
974 else
975 {
b368969c 976 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
977 }
978}
979
1a93e64a
VK
980/**
981 * Setup proxy connection
982 */
b368969c 983UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
5039dede 984{
967893bb 985 UINT32 dwResult, dwAddr;
5039dede
AK
986 WORD wPort;
987 struct sockaddr_in sa;
98abc9f1 988 NXCPEncryptionContext *pSavedCtx;
5039dede
AK
989 TCHAR szBuffer[32];
990
9319c166 991 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
5039dede 992 {
b368969c
VK
993 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
994 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
5039dede 995 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
4c0c75c7 996 if (m_hProxySocket != INVALID_SOCKET)
5039dede
AK
997 {
998 // Fill in address structure
999 memset(&sa, 0, sizeof(sa));
1000 sa.sin_addr.s_addr = htonl(dwAddr);
1001 sa.sin_family = AF_INET;
1002 sa.sin_port = htons(wPort);
1003 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1004 {
b368969c
VK
1005 NXCPMessage msg;
1006 NXCP_MESSAGE *pRawMsg;
5039dede
AK
1007
1008 // Stop writing thread
19dbc8ef 1009 m_sendQueue->put(INVALID_POINTER_VALUE);
5039dede
AK
1010
1011 // Wait while all queued messages will be sent
19dbc8ef 1012 while(m_sendQueue->size() > 0)
5039dede
AK
1013 ThreadSleepMs(100);
1014
1015 // Finish proxy connection setup
1016 pSavedCtx = m_pCtx;
1017 m_pCtx = PROXY_ENCRYPTION_CTX;
9319c166 1018 m_proxyConnection = true;
5039dede 1019 dwResult = ERR_SUCCESS;
e4a64da2 1020 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
5039dede
AK
1021
1022 // Send confirmation message
e4a64da2 1023 // We cannot use sendMessage() and writing thread, because
5039dede
AK
1024 // encryption context already overriden, and writing thread
1025 // already stopped
b368969c
VK
1026 msg.setCode(CMD_REQUEST_COMPLETED);
1027 msg.setId(pRequest->getId());
1028 msg.setField(VID_RCC, RCC_SUCCESS);
5c44534b 1029 pRawMsg = msg.createMessage();
e4a64da2 1030 sendRawMessage(pRawMsg, pSavedCtx);
98abc9f1
VK
1031 if (pSavedCtx != NULL)
1032 pSavedCtx->decRefCount();
5039dede 1033
bf3b7f79 1034 DebugPrintf(m_dwIndex, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
5039dede
AK
1035 }
1036 else
1037 {
1038 dwResult = ERR_CONNECT_FAILED;
1039 }
1040 }
1041 else
1042 {
1043 dwResult = ERR_SOCKET_ERROR;
1044 }
1045 }
1046 else
1047 {
1048 dwResult = ERR_ACCESS_DENIED;
1049 }
1050 return dwResult;
1051}
1052
1a93e64a
VK
1053/**
1054 * Proxy reading thread
1055 */
1f385e47 1056void CommSession::proxyReadThread()
5039dede
AK
1057{
1058 fd_set rdfs;
1059 struct timeval tv;
81222795 1060 char buffer[32768];
5039dede
AK
1061 int nRet;
1062
1063 while(1)
1064 {
1065 FD_ZERO(&rdfs);
1066 FD_SET(m_hProxySocket, &rdfs);
1067 tv.tv_sec = 0;
1068 tv.tv_usec = 5000000; // Half-second timeout
1069 nRet = select(SELECT_NFDS(m_hProxySocket + 1), &rdfs, NULL, NULL, &tv);
1070 if (nRet < 0)
1071 break;
1072 if (nRet > 0)
1073 {
81222795 1074 nRet = recv(m_hProxySocket, buffer, 32768, 0);
5039dede
AK
1075 if (nRet <= 0)
1076 break;
81222795 1077 SendEx(m_hSocket, buffer, nRet, 0, m_socketWriteMutex);
5039dede
AK
1078 }
1079 }
e4a64da2 1080 disconnect();
5039dede 1081}
6fbaa926
VK
1082
1083/**
1084 * Wait for request completion
1085 */
63ff3c9d 1086UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
6fbaa926 1087{
6fbaa926
VK
1088 sendMessage(msg);
1089 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
63ff3c9d 1090 UINT32 rcc;
6fbaa926
VK
1091 if (response != NULL)
1092 {
63ff3c9d 1093 rcc = response->getFieldAsUInt32(VID_RCC);
6fbaa926
VK
1094 delete response;
1095 }
63ff3c9d
VK
1096 else
1097 {
1098 rcc = ERR_REQUEST_TIMEOUT;
1099 }
1100 return rcc;
6fbaa926
VK
1101}
1102
1103/**
1104 * Generate new request ID
1105 */
1106UINT32 CommSession::generateRequestId()
1107{
1108 return (UINT32)InterlockedIncrement(&m_requestId);
1109}