bc22abff10a485696283e81afe891e61114a7980
[public/netxms.git] / src / agent / core / session.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: session.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 #ifdef _WIN32
26 #define write _write
27 #define close _close
28 #endif
29
30 /**
31 * Externals
32 */
33 void UnregisterSession(UINT32 dwIndex);
34 UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
35 UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
36 UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
37 void ClearDataCollectionConfiguration();
38
39 /**
40 * Max message size
41 */
42 #define MAX_MSG_SIZE 4194304
43
44 /**
45 * SNMP proxy thread pool
46 */
47 ThreadPool *g_snmpProxyThreadPool = NULL;
48
49 /**
50 * Client communication read thread
51 */
52 THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
53 {
54 ((CommSession *)pArg)->readThread();
55
56 // When CommSession::ReadThread exits, all other session
57 // threads are already stopped, so we can safely destroy
58 // session object
59 UnregisterSession(((CommSession *)pArg)->getIndex());
60 ((CommSession *)pArg)->decRefCount();
61 return THREAD_OK;
62 }
63
64 /**
65 * Client communication write thread
66 */
67 THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
68 {
69 ((CommSession *)pArg)->writeThread();
70 return THREAD_OK;
71 }
72
73 /**
74 * Received message processing thread
75 */
76 THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
77 {
78 ((CommSession *)pArg)->processingThread();
79 return THREAD_OK;
80 }
81
82 /**
83 * Client communication write thread
84 */
85 THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
86 {
87 ((CommSession *)pArg)->proxyReadThread();
88 return THREAD_OK;
89 }
90
91 /**
92 * Client session class constructor
93 */
94 CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer)
95 {
96 m_sendQueue = new Queue;
97 m_processingQueue = new Queue;
98 m_hSocket = hSocket;
99 m_hProxySocket = INVALID_SOCKET;
100 m_dwIndex = INVALID_INDEX;
101 m_hWriteThread = INVALID_THREAD_HANDLE;
102 m_hProcessingThread = INVALID_THREAD_HANDLE;
103 m_hProxyReadThread = INVALID_THREAD_HANDLE;
104 m_serverId = 0;
105 m_serverAddr = serverAddr;
106 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
107 m_masterServer = masterServer;
108 m_controlServer = controlServer;
109 m_proxyConnection = false;
110 m_acceptTraps = false;
111 m_acceptFileUpdates = false;
112 m_ipv6Aware = false;
113 m_bulkReconciliationSupported = false;
114 m_hCurrFile = -1;
115 m_fileRqId = 0;
116 m_compressor = NULL;
117 m_pCtx = NULL;
118 m_ts = time(NULL);
119 m_socketWriteMutex = MutexCreate();
120 m_responseQueue = new MsgWaitQueue();
121 m_requestId = 0;
122 }
123
124 /**
125 * Destructor
126 */
127 CommSession::~CommSession()
128 {
129 shutdown(m_hSocket, SHUT_RDWR);
130 closesocket(m_hSocket);
131 if (m_hProxySocket != INVALID_SOCKET)
132 closesocket(m_hProxySocket);
133
134 void *p;
135 while((p = m_sendQueue->get()) != NULL)
136 if (p != INVALID_POINTER_VALUE)
137 free(p);
138 delete m_sendQueue;
139
140 while((p = m_processingQueue->get()) != NULL)
141 if (p != INVALID_POINTER_VALUE)
142 delete (NXCPMessage *)p;
143 delete m_processingQueue;
144
145 if (m_hCurrFile != -1)
146 close(m_hCurrFile);
147 delete m_compressor;
148 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
149 m_pCtx->decRefCount();
150 MutexDestroy(m_socketWriteMutex);
151 delete m_responseQueue;
152 }
153
154 /**
155 * Start all threads
156 */
157 void CommSession::run()
158 {
159 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
160 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
161 ThreadCreate(readThreadStarter, 0, this);
162 }
163
164 /**
165 * Disconnect session
166 */
167 void CommSession::disconnect()
168 {
169 DebugPrintf(m_dwIndex, 5, _T("CommSession::disconnect()"));
170 shutdown(m_hSocket, SHUT_RDWR);
171 if (m_hProxySocket != -1)
172 shutdown(m_hProxySocket, SHUT_RDWR);
173 }
174
175 /**
176 * Reading thread
177 */
178 void CommSession::readThread()
179 {
180 SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE);
181 while(true)
182 {
183 if (!m_proxyConnection)
184 {
185 MessageReceiverResult result;
186 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
187
188 // Check for decryption error
189 if (result == MSGRECV_DECRYPTION_FAILURE)
190 {
191 DebugPrintf(m_dwIndex, 4, _T("Unable to decrypt received message"));
192 continue;
193 }
194
195 // Check for timeout
196 if (result == MSGRECV_TIMEOUT)
197 {
198 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
199 {
200 DebugPrintf(m_dwIndex, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
201 break;
202 }
203 continue;
204 }
205
206 // Receive error
207 if (msg == NULL)
208 {
209 DebugPrintf(m_dwIndex, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
210 break;
211 }
212
213 // Update activity timestamp
214 m_ts = time(NULL);
215
216 if (nxlog_get_debug_level() >= 8)
217 {
218 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
219 DebugPrintf(m_dwIndex, 8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
220 }
221
222 if (msg->isBinary())
223 {
224 TCHAR buffer[64];
225 DebugPrintf(m_dwIndex, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
226
227 if (msg->getCode() == CMD_FILE_DATA)
228 {
229 if ((m_hCurrFile != -1) && (m_fileRqId == msg->getId()))
230 {
231 const BYTE *data;
232 int dataSize;
233 if (msg->isCompressed())
234 {
235 const BYTE *in = msg->getBinaryData();
236 if (m_compressor == NULL)
237 {
238 NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
239 m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
240 if (m_compressor == NULL)
241 {
242 DebugPrintf(m_dwIndex, 5, _T("Unable to create stream compressor for method %d"), (int)method);
243 data = NULL;
244 dataSize = -1;
245 }
246 }
247
248 if (m_compressor != NULL)
249 {
250 dataSize = (int)m_compressor->decompress(in + 4, msg->getBinaryDataSize() - 4, &data);
251 if (dataSize != (int)ntohs(*((UINT16 *)(in + 2))))
252 {
253 // decompressed block size validation failed
254 dataSize = -1;
255 }
256 }
257 }
258 else
259 {
260 data = msg->getBinaryData();
261 dataSize = (int)msg->getBinaryDataSize();
262 }
263
264 if ((dataSize >= 0) && (write(m_hCurrFile, data, dataSize) == dataSize))
265 {
266 if (msg->isEndOfFile())
267 {
268 NXCPMessage response;
269
270 close(m_hCurrFile);
271 m_hCurrFile = -1;
272 delete_and_null(m_compressor);
273
274 response.setCode(CMD_REQUEST_COMPLETED);
275 response.setId(msg->getId());
276 response.setField(VID_RCC, ERR_SUCCESS);
277 sendMessage(&response);
278 }
279 }
280 else
281 {
282 // I/O error
283 NXCPMessage response;
284
285 close(m_hCurrFile);
286 m_hCurrFile = -1;
287 delete_and_null(m_compressor);
288
289 response.setCode(CMD_REQUEST_COMPLETED);
290 response.setId(msg->getId());
291 response.setField(VID_RCC, ERR_IO_FAILURE);
292 sendMessage(&response);
293 }
294 }
295 }
296 }
297 else if (msg->isControl())
298 {
299 TCHAR buffer[64];
300 DebugPrintf(m_dwIndex, 6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
301
302 if (msg->getCode() == CMD_GET_NXCP_CAPS)
303 {
304 NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
305 pMsg->id = htonl(msg->getId());
306 pMsg->code = htons((WORD)CMD_NXCP_CAPS);
307 pMsg->flags = htons(MF_CONTROL);
308 pMsg->numFields = htonl(NXCP_VERSION << 24);
309 pMsg->size = htonl(NXCP_HEADER_SIZE);
310 sendRawMessage(pMsg, m_pCtx);
311 }
312 delete msg;
313 }
314 else
315 {
316 TCHAR buffer[64];
317 DebugPrintf(m_dwIndex, 6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
318
319 UINT32 rcc;
320 switch(msg->getCode())
321 {
322 case CMD_REQUEST_COMPLETED:
323 m_responseQueue->put(msg);
324 break;
325 case CMD_REQUEST_SESSION_KEY:
326 if (m_pCtx == NULL)
327 {
328 NXCPMessage *pResponse;
329 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
330 sendMessage(pResponse);
331 delete pResponse;
332 receiver.setEncryptionContext(m_pCtx);
333 }
334 delete msg;
335 break;
336 case CMD_SETUP_PROXY_CONNECTION:
337 rcc = setupProxyConnection(msg);
338 // When proxy session established incoming messages will
339 // not be processed locally. Acknowledgment message sent
340 // by SetupProxyConnection() in case of success.
341 if (rcc == ERR_SUCCESS)
342 {
343 m_processingQueue->put(INVALID_POINTER_VALUE);
344 }
345 else
346 {
347 NXCPMessage response;
348 response.setCode(CMD_REQUEST_COMPLETED);
349 response.setId(msg->getId());
350 response.setField(VID_RCC, rcc);
351 sendMessage(&response);
352 }
353 delete msg;
354 break;
355 case CMD_SNMP_REQUEST:
356 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
357 {
358 incRefCount();
359 ThreadPoolExecute(g_snmpProxyThreadPool, this, &CommSession::proxySnmpRequest, msg);
360 }
361 else
362 {
363 NXCPMessage response;
364 response.setCode(CMD_REQUEST_COMPLETED);
365 response.setId(msg->getId());
366 response.setField(VID_RCC, ERR_ACCESS_DENIED);
367 sendMessage(&response);
368 delete msg;
369 }
370 break;
371 default:
372 m_processingQueue->put(msg);
373 break;
374 }
375 }
376 }
377 else // m_proxyConnection
378 {
379 fd_set rdfs;
380 struct timeval tv;
381 char buffer[32768];
382
383 FD_ZERO(&rdfs);
384 FD_SET(m_hSocket, &rdfs);
385 tv.tv_sec = g_dwIdleTimeout + 1;
386 tv.tv_usec = 0;
387 int rc = select(SELECT_NFDS(m_hSocket + 1), &rdfs, NULL, NULL, &tv);
388 if (rc <= 0)
389 break;
390 if (rc > 0)
391 {
392 // Update activity timestamp
393 m_ts = time(NULL);
394
395 rc = recv(m_hSocket, buffer, 32768, 0);
396 if (rc <= 0)
397 break;
398 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
399 }
400 }
401 }
402
403 // Notify other threads to exit
404 m_sendQueue->put(INVALID_POINTER_VALUE);
405 m_processingQueue->put(INVALID_POINTER_VALUE);
406 if (m_hProxySocket != INVALID_SOCKET)
407 shutdown(m_hProxySocket, SHUT_RDWR);
408
409 // Wait for other threads to finish
410 ThreadJoin(m_hWriteThread);
411 ThreadJoin(m_hProcessingThread);
412 if (m_proxyConnection)
413 ThreadJoin(m_hProxyReadThread);
414
415 DebugPrintf(m_dwIndex, 5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
416 }
417
418 /**
419 * Send prepared raw message over the network and destroy it
420 */
421 BOOL CommSession::sendRawMessage(NXCP_MESSAGE *pMsg, NXCPEncryptionContext *pCtx)
422 {
423 BOOL bResult = TRUE;
424 TCHAR szBuffer[128];
425
426 DebugPrintf(m_dwIndex, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
427 if ((pCtx != NULL) && (pCtx != PROXY_ENCRYPTION_CTX))
428 {
429 NXCP_ENCRYPTED_MESSAGE *enMsg = pCtx->encryptMessage(pMsg);
430 if (enMsg != NULL)
431 {
432 if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
433 {
434 bResult = FALSE;
435 }
436 free(enMsg);
437 }
438 }
439 else
440 {
441 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->size), 0, m_socketWriteMutex) <= 0)
442 {
443 bResult = FALSE;
444 }
445 }
446 if (!bResult)
447 DebugPrintf(m_dwIndex, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
448 free(pMsg);
449 return bResult;
450 }
451
452 /**
453 * Writing thread
454 */
455 void CommSession::writeThread()
456 {
457 NXCP_MESSAGE *pMsg;
458
459 while(1)
460 {
461 pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
462 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
463 break;
464
465 if (!sendRawMessage(pMsg, m_pCtx))
466 break;
467 }
468 }
469
470 /**
471 * Message processing thread
472 */
473 void CommSession::processingThread()
474 {
475 NXCPMessage *pMsg;
476 NXCPMessage msg;
477 UINT32 dwCommand;
478
479 while(1)
480 {
481 pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
482 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
483 break;
484 dwCommand = pMsg->getCode();
485
486 // Prepare response message
487 msg.setCode(CMD_REQUEST_COMPLETED);
488 msg.setId(pMsg->getId());
489
490 // Check if authentication required
491 if ((!m_authenticated) && (dwCommand != CMD_AUTHENTICATE))
492 {
493 DebugPrintf(m_dwIndex, 6, _T("Authentication required"));
494 msg.setField(VID_RCC, ERR_AUTH_REQUIRED);
495 }
496 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
497 {
498 DebugPrintf(m_dwIndex, 6, _T("Encryption required"));
499 msg.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
500 }
501 else
502 {
503 switch(dwCommand)
504 {
505 case CMD_AUTHENTICATE:
506 authenticate(pMsg, &msg);
507 break;
508 case CMD_GET_PARAMETER:
509 getParameter(pMsg, &msg);
510 break;
511 case CMD_GET_LIST:
512 getList(pMsg, &msg);
513 break;
514 case CMD_GET_TABLE:
515 getTable(pMsg, &msg);
516 break;
517 case CMD_KEEPALIVE:
518 msg.setField(VID_RCC, ERR_SUCCESS);
519 break;
520 case CMD_ACTION:
521 action(pMsg, &msg);
522 break;
523 case CMD_TRANSFER_FILE:
524 recvFile(pMsg, &msg);
525 break;
526 case CMD_UPGRADE_AGENT:
527 msg.setField(VID_RCC, upgrade(pMsg));
528 break;
529 case CMD_GET_PARAMETER_LIST:
530 msg.setField(VID_RCC, ERR_SUCCESS);
531 GetParameterList(&msg);
532 break;
533 case CMD_GET_ENUM_LIST:
534 msg.setField(VID_RCC, ERR_SUCCESS);
535 GetEnumList(&msg);
536 break;
537 case CMD_GET_TABLE_LIST:
538 msg.setField(VID_RCC, ERR_SUCCESS);
539 GetTableList(&msg);
540 break;
541 case CMD_GET_AGENT_CONFIG:
542 getConfig(&msg);
543 break;
544 case CMD_UPDATE_AGENT_CONFIG:
545 updateConfig(pMsg, &msg);
546 break;
547 case CMD_ENABLE_AGENT_TRAPS:
548 if (m_masterServer)
549 {
550 m_acceptTraps = true;
551 msg.setField(VID_RCC, ERR_SUCCESS);
552 }
553 else
554 {
555 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
556 }
557 break;
558 case CMD_ENABLE_FILE_UPDATES:
559 if (m_masterServer)
560 {
561 m_acceptFileUpdates = true;
562 msg.setField(VID_RCC, ERR_SUCCESS);
563 }
564 else
565 {
566 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
567 }
568 break;
569 case CMD_DEPLOY_AGENT_POLICY:
570 if (m_masterServer)
571 {
572 msg.setField(VID_RCC, DeployPolicy(this, pMsg));
573 }
574 else
575 {
576 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
577 }
578 break;
579 case CMD_UNINSTALL_AGENT_POLICY:
580 if (m_masterServer)
581 {
582 msg.setField(VID_RCC, UninstallPolicy(this, pMsg));
583 }
584 else
585 {
586 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
587 }
588 break;
589 case CMD_GET_POLICY_INVENTORY:
590 if (m_masterServer)
591 {
592 msg.setField(VID_RCC, GetPolicyInventory(this, &msg));
593 }
594 else
595 {
596 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
597 }
598 break;
599 case CMD_TAKE_SCREENSHOT:
600 if (m_controlServer)
601 {
602 TCHAR sessionName[256];
603 pMsg->getFieldAsString(VID_NAME, sessionName, 256);
604 DebugPrintf(m_dwIndex, 6, _T("Take snapshot from session \"%s\""), sessionName);
605 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
606 if (conn != NULL)
607 {
608 DebugPrintf(m_dwIndex, 6, _T("Session agent connector acquired"));
609 conn->takeScreenshot(&msg);
610 conn->decRefCount();
611 }
612 else
613 {
614 msg.setField(VID_RCC, ERR_NO_SESSION_AGENT);
615 }
616 }
617 else
618 {
619 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
620 }
621 break;
622 case CMD_SET_SERVER_CAPABILITIES:
623 // Servers before 2.0 use VID_ENABLED
624 m_ipv6Aware = pMsg->isFieldExist(VID_IPV6_SUPPORT) ? pMsg->getFieldAsBoolean(VID_IPV6_SUPPORT) : pMsg->getFieldAsBoolean(VID_ENABLED);
625 m_bulkReconciliationSupported = pMsg->getFieldAsBoolean(VID_BULK_RECONCILIATION);
626 msg.setField(VID_RCC, ERR_SUCCESS);
627 break;
628 case CMD_SET_SERVER_ID:
629 m_serverId = pMsg->getFieldAsUInt64(VID_SERVER_ID);
630 DebugPrintf(m_dwIndex, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
631 msg.setField(VID_RCC, ERR_SUCCESS);
632 break;
633 case CMD_DATA_COLLECTION_CONFIG:
634 if (m_serverId != 0)
635 {
636 ConfigureDataCollection(m_serverId, pMsg);
637 msg.setField(VID_RCC, ERR_SUCCESS);
638 }
639 else
640 {
641 DebugPrintf(m_dwIndex, 1, _T("Data collection configuration command received but server ID is not set"));
642 msg.setField(VID_RCC, ERR_SERVER_ID_UNSET);
643 }
644 break;
645 case CMD_CLEAN_AGENT_DCI_CONF:
646 if (m_masterServer)
647 {
648 ClearDataCollectionConfiguration();
649 msg.setField(VID_RCC, ERR_SUCCESS);
650 }
651 else
652 {
653 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
654 }
655 break;
656 default:
657 // Attempt to process unknown command by subagents
658 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg, this))
659 msg.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
660 break;
661 }
662 }
663 delete pMsg;
664
665 // Send response
666 sendMessage(&msg);
667 msg.deleteAllFields();
668 }
669 }
670
671 /**
672 * Authenticate peer
673 */
674 void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
675 {
676 if (m_authenticated)
677 {
678 // Already authenticated
679 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
680 }
681 else
682 {
683 TCHAR szSecret[MAX_SECRET_LENGTH];
684 BYTE hash[32];
685 WORD wAuthMethod;
686
687 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
688 switch(wAuthMethod)
689 {
690 case AUTH_PLAINTEXT:
691 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
692 if (!_tcscmp(szSecret, g_szSharedSecret))
693 {
694 m_authenticated = true;
695 pMsg->setField(VID_RCC, ERR_SUCCESS);
696 }
697 else
698 {
699 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, "PLAIN");
700 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
701 }
702 break;
703 case AUTH_MD5_HASH:
704 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
705 #ifdef UNICODE
706 {
707 char sharedSecret[256];
708 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
709 sharedSecret[255] = 0;
710 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
711 }
712 #else
713 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
714 #endif
715 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
716 {
717 m_authenticated = true;
718 pMsg->setField(VID_RCC, ERR_SUCCESS);
719 }
720 else
721 {
722 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
723 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
724 }
725 break;
726 case AUTH_SHA1_HASH:
727 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
728 #ifdef UNICODE
729 {
730 char sharedSecret[256];
731 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
732 sharedSecret[255] = 0;
733 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
734 }
735 #else
736 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
737 #endif
738 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
739 {
740 m_authenticated = true;
741 pMsg->setField(VID_RCC, ERR_SUCCESS);
742 }
743 else
744 {
745 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
746 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
747 }
748 break;
749 default:
750 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
751 break;
752 }
753 }
754 }
755
756 /**
757 * Get parameter's value
758 */
759 void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
760 {
761 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
762 UINT32 dwErrorCode;
763
764 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
765 dwErrorCode = GetParameterValue(m_dwIndex, szParameter, szValue, this);
766 pMsg->setField(VID_RCC, dwErrorCode);
767 if (dwErrorCode == ERR_SUCCESS)
768 pMsg->setField(VID_VALUE, szValue);
769 }
770
771 /**
772 * Get list of values
773 */
774 void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
775 {
776 TCHAR szParameter[MAX_PARAM_NAME];
777 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
778
779 StringList value;
780 UINT32 dwErrorCode = GetListValue(m_dwIndex, szParameter, &value, this);
781 pMsg->setField(VID_RCC, dwErrorCode);
782 if (dwErrorCode == ERR_SUCCESS)
783 {
784 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
785 }
786 }
787
788 /**
789 * Get table
790 */
791 void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
792 {
793 TCHAR szParameter[MAX_PARAM_NAME];
794
795 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
796
797 Table value;
798 UINT32 dwErrorCode = GetTableValue(m_dwIndex, szParameter, &value, this);
799 pMsg->setField(VID_RCC, dwErrorCode);
800 if (dwErrorCode == ERR_SUCCESS)
801 {
802 value.fillMessage(*pMsg, 0, -1); // no row limit
803 }
804 }
805
806 /**
807 * Perform action on request
808 */
809 void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
810 {
811 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
812 {
813 // Get action name and arguments
814 TCHAR action[MAX_PARAM_NAME];
815 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
816
817 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
818 StringList *args = new StringList;
819 for(int i = 0; i < numArgs; i++)
820 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
821
822 // Execute action
823 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
824 {
825 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
826 pMsg->setField(VID_RCC, rcc);
827 }
828 else
829 {
830 UINT32 rcc = ExecAction(action, args, this);
831 pMsg->setField(VID_RCC, rcc);
832 delete args;
833 }
834 }
835 else
836 {
837 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
838 }
839 }
840
841 /**
842 * Prepare for receiving file
843 */
844 void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
845 {
846 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
847
848 if (m_masterServer)
849 {
850 szFileName[0] = 0;
851 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
852 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
853 BuildFullPath(szFileName, szFullPath);
854
855 // Check if for some reason we have already opened file
856 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId()));
857 }
858 else
859 {
860 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
861 }
862 }
863
864 /**
865 * Open file for writing
866 */
867 UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId)
868 {
869 if (m_hCurrFile != -1)
870 {
871 return ERR_RESOURCE_BUSY;
872 }
873 else
874 {
875 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
876 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
877 if (m_hCurrFile == -1)
878 {
879 DebugPrintf(m_dwIndex, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
880 return ERR_IO_FAILURE;
881 }
882 else
883 {
884 m_fileRqId = requestId;
885 return ERR_SUCCESS;
886 }
887 }
888 }
889
890 /**
891 * Progress callback for file sending
892 */
893 static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
894 {
895 ((CommSession *)cbArg)->updateTimeStamp();
896 }
897
898 /**
899 * Send file to server
900 */
901 bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
902 {
903 return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
904 }
905
906 /**
907 * Upgrade agent from package in the file store
908 */
909 UINT32 CommSession::upgrade(NXCPMessage *pRequest)
910 {
911 if (m_masterServer)
912 {
913 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
914
915 szPkgName[0] = 0;
916 pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
917 BuildFullPath(szPkgName, szFullPath);
918
919 //Create line in registry file with upgrade file name to delete it after system start
920 DB_HANDLE hdb = GetLocalDatabaseHandle();
921 if(hdb != NULL)
922 {
923 TCHAR upgradeFileInsert[256];
924 _sntprintf(upgradeFileInsert, 256, _T("INSERT INTO registry (attribute,value) VALUES ('upgrade.file',%s)"), szPkgName);
925 DBQuery(hdb, upgradeFileInsert);
926 }
927
928 return UpgradeAgent(szFullPath);
929 }
930 else
931 {
932 return ERR_ACCESS_DENIED;
933 }
934 }
935
936 /**
937 * Get agent's configuration file
938 */
939 void CommSession::getConfig(NXCPMessage *pMsg)
940 {
941 if (m_masterServer)
942 {
943 pMsg->setField(VID_RCC,
944 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
945 }
946 else
947 {
948 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
949 }
950 }
951
952 /**
953 * Update agent's configuration file
954 */
955 void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
956 {
957 if (m_masterServer)
958 {
959 BYTE *pConfig;
960 int hFile;
961 UINT32 size;
962
963 if (pRequest->isFieldExist(VID_CONFIG_FILE))
964 {
965 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
966 pConfig = (BYTE *)malloc(size);
967 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
968 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
969 if (hFile != -1)
970 {
971 if (size > 0)
972 {
973 for(UINT32 i = 0; i < size - 1; i++)
974 if (pConfig[i] == 0x0D)
975 {
976 size--;
977 memmove(&pConfig[i], &pConfig[i + 1], size - i);
978 i--;
979 }
980 }
981 if (write(hFile, pConfig, size) == size)
982 pMsg->setField(VID_RCC, ERR_SUCCESS);
983 else
984 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
985 close(hFile);
986 }
987 else
988 {
989 DebugPrintf(m_dwIndex, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
990 g_szConfigFile, _tcserror(errno));
991 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
992 }
993 free(pConfig);
994 }
995 else
996 {
997 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
998 }
999 }
1000 else
1001 {
1002 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
1003 }
1004 }
1005
1006 /**
1007 * Setup proxy connection
1008 */
1009 UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
1010 {
1011 UINT32 dwResult, dwAddr;
1012 WORD wPort;
1013 struct sockaddr_in sa;
1014 NXCPEncryptionContext *pSavedCtx;
1015 TCHAR szBuffer[32];
1016
1017 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
1018 {
1019 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
1020 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
1021 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
1022 if (m_hProxySocket != INVALID_SOCKET)
1023 {
1024 // Fill in address structure
1025 memset(&sa, 0, sizeof(sa));
1026 sa.sin_addr.s_addr = htonl(dwAddr);
1027 sa.sin_family = AF_INET;
1028 sa.sin_port = htons(wPort);
1029 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1030 {
1031 NXCPMessage msg;
1032 NXCP_MESSAGE *pRawMsg;
1033
1034 // Stop writing thread
1035 m_sendQueue->put(INVALID_POINTER_VALUE);
1036
1037 // Wait while all queued messages will be sent
1038 while(m_sendQueue->size() > 0)
1039 ThreadSleepMs(100);
1040
1041 // Finish proxy connection setup
1042 pSavedCtx = m_pCtx;
1043 m_pCtx = PROXY_ENCRYPTION_CTX;
1044 m_proxyConnection = true;
1045 dwResult = ERR_SUCCESS;
1046 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
1047
1048 // Send confirmation message
1049 // We cannot use sendMessage() and writing thread, because
1050 // encryption context already overriden, and writing thread
1051 // already stopped
1052 msg.setCode(CMD_REQUEST_COMPLETED);
1053 msg.setId(pRequest->getId());
1054 msg.setField(VID_RCC, RCC_SUCCESS);
1055 pRawMsg = msg.createMessage();
1056 sendRawMessage(pRawMsg, pSavedCtx);
1057 if (pSavedCtx != NULL)
1058 pSavedCtx->decRefCount();
1059
1060 DebugPrintf(m_dwIndex, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
1061 }
1062 else
1063 {
1064 dwResult = ERR_CONNECT_FAILED;
1065 }
1066 }
1067 else
1068 {
1069 dwResult = ERR_SOCKET_ERROR;
1070 }
1071 }
1072 else
1073 {
1074 dwResult = ERR_ACCESS_DENIED;
1075 }
1076 return dwResult;
1077 }
1078
1079 /**
1080 * Proxy reading thread
1081 */
1082 void CommSession::proxyReadThread()
1083 {
1084 fd_set rdfs;
1085 struct timeval tv;
1086 char buffer[32768];
1087 int nRet;
1088
1089 while(1)
1090 {
1091 FD_ZERO(&rdfs);
1092 FD_SET(m_hProxySocket, &rdfs);
1093 tv.tv_sec = 0;
1094 tv.tv_usec = 5000000; // Half-second timeout
1095 nRet = select(SELECT_NFDS(m_hProxySocket + 1), &rdfs, NULL, NULL, &tv);
1096 if (nRet < 0)
1097 break;
1098 if (nRet > 0)
1099 {
1100 nRet = recv(m_hProxySocket, buffer, 32768, 0);
1101 if (nRet <= 0)
1102 break;
1103 SendEx(m_hSocket, buffer, nRet, 0, m_socketWriteMutex);
1104 }
1105 }
1106 disconnect();
1107 }
1108
1109 /**
1110 * Wait for request completion
1111 */
1112 UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
1113 {
1114 sendMessage(msg);
1115 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1116 UINT32 rcc;
1117 if (response != NULL)
1118 {
1119 rcc = response->getFieldAsUInt32(VID_RCC);
1120 delete response;
1121 }
1122 else
1123 {
1124 rcc = ERR_REQUEST_TIMEOUT;
1125 }
1126 return rcc;
1127 }
1128
1129 /**
1130 * Wait for request completion
1131 */
1132 NXCPMessage *CommSession::doRequestEx(NXCPMessage *msg, UINT32 timeout)
1133 {
1134 sendMessage(msg);
1135 return m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1136 }
1137
1138 /**
1139 * Generate new request ID
1140 */
1141 UINT32 CommSession::generateRequestId()
1142 {
1143 return (UINT32)InterlockedIncrement(&m_requestId);
1144 }