Fixed Windows build errors
[public/netxms.git] / src / agent / core / session.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: session.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 #ifdef _WIN32
26 #define write _write
27 #define close _close
28 #endif
29
30 /**
31 * Externals
32 */
33 void UnregisterSession(UINT32 dwIndex);
34 void ProxySNMPRequest(NXCPMessage *pRequest, NXCPMessage *pResponse);
35 UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
36 UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
37 UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
38 void ClearDataCollectionConfiguration();
39
40 /**
41 * Max message size
42 */
43 #define MAX_MSG_SIZE 4194304
44
45 /**
46 * Client communication read thread
47 */
48 THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
49 {
50 ((CommSession *)pArg)->readThread();
51
52 // When CommSession::ReadThread exits, all other session
53 // threads are already stopped, so we can safely destroy
54 // session object
55 UnregisterSession(((CommSession *)pArg)->getIndex());
56 ((CommSession *)pArg)->decRefCount();
57 return THREAD_OK;
58 }
59
60 /**
61 * Client communication write thread
62 */
63 THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
64 {
65 ((CommSession *)pArg)->writeThread();
66 return THREAD_OK;
67 }
68
69 /**
70 * Received message processing thread
71 */
72 THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
73 {
74 ((CommSession *)pArg)->processingThread();
75 return THREAD_OK;
76 }
77
78 /**
79 * Client communication write thread
80 */
81 THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
82 {
83 ((CommSession *)pArg)->proxyReadThread();
84 return THREAD_OK;
85 }
86
87 /**
88 * Client session class constructor
89 */
90 CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer)
91 {
92 m_sendQueue = new Queue;
93 m_processingQueue = new Queue;
94 m_hSocket = hSocket;
95 m_hProxySocket = INVALID_SOCKET;
96 m_dwIndex = INVALID_INDEX;
97 m_hWriteThread = INVALID_THREAD_HANDLE;
98 m_hProcessingThread = INVALID_THREAD_HANDLE;
99 m_hProxyReadThread = INVALID_THREAD_HANDLE;
100 m_serverId = 0;
101 m_serverAddr = serverAddr;
102 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
103 m_masterServer = masterServer;
104 m_controlServer = controlServer;
105 m_proxyConnection = false;
106 m_acceptTraps = false;
107 m_acceptFileUpdates = false;
108 m_ipv6Aware = false;
109 m_hCurrFile = -1;
110 m_fileRqId = 0;
111 m_compressor = NULL;
112 m_pCtx = NULL;
113 m_ts = time(NULL);
114 m_socketWriteMutex = MutexCreate();
115 m_responseQueue = new MsgWaitQueue();
116 m_requestId = 0;
117 }
118
119 /**
120 * Destructor
121 */
122 CommSession::~CommSession()
123 {
124 shutdown(m_hSocket, SHUT_RDWR);
125 closesocket(m_hSocket);
126 if (m_hProxySocket != INVALID_SOCKET)
127 closesocket(m_hProxySocket);
128 delete m_sendQueue;
129 delete m_processingQueue;
130 if (m_hCurrFile != -1)
131 close(m_hCurrFile);
132 delete m_compressor;
133 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
134 m_pCtx->decRefCount();
135 MutexDestroy(m_socketWriteMutex);
136 delete m_responseQueue;
137 }
138
139 /**
140 * Start all threads
141 */
142 void CommSession::run()
143 {
144 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
145 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
146 ThreadCreate(readThreadStarter, 0, this);
147 }
148
149 /**
150 * Disconnect session
151 */
152 void CommSession::disconnect()
153 {
154 DebugPrintf(m_dwIndex, 5, _T("CommSession::disconnect()"));
155 shutdown(m_hSocket, SHUT_RDWR);
156 if (m_hProxySocket != -1)
157 shutdown(m_hProxySocket, SHUT_RDWR);
158 }
159
160 /**
161 * Reading thread
162 */
163 void CommSession::readThread()
164 {
165 SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE);
166 while(true)
167 {
168 if (!m_proxyConnection)
169 {
170 MessageReceiverResult result;
171 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
172
173 // Check for decryption error
174 if (result == MSGRECV_DECRYPTION_FAILURE)
175 {
176 DebugPrintf(m_dwIndex, 4, _T("Unable to decrypt received message"));
177 continue;
178 }
179
180 // Check for timeout
181 if (result == MSGRECV_TIMEOUT)
182 {
183 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
184 {
185 DebugPrintf(m_dwIndex, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
186 break;
187 }
188 continue;
189 }
190
191 // Receive error
192 if (msg == NULL)
193 {
194 DebugPrintf(m_dwIndex, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
195 break;
196 }
197
198 // Update activity timestamp
199 m_ts = time(NULL);
200
201 if (g_debugLevel >= 8)
202 {
203 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
204 DebugPrintf(m_dwIndex, 8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
205 }
206
207 if (msg->isBinary())
208 {
209 TCHAR buffer[64];
210 DebugPrintf(m_dwIndex, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
211
212 if (msg->getCode() == CMD_FILE_DATA)
213 {
214 if ((m_hCurrFile != -1) && (m_fileRqId == msg->getId()))
215 {
216 const BYTE *data;
217 int dataSize;
218 if (msg->isCompressed())
219 {
220 BYTE *in = msg->getBinaryData();
221 if (m_compressor == NULL)
222 {
223 NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
224 m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
225 if (m_compressor == NULL)
226 {
227 DebugPrintf(m_dwIndex, 5, _T("Unable to create stream compressor for method %d"), (int)method);
228 data = NULL;
229 dataSize = -1;
230 }
231 }
232
233 if (m_compressor != NULL)
234 {
235 dataSize = (int)m_compressor->decompress(in + 4, msg->getBinaryDataSize() - 4, &data);
236 if (dataSize != (int)ntohs(*((UINT16 *)(in + 2))))
237 {
238 // decompressed block size validation failed
239 dataSize = -1;
240 }
241 }
242 }
243 else
244 {
245 data = msg->getBinaryData();
246 dataSize = (int)msg->getBinaryDataSize();
247 }
248
249 if ((dataSize >= 0) && (write(m_hCurrFile, data, dataSize) == dataSize))
250 {
251 if (msg->isEndOfFile())
252 {
253 NXCPMessage response;
254
255 close(m_hCurrFile);
256 m_hCurrFile = -1;
257 delete_and_null(m_compressor);
258
259 response.setCode(CMD_REQUEST_COMPLETED);
260 response.setId(msg->getId());
261 response.setField(VID_RCC, ERR_SUCCESS);
262 sendMessage(&response);
263 }
264 }
265 else
266 {
267 // I/O error
268 NXCPMessage response;
269
270 close(m_hCurrFile);
271 m_hCurrFile = -1;
272 delete_and_null(m_compressor);
273
274 response.setCode(CMD_REQUEST_COMPLETED);
275 response.setId(msg->getId());
276 response.setField(VID_RCC, ERR_IO_FAILURE);
277 sendMessage(&response);
278 }
279 }
280 }
281 }
282 else if (msg->isControl())
283 {
284 TCHAR buffer[64];
285 DebugPrintf(m_dwIndex, 6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
286
287 if (msg->getCode() == CMD_GET_NXCP_CAPS)
288 {
289 NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
290 pMsg->id = htonl(msg->getId());
291 pMsg->code = htons((WORD)CMD_NXCP_CAPS);
292 pMsg->flags = htons(MF_CONTROL);
293 pMsg->numFields = htonl(NXCP_VERSION << 24);
294 pMsg->size = htonl(NXCP_HEADER_SIZE);
295 sendRawMessage(pMsg, m_pCtx);
296 }
297 delete msg;
298 }
299 else
300 {
301 TCHAR buffer[64];
302 DebugPrintf(m_dwIndex, 6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
303
304 if (msg->getCode() == CMD_REQUEST_SESSION_KEY)
305 {
306 if (m_pCtx == NULL)
307 {
308 NXCPMessage *pResponse;
309 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
310 sendMessage(pResponse);
311 delete pResponse;
312 receiver.setEncryptionContext(m_pCtx);
313 }
314 delete msg;
315 }
316 else if (msg->getCode() == CMD_REQUEST_COMPLETED)
317 {
318 m_responseQueue->put(msg);
319 }
320 else if (msg->getCode() == CMD_SETUP_PROXY_CONNECTION)
321 {
322 UINT32 rcc = setupProxyConnection(msg);
323 // When proxy session established incoming messages will
324 // not be processed locally. Acknowledgement message sent
325 // by SetupProxyConnection() in case of success.
326 if (rcc == ERR_SUCCESS)
327 {
328 m_processingQueue->put(INVALID_POINTER_VALUE);
329 }
330 else
331 {
332 NXCPMessage response;
333 response.setCode(CMD_REQUEST_COMPLETED);
334 response.setId(msg->getId());
335 response.setField(VID_RCC, rcc);
336 sendMessage(&response);
337 }
338 delete msg;
339 }
340 else
341 {
342 m_processingQueue->put(msg);
343 }
344 }
345 }
346 else // m_proxyConnection
347 {
348 fd_set rdfs;
349 struct timeval tv;
350 char buffer[32768];
351
352 FD_ZERO(&rdfs);
353 FD_SET(m_hSocket, &rdfs);
354 tv.tv_sec = g_dwIdleTimeout + 1;
355 tv.tv_usec = 0;
356 int rc = select(SELECT_NFDS(m_hSocket + 1), &rdfs, NULL, NULL, &tv);
357 if (rc <= 0)
358 break;
359 if (rc > 0)
360 {
361 // Update activity timestamp
362 m_ts = time(NULL);
363
364 rc = recv(m_hSocket, buffer, 32768, 0);
365 if (rc <= 0)
366 break;
367 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
368 }
369 }
370 }
371
372 // Notify other threads to exit
373 m_sendQueue->put(INVALID_POINTER_VALUE);
374 m_processingQueue->put(INVALID_POINTER_VALUE);
375 if (m_hProxySocket != INVALID_SOCKET)
376 shutdown(m_hProxySocket, SHUT_RDWR);
377
378 // Wait for other threads to finish
379 ThreadJoin(m_hWriteThread);
380 ThreadJoin(m_hProcessingThread);
381 if (m_proxyConnection)
382 ThreadJoin(m_hProxyReadThread);
383
384 DebugPrintf(m_dwIndex, 5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
385 }
386
387 /**
388 * Send prepared raw message over the network and destroy it
389 */
390 BOOL CommSession::sendRawMessage(NXCP_MESSAGE *pMsg, NXCPEncryptionContext *pCtx)
391 {
392 BOOL bResult = TRUE;
393 TCHAR szBuffer[128];
394
395 DebugPrintf(m_dwIndex, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
396 if ((pCtx != NULL) && (pCtx != PROXY_ENCRYPTION_CTX))
397 {
398 NXCP_ENCRYPTED_MESSAGE *enMsg = pCtx->encryptMessage(pMsg);
399 if (enMsg != NULL)
400 {
401 if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
402 {
403 bResult = FALSE;
404 }
405 free(enMsg);
406 }
407 }
408 else
409 {
410 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->size), 0, m_socketWriteMutex) <= 0)
411 {
412 bResult = FALSE;
413 }
414 }
415 if (!bResult)
416 DebugPrintf(m_dwIndex, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
417 free(pMsg);
418 return bResult;
419 }
420
421 /**
422 * Writing thread
423 */
424 void CommSession::writeThread()
425 {
426 NXCP_MESSAGE *pMsg;
427
428 while(1)
429 {
430 pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
431 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
432 break;
433
434 if (!sendRawMessage(pMsg, m_pCtx))
435 break;
436 }
437 m_sendQueue->clear();
438 }
439
440 /**
441 * Message processing thread
442 */
443 void CommSession::processingThread()
444 {
445 NXCPMessage *pMsg;
446 NXCPMessage msg;
447 UINT32 dwCommand;
448
449 while(1)
450 {
451 pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
452 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
453 break;
454 dwCommand = pMsg->getCode();
455
456 // Prepare response message
457 msg.setCode(CMD_REQUEST_COMPLETED);
458 msg.setId(pMsg->getId());
459
460 // Check if authentication required
461 if ((!m_authenticated) && (dwCommand != CMD_AUTHENTICATE))
462 {
463 DebugPrintf(m_dwIndex, 6, _T("Authentication required"));
464 msg.setField(VID_RCC, ERR_AUTH_REQUIRED);
465 }
466 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
467 {
468 DebugPrintf(m_dwIndex, 6, _T("Encryption required"));
469 msg.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
470 }
471 else
472 {
473 switch(dwCommand)
474 {
475 case CMD_AUTHENTICATE:
476 authenticate(pMsg, &msg);
477 break;
478 case CMD_GET_PARAMETER:
479 getParameter(pMsg, &msg);
480 break;
481 case CMD_GET_LIST:
482 getList(pMsg, &msg);
483 break;
484 case CMD_GET_TABLE:
485 getTable(pMsg, &msg);
486 break;
487 case CMD_KEEPALIVE:
488 msg.setField(VID_RCC, ERR_SUCCESS);
489 break;
490 case CMD_ACTION:
491 action(pMsg, &msg);
492 break;
493 case CMD_TRANSFER_FILE:
494 recvFile(pMsg, &msg);
495 break;
496 case CMD_UPGRADE_AGENT:
497 msg.setField(VID_RCC, upgrade(pMsg));
498 break;
499 case CMD_GET_PARAMETER_LIST:
500 msg.setField(VID_RCC, ERR_SUCCESS);
501 GetParameterList(&msg);
502 break;
503 case CMD_GET_ENUM_LIST:
504 msg.setField(VID_RCC, ERR_SUCCESS);
505 GetEnumList(&msg);
506 break;
507 case CMD_GET_TABLE_LIST:
508 msg.setField(VID_RCC, ERR_SUCCESS);
509 GetTableList(&msg);
510 break;
511 case CMD_GET_AGENT_CONFIG:
512 getConfig(&msg);
513 break;
514 case CMD_UPDATE_AGENT_CONFIG:
515 updateConfig(pMsg, &msg);
516 break;
517 case CMD_ENABLE_AGENT_TRAPS:
518 if (m_masterServer)
519 {
520 m_acceptTraps = true;
521 msg.setField(VID_RCC, ERR_SUCCESS);
522 }
523 else
524 {
525 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
526 }
527 break;
528 case CMD_ENABLE_FILE_UPDATES:
529 if (m_masterServer)
530 {
531 m_acceptFileUpdates = true;
532 msg.setField(VID_RCC, ERR_SUCCESS);
533 }
534 else
535 {
536 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
537 }
538 break;
539 case CMD_SNMP_REQUEST:
540 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
541 {
542 ProxySNMPRequest(pMsg, &msg);
543 }
544 else
545 {
546 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
547 }
548 break;
549 case CMD_DEPLOY_AGENT_POLICY:
550 if (m_masterServer)
551 {
552 msg.setField(VID_RCC, DeployPolicy(this, pMsg));
553 }
554 else
555 {
556 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
557 }
558 break;
559 case CMD_UNINSTALL_AGENT_POLICY:
560 if (m_masterServer)
561 {
562 msg.setField(VID_RCC, UninstallPolicy(this, pMsg));
563 }
564 else
565 {
566 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
567 }
568 break;
569 case CMD_GET_POLICY_INVENTORY:
570 if (m_masterServer)
571 {
572 msg.setField(VID_RCC, GetPolicyInventory(this, &msg));
573 }
574 else
575 {
576 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
577 }
578 break;
579 case CMD_TAKE_SCREENSHOT:
580 if (m_controlServer)
581 {
582 TCHAR sessionName[256];
583 pMsg->getFieldAsString(VID_NAME, sessionName, 256);
584 DebugPrintf(m_dwIndex, 6, _T("Take snapshot from session \"%s\""), sessionName);
585 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
586 if (conn != NULL)
587 {
588 DebugPrintf(m_dwIndex, 6, _T("Session agent connector acquired"));
589 conn->takeScreenshot(&msg);
590 conn->decRefCount();
591 }
592 else
593 {
594 msg.setField(VID_RCC, ERR_NO_SESSION_AGENT);
595 }
596 }
597 else
598 {
599 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
600 }
601 break;
602 case CMD_ENABLE_IPV6:
603 m_ipv6Aware = pMsg->getFieldAsBoolean(VID_ENABLED);
604 msg.setField(VID_RCC, ERR_SUCCESS);
605 break;
606 case CMD_SET_SERVER_ID:
607 m_serverId = pMsg->getFieldAsUInt64(VID_SERVER_ID);
608 DebugPrintf(m_dwIndex, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
609 msg.setField(VID_RCC, ERR_SUCCESS);
610 break;
611 case CMD_DATA_COLLECTION_CONFIG:
612 if (m_serverId != 0)
613 {
614 ConfigureDataCollection(m_serverId, pMsg);
615 msg.setField(VID_RCC, ERR_SUCCESS);
616 }
617 else
618 {
619 DebugPrintf(m_dwIndex, 1, _T("Data collection configuration command received but server ID is not set"));
620 msg.setField(VID_RCC, ERR_SERVER_ID_UNSET);
621 }
622 break;
623 case CMD_CLEAN_AGENT_DCI_CONF:
624 if (m_masterServer)
625 {
626 ClearDataCollectionConfiguration();
627 msg.setField(VID_RCC, ERR_SUCCESS);
628 }
629 else
630 {
631 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
632 }
633 break;
634 default:
635 // Attempt to process unknown command by subagents
636 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg, this))
637 msg.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
638 break;
639 }
640 }
641 delete pMsg;
642
643 // Send response
644 sendMessage(&msg);
645 msg.deleteAllFields();
646 }
647 }
648
649 /**
650 * Authenticate peer
651 */
652 void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
653 {
654 if (m_authenticated)
655 {
656 // Already authenticated
657 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
658 }
659 else
660 {
661 TCHAR szSecret[MAX_SECRET_LENGTH];
662 BYTE hash[32];
663 WORD wAuthMethod;
664
665 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
666 switch(wAuthMethod)
667 {
668 case AUTH_PLAINTEXT:
669 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
670 if (!_tcscmp(szSecret, g_szSharedSecret))
671 {
672 m_authenticated = true;
673 pMsg->setField(VID_RCC, ERR_SUCCESS);
674 }
675 else
676 {
677 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, "PLAIN");
678 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
679 }
680 break;
681 case AUTH_MD5_HASH:
682 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
683 #ifdef UNICODE
684 {
685 char sharedSecret[256];
686 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
687 sharedSecret[255] = 0;
688 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
689 }
690 #else
691 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
692 #endif
693 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
694 {
695 m_authenticated = true;
696 pMsg->setField(VID_RCC, ERR_SUCCESS);
697 }
698 else
699 {
700 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
701 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
702 }
703 break;
704 case AUTH_SHA1_HASH:
705 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
706 #ifdef UNICODE
707 {
708 char sharedSecret[256];
709 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
710 sharedSecret[255] = 0;
711 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
712 }
713 #else
714 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
715 #endif
716 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
717 {
718 m_authenticated = true;
719 pMsg->setField(VID_RCC, ERR_SUCCESS);
720 }
721 else
722 {
723 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
724 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
725 }
726 break;
727 default:
728 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
729 break;
730 }
731 }
732 }
733
734 /**
735 * Get parameter's value
736 */
737 void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
738 {
739 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
740 UINT32 dwErrorCode;
741
742 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
743 dwErrorCode = GetParameterValue(m_dwIndex, szParameter, szValue, this);
744 pMsg->setField(VID_RCC, dwErrorCode);
745 if (dwErrorCode == ERR_SUCCESS)
746 pMsg->setField(VID_VALUE, szValue);
747 }
748
749 /**
750 * Get list of values
751 */
752 void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
753 {
754 TCHAR szParameter[MAX_PARAM_NAME];
755 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
756
757 StringList value;
758 UINT32 dwErrorCode = GetListValue(m_dwIndex, szParameter, &value, this);
759 pMsg->setField(VID_RCC, dwErrorCode);
760 if (dwErrorCode == ERR_SUCCESS)
761 {
762 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
763 }
764 }
765
766 /**
767 * Get table
768 */
769 void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
770 {
771 TCHAR szParameter[MAX_PARAM_NAME];
772
773 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
774
775 Table value;
776 UINT32 dwErrorCode = GetTableValue(m_dwIndex, szParameter, &value, this);
777 pMsg->setField(VID_RCC, dwErrorCode);
778 if (dwErrorCode == ERR_SUCCESS)
779 {
780 value.fillMessage(*pMsg, 0, -1); // no row limit
781 }
782 }
783
784 /**
785 * Perform action on request
786 */
787 void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
788 {
789 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
790 {
791 // Get action name and arguments
792 TCHAR action[MAX_PARAM_NAME];
793 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
794
795 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
796 StringList *args = new StringList;
797 for(int i = 0; i < numArgs; i++)
798 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
799
800 // Execute action
801 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
802 {
803 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
804 pMsg->setField(VID_RCC, rcc);
805 }
806 else
807 {
808 UINT32 rcc = ExecAction(action, args, this);
809 pMsg->setField(VID_RCC, rcc);
810 delete args;
811 }
812 }
813 else
814 {
815 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
816 }
817 }
818
819 /**
820 * Prepare for receiving file
821 */
822 void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
823 {
824 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
825
826 if (m_masterServer)
827 {
828 szFileName[0] = 0;
829 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
830 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
831 BuildFullPath(szFileName, szFullPath);
832
833 // Check if for some reason we have already opened file
834 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId()));
835 }
836 else
837 {
838 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
839 }
840 }
841
842 /**
843 * Open file for writing
844 */
845 UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId)
846 {
847 if (m_hCurrFile != -1)
848 {
849 return ERR_RESOURCE_BUSY;
850 }
851 else
852 {
853 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
854 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
855 if (m_hCurrFile == -1)
856 {
857 DebugPrintf(m_dwIndex, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
858 return ERR_IO_FAILURE;
859 }
860 else
861 {
862 m_fileRqId = requestId;
863 return ERR_SUCCESS;
864 }
865 }
866 }
867
868 /**
869 * Progress callback for file sending
870 */
871 static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
872 {
873 ((CommSession *)cbArg)->updateTimeStamp();
874 }
875
876 /**
877 * Send file to server
878 */
879 bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
880 {
881 return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
882 }
883
884 /**
885 * Upgrade agent from package in the file store
886 */
887 UINT32 CommSession::upgrade(NXCPMessage *pRequest)
888 {
889 if (m_masterServer)
890 {
891 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
892
893 szPkgName[0] = 0;
894 pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
895 BuildFullPath(szPkgName, szFullPath);
896
897 //Create line in registry file with upgrade file name to delete it after system start
898 Config *registry = AgentOpenRegistry();
899 registry->setValue(_T("/upgrade/file"), szFullPath);
900 AgentCloseRegistry(true);
901
902 return UpgradeAgent(szFullPath);
903 }
904 else
905 {
906 return ERR_ACCESS_DENIED;
907 }
908 }
909
910 /**
911 * Get agent's configuration file
912 */
913 void CommSession::getConfig(NXCPMessage *pMsg)
914 {
915 if (m_masterServer)
916 {
917 pMsg->setField(VID_RCC,
918 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
919 }
920 else
921 {
922 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
923 }
924 }
925
926 /**
927 * Update agent's configuration file
928 */
929 void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
930 {
931 if (m_masterServer)
932 {
933 BYTE *pConfig;
934 int hFile;
935 UINT32 size;
936
937 if (pRequest->isFieldExist(VID_CONFIG_FILE))
938 {
939 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
940 pConfig = (BYTE *)malloc(size);
941 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
942 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
943 if (hFile != -1)
944 {
945 if (size > 0)
946 {
947 for(UINT32 i = 0; i < size - 1; i++)
948 if (pConfig[i] == 0x0D)
949 {
950 size--;
951 memmove(&pConfig[i], &pConfig[i + 1], size - i);
952 i--;
953 }
954 }
955 if (write(hFile, pConfig, size) == size)
956 pMsg->setField(VID_RCC, ERR_SUCCESS);
957 else
958 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
959 close(hFile);
960 }
961 else
962 {
963 DebugPrintf(m_dwIndex, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
964 g_szConfigFile, _tcserror(errno));
965 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
966 }
967 free(pConfig);
968 }
969 else
970 {
971 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
972 }
973 }
974 else
975 {
976 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
977 }
978 }
979
980 /**
981 * Setup proxy connection
982 */
983 UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
984 {
985 UINT32 dwResult, dwAddr;
986 WORD wPort;
987 struct sockaddr_in sa;
988 NXCPEncryptionContext *pSavedCtx;
989 TCHAR szBuffer[32];
990
991 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
992 {
993 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
994 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
995 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
996 if (m_hProxySocket != INVALID_SOCKET)
997 {
998 // Fill in address structure
999 memset(&sa, 0, sizeof(sa));
1000 sa.sin_addr.s_addr = htonl(dwAddr);
1001 sa.sin_family = AF_INET;
1002 sa.sin_port = htons(wPort);
1003 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1004 {
1005 NXCPMessage msg;
1006 NXCP_MESSAGE *pRawMsg;
1007
1008 // Stop writing thread
1009 m_sendQueue->put(INVALID_POINTER_VALUE);
1010
1011 // Wait while all queued messages will be sent
1012 while(m_sendQueue->size() > 0)
1013 ThreadSleepMs(100);
1014
1015 // Finish proxy connection setup
1016 pSavedCtx = m_pCtx;
1017 m_pCtx = PROXY_ENCRYPTION_CTX;
1018 m_proxyConnection = true;
1019 dwResult = ERR_SUCCESS;
1020 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
1021
1022 // Send confirmation message
1023 // We cannot use sendMessage() and writing thread, because
1024 // encryption context already overriden, and writing thread
1025 // already stopped
1026 msg.setCode(CMD_REQUEST_COMPLETED);
1027 msg.setId(pRequest->getId());
1028 msg.setField(VID_RCC, RCC_SUCCESS);
1029 pRawMsg = msg.createMessage();
1030 sendRawMessage(pRawMsg, pSavedCtx);
1031 if (pSavedCtx != NULL)
1032 pSavedCtx->decRefCount();
1033
1034 DebugPrintf(m_dwIndex, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
1035 }
1036 else
1037 {
1038 dwResult = ERR_CONNECT_FAILED;
1039 }
1040 }
1041 else
1042 {
1043 dwResult = ERR_SOCKET_ERROR;
1044 }
1045 }
1046 else
1047 {
1048 dwResult = ERR_ACCESS_DENIED;
1049 }
1050 return dwResult;
1051 }
1052
1053 /**
1054 * Proxy reading thread
1055 */
1056 void CommSession::proxyReadThread()
1057 {
1058 fd_set rdfs;
1059 struct timeval tv;
1060 char buffer[32768];
1061 int nRet;
1062
1063 while(1)
1064 {
1065 FD_ZERO(&rdfs);
1066 FD_SET(m_hProxySocket, &rdfs);
1067 tv.tv_sec = 0;
1068 tv.tv_usec = 5000000; // Half-second timeout
1069 nRet = select(SELECT_NFDS(m_hProxySocket + 1), &rdfs, NULL, NULL, &tv);
1070 if (nRet < 0)
1071 break;
1072 if (nRet > 0)
1073 {
1074 nRet = recv(m_hProxySocket, buffer, 32768, 0);
1075 if (nRet <= 0)
1076 break;
1077 SendEx(m_hSocket, buffer, nRet, 0, m_socketWriteMutex);
1078 }
1079 }
1080 disconnect();
1081 }
1082
1083 /**
1084 * Wait for request completion
1085 */
1086 UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
1087 {
1088 sendMessage(msg);
1089 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1090 UINT32 rcc;
1091 if (response != NULL)
1092 {
1093 rcc = response->getFieldAsUInt32(VID_RCC);
1094 delete response;
1095 }
1096 else
1097 {
1098 rcc = ERR_REQUEST_TIMEOUT;
1099 }
1100 return rcc;
1101 }
1102
1103 /**
1104 * Generate new request ID
1105 */
1106 UINT32 CommSession::generateRequestId()
1107 {
1108 return (UINT32)InterlockedIncrement(&m_requestId);
1109 }