Initial agent-side code for agent to server connections
[public/netxms.git] / src / agent / core / session.cpp
CommitLineData
9fa031cd 1/*
5039dede 2** NetXMS multiplatform core agent
9d35e382 3** Copyright (C) 2003-2016 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
9fcdfda7 19** File: session.cpp
5039dede
AK
20**
21**/
22
23#include "nxagentd.h"
24
62baece1
VK
25#ifdef _WIN32
26#define write _write
27#define close _close
28#endif
29
0f506caa
VK
30/**
31 * Externals
32 */
967893bb 33void UnregisterSession(UINT32 dwIndex);
b368969c
VK
34UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
35UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
36UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
81222795 37void ClearDataCollectionConfiguration();
5039dede 38
83191808
VK
39/**
40 * SNMP proxy thread pool
41 */
42ThreadPool *g_snmpProxyThreadPool = NULL;
43
e1d760ac
VK
44/**
45 * Agent proxy statistics
46 */
47static UINT64 s_proxyConnectionRequests = 0;
48static VolatileCounter s_activeProxySessions = 0;
49
50/**
51 * Handler for agent proxy stats parameters
52 */
53LONG H_AgentProxyStats(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
54{
55 switch(*arg)
56 {
57 case 'A':
58 ret_uint(value, (UINT32)s_activeProxySessions);
59 break;
60 case 'C':
61 ret_uint64(value, s_proxyConnectionRequests);
62 break;
63 default:
64 return SYSINFO_RC_UNSUPPORTED;
65 }
66 return SYSINFO_RC_SUCCESS;
67}
68
0f506caa
VK
69/**
70 * Client communication read thread
71 */
e4a64da2 72THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
5039dede 73{
e4a64da2 74 ((CommSession *)pArg)->readThread();
5039dede
AK
75
76 // When CommSession::ReadThread exits, all other session
77 // threads are already stopped, so we can safely destroy
78 // session object
e4a64da2 79 UnregisterSession(((CommSession *)pArg)->getIndex());
6fbaa926 80 ((CommSession *)pArg)->decRefCount();
5039dede
AK
81 return THREAD_OK;
82}
83
1a93e64a
VK
84/**
85 * Client communication write thread
86 */
e4a64da2 87THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
5039dede 88{
e4a64da2 89 ((CommSession *)pArg)->writeThread();
5039dede
AK
90 return THREAD_OK;
91}
92
1a93e64a
VK
93/**
94 * Received message processing thread
95 */
e4a64da2 96THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
5039dede 97{
e4a64da2 98 ((CommSession *)pArg)->processingThread();
5039dede
AK
99 return THREAD_OK;
100}
101
1a93e64a
VK
102/**
103 * Client communication write thread
104 */
e4a64da2 105THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
5039dede 106{
e4a64da2 107 ((CommSession *)pArg)->proxyReadThread();
5039dede
AK
108 return THREAD_OK;
109}
110
1a93e64a
VK
111/**
112 * Client session class constructor
113 */
8c75ad41 114CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer)
5039dede 115{
6fbaa926
VK
116 m_sendQueue = new Queue;
117 m_processingQueue = new Queue;
5039dede 118 m_hSocket = hSocket;
81222795 119 m_hProxySocket = INVALID_SOCKET;
5039dede 120 m_dwIndex = INVALID_INDEX;
5039dede
AK
121 m_hWriteThread = INVALID_THREAD_HANDLE;
122 m_hProcessingThread = INVALID_THREAD_HANDLE;
124528cc 123 m_hProxyReadThread = INVALID_THREAD_HANDLE;
e9902466 124 m_serverId = 0;
9319c166
VK
125 m_serverAddr = serverAddr;
126 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
127 m_masterServer = masterServer;
128 m_controlServer = controlServer;
129 m_proxyConnection = false;
130 m_acceptTraps = false;
b4cf3199 131 m_acceptData = false;
124528cc 132 m_acceptFileUpdates = false;
ea3993c8 133 m_ipv6Aware = false;
f7319eac 134 m_bulkReconciliationSupported = false;
5039dede 135 m_hCurrFile = -1;
503da871
VK
136 m_fileRqId = 0;
137 m_compressor = NULL;
5039dede
AK
138 m_pCtx = NULL;
139 m_ts = time(NULL);
ef8a3c32 140 m_socketWriteMutex = MutexCreate();
6fbaa926
VK
141 m_responseQueue = new MsgWaitQueue();
142 m_requestId = 0;
5039dede
AK
143}
144
1a93e64a
VK
145/**
146 * Destructor
147 */
5039dede
AK
148CommSession::~CommSession()
149{
e1d760ac
VK
150 if (m_proxyConnection)
151 InterlockedDecrement(&s_activeProxySessions);
152
5039dede
AK
153 shutdown(m_hSocket, SHUT_RDWR);
154 closesocket(m_hSocket);
81222795 155 if (m_hProxySocket != INVALID_SOCKET)
5039dede 156 closesocket(m_hProxySocket);
9d35e382
VK
157
158 void *p;
159 while((p = m_sendQueue->get()) != NULL)
160 if (p != INVALID_POINTER_VALUE)
161 free(p);
6fbaa926 162 delete m_sendQueue;
9d35e382
VK
163
164 while((p = m_processingQueue->get()) != NULL)
165 if (p != INVALID_POINTER_VALUE)
166 delete (NXCPMessage *)p;
6fbaa926 167 delete m_processingQueue;
9d35e382 168
5039dede 169 if (m_hCurrFile != -1)
62baece1 170 close(m_hCurrFile);
503da871 171 delete m_compressor;
ea50695d 172 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
98abc9f1 173 m_pCtx->decRefCount();
7b8b337e 174 MutexDestroy(m_socketWriteMutex);
6fbaa926 175 delete m_responseQueue;
5039dede
AK
176}
177
1a93e64a
VK
178/**
179 * Start all threads
180 */
4685a2ad 181void CommSession::run()
5039dede 182{
e4a64da2
VK
183 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
184 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
185 ThreadCreate(readThreadStarter, 0, this);
5039dede
AK
186}
187
1a93e64a
VK
188/**
189 * Disconnect session
190 */
4685a2ad 191void CommSession::disconnect()
5039dede 192{
bf3b7f79 193 DebugPrintf(m_dwIndex, 5, _T("CommSession::disconnect()"));
5039dede
AK
194 shutdown(m_hSocket, SHUT_RDWR);
195 if (m_hProxySocket != -1)
196 shutdown(m_hProxySocket, SHUT_RDWR);
197}
198
5891329f
VK
199/**
200 * Reading thread
201 */
4685a2ad 202void CommSession::readThread()
5039dede 203{
842378a4 204 SocketMessageReceiver receiver(m_hSocket, 4096, MAX_AGENT_MSG_SIZE);
81222795 205 while(true)
5039dede 206 {
81222795 207 if (!m_proxyConnection)
5039dede 208 {
81222795
VK
209 MessageReceiverResult result;
210 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
5039dede 211
81222795
VK
212 // Check for decryption error
213 if (result == MSGRECV_DECRYPTION_FAILURE)
214 {
215 DebugPrintf(m_dwIndex, 4, _T("Unable to decrypt received message"));
216 continue;
217 }
5039dede 218
81222795
VK
219 // Check for timeout
220 if (result == MSGRECV_TIMEOUT)
221 {
222 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
223 {
224 DebugPrintf(m_dwIndex, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
225 break;
226 }
227 continue;
228 }
5039dede 229
81222795
VK
230 // Receive error
231 if (msg == NULL)
5e60b759 232 {
81222795 233 DebugPrintf(m_dwIndex, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
5e60b759
VK
234 break;
235 }
5e60b759 236
81222795
VK
237 // Update activity timestamp
238 m_ts = time(NULL);
5039dede 239
2df047f4 240 if (nxlog_get_debug_level() >= 8)
81222795
VK
241 {
242 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
243 DebugPrintf(m_dwIndex, 8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
244 }
5c44534b 245
81222795 246 if (msg->isBinary())
5039dede 247 {
81222795
VK
248 TCHAR buffer[64];
249 DebugPrintf(m_dwIndex, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
5039dede 250
81222795 251 if (msg->getCode() == CMD_FILE_DATA)
5039dede 252 {
81222795 253 if ((m_hCurrFile != -1) && (m_fileRqId == msg->getId()))
5039dede 254 {
503da871
VK
255 const BYTE *data;
256 int dataSize;
81222795 257 if (msg->isCompressed())
503da871 258 {
ca0a165b 259 const BYTE *in = msg->getBinaryData();
503da871
VK
260 if (m_compressor == NULL)
261 {
262 NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
263 m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
264 if (m_compressor == NULL)
265 {
266 DebugPrintf(m_dwIndex, 5, _T("Unable to create stream compressor for method %d"), (int)method);
267 data = NULL;
268 dataSize = -1;
269 }
270 }
271
272 if (m_compressor != NULL)
273 {
81222795 274 dataSize = (int)m_compressor->decompress(in + 4, msg->getBinaryDataSize() - 4, &data);
503da871
VK
275 if (dataSize != (int)ntohs(*((UINT16 *)(in + 2))))
276 {
277 // decompressed block size validation failed
278 dataSize = -1;
279 }
280 }
281 }
282 else
283 {
81222795
VK
284 data = msg->getBinaryData();
285 dataSize = (int)msg->getBinaryDataSize();
503da871
VK
286 }
287
288 if ((dataSize >= 0) && (write(m_hCurrFile, data, dataSize) == dataSize))
5039dede 289 {
81222795 290 if (msg->isEndOfFile())
5039dede 291 {
81222795 292 NXCPMessage response;
5039dede 293
62baece1 294 close(m_hCurrFile);
5039dede 295 m_hCurrFile = -1;
503da871 296 delete_and_null(m_compressor);
9fa031cd 297
81222795
VK
298 response.setCode(CMD_REQUEST_COMPLETED);
299 response.setId(msg->getId());
300 response.setField(VID_RCC, ERR_SUCCESS);
301 sendMessage(&response);
5039dede
AK
302 }
303 }
304 else
305 {
306 // I/O error
81222795 307 NXCPMessage response;
5039dede 308
62baece1 309 close(m_hCurrFile);
5039dede 310 m_hCurrFile = -1;
503da871 311 delete_and_null(m_compressor);
9fa031cd 312
81222795
VK
313 response.setCode(CMD_REQUEST_COMPLETED);
314 response.setId(msg->getId());
315 response.setField(VID_RCC, ERR_IO_FAILURE);
316 sendMessage(&response);
5039dede
AK
317 }
318 }
319 }
320 }
81222795 321 else if (msg->isControl())
5039dede 322 {
81222795
VK
323 TCHAR buffer[64];
324 DebugPrintf(m_dwIndex, 6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
5039dede 325
81222795 326 if (msg->getCode() == CMD_GET_NXCP_CAPS)
5039dede 327 {
b368969c 328 NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
81222795 329 pMsg->id = htonl(msg->getId());
b368969c
VK
330 pMsg->code = htons((WORD)CMD_NXCP_CAPS);
331 pMsg->flags = htons(MF_CONTROL);
332 pMsg->numFields = htonl(NXCP_VERSION << 24);
333 pMsg->size = htonl(NXCP_HEADER_SIZE);
e4a64da2 334 sendRawMessage(pMsg, m_pCtx);
5039dede 335 }
81222795 336 delete msg;
5039dede
AK
337 }
338 else
339 {
81222795
VK
340 TCHAR buffer[64];
341 DebugPrintf(m_dwIndex, 6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
342
83191808
VK
343 UINT32 rcc;
344 switch(msg->getCode())
5039dede 345 {
83191808
VK
346 case CMD_REQUEST_COMPLETED:
347 m_responseQueue->put(msg);
348 break;
349 case CMD_REQUEST_SESSION_KEY:
350 if (m_pCtx == NULL)
351 {
352 NXCPMessage *pResponse;
353 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
354 sendMessage(pResponse);
355 delete pResponse;
356 receiver.setEncryptionContext(m_pCtx);
357 }
358 delete msg;
359 break;
360 case CMD_SETUP_PROXY_CONNECTION:
e1d760ac 361 s_proxyConnectionRequests++;
83191808
VK
362 rcc = setupProxyConnection(msg);
363 // When proxy session established incoming messages will
364 // not be processed locally. Acknowledgment message sent
e1d760ac 365 // by setupProxyConnection() in case of success.
83191808
VK
366 if (rcc == ERR_SUCCESS)
367 {
e1d760ac 368 InterlockedIncrement(&s_activeProxySessions);
83191808
VK
369 m_processingQueue->put(INVALID_POINTER_VALUE);
370 }
371 else
372 {
373 NXCPMessage response;
374 response.setCode(CMD_REQUEST_COMPLETED);
375 response.setId(msg->getId());
376 response.setField(VID_RCC, rcc);
377 sendMessage(&response);
378 }
379 delete msg;
380 break;
381 case CMD_SNMP_REQUEST:
382 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
383 {
384 incRefCount();
385 ThreadPoolExecute(g_snmpProxyThreadPool, this, &CommSession::proxySnmpRequest, msg);
386 }
387 else
388 {
389 NXCPMessage response;
390 response.setCode(CMD_REQUEST_COMPLETED);
391 response.setId(msg->getId());
392 response.setField(VID_RCC, ERR_ACCESS_DENIED);
393 sendMessage(&response);
394 delete msg;
395 }
396 break;
397 default:
398 m_processingQueue->put(msg);
399 break;
5039dede
AK
400 }
401 }
402 }
81222795
VK
403 else // m_proxyConnection
404 {
405 fd_set rdfs;
406 struct timeval tv;
407 char buffer[32768];
408
409 FD_ZERO(&rdfs);
410 FD_SET(m_hSocket, &rdfs);
411 tv.tv_sec = g_dwIdleTimeout + 1;
412 tv.tv_usec = 0;
413 int rc = select(SELECT_NFDS(m_hSocket + 1), &rdfs, NULL, NULL, &tv);
414 if (rc <= 0)
415 break;
416 if (rc > 0)
417 {
418 // Update activity timestamp
419 m_ts = time(NULL);
420
421 rc = recv(m_hSocket, buffer, 32768, 0);
422 if (rc <= 0)
423 break;
424 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
425 }
426 }
5039dede 427 }
5039dede
AK
428
429 // Notify other threads to exit
19dbc8ef
VK
430 m_sendQueue->put(INVALID_POINTER_VALUE);
431 m_processingQueue->put(INVALID_POINTER_VALUE);
81222795 432 if (m_hProxySocket != INVALID_SOCKET)
5039dede
AK
433 shutdown(m_hProxySocket, SHUT_RDWR);
434
435 // Wait for other threads to finish
436 ThreadJoin(m_hWriteThread);
437 ThreadJoin(m_hProcessingThread);
9319c166 438 if (m_proxyConnection)
5039dede
AK
439 ThreadJoin(m_hProxyReadThread);
440
8c75ad41 441 DebugPrintf(m_dwIndex, 5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
5039dede
AK
442}
443
5891329f
VK
444/**
445 * Send prepared raw message over the network and destroy it
446 */
b368969c 447BOOL CommSession::sendRawMessage(NXCP_MESSAGE *pMsg, NXCPEncryptionContext *pCtx)
5039dede
AK
448{
449 BOOL bResult = TRUE;
bf3b7f79 450 TCHAR szBuffer[128];
5039dede 451
b368969c 452 DebugPrintf(m_dwIndex, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
57963c8e 453 if (nxlog_get_debug_level() >= 8)
454 {
455 String msgDump = NXCPMessage::dump(pMsg, NXCP_VERSION);
456 DebugPrintf(m_dwIndex, 8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
457 }
5039dede
AK
458 if ((pCtx != NULL) && (pCtx != PROXY_ENCRYPTION_CTX))
459 {
b368969c 460 NXCP_ENCRYPTED_MESSAGE *enMsg = pCtx->encryptMessage(pMsg);
6be0a20b 461 if (enMsg != NULL)
5039dede 462 {
b368969c 463 if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
5039dede
AK
464 {
465 bResult = FALSE;
466 }
6be0a20b 467 free(enMsg);
5039dede
AK
468 }
469 }
470 else
471 {
b368969c 472 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->size), 0, m_socketWriteMutex) <= 0)
5039dede
AK
473 {
474 bResult = FALSE;
475 }
5039dede
AK
476 }
477 if (!bResult)
b368969c 478 DebugPrintf(m_dwIndex, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
486942d7 479 free(pMsg);
5039dede
AK
480 return bResult;
481}
482
1a93e64a
VK
483/**
484 * Writing thread
485 */
ab51c4d0 486void CommSession::writeThread()
5039dede 487{
b368969c 488 NXCP_MESSAGE *pMsg;
5039dede
AK
489
490 while(1)
491 {
19dbc8ef 492 pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
5039dede
AK
493 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
494 break;
495
e4a64da2 496 if (!sendRawMessage(pMsg, m_pCtx))
5039dede
AK
497 break;
498 }
5039dede
AK
499}
500
1a93e64a
VK
501/**
502 * Message processing thread
503 */
ab51c4d0 504void CommSession::processingThread()
5039dede 505{
b368969c 506 NXCPMessage *pMsg;
b368969c 507 NXCPMessage msg;
6ba36557 508 UINT32 dwCommand;
5039dede
AK
509
510 while(1)
511 {
19dbc8ef 512 pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
5039dede
AK
513 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
514 break;
b368969c 515 dwCommand = pMsg->getCode();
5039dede
AK
516
517 // Prepare response message
b368969c
VK
518 msg.setCode(CMD_REQUEST_COMPLETED);
519 msg.setId(pMsg->getId());
5039dede
AK
520
521 // Check if authentication required
9319c166 522 if ((!m_authenticated) && (dwCommand != CMD_AUTHENTICATE))
5039dede 523 {
74df14c2 524 DebugPrintf(m_dwIndex, 6, _T("Authentication required"));
b368969c 525 msg.setField(VID_RCC, ERR_AUTH_REQUIRED);
5039dede
AK
526 }
527 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
528 {
74df14c2 529 DebugPrintf(m_dwIndex, 6, _T("Encryption required"));
b368969c 530 msg.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
5039dede
AK
531 }
532 else
533 {
534 switch(dwCommand)
535 {
536 case CMD_AUTHENTICATE:
e4a64da2 537 authenticate(pMsg, &msg);
5039dede
AK
538 break;
539 case CMD_GET_PARAMETER:
e4a64da2 540 getParameter(pMsg, &msg);
5039dede
AK
541 break;
542 case CMD_GET_LIST:
e4a64da2 543 getList(pMsg, &msg);
5039dede 544 break;
4687826e
VK
545 case CMD_GET_TABLE:
546 getTable(pMsg, &msg);
547 break;
5039dede 548 case CMD_KEEPALIVE:
b368969c 549 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
550 break;
551 case CMD_ACTION:
e4a64da2 552 action(pMsg, &msg);
5039dede
AK
553 break;
554 case CMD_TRANSFER_FILE:
e4a64da2 555 recvFile(pMsg, &msg);
5039dede
AK
556 break;
557 case CMD_UPGRADE_AGENT:
b368969c 558 msg.setField(VID_RCC, upgrade(pMsg));
5039dede
AK
559 break;
560 case CMD_GET_PARAMETER_LIST:
b368969c 561 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
562 GetParameterList(&msg);
563 break;
0f506caa 564 case CMD_GET_ENUM_LIST:
b368969c 565 msg.setField(VID_RCC, ERR_SUCCESS);
0f506caa
VK
566 GetEnumList(&msg);
567 break;
568 case CMD_GET_TABLE_LIST:
b368969c 569 msg.setField(VID_RCC, ERR_SUCCESS);
0f506caa
VK
570 GetTableList(&msg);
571 break;
5039dede 572 case CMD_GET_AGENT_CONFIG:
e4a64da2 573 getConfig(&msg);
5039dede
AK
574 break;
575 case CMD_UPDATE_AGENT_CONFIG:
e4a64da2 576 updateConfig(pMsg, &msg);
5039dede 577 break;
5039dede 578 case CMD_ENABLE_AGENT_TRAPS:
9319c166 579 if (m_masterServer)
5039dede 580 {
9319c166 581 m_acceptTraps = true;
b368969c 582 msg.setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
583 }
584 else
585 {
b368969c 586 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede 587 }
e13420c1 588 break;
589 case CMD_ENABLE_FILE_UPDATES:
590 if (m_masterServer)
591 {
592 m_acceptFileUpdates = true;
593 msg.setField(VID_RCC, ERR_SUCCESS);
594 }
595 else
596 {
597 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
598 }
5039dede 599 break;
6ca3b41c 600 case CMD_DEPLOY_AGENT_POLICY:
9319c166 601 if (m_masterServer)
6ca3b41c 602 {
b368969c 603 msg.setField(VID_RCC, DeployPolicy(this, pMsg));
6ca3b41c
VK
604 }
605 else
606 {
b368969c 607 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
93599cfd
VK
608 }
609 break;
610 case CMD_UNINSTALL_AGENT_POLICY:
9319c166 611 if (m_masterServer)
93599cfd 612 {
b368969c 613 msg.setField(VID_RCC, UninstallPolicy(this, pMsg));
93599cfd
VK
614 }
615 else
616 {
b368969c 617 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
1f385e47
VK
618 }
619 break;
620 case CMD_GET_POLICY_INVENTORY:
9319c166 621 if (m_masterServer)
1f385e47 622 {
b368969c 623 msg.setField(VID_RCC, GetPolicyInventory(this, &msg));
1f385e47
VK
624 }
625 else
626 {
b368969c 627 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
628 }
629 break;
9c786c0f 630 case CMD_TAKE_SCREENSHOT:
9319c166 631 if (m_controlServer)
9c786c0f
VK
632 {
633 TCHAR sessionName[256];
b368969c 634 pMsg->getFieldAsString(VID_NAME, sessionName, 256);
9c786c0f
VK
635 DebugPrintf(m_dwIndex, 6, _T("Take snapshot from session \"%s\""), sessionName);
636 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
637 if (conn != NULL)
638 {
639 DebugPrintf(m_dwIndex, 6, _T("Session agent connector acquired"));
640 conn->takeScreenshot(&msg);
641 conn->decRefCount();
642 }
643 else
644 {
b368969c 645 msg.setField(VID_RCC, ERR_NO_SESSION_AGENT);
9c786c0f
VK
646 }
647 }
648 else
649 {
b368969c 650 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
9c786c0f
VK
651 }
652 break;
1a5ddd2a
VK
653 case CMD_SET_SERVER_CAPABILITIES:
654 // Servers before 2.0 use VID_ENABLED
655 m_ipv6Aware = pMsg->isFieldExist(VID_IPV6_SUPPORT) ? pMsg->getFieldAsBoolean(VID_IPV6_SUPPORT) : pMsg->getFieldAsBoolean(VID_ENABLED);
656 m_bulkReconciliationSupported = pMsg->getFieldAsBoolean(VID_BULK_RECONCILIATION);
ea3993c8
VK
657 msg.setField(VID_RCC, ERR_SUCCESS);
658 break;
e9902466
VK
659 case CMD_SET_SERVER_ID:
660 m_serverId = pMsg->getFieldAsUInt64(VID_SERVER_ID);
661 DebugPrintf(m_dwIndex, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
662 msg.setField(VID_RCC, ERR_SUCCESS);
663 break;
87fff547
VK
664 case CMD_DATA_COLLECTION_CONFIG:
665 if (m_serverId != 0)
666 {
667 ConfigureDataCollection(m_serverId, pMsg);
d1dfba1a 668 m_acceptData = true;
87fff547
VK
669 msg.setField(VID_RCC, ERR_SUCCESS);
670 }
671 else
672 {
673 DebugPrintf(m_dwIndex, 1, _T("Data collection configuration command received but server ID is not set"));
674 msg.setField(VID_RCC, ERR_SERVER_ID_UNSET);
675 }
676 break;
4b535d78 677 case CMD_CLEAN_AGENT_DCI_CONF:
678 if (m_masterServer)
679 {
81222795 680 ClearDataCollectionConfiguration();
4b535d78 681 msg.setField(VID_RCC, ERR_SUCCESS);
682 }
683 else
684 {
685 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
686 }
687 break;
5039dede
AK
688 default:
689 // Attempt to process unknown command by subagents
7cf549ad 690 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg, this))
b368969c 691 msg.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
5039dede
AK
692 break;
693 }
694 }
695 delete pMsg;
696
697 // Send response
e4a64da2 698 sendMessage(&msg);
b368969c 699 msg.deleteAllFields();
5039dede 700 }
5039dede
AK
701}
702
4af351c7
VK
703/**
704 * Authenticate peer
705 */
b368969c 706void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 707{
9319c166 708 if (m_authenticated)
5039dede
AK
709 {
710 // Already authenticated
b368969c 711 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
5039dede
AK
712 }
713 else
714 {
bf3b7f79 715 TCHAR szSecret[MAX_SECRET_LENGTH];
5039dede
AK
716 BYTE hash[32];
717 WORD wAuthMethod;
718
b368969c 719 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
5039dede
AK
720 switch(wAuthMethod)
721 {
722 case AUTH_PLAINTEXT:
b368969c 723 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
bf3b7f79 724 if (!_tcscmp(szSecret, g_szSharedSecret))
5039dede 725 {
9319c166 726 m_authenticated = true;
b368969c 727 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
728 }
729 else
730 {
e434ba62 731 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, "PLAIN");
b368969c 732 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
733 }
734 break;
735 case AUTH_MD5_HASH:
b368969c 736 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
bf3b7f79
VK
737#ifdef UNICODE
738 {
739 char sharedSecret[256];
740 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
741 sharedSecret[255] = 0;
742 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
743 }
744#else
5039dede 745 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
bf3b7f79 746#endif
5039dede
AK
747 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
748 {
9319c166 749 m_authenticated = true;
b368969c 750 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
751 }
752 else
753 {
e434ba62 754 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
b368969c 755 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
756 }
757 break;
758 case AUTH_SHA1_HASH:
b368969c 759 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
bf3b7f79
VK
760#ifdef UNICODE
761 {
762 char sharedSecret[256];
763 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
764 sharedSecret[255] = 0;
765 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
766 }
767#else
5039dede 768 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
bf3b7f79 769#endif
5039dede
AK
770 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
771 {
9319c166 772 m_authenticated = true;
b368969c 773 pMsg->setField(VID_RCC, ERR_SUCCESS);
5039dede
AK
774 }
775 else
776 {
e434ba62 777 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
b368969c 778 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
5039dede
AK
779 }
780 break;
781 default:
b368969c 782 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
5039dede
AK
783 break;
784 }
785 }
786}
787
1a93e64a
VK
788/**
789 * Get parameter's value
790 */
b368969c 791void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 792{
0df58041 793 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
967893bb 794 UINT32 dwErrorCode;
5039dede 795
b368969c 796 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
060c5a11 797 dwErrorCode = GetParameterValue(m_dwIndex, szParameter, szValue, this);
b368969c 798 pMsg->setField(VID_RCC, dwErrorCode);
5039dede 799 if (dwErrorCode == ERR_SUCCESS)
b368969c 800 pMsg->setField(VID_VALUE, szValue);
5039dede
AK
801}
802
0f506caa
VK
803/**
804 * Get list of values
805 */
b368969c 806void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 807{
fd1fa78b 808 TCHAR szParameter[MAX_PARAM_NAME];
b368969c 809 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
5039dede 810
6173bea8 811 StringList value;
060c5a11 812 UINT32 dwErrorCode = GetListValue(m_dwIndex, szParameter, &value, this);
b368969c 813 pMsg->setField(VID_RCC, dwErrorCode);
5039dede
AK
814 if (dwErrorCode == ERR_SUCCESS)
815 {
6fbaa926 816 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
5039dede 817 }
5039dede
AK
818}
819
0f506caa
VK
820/**
821 * Get table
822 */
b368969c 823void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
4687826e
VK
824{
825 TCHAR szParameter[MAX_PARAM_NAME];
826
b368969c 827 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
4687826e
VK
828
829 Table value;
060c5a11 830 UINT32 dwErrorCode = GetTableValue(m_dwIndex, szParameter, &value, this);
b368969c 831 pMsg->setField(VID_RCC, dwErrorCode);
4687826e
VK
832 if (dwErrorCode == ERR_SUCCESS)
833 {
834 value.fillMessage(*pMsg, 0, -1); // no row limit
835 }
836}
837
0f506caa
VK
838/**
839 * Perform action on request
840 */
b368969c 841void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 842{
9319c166 843 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
5039dede
AK
844 {
845 // Get action name and arguments
908d71bd 846 TCHAR action[MAX_PARAM_NAME];
b368969c 847 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
489b117b 848
908d71bd
VK
849 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
850 StringList *args = new StringList;
851 for(int i = 0; i < numArgs; i++)
b368969c 852 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
5039dede
AK
853
854 // Execute action
908d71bd
VK
855 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
856 {
b368969c
VK
857 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
858 pMsg->setField(VID_RCC, rcc);
908d71bd
VK
859 }
860 else
861 {
060c5a11 862 UINT32 rcc = ExecAction(action, args, this);
b368969c 863 pMsg->setField(VID_RCC, rcc);
908d71bd
VK
864 delete args;
865 }
5039dede
AK
866 }
867 else
868 {
b368969c 869 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
870 }
871}
872
d9821bd7 873/**
874 * Prepare for receiving file
875 */
b368969c 876void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 877{
7cf549ad 878 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
5039dede 879
9319c166 880 if (m_masterServer)
f0c1d2a4
AK
881 {
882 szFileName[0] = 0;
b368969c 883 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
bf3b7f79 884 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
7cf549ad 885 BuildFullPath(szFileName, szFullPath);
f0c1d2a4
AK
886
887 // Check if for some reason we have already opened file
b368969c 888 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId()));
f0c1d2a4
AK
889 }
890 else
891 {
b368969c 892 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
f0c1d2a4 893 }
5039dede
AK
894}
895
8581943e
VK
896/**
897 * Open file for writing
898 */
899UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId)
7cf549ad 900{
901 if (m_hCurrFile != -1)
902 {
903 return ERR_RESOURCE_BUSY;
904 }
905 else
906 {
907 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
908 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
909 if (m_hCurrFile == -1)
910 {
911 DebugPrintf(m_dwIndex, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
912 return ERR_IO_FAILURE;
913 }
914 else
915 {
503da871 916 m_fileRqId = requestId;
8581943e 917 return ERR_SUCCESS;
7cf549ad 918 }
919 }
920}
921
503da871
VK
922/**
923 * Progress callback for file sending
924 */
4685a2ad
VK
925static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
926{
927 ((CommSession *)cbArg)->updateTimeStamp();
928}
929
9c786c0f
VK
930/**
931 * Send file to server
932 */
cc022855 933bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
4685a2ad 934{
cc022855 935 return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
4685a2ad
VK
936}
937
9c786c0f
VK
938/**
939 * Upgrade agent from package in the file store
940 */
b368969c 941UINT32 CommSession::upgrade(NXCPMessage *pRequest)
5039dede 942{
9319c166 943 if (m_masterServer)
5039dede
AK
944 {
945 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
946
947 szPkgName[0] = 0;
b368969c 948 pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
5039dede 949 BuildFullPath(szPkgName, szFullPath);
80f2318b 950
951 //Create line in registry file with upgrade file name to delete it after system start
7f6ecb6d
TD
952 DB_HANDLE hdb = GetLocalDatabaseHandle();
953 if(hdb != NULL)
954 {
955 TCHAR upgradeFileInsert[256];
956 _sntprintf(upgradeFileInsert, 256, _T("INSERT INTO registry (attribute,value) VALUES ('upgrade.file',%s)"), szPkgName);
957 DBQuery(hdb, upgradeFileInsert);
958 }
80f2318b 959
5039dede
AK
960 return UpgradeAgent(szFullPath);
961 }
962 else
963 {
964 return ERR_ACCESS_DENIED;
965 }
966}
967
5944946e
VK
968/**
969 * Get agent's configuration file
970 */
b368969c 971void CommSession::getConfig(NXCPMessage *pMsg)
5039dede 972{
9319c166 973 if (m_masterServer)
5039dede 974 {
b368969c 975 pMsg->setField(VID_RCC,
5944946e 976 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
5039dede
AK
977 }
978 else
979 {
b368969c 980 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
981 }
982}
983
5c44534b
VK
984/**
985 * Update agent's configuration file
986 */
b368969c 987void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
5039dede 988{
9319c166 989 if (m_masterServer)
5039dede
AK
990 {
991 BYTE *pConfig;
992 int hFile;
b368969c 993 UINT32 size;
5039dede 994
5c44534b 995 if (pRequest->isFieldExist(VID_CONFIG_FILE))
5039dede 996 {
b368969c
VK
997 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
998 pConfig = (BYTE *)malloc(size);
999 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
05e050f6 1000 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
5039dede
AK
1001 if (hFile != -1)
1002 {
b368969c 1003 if (size > 0)
5039dede 1004 {
b368969c 1005 for(UINT32 i = 0; i < size - 1; i++)
05e050f6 1006 if (pConfig[i] == 0x0D)
5039dede 1007 {
b368969c
VK
1008 size--;
1009 memmove(&pConfig[i], &pConfig[i + 1], size - i);
05e050f6 1010 i--;
5039dede
AK
1011 }
1012 }
e5cceced
VK
1013 if (write(hFile, pConfig, size) == size)
1014 pMsg->setField(VID_RCC, ERR_SUCCESS);
1015 else
1016 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
62baece1 1017 close(hFile);
5039dede
AK
1018 }
1019 else
1020 {
bf3b7f79
VK
1021 DebugPrintf(m_dwIndex, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
1022 g_szConfigFile, _tcserror(errno));
b368969c 1023 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
5039dede
AK
1024 }
1025 free(pConfig);
1026 }
1027 else
1028 {
b368969c 1029 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
5039dede
AK
1030 }
1031 }
1032 else
1033 {
b368969c 1034 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
5039dede
AK
1035 }
1036}
1037
1a93e64a
VK
1038/**
1039 * Setup proxy connection
1040 */
b368969c 1041UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
5039dede 1042{
967893bb 1043 UINT32 dwResult, dwAddr;
5039dede
AK
1044 WORD wPort;
1045 struct sockaddr_in sa;
98abc9f1 1046 NXCPEncryptionContext *pSavedCtx;
5039dede
AK
1047 TCHAR szBuffer[32];
1048
9319c166 1049 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
5039dede 1050 {
b368969c
VK
1051 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
1052 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
5039dede 1053 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
4c0c75c7 1054 if (m_hProxySocket != INVALID_SOCKET)
5039dede
AK
1055 {
1056 // Fill in address structure
1057 memset(&sa, 0, sizeof(sa));
1058 sa.sin_addr.s_addr = htonl(dwAddr);
1059 sa.sin_family = AF_INET;
1060 sa.sin_port = htons(wPort);
1061 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1062 {
b368969c
VK
1063 NXCPMessage msg;
1064 NXCP_MESSAGE *pRawMsg;
5039dede
AK
1065
1066 // Stop writing thread
19dbc8ef 1067 m_sendQueue->put(INVALID_POINTER_VALUE);
5039dede
AK
1068
1069 // Wait while all queued messages will be sent
19dbc8ef 1070 while(m_sendQueue->size() > 0)
5039dede
AK
1071 ThreadSleepMs(100);
1072
1073 // Finish proxy connection setup
1074 pSavedCtx = m_pCtx;
1075 m_pCtx = PROXY_ENCRYPTION_CTX;
9319c166 1076 m_proxyConnection = true;
5039dede 1077 dwResult = ERR_SUCCESS;
e4a64da2 1078 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
5039dede
AK
1079
1080 // Send confirmation message
e4a64da2 1081 // We cannot use sendMessage() and writing thread, because
5039dede
AK
1082 // encryption context already overriden, and writing thread
1083 // already stopped
b368969c
VK
1084 msg.setCode(CMD_REQUEST_COMPLETED);
1085 msg.setId(pRequest->getId());
1086 msg.setField(VID_RCC, RCC_SUCCESS);
5c44534b 1087 pRawMsg = msg.createMessage();
e4a64da2 1088 sendRawMessage(pRawMsg, pSavedCtx);
98abc9f1
VK
1089 if (pSavedCtx != NULL)
1090 pSavedCtx->decRefCount();
5039dede 1091
bf3b7f79 1092 DebugPrintf(m_dwIndex, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
5039dede
AK
1093 }
1094 else
1095 {
1096 dwResult = ERR_CONNECT_FAILED;
1097 }
1098 }
1099 else
1100 {
1101 dwResult = ERR_SOCKET_ERROR;
1102 }
1103 }
1104 else
1105 {
1106 dwResult = ERR_ACCESS_DENIED;
1107 }
1108 return dwResult;
1109}
1110
1a93e64a
VK
1111/**
1112 * Proxy reading thread
1113 */
1f385e47 1114void CommSession::proxyReadThread()
5039dede
AK
1115{
1116 fd_set rdfs;
1117 struct timeval tv;
81222795 1118 char buffer[32768];
5039dede
AK
1119 int nRet;
1120
1121 while(1)
1122 {
1123 FD_ZERO(&rdfs);
1124 FD_SET(m_hProxySocket, &rdfs);
1125 tv.tv_sec = 0;
1126 tv.tv_usec = 5000000; // Half-second timeout
1127 nRet = select(SELECT_NFDS(m_hProxySocket + 1), &rdfs, NULL, NULL, &tv);
1128 if (nRet < 0)
1129 break;
1130 if (nRet > 0)
1131 {
81222795 1132 nRet = recv(m_hProxySocket, buffer, 32768, 0);
5039dede
AK
1133 if (nRet <= 0)
1134 break;
81222795 1135 SendEx(m_hSocket, buffer, nRet, 0, m_socketWriteMutex);
5039dede
AK
1136 }
1137 }
e4a64da2 1138 disconnect();
5039dede 1139}
6fbaa926
VK
1140
1141/**
1142 * Wait for request completion
1143 */
63ff3c9d 1144UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
6fbaa926 1145{
6fbaa926
VK
1146 sendMessage(msg);
1147 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
63ff3c9d 1148 UINT32 rcc;
6fbaa926
VK
1149 if (response != NULL)
1150 {
63ff3c9d 1151 rcc = response->getFieldAsUInt32(VID_RCC);
6fbaa926
VK
1152 delete response;
1153 }
63ff3c9d
VK
1154 else
1155 {
1156 rcc = ERR_REQUEST_TIMEOUT;
1157 }
1158 return rcc;
6fbaa926
VK
1159}
1160
a1273b42
VK
1161/**
1162 * Wait for request completion
1163 */
1164NXCPMessage *CommSession::doRequestEx(NXCPMessage *msg, UINT32 timeout)
1165{
1166 sendMessage(msg);
1167 return m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1168}
1169
6fbaa926
VK
1170/**
1171 * Generate new request ID
1172 */
1173UINT32 CommSession::generateRequestId()
1174{
1175 return (UINT32)InterlockedIncrement(&m_requestId);
1176}