d4835373aa7855d392177bf08ade94bb12adb043
[public/netxms.git] / src / agent / core / session.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: session.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 #ifdef _WIN32
26 #define write _write
27 #define close _close
28 #endif
29
30 /**
31 * Externals
32 */
33 void UnregisterSession(UINT32 dwIndex);
34 UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
35 UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
36 UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
37 void ClearDataCollectionConfiguration();
38
39 /**
40 * Max message size
41 */
42 #define MAX_MSG_SIZE 4194304
43
44 /**
45 * SNMP proxy thread pool
46 */
47 ThreadPool *g_snmpProxyThreadPool = NULL;
48
49 /**
50 * Agent proxy statistics
51 */
52 static UINT64 s_proxyConnectionRequests = 0;
53 static VolatileCounter s_activeProxySessions = 0;
54
55 /**
56 * Handler for agent proxy stats parameters
57 */
58 LONG H_AgentProxyStats(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
59 {
60 switch(*arg)
61 {
62 case 'A':
63 ret_uint(value, (UINT32)s_activeProxySessions);
64 break;
65 case 'C':
66 ret_uint64(value, s_proxyConnectionRequests);
67 break;
68 default:
69 return SYSINFO_RC_UNSUPPORTED;
70 }
71 return SYSINFO_RC_SUCCESS;
72 }
73
74 /**
75 * Client communication read thread
76 */
77 THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
78 {
79 ((CommSession *)pArg)->readThread();
80
81 // When CommSession::ReadThread exits, all other session
82 // threads are already stopped, so we can safely destroy
83 // session object
84 UnregisterSession(((CommSession *)pArg)->getIndex());
85 ((CommSession *)pArg)->decRefCount();
86 return THREAD_OK;
87 }
88
89 /**
90 * Client communication write thread
91 */
92 THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
93 {
94 ((CommSession *)pArg)->writeThread();
95 return THREAD_OK;
96 }
97
98 /**
99 * Received message processing thread
100 */
101 THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
102 {
103 ((CommSession *)pArg)->processingThread();
104 return THREAD_OK;
105 }
106
107 /**
108 * Client communication write thread
109 */
110 THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
111 {
112 ((CommSession *)pArg)->proxyReadThread();
113 return THREAD_OK;
114 }
115
116 /**
117 * Client session class constructor
118 */
119 CommSession::CommSession(SOCKET hSocket, const InetAddress &serverAddr, bool masterServer, bool controlServer)
120 {
121 m_sendQueue = new Queue;
122 m_processingQueue = new Queue;
123 m_hSocket = hSocket;
124 m_hProxySocket = INVALID_SOCKET;
125 m_dwIndex = INVALID_INDEX;
126 m_hWriteThread = INVALID_THREAD_HANDLE;
127 m_hProcessingThread = INVALID_THREAD_HANDLE;
128 m_hProxyReadThread = INVALID_THREAD_HANDLE;
129 m_serverId = 0;
130 m_serverAddr = serverAddr;
131 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
132 m_masterServer = masterServer;
133 m_controlServer = controlServer;
134 m_proxyConnection = false;
135 m_acceptTraps = false;
136 m_acceptData = false;
137 m_acceptFileUpdates = false;
138 m_ipv6Aware = false;
139 m_bulkReconciliationSupported = false;
140 m_hCurrFile = -1;
141 m_fileRqId = 0;
142 m_compressor = NULL;
143 m_pCtx = NULL;
144 m_ts = time(NULL);
145 m_socketWriteMutex = MutexCreate();
146 m_responseQueue = new MsgWaitQueue();
147 m_requestId = 0;
148 }
149
150 /**
151 * Destructor
152 */
153 CommSession::~CommSession()
154 {
155 if (m_proxyConnection)
156 InterlockedDecrement(&s_activeProxySessions);
157
158 shutdown(m_hSocket, SHUT_RDWR);
159 closesocket(m_hSocket);
160 if (m_hProxySocket != INVALID_SOCKET)
161 closesocket(m_hProxySocket);
162
163 void *p;
164 while((p = m_sendQueue->get()) != NULL)
165 if (p != INVALID_POINTER_VALUE)
166 free(p);
167 delete m_sendQueue;
168
169 while((p = m_processingQueue->get()) != NULL)
170 if (p != INVALID_POINTER_VALUE)
171 delete (NXCPMessage *)p;
172 delete m_processingQueue;
173
174 if (m_hCurrFile != -1)
175 close(m_hCurrFile);
176 delete m_compressor;
177 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
178 m_pCtx->decRefCount();
179 MutexDestroy(m_socketWriteMutex);
180 delete m_responseQueue;
181 }
182
183 /**
184 * Start all threads
185 */
186 void CommSession::run()
187 {
188 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
189 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
190 ThreadCreate(readThreadStarter, 0, this);
191 }
192
193 /**
194 * Disconnect session
195 */
196 void CommSession::disconnect()
197 {
198 DebugPrintf(m_dwIndex, 5, _T("CommSession::disconnect()"));
199 shutdown(m_hSocket, SHUT_RDWR);
200 if (m_hProxySocket != -1)
201 shutdown(m_hProxySocket, SHUT_RDWR);
202 }
203
204 /**
205 * Reading thread
206 */
207 void CommSession::readThread()
208 {
209 SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE);
210 while(true)
211 {
212 if (!m_proxyConnection)
213 {
214 MessageReceiverResult result;
215 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
216
217 // Check for decryption error
218 if (result == MSGRECV_DECRYPTION_FAILURE)
219 {
220 DebugPrintf(m_dwIndex, 4, _T("Unable to decrypt received message"));
221 continue;
222 }
223
224 // Check for timeout
225 if (result == MSGRECV_TIMEOUT)
226 {
227 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
228 {
229 DebugPrintf(m_dwIndex, 5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
230 break;
231 }
232 continue;
233 }
234
235 // Receive error
236 if (msg == NULL)
237 {
238 DebugPrintf(m_dwIndex, 5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
239 break;
240 }
241
242 // Update activity timestamp
243 m_ts = time(NULL);
244
245 if (nxlog_get_debug_level() >= 8)
246 {
247 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
248 DebugPrintf(m_dwIndex, 8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
249 }
250
251 if (msg->isBinary())
252 {
253 TCHAR buffer[64];
254 DebugPrintf(m_dwIndex, 6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
255
256 if (msg->getCode() == CMD_FILE_DATA)
257 {
258 if ((m_hCurrFile != -1) && (m_fileRqId == msg->getId()))
259 {
260 const BYTE *data;
261 int dataSize;
262 if (msg->isCompressed())
263 {
264 const BYTE *in = msg->getBinaryData();
265 if (m_compressor == NULL)
266 {
267 NXCPCompressionMethod method = (NXCPCompressionMethod)(*in);
268 m_compressor = StreamCompressor::create(method, false, FILE_BUFFER_SIZE);
269 if (m_compressor == NULL)
270 {
271 DebugPrintf(m_dwIndex, 5, _T("Unable to create stream compressor for method %d"), (int)method);
272 data = NULL;
273 dataSize = -1;
274 }
275 }
276
277 if (m_compressor != NULL)
278 {
279 dataSize = (int)m_compressor->decompress(in + 4, msg->getBinaryDataSize() - 4, &data);
280 if (dataSize != (int)ntohs(*((UINT16 *)(in + 2))))
281 {
282 // decompressed block size validation failed
283 dataSize = -1;
284 }
285 }
286 }
287 else
288 {
289 data = msg->getBinaryData();
290 dataSize = (int)msg->getBinaryDataSize();
291 }
292
293 if ((dataSize >= 0) && (write(m_hCurrFile, data, dataSize) == dataSize))
294 {
295 if (msg->isEndOfFile())
296 {
297 NXCPMessage response;
298
299 close(m_hCurrFile);
300 m_hCurrFile = -1;
301 delete_and_null(m_compressor);
302
303 response.setCode(CMD_REQUEST_COMPLETED);
304 response.setId(msg->getId());
305 response.setField(VID_RCC, ERR_SUCCESS);
306 sendMessage(&response);
307 }
308 }
309 else
310 {
311 // I/O error
312 NXCPMessage response;
313
314 close(m_hCurrFile);
315 m_hCurrFile = -1;
316 delete_and_null(m_compressor);
317
318 response.setCode(CMD_REQUEST_COMPLETED);
319 response.setId(msg->getId());
320 response.setField(VID_RCC, ERR_IO_FAILURE);
321 sendMessage(&response);
322 }
323 }
324 }
325 }
326 else if (msg->isControl())
327 {
328 TCHAR buffer[64];
329 DebugPrintf(m_dwIndex, 6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
330
331 if (msg->getCode() == CMD_GET_NXCP_CAPS)
332 {
333 NXCP_MESSAGE *pMsg = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
334 pMsg->id = htonl(msg->getId());
335 pMsg->code = htons((WORD)CMD_NXCP_CAPS);
336 pMsg->flags = htons(MF_CONTROL);
337 pMsg->numFields = htonl(NXCP_VERSION << 24);
338 pMsg->size = htonl(NXCP_HEADER_SIZE);
339 sendRawMessage(pMsg, m_pCtx);
340 }
341 delete msg;
342 }
343 else
344 {
345 TCHAR buffer[64];
346 DebugPrintf(m_dwIndex, 6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
347
348 UINT32 rcc;
349 switch(msg->getCode())
350 {
351 case CMD_REQUEST_COMPLETED:
352 m_responseQueue->put(msg);
353 break;
354 case CMD_REQUEST_SESSION_KEY:
355 if (m_pCtx == NULL)
356 {
357 NXCPMessage *pResponse;
358 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
359 sendMessage(pResponse);
360 delete pResponse;
361 receiver.setEncryptionContext(m_pCtx);
362 }
363 delete msg;
364 break;
365 case CMD_SETUP_PROXY_CONNECTION:
366 s_proxyConnectionRequests++;
367 rcc = setupProxyConnection(msg);
368 // When proxy session established incoming messages will
369 // not be processed locally. Acknowledgment message sent
370 // by setupProxyConnection() in case of success.
371 if (rcc == ERR_SUCCESS)
372 {
373 InterlockedIncrement(&s_activeProxySessions);
374 m_processingQueue->put(INVALID_POINTER_VALUE);
375 }
376 else
377 {
378 NXCPMessage response;
379 response.setCode(CMD_REQUEST_COMPLETED);
380 response.setId(msg->getId());
381 response.setField(VID_RCC, rcc);
382 sendMessage(&response);
383 }
384 delete msg;
385 break;
386 case CMD_SNMP_REQUEST:
387 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
388 {
389 incRefCount();
390 ThreadPoolExecute(g_snmpProxyThreadPool, this, &CommSession::proxySnmpRequest, msg);
391 }
392 else
393 {
394 NXCPMessage response;
395 response.setCode(CMD_REQUEST_COMPLETED);
396 response.setId(msg->getId());
397 response.setField(VID_RCC, ERR_ACCESS_DENIED);
398 sendMessage(&response);
399 delete msg;
400 }
401 break;
402 default:
403 m_processingQueue->put(msg);
404 break;
405 }
406 }
407 }
408 else // m_proxyConnection
409 {
410 fd_set rdfs;
411 struct timeval tv;
412 char buffer[32768];
413
414 FD_ZERO(&rdfs);
415 FD_SET(m_hSocket, &rdfs);
416 tv.tv_sec = g_dwIdleTimeout + 1;
417 tv.tv_usec = 0;
418 int rc = select(SELECT_NFDS(m_hSocket + 1), &rdfs, NULL, NULL, &tv);
419 if (rc <= 0)
420 break;
421 if (rc > 0)
422 {
423 // Update activity timestamp
424 m_ts = time(NULL);
425
426 rc = recv(m_hSocket, buffer, 32768, 0);
427 if (rc <= 0)
428 break;
429 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
430 }
431 }
432 }
433
434 // Notify other threads to exit
435 m_sendQueue->put(INVALID_POINTER_VALUE);
436 m_processingQueue->put(INVALID_POINTER_VALUE);
437 if (m_hProxySocket != INVALID_SOCKET)
438 shutdown(m_hProxySocket, SHUT_RDWR);
439
440 // Wait for other threads to finish
441 ThreadJoin(m_hWriteThread);
442 ThreadJoin(m_hProcessingThread);
443 if (m_proxyConnection)
444 ThreadJoin(m_hProxyReadThread);
445
446 DebugPrintf(m_dwIndex, 5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
447 }
448
449 /**
450 * Send prepared raw message over the network and destroy it
451 */
452 BOOL CommSession::sendRawMessage(NXCP_MESSAGE *pMsg, NXCPEncryptionContext *pCtx)
453 {
454 BOOL bResult = TRUE;
455 TCHAR szBuffer[128];
456
457 DebugPrintf(m_dwIndex, 6, _T("Sending message %s (size %d)"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
458 if (nxlog_get_debug_level() >= 8)
459 {
460 String msgDump = NXCPMessage::dump(pMsg, NXCP_VERSION);
461 DebugPrintf(m_dwIndex, 8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
462 }
463 if ((pCtx != NULL) && (pCtx != PROXY_ENCRYPTION_CTX))
464 {
465 NXCP_ENCRYPTED_MESSAGE *enMsg = pCtx->encryptMessage(pMsg);
466 if (enMsg != NULL)
467 {
468 if (SendEx(m_hSocket, (const char *)enMsg, ntohl(enMsg->size), 0, m_socketWriteMutex) <= 0)
469 {
470 bResult = FALSE;
471 }
472 free(enMsg);
473 }
474 }
475 else
476 {
477 if (SendEx(m_hSocket, (const char *)pMsg, ntohl(pMsg->size), 0, m_socketWriteMutex) <= 0)
478 {
479 bResult = FALSE;
480 }
481 }
482 if (!bResult)
483 DebugPrintf(m_dwIndex, 6, _T("CommSession::SendRawMessage() for %s (size %d) failed"), NXCPMessageCodeName(ntohs(pMsg->code), szBuffer), ntohl(pMsg->size));
484 free(pMsg);
485 return bResult;
486 }
487
488 /**
489 * Writing thread
490 */
491 void CommSession::writeThread()
492 {
493 NXCP_MESSAGE *pMsg;
494
495 while(1)
496 {
497 pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
498 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
499 break;
500
501 if (!sendRawMessage(pMsg, m_pCtx))
502 break;
503 }
504 }
505
506 /**
507 * Message processing thread
508 */
509 void CommSession::processingThread()
510 {
511 NXCPMessage *pMsg;
512 NXCPMessage msg;
513 UINT32 dwCommand;
514
515 while(1)
516 {
517 pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
518 if (pMsg == INVALID_POINTER_VALUE) // Session termination indicator
519 break;
520 dwCommand = pMsg->getCode();
521
522 // Prepare response message
523 msg.setCode(CMD_REQUEST_COMPLETED);
524 msg.setId(pMsg->getId());
525
526 // Check if authentication required
527 if ((!m_authenticated) && (dwCommand != CMD_AUTHENTICATE))
528 {
529 DebugPrintf(m_dwIndex, 6, _T("Authentication required"));
530 msg.setField(VID_RCC, ERR_AUTH_REQUIRED);
531 }
532 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
533 {
534 DebugPrintf(m_dwIndex, 6, _T("Encryption required"));
535 msg.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
536 }
537 else
538 {
539 switch(dwCommand)
540 {
541 case CMD_AUTHENTICATE:
542 authenticate(pMsg, &msg);
543 break;
544 case CMD_GET_PARAMETER:
545 getParameter(pMsg, &msg);
546 break;
547 case CMD_GET_LIST:
548 getList(pMsg, &msg);
549 break;
550 case CMD_GET_TABLE:
551 getTable(pMsg, &msg);
552 break;
553 case CMD_KEEPALIVE:
554 msg.setField(VID_RCC, ERR_SUCCESS);
555 break;
556 case CMD_ACTION:
557 action(pMsg, &msg);
558 break;
559 case CMD_TRANSFER_FILE:
560 recvFile(pMsg, &msg);
561 break;
562 case CMD_UPGRADE_AGENT:
563 msg.setField(VID_RCC, upgrade(pMsg));
564 break;
565 case CMD_GET_PARAMETER_LIST:
566 msg.setField(VID_RCC, ERR_SUCCESS);
567 GetParameterList(&msg);
568 break;
569 case CMD_GET_ENUM_LIST:
570 msg.setField(VID_RCC, ERR_SUCCESS);
571 GetEnumList(&msg);
572 break;
573 case CMD_GET_TABLE_LIST:
574 msg.setField(VID_RCC, ERR_SUCCESS);
575 GetTableList(&msg);
576 break;
577 case CMD_GET_AGENT_CONFIG:
578 getConfig(&msg);
579 break;
580 case CMD_UPDATE_AGENT_CONFIG:
581 updateConfig(pMsg, &msg);
582 break;
583 case CMD_ENABLE_AGENT_TRAPS:
584 if (m_masterServer)
585 {
586 m_acceptTraps = true;
587 msg.setField(VID_RCC, ERR_SUCCESS);
588 }
589 else
590 {
591 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
592 }
593 break;
594 case CMD_ENABLE_FILE_UPDATES:
595 if (m_masterServer)
596 {
597 m_acceptFileUpdates = true;
598 msg.setField(VID_RCC, ERR_SUCCESS);
599 }
600 else
601 {
602 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
603 }
604 break;
605 case CMD_DEPLOY_AGENT_POLICY:
606 if (m_masterServer)
607 {
608 msg.setField(VID_RCC, DeployPolicy(this, pMsg));
609 }
610 else
611 {
612 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
613 }
614 break;
615 case CMD_UNINSTALL_AGENT_POLICY:
616 if (m_masterServer)
617 {
618 msg.setField(VID_RCC, UninstallPolicy(this, pMsg));
619 }
620 else
621 {
622 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
623 }
624 break;
625 case CMD_GET_POLICY_INVENTORY:
626 if (m_masterServer)
627 {
628 msg.setField(VID_RCC, GetPolicyInventory(this, &msg));
629 }
630 else
631 {
632 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
633 }
634 break;
635 case CMD_TAKE_SCREENSHOT:
636 if (m_controlServer)
637 {
638 TCHAR sessionName[256];
639 pMsg->getFieldAsString(VID_NAME, sessionName, 256);
640 DebugPrintf(m_dwIndex, 6, _T("Take snapshot from session \"%s\""), sessionName);
641 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
642 if (conn != NULL)
643 {
644 DebugPrintf(m_dwIndex, 6, _T("Session agent connector acquired"));
645 conn->takeScreenshot(&msg);
646 conn->decRefCount();
647 }
648 else
649 {
650 msg.setField(VID_RCC, ERR_NO_SESSION_AGENT);
651 }
652 }
653 else
654 {
655 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
656 }
657 break;
658 case CMD_SET_SERVER_CAPABILITIES:
659 // Servers before 2.0 use VID_ENABLED
660 m_ipv6Aware = pMsg->isFieldExist(VID_IPV6_SUPPORT) ? pMsg->getFieldAsBoolean(VID_IPV6_SUPPORT) : pMsg->getFieldAsBoolean(VID_ENABLED);
661 m_bulkReconciliationSupported = pMsg->getFieldAsBoolean(VID_BULK_RECONCILIATION);
662 msg.setField(VID_RCC, ERR_SUCCESS);
663 break;
664 case CMD_SET_SERVER_ID:
665 m_serverId = pMsg->getFieldAsUInt64(VID_SERVER_ID);
666 DebugPrintf(m_dwIndex, 1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
667 msg.setField(VID_RCC, ERR_SUCCESS);
668 break;
669 case CMD_DATA_COLLECTION_CONFIG:
670 if (m_serverId != 0)
671 {
672 ConfigureDataCollection(m_serverId, pMsg);
673 m_acceptData = true;
674 msg.setField(VID_RCC, ERR_SUCCESS);
675 }
676 else
677 {
678 DebugPrintf(m_dwIndex, 1, _T("Data collection configuration command received but server ID is not set"));
679 msg.setField(VID_RCC, ERR_SERVER_ID_UNSET);
680 }
681 break;
682 case CMD_CLEAN_AGENT_DCI_CONF:
683 if (m_masterServer)
684 {
685 ClearDataCollectionConfiguration();
686 msg.setField(VID_RCC, ERR_SUCCESS);
687 }
688 else
689 {
690 msg.setField(VID_RCC, ERR_ACCESS_DENIED);
691 }
692 break;
693 default:
694 // Attempt to process unknown command by subagents
695 if (!ProcessCmdBySubAgent(dwCommand, pMsg, &msg, this))
696 msg.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
697 break;
698 }
699 }
700 delete pMsg;
701
702 // Send response
703 sendMessage(&msg);
704 msg.deleteAllFields();
705 }
706 }
707
708 /**
709 * Authenticate peer
710 */
711 void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
712 {
713 if (m_authenticated)
714 {
715 // Already authenticated
716 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
717 }
718 else
719 {
720 TCHAR szSecret[MAX_SECRET_LENGTH];
721 BYTE hash[32];
722 WORD wAuthMethod;
723
724 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
725 switch(wAuthMethod)
726 {
727 case AUTH_PLAINTEXT:
728 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
729 if (!_tcscmp(szSecret, g_szSharedSecret))
730 {
731 m_authenticated = true;
732 pMsg->setField(VID_RCC, ERR_SUCCESS);
733 }
734 else
735 {
736 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, "PLAIN");
737 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
738 }
739 break;
740 case AUTH_MD5_HASH:
741 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
742 #ifdef UNICODE
743 {
744 char sharedSecret[256];
745 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
746 sharedSecret[255] = 0;
747 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
748 }
749 #else
750 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
751 #endif
752 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
753 {
754 m_authenticated = true;
755 pMsg->setField(VID_RCC, ERR_SUCCESS);
756 }
757 else
758 {
759 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
760 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
761 }
762 break;
763 case AUTH_SHA1_HASH:
764 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
765 #ifdef UNICODE
766 {
767 char sharedSecret[256];
768 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
769 sharedSecret[255] = 0;
770 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
771 }
772 #else
773 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
774 #endif
775 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
776 {
777 m_authenticated = true;
778 pMsg->setField(VID_RCC, ERR_SUCCESS);
779 }
780 else
781 {
782 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
783 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
784 }
785 break;
786 default:
787 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
788 break;
789 }
790 }
791 }
792
793 /**
794 * Get parameter's value
795 */
796 void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
797 {
798 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
799 UINT32 dwErrorCode;
800
801 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
802 dwErrorCode = GetParameterValue(m_dwIndex, szParameter, szValue, this);
803 pMsg->setField(VID_RCC, dwErrorCode);
804 if (dwErrorCode == ERR_SUCCESS)
805 pMsg->setField(VID_VALUE, szValue);
806 }
807
808 /**
809 * Get list of values
810 */
811 void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
812 {
813 TCHAR szParameter[MAX_PARAM_NAME];
814 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
815
816 StringList value;
817 UINT32 dwErrorCode = GetListValue(m_dwIndex, szParameter, &value, this);
818 pMsg->setField(VID_RCC, dwErrorCode);
819 if (dwErrorCode == ERR_SUCCESS)
820 {
821 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
822 }
823 }
824
825 /**
826 * Get table
827 */
828 void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
829 {
830 TCHAR szParameter[MAX_PARAM_NAME];
831
832 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
833
834 Table value;
835 UINT32 dwErrorCode = GetTableValue(m_dwIndex, szParameter, &value, this);
836 pMsg->setField(VID_RCC, dwErrorCode);
837 if (dwErrorCode == ERR_SUCCESS)
838 {
839 value.fillMessage(*pMsg, 0, -1); // no row limit
840 }
841 }
842
843 /**
844 * Perform action on request
845 */
846 void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
847 {
848 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
849 {
850 // Get action name and arguments
851 TCHAR action[MAX_PARAM_NAME];
852 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
853
854 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
855 StringList *args = new StringList;
856 for(int i = 0; i < numArgs; i++)
857 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
858
859 // Execute action
860 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
861 {
862 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
863 pMsg->setField(VID_RCC, rcc);
864 }
865 else
866 {
867 UINT32 rcc = ExecAction(action, args, this);
868 pMsg->setField(VID_RCC, rcc);
869 delete args;
870 }
871 }
872 else
873 {
874 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
875 }
876 }
877
878 /**
879 * Prepare for receiving file
880 */
881 void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
882 {
883 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
884
885 if (m_masterServer)
886 {
887 szFileName[0] = 0;
888 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
889 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
890 BuildFullPath(szFileName, szFullPath);
891
892 // Check if for some reason we have already opened file
893 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId()));
894 }
895 else
896 {
897 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
898 }
899 }
900
901 /**
902 * Open file for writing
903 */
904 UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId)
905 {
906 if (m_hCurrFile != -1)
907 {
908 return ERR_RESOURCE_BUSY;
909 }
910 else
911 {
912 DebugPrintf(m_dwIndex, 5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
913 m_hCurrFile = _topen(szFullPath, O_CREAT | O_TRUNC | O_WRONLY | O_BINARY, 0600);
914 if (m_hCurrFile == -1)
915 {
916 DebugPrintf(m_dwIndex, 2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
917 return ERR_IO_FAILURE;
918 }
919 else
920 {
921 m_fileRqId = requestId;
922 return ERR_SUCCESS;
923 }
924 }
925 }
926
927 /**
928 * Progress callback for file sending
929 */
930 static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
931 {
932 ((CommSession *)cbArg)->updateTimeStamp();
933 }
934
935 /**
936 * Send file to server
937 */
938 bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset)
939 {
940 return SendFileOverNXCP(m_hSocket, requestId, file, m_pCtx, offset, SendFileProgressCallback, this, m_socketWriteMutex) ? true : false;
941 }
942
943 /**
944 * Upgrade agent from package in the file store
945 */
946 UINT32 CommSession::upgrade(NXCPMessage *pRequest)
947 {
948 if (m_masterServer)
949 {
950 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
951
952 szPkgName[0] = 0;
953 pRequest->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
954 BuildFullPath(szPkgName, szFullPath);
955
956 //Create line in registry file with upgrade file name to delete it after system start
957 DB_HANDLE hdb = GetLocalDatabaseHandle();
958 if(hdb != NULL)
959 {
960 TCHAR upgradeFileInsert[256];
961 _sntprintf(upgradeFileInsert, 256, _T("INSERT INTO registry (attribute,value) VALUES ('upgrade.file',%s)"), szPkgName);
962 DBQuery(hdb, upgradeFileInsert);
963 }
964
965 return UpgradeAgent(szFullPath);
966 }
967 else
968 {
969 return ERR_ACCESS_DENIED;
970 }
971 }
972
973 /**
974 * Get agent's configuration file
975 */
976 void CommSession::getConfig(NXCPMessage *pMsg)
977 {
978 if (m_masterServer)
979 {
980 pMsg->setField(VID_RCC,
981 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
982 }
983 else
984 {
985 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
986 }
987 }
988
989 /**
990 * Update agent's configuration file
991 */
992 void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
993 {
994 if (m_masterServer)
995 {
996 BYTE *pConfig;
997 int hFile;
998 UINT32 size;
999
1000 if (pRequest->isFieldExist(VID_CONFIG_FILE))
1001 {
1002 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
1003 pConfig = (BYTE *)malloc(size);
1004 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
1005 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
1006 if (hFile != -1)
1007 {
1008 if (size > 0)
1009 {
1010 for(UINT32 i = 0; i < size - 1; i++)
1011 if (pConfig[i] == 0x0D)
1012 {
1013 size--;
1014 memmove(&pConfig[i], &pConfig[i + 1], size - i);
1015 i--;
1016 }
1017 }
1018 if (write(hFile, pConfig, size) == size)
1019 pMsg->setField(VID_RCC, ERR_SUCCESS);
1020 else
1021 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
1022 close(hFile);
1023 }
1024 else
1025 {
1026 DebugPrintf(m_dwIndex, 2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
1027 g_szConfigFile, _tcserror(errno));
1028 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
1029 }
1030 free(pConfig);
1031 }
1032 else
1033 {
1034 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
1035 }
1036 }
1037 else
1038 {
1039 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
1040 }
1041 }
1042
1043 /**
1044 * Setup proxy connection
1045 */
1046 UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
1047 {
1048 UINT32 dwResult, dwAddr;
1049 WORD wPort;
1050 struct sockaddr_in sa;
1051 NXCPEncryptionContext *pSavedCtx;
1052 TCHAR szBuffer[32];
1053
1054 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
1055 {
1056 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
1057 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
1058 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
1059 if (m_hProxySocket != INVALID_SOCKET)
1060 {
1061 // Fill in address structure
1062 memset(&sa, 0, sizeof(sa));
1063 sa.sin_addr.s_addr = htonl(dwAddr);
1064 sa.sin_family = AF_INET;
1065 sa.sin_port = htons(wPort);
1066 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1067 {
1068 NXCPMessage msg;
1069 NXCP_MESSAGE *pRawMsg;
1070
1071 // Stop writing thread
1072 m_sendQueue->put(INVALID_POINTER_VALUE);
1073
1074 // Wait while all queued messages will be sent
1075 while(m_sendQueue->size() > 0)
1076 ThreadSleepMs(100);
1077
1078 // Finish proxy connection setup
1079 pSavedCtx = m_pCtx;
1080 m_pCtx = PROXY_ENCRYPTION_CTX;
1081 m_proxyConnection = true;
1082 dwResult = ERR_SUCCESS;
1083 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
1084
1085 // Send confirmation message
1086 // We cannot use sendMessage() and writing thread, because
1087 // encryption context already overriden, and writing thread
1088 // already stopped
1089 msg.setCode(CMD_REQUEST_COMPLETED);
1090 msg.setId(pRequest->getId());
1091 msg.setField(VID_RCC, RCC_SUCCESS);
1092 pRawMsg = msg.createMessage();
1093 sendRawMessage(pRawMsg, pSavedCtx);
1094 if (pSavedCtx != NULL)
1095 pSavedCtx->decRefCount();
1096
1097 DebugPrintf(m_dwIndex, 5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
1098 }
1099 else
1100 {
1101 dwResult = ERR_CONNECT_FAILED;
1102 }
1103 }
1104 else
1105 {
1106 dwResult = ERR_SOCKET_ERROR;
1107 }
1108 }
1109 else
1110 {
1111 dwResult = ERR_ACCESS_DENIED;
1112 }
1113 return dwResult;
1114 }
1115
1116 /**
1117 * Proxy reading thread
1118 */
1119 void CommSession::proxyReadThread()
1120 {
1121 fd_set rdfs;
1122 struct timeval tv;
1123 char buffer[32768];
1124 int nRet;
1125
1126 while(1)
1127 {
1128 FD_ZERO(&rdfs);
1129 FD_SET(m_hProxySocket, &rdfs);
1130 tv.tv_sec = 0;
1131 tv.tv_usec = 5000000; // Half-second timeout
1132 nRet = select(SELECT_NFDS(m_hProxySocket + 1), &rdfs, NULL, NULL, &tv);
1133 if (nRet < 0)
1134 break;
1135 if (nRet > 0)
1136 {
1137 nRet = recv(m_hProxySocket, buffer, 32768, 0);
1138 if (nRet <= 0)
1139 break;
1140 SendEx(m_hSocket, buffer, nRet, 0, m_socketWriteMutex);
1141 }
1142 }
1143 disconnect();
1144 }
1145
1146 /**
1147 * Wait for request completion
1148 */
1149 UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
1150 {
1151 sendMessage(msg);
1152 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1153 UINT32 rcc;
1154 if (response != NULL)
1155 {
1156 rcc = response->getFieldAsUInt32(VID_RCC);
1157 delete response;
1158 }
1159 else
1160 {
1161 rcc = ERR_REQUEST_TIMEOUT;
1162 }
1163 return rcc;
1164 }
1165
1166 /**
1167 * Wait for request completion
1168 */
1169 NXCPMessage *CommSession::doRequestEx(NXCPMessage *msg, UINT32 timeout)
1170 {
1171 sendMessage(msg);
1172 return m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1173 }
1174
1175 /**
1176 * Generate new request ID
1177 */
1178 UINT32 CommSession::generateRequestId()
1179 {
1180 return (UINT32)InterlockedIncrement(&m_requestId);
1181 }