aaef2dca5f33e9f5395132ec1bfc6ebd40c7122f
[public/netxms.git] / src / agent / core / session.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: session.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Externals
27 */
28 void UnregisterSession(UINT32 index, UINT32 id);
29 UINT32 DeployPolicy(CommSession *session, NXCPMessage *request);
30 UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request);
31 UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg);
32 void ClearDataCollectionConfiguration();
33
34 /**
35 * SNMP proxy thread pool
36 */
37 ThreadPool *g_snmpProxyThreadPool = NULL;
38
39 /**
40 * Communication request processing thread pool
41 */
42 ThreadPool *g_commThreadPool = NULL;
43
44 /**
45 * Next free session ID
46 */
47 static VolatileCounter s_sessionId = 0;
48
49 /**
50 * Agent proxy statistics
51 */
52 static UINT64 s_proxyConnectionRequests = 0;
53 static VolatileCounter s_activeProxySessions = 0;
54
55 /**
56 * Handler for agent proxy stats parameters
57 */
58 LONG H_AgentProxyStats(const TCHAR *cmd, const TCHAR *arg, TCHAR *value, AbstractCommSession *session)
59 {
60 switch(*arg)
61 {
62 case 'A':
63 ret_uint(value, (UINT32)s_activeProxySessions);
64 break;
65 case 'C':
66 ret_uint64(value, s_proxyConnectionRequests);
67 break;
68 default:
69 return SYSINFO_RC_UNSUPPORTED;
70 }
71 return SYSINFO_RC_SUCCESS;
72 }
73
74 /**
75 * Client communication read thread
76 */
77 THREAD_RESULT THREAD_CALL CommSession::readThreadStarter(void *pArg)
78 {
79 ((CommSession *)pArg)->readThread();
80 UnregisterSession(((CommSession *)pArg)->getIndex(), ((CommSession *)pArg)->getId());
81 ((CommSession *)pArg)->debugPrintf(6, _T("Receiver thread stopped"));
82 ((CommSession *)pArg)->decRefCount();
83 return THREAD_OK;
84 }
85
86 /**
87 * Client communication write thread
88 */
89 THREAD_RESULT THREAD_CALL CommSession::writeThreadStarter(void *pArg)
90 {
91 ((CommSession *)pArg)->writeThread();
92 return THREAD_OK;
93 }
94
95 /**
96 * Received message processing thread
97 */
98 THREAD_RESULT THREAD_CALL CommSession::processingThreadStarter(void *pArg)
99 {
100 ((CommSession *)pArg)->processingThread();
101 return THREAD_OK;
102 }
103
104 /**
105 * Client communication write thread
106 */
107 THREAD_RESULT THREAD_CALL CommSession::proxyReadThreadStarter(void *pArg)
108 {
109 ((CommSession *)pArg)->proxyReadThread();
110 return THREAD_OK;
111 }
112
113 /**
114 * Client session class constructor
115 */
116 CommSession::CommSession(AbstractCommChannel *channel, const InetAddress &serverAddr, bool masterServer, bool controlServer) : m_downloadFileMap(true)
117 {
118 m_id = InterlockedIncrement(&s_sessionId);
119 m_index = INVALID_INDEX;
120 m_sendQueue = new Queue;
121 m_processingQueue = new Queue;
122 m_channel = channel;
123 m_channel->incRefCount();
124 m_hProxySocket = INVALID_SOCKET;
125 m_hWriteThread = INVALID_THREAD_HANDLE;
126 m_hProcessingThread = INVALID_THREAD_HANDLE;
127 m_hProxyReadThread = INVALID_THREAD_HANDLE;
128 m_serverId = 0;
129 m_serverAddr = serverAddr;
130 m_authenticated = (g_dwFlags & AF_REQUIRE_AUTH) ? false : true;
131 m_masterServer = masterServer;
132 m_controlServer = controlServer;
133 m_proxyConnection = false;
134 m_acceptTraps = false;
135 m_acceptData = false;
136 m_acceptFileUpdates = false;
137 m_ipv6Aware = false;
138 m_bulkReconciliationSupported = false;
139 m_disconnected = false;
140 m_allowCompression = false;
141 m_pCtx = NULL;
142 m_ts = time(NULL);
143 m_socketWriteMutex = MutexCreate();
144 m_responseQueue = new MsgWaitQueue();
145 m_requestId = 0;
146 }
147
148 /**
149 * Destructor
150 */
151 CommSession::~CommSession()
152 {
153 if (m_proxyConnection)
154 InterlockedDecrement(&s_activeProxySessions);
155
156 m_channel->shutdown();
157 m_channel->close();
158 m_channel->decRefCount();
159 if (m_hProxySocket != INVALID_SOCKET)
160 closesocket(m_hProxySocket);
161
162 void *p;
163 while((p = m_sendQueue->get()) != NULL)
164 if (p != INVALID_POINTER_VALUE)
165 free(p);
166 delete m_sendQueue;
167
168 while((p = m_processingQueue->get()) != NULL)
169 if (p != INVALID_POINTER_VALUE)
170 delete (NXCPMessage *)p;
171 delete m_processingQueue;
172 if ((m_pCtx != NULL) && (m_pCtx != PROXY_ENCRYPTION_CTX))
173 m_pCtx->decRefCount();
174 MutexDestroy(m_socketWriteMutex);
175 delete m_responseQueue;
176 }
177
178 /**
179 * Debug print in session context
180 */
181 void CommSession::debugPrintf(int level, const TCHAR *format, ...)
182 {
183 if (level > nxlog_get_debug_level())
184 return;
185
186 va_list args;
187 TCHAR buffer[8192];
188
189 va_start(args, format);
190 _vsntprintf(buffer, 8192, format, args);
191 va_end(args);
192
193 nxlog_write(MSG_DEBUG_SESSION, EVENTLOG_DEBUG_TYPE, "dds", m_index, m_id, buffer);
194 }
195
196 /**
197 * Start all threads
198 */
199 void CommSession::run()
200 {
201 m_hWriteThread = ThreadCreateEx(writeThreadStarter, 0, this);
202 m_hProcessingThread = ThreadCreateEx(processingThreadStarter, 0, this);
203 ThreadCreate(readThreadStarter, 0, this);
204 }
205
206 /**
207 * Disconnect session
208 */
209 void CommSession::disconnect()
210 {
211 debugPrintf(5, _T("CommSession::disconnect()"));
212 m_channel->shutdown();
213 if (m_hProxySocket != -1)
214 shutdown(m_hProxySocket, SHUT_RDWR);
215 m_disconnected = true;
216 }
217
218 /**
219 * Reading thread
220 */
221 void CommSession::readThread()
222 {
223 CommChannelMessageReceiver receiver(m_channel, 4096, MAX_AGENT_MSG_SIZE);
224 while(!m_disconnected)
225 {
226 if (!m_proxyConnection)
227 {
228 MessageReceiverResult result;
229 NXCPMessage *msg = receiver.readMessage((g_dwIdleTimeout + 1) * 1000, &result);
230
231 // Check for decryption error
232 if (result == MSGRECV_DECRYPTION_FAILURE)
233 {
234 debugPrintf(4, _T("Unable to decrypt received message"));
235 continue;
236 }
237
238 // Check for timeout
239 if (result == MSGRECV_TIMEOUT)
240 {
241 if (m_ts < time(NULL) - (time_t)g_dwIdleTimeout)
242 {
243 debugPrintf(5, _T("Session disconnected by timeout (last activity timestamp is %d)"), (int)m_ts);
244 break;
245 }
246 continue;
247 }
248
249 // Receive error
250 if (msg == NULL)
251 {
252 if (result == MSGRECV_CLOSED)
253 debugPrintf(5, _T("Communication channel closed by peer"));
254 else
255 debugPrintf(5, _T("Message receiving error (%s)"), AbstractMessageReceiver::resultToText(result));
256 break;
257 }
258
259 // Update activity timestamp
260 m_ts = time(NULL);
261
262 if (nxlog_get_debug_level() >= 8)
263 {
264 String msgDump = NXCPMessage::dump(receiver.getRawMessageBuffer(), NXCP_VERSION);
265 debugPrintf(8, _T("Message dump:\n%s"), (const TCHAR *)msgDump);
266 }
267
268 if (msg->isBinary())
269 {
270 TCHAR buffer[64];
271 debugPrintf(6, _T("Received raw message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
272
273 if (msg->getCode() == CMD_FILE_DATA)
274 {
275 DownloadFileInfo *dInfo = m_downloadFileMap.get(msg->getId());
276 if (dInfo != NULL)
277 {
278 if (dInfo->write(msg->getBinaryData(), msg->getBinaryDataSize(), msg->isCompressedStream()))
279 {
280 if (msg->isEndOfFile())
281 {
282 NXCPMessage response;
283
284 dInfo->close(true);
285 m_downloadFileMap.remove(msg->getId());
286
287 response.setCode(CMD_REQUEST_COMPLETED);
288 response.setId(msg->getId());
289 response.setField(VID_RCC, ERR_SUCCESS);
290 sendMessage(&response);
291 }
292 }
293 else
294 {
295 // I/O error
296 NXCPMessage response;
297
298 dInfo->close(false);
299 m_downloadFileMap.remove(msg->getId());
300
301 response.setCode(CMD_REQUEST_COMPLETED);
302 response.setId(msg->getId());
303 response.setField(VID_RCC, ERR_IO_FAILURE);
304 sendMessage(&response);
305 }
306 }
307 }
308 }
309 else if (msg->isControl())
310 {
311 TCHAR buffer[64];
312 debugPrintf(6, _T("Received control message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
313
314 if (msg->getCode() == CMD_GET_NXCP_CAPS)
315 {
316 NXCP_MESSAGE *response = (NXCP_MESSAGE *)malloc(NXCP_HEADER_SIZE);
317 response->id = htonl(msg->getId());
318 response->code = htons((WORD)CMD_NXCP_CAPS);
319 response->flags = htons(MF_CONTROL);
320 response->numFields = htonl(NXCP_VERSION << 24);
321 response->size = htonl(NXCP_HEADER_SIZE);
322 sendRawMessage(response, m_pCtx);
323 }
324 delete msg;
325 }
326 else
327 {
328 TCHAR buffer[64];
329 debugPrintf(6, _T("Received message %s (%d)"), NXCPMessageCodeName(msg->getCode(), buffer), msg->getId());
330
331 UINT32 rcc;
332 switch(msg->getCode())
333 {
334 case CMD_REQUEST_COMPLETED:
335 m_responseQueue->put(msg);
336 break;
337 case CMD_REQUEST_SESSION_KEY:
338 if (m_pCtx == NULL)
339 {
340 NXCPMessage *pResponse;
341 SetupEncryptionContext(msg, &m_pCtx, &pResponse, NULL, NXCP_VERSION);
342 sendMessage(pResponse);
343 delete pResponse;
344 receiver.setEncryptionContext(m_pCtx);
345 }
346 delete msg;
347 break;
348 case CMD_SETUP_PROXY_CONNECTION:
349 s_proxyConnectionRequests++;
350 rcc = setupProxyConnection(msg);
351 // When proxy session established incoming messages will
352 // not be processed locally. Acknowledgment message sent
353 // by setupProxyConnection() in case of success.
354 if (rcc == ERR_SUCCESS)
355 {
356 InterlockedIncrement(&s_activeProxySessions);
357 m_processingQueue->put(INVALID_POINTER_VALUE);
358 }
359 else
360 {
361 NXCPMessage response;
362 response.setCode(CMD_REQUEST_COMPLETED);
363 response.setId(msg->getId());
364 response.setField(VID_RCC, rcc);
365 sendMessage(&response);
366 }
367 delete msg;
368 break;
369 case CMD_SNMP_REQUEST:
370 if (m_masterServer && (g_dwFlags & AF_ENABLE_SNMP_PROXY))
371 {
372 incRefCount();
373 ThreadPoolExecute(g_snmpProxyThreadPool, this, &CommSession::proxySnmpRequest, msg);
374 }
375 else
376 {
377 NXCPMessage response;
378 response.setCode(CMD_REQUEST_COMPLETED);
379 response.setId(msg->getId());
380 response.setField(VID_RCC, ERR_ACCESS_DENIED);
381 sendMessage(&response);
382 delete msg;
383 }
384 break;
385 default:
386 m_processingQueue->put(msg);
387 break;
388 }
389 }
390 }
391 else // m_proxyConnection
392 {
393 int rc = m_channel->poll((g_dwIdleTimeout + 1) * 1000);
394 if (rc <= 0)
395 break;
396 if (rc > 0)
397 {
398 // Update activity timestamp
399 m_ts = time(NULL);
400
401 char buffer[32768];
402 rc = m_channel->recv(buffer, 32768);
403 if (rc <= 0)
404 break;
405 SendEx(m_hProxySocket, buffer, rc, 0, NULL);
406 }
407 }
408 }
409
410 // Notify other threads to exit
411 m_sendQueue->put(INVALID_POINTER_VALUE);
412 m_processingQueue->put(INVALID_POINTER_VALUE);
413 if (m_hProxySocket != INVALID_SOCKET)
414 shutdown(m_hProxySocket, SHUT_RDWR);
415
416 // Wait for other threads to finish
417 ThreadJoin(m_hWriteThread);
418 ThreadJoin(m_hProcessingThread);
419 if (m_proxyConnection)
420 ThreadJoin(m_hProxyReadThread);
421
422 debugPrintf(5, _T("Session with %s closed"), (const TCHAR *)m_serverAddr.toString());
423 }
424
425 /**
426 * Send prepared raw message over the network and destroy it
427 */
428 bool CommSession::sendRawMessage(NXCP_MESSAGE *msg, NXCPEncryptionContext *ctx)
429 {
430 if (m_disconnected)
431 {
432 free(msg);
433 debugPrintf(6, _T("Aborting sendRawMessage call because session is disconnected"));
434 return false;
435 }
436
437 bool success = true;
438 if (nxlog_get_debug_level() >= 6)
439 {
440 TCHAR buffer[128];
441 debugPrintf(6, _T("Sending message %s (ID %d; size %d; %s)"), NXCPMessageCodeName(ntohs(msg->code), buffer),
442 ntohl(msg->id), ntohl(msg->size),
443 ntohs(msg->flags) & MF_COMPRESSED ? _T("compressed") : _T("uncompressed"));
444 if (nxlog_get_debug_level() >= 8)
445 {
446 String msgDump = NXCPMessage::dump(msg, NXCP_VERSION);
447 debugPrintf(8, _T("Outgoing message dump:\n%s"), (const TCHAR *)msgDump);
448 }
449 }
450
451 if ((ctx != NULL) && (ctx != PROXY_ENCRYPTION_CTX))
452 {
453 NXCP_ENCRYPTED_MESSAGE *enMsg = ctx->encryptMessage(msg);
454 if (enMsg != NULL)
455 {
456 if (m_channel->send(enMsg, ntohl(enMsg->size), m_socketWriteMutex) <= 0)
457 {
458 success = false;
459 }
460 free(enMsg);
461 }
462 }
463 else
464 {
465 if (m_channel->send(msg, ntohl(msg->size), m_socketWriteMutex) <= 0)
466 {
467 success = false;
468 }
469 }
470
471 if (!success)
472 {
473 TCHAR buffer[128];
474 debugPrintf(6, _T("CommSession::SendRawMessage() for %s (size %d) failed (error %d: %s)"),
475 NXCPMessageCodeName(ntohs(msg->code), buffer), ntohl(msg->size), WSAGetLastError(), _tcserror(WSAGetLastError()));
476 }
477 free(msg);
478 return success;
479 }
480
481 /**
482 * Send message directly to socket
483 */
484 bool CommSession::sendMessage(NXCPMessage *msg)
485 {
486 if (m_disconnected)
487 return false;
488
489 return sendRawMessage(msg->serialize(m_allowCompression), m_pCtx);
490 }
491
492 /**
493 * Send raw message directly to socket
494 */
495 bool CommSession::sendRawMessage(NXCP_MESSAGE *msg)
496 {
497 return sendRawMessage((NXCP_MESSAGE *)nx_memdup(msg, ntohl(msg->size)), m_pCtx);
498 }
499
500 /**
501 * Writing thread
502 */
503 void CommSession::writeThread()
504 {
505 while(true)
506 {
507 NXCP_MESSAGE *msg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
508 if (msg == INVALID_POINTER_VALUE) // Session termination indicator
509 break;
510
511 if (!sendRawMessage(msg, m_pCtx))
512 break;
513 }
514 debugPrintf(6, _T("writer thread stopped"));
515 }
516
517 /**
518 * Message processing thread
519 */
520 void CommSession::processingThread()
521 {
522 NXCPMessage response;
523 while(true)
524 {
525 NXCPMessage *request = (NXCPMessage *)m_processingQueue->getOrBlock();
526 if (request == INVALID_POINTER_VALUE) // Session termination indicator
527 break;
528 UINT32 command = request->getCode();
529
530 // Prepare response message
531 response.setCode(CMD_REQUEST_COMPLETED);
532 response.setId(request->getId());
533
534 // Check if authentication required
535 if ((!m_authenticated) && (command != CMD_AUTHENTICATE))
536 {
537 debugPrintf(6, _T("Authentication required"));
538 response.setField(VID_RCC, ERR_AUTH_REQUIRED);
539 }
540 else if ((g_dwFlags & AF_REQUIRE_ENCRYPTION) && (m_pCtx == NULL))
541 {
542 debugPrintf(6, _T("Encryption required"));
543 response.setField(VID_RCC, ERR_ENCRYPTION_REQUIRED);
544 }
545 else
546 {
547 switch(command)
548 {
549 case CMD_AUTHENTICATE:
550 authenticate(request, &response);
551 break;
552 case CMD_GET_PARAMETER:
553 getParameter(request, &response);
554 break;
555 case CMD_GET_LIST:
556 getList(request, &response);
557 break;
558 case CMD_GET_TABLE:
559 getTable(request, &response);
560 break;
561 case CMD_KEEPALIVE:
562 response.setField(VID_RCC, ERR_SUCCESS);
563 break;
564 case CMD_ACTION:
565 action(request, &response);
566 break;
567 case CMD_TRANSFER_FILE:
568 recvFile(request, &response);
569 break;
570 case CMD_UPGRADE_AGENT:
571 response.setField(VID_RCC, upgrade(request));
572 break;
573 case CMD_GET_PARAMETER_LIST:
574 response.setField(VID_RCC, ERR_SUCCESS);
575 GetParameterList(&response);
576 break;
577 case CMD_GET_ENUM_LIST:
578 response.setField(VID_RCC, ERR_SUCCESS);
579 GetEnumList(&response);
580 break;
581 case CMD_GET_TABLE_LIST:
582 response.setField(VID_RCC, ERR_SUCCESS);
583 GetTableList(&response);
584 break;
585 case CMD_GET_AGENT_CONFIG:
586 getConfig(&response);
587 break;
588 case CMD_UPDATE_AGENT_CONFIG:
589 updateConfig(request, &response);
590 break;
591 case CMD_ENABLE_AGENT_TRAPS:
592 m_acceptTraps = true;
593 response.setField(VID_RCC, ERR_SUCCESS);
594 break;
595 case CMD_ENABLE_FILE_UPDATES:
596 if (m_masterServer)
597 {
598 m_acceptFileUpdates = true;
599 response.setField(VID_RCC, ERR_SUCCESS);
600 }
601 else
602 {
603 response.setField(VID_RCC, ERR_ACCESS_DENIED);
604 }
605 break;
606 case CMD_DEPLOY_AGENT_POLICY:
607 if (m_masterServer)
608 {
609 response.setField(VID_RCC, DeployPolicy(this, request));
610 }
611 else
612 {
613 response.setField(VID_RCC, ERR_ACCESS_DENIED);
614 }
615 break;
616 case CMD_UNINSTALL_AGENT_POLICY:
617 if (m_masterServer)
618 {
619 response.setField(VID_RCC, UninstallPolicy(this, request));
620 }
621 else
622 {
623 response.setField(VID_RCC, ERR_ACCESS_DENIED);
624 }
625 break;
626 case CMD_GET_POLICY_INVENTORY:
627 if (m_masterServer)
628 {
629 response.setField(VID_RCC, GetPolicyInventory(this, &response));
630 }
631 else
632 {
633 response.setField(VID_RCC, ERR_ACCESS_DENIED);
634 }
635 break;
636 case CMD_TAKE_SCREENSHOT:
637 if (m_controlServer)
638 {
639 TCHAR sessionName[256];
640 request->getFieldAsString(VID_NAME, sessionName, 256);
641 debugPrintf(6, _T("Take screenshot from session \"%s\""), sessionName);
642 SessionAgentConnector *conn = AcquireSessionAgentConnector(sessionName);
643 if (conn != NULL)
644 {
645 debugPrintf(6, _T("Session agent connector acquired"));
646 conn->takeScreenshot(&response);
647 conn->decRefCount();
648 }
649 else
650 {
651 response.setField(VID_RCC, ERR_NO_SESSION_AGENT);
652 }
653 }
654 else
655 {
656 response.setField(VID_RCC, ERR_ACCESS_DENIED);
657 }
658 break;
659 case CMD_SET_SERVER_CAPABILITIES:
660 // Servers before 2.0 use VID_ENABLED
661 m_ipv6Aware = request->isFieldExist(VID_IPV6_SUPPORT) ? request->getFieldAsBoolean(VID_IPV6_SUPPORT) : request->getFieldAsBoolean(VID_ENABLED);
662 m_bulkReconciliationSupported = request->getFieldAsBoolean(VID_BULK_RECONCILIATION);
663 m_allowCompression = request->getFieldAsBoolean(VID_ENABLE_COMPRESSION);
664 response.setField(VID_RCC, ERR_SUCCESS);
665 debugPrintf(1, _T("Server capabilities: IPv6: %s; bulk reconciliation: %s; compression: %s"),
666 m_ipv6Aware ? _T("yes") : _T("no"),
667 m_bulkReconciliationSupported ? _T("yes") : _T("no"),
668 m_allowCompression ? _T("yes") : _T("no"));
669 break;
670 case CMD_SET_SERVER_ID:
671 m_serverId = request->getFieldAsUInt64(VID_SERVER_ID);
672 debugPrintf(1, _T("Server ID set to ") UINT64X_FMT(_T("016")), m_serverId);
673 response.setField(VID_RCC, ERR_SUCCESS);
674 break;
675 case CMD_DATA_COLLECTION_CONFIG:
676 if (m_serverId != 0)
677 {
678 ConfigureDataCollection(m_serverId, request);
679 m_acceptData = true;
680 response.setField(VID_RCC, ERR_SUCCESS);
681 }
682 else
683 {
684 debugPrintf(1, _T("Data collection configuration command received but server ID is not set"));
685 response.setField(VID_RCC, ERR_SERVER_ID_UNSET);
686 }
687 break;
688 case CMD_CLEAN_AGENT_DCI_CONF:
689 if (m_masterServer)
690 {
691 ClearDataCollectionConfiguration();
692 response.setField(VID_RCC, ERR_SUCCESS);
693 }
694 else
695 {
696 response.setField(VID_RCC, ERR_ACCESS_DENIED);
697 }
698 break;
699 default:
700 // Attempt to process unknown command by subagents
701 if (!ProcessCmdBySubAgent(command, request, &response, this))
702 response.setField(VID_RCC, ERR_UNKNOWN_COMMAND);
703 break;
704 }
705 }
706 delete request;
707
708 // Send response
709 sendMessage(&response);
710 response.deleteAllFields();
711 }
712 }
713
714 /**
715 * Authenticate peer
716 */
717 void CommSession::authenticate(NXCPMessage *pRequest, NXCPMessage *pMsg)
718 {
719 if (m_authenticated)
720 {
721 // Already authenticated
722 pMsg->setField(VID_RCC, (g_dwFlags & AF_REQUIRE_AUTH) ? ERR_ALREADY_AUTHENTICATED : ERR_AUTH_NOT_REQUIRED);
723 }
724 else
725 {
726 TCHAR szSecret[MAX_SECRET_LENGTH];
727 BYTE hash[32];
728 WORD wAuthMethod;
729
730 wAuthMethod = pRequest->getFieldAsUInt16(VID_AUTH_METHOD);
731 switch(wAuthMethod)
732 {
733 case AUTH_PLAINTEXT:
734 pRequest->getFieldAsString(VID_SHARED_SECRET, szSecret, MAX_SECRET_LENGTH);
735 if (!_tcscmp(szSecret, g_szSharedSecret))
736 {
737 m_authenticated = true;
738 pMsg->setField(VID_RCC, ERR_SUCCESS);
739 }
740 else
741 {
742 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("PLAIN"));
743 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
744 }
745 break;
746 case AUTH_MD5_HASH:
747 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, MD5_DIGEST_SIZE);
748 #ifdef UNICODE
749 {
750 char sharedSecret[256];
751 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
752 sharedSecret[255] = 0;
753 CalculateMD5Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
754 }
755 #else
756 CalculateMD5Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
757 #endif
758 if (!memcmp(szSecret, hash, MD5_DIGEST_SIZE))
759 {
760 m_authenticated = true;
761 pMsg->setField(VID_RCC, ERR_SUCCESS);
762 }
763 else
764 {
765 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("MD5"));
766 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
767 }
768 break;
769 case AUTH_SHA1_HASH:
770 pRequest->getFieldAsBinary(VID_SHARED_SECRET, (BYTE *)szSecret, SHA1_DIGEST_SIZE);
771 #ifdef UNICODE
772 {
773 char sharedSecret[256];
774 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, g_szSharedSecret, -1, sharedSecret, 256, NULL, NULL);
775 sharedSecret[255] = 0;
776 CalculateSHA1Hash((BYTE *)sharedSecret, strlen(sharedSecret), hash);
777 }
778 #else
779 CalculateSHA1Hash((BYTE *)g_szSharedSecret, strlen(g_szSharedSecret), hash);
780 #endif
781 if (!memcmp(szSecret, hash, SHA1_DIGEST_SIZE))
782 {
783 m_authenticated = true;
784 pMsg->setField(VID_RCC, ERR_SUCCESS);
785 }
786 else
787 {
788 nxlog_write(MSG_AUTH_FAILED, EVENTLOG_WARNING_TYPE, "Is", &m_serverAddr, _T("SHA1"));
789 pMsg->setField(VID_RCC, ERR_AUTH_FAILED);
790 }
791 break;
792 default:
793 pMsg->setField(VID_RCC, ERR_NOT_IMPLEMENTED);
794 break;
795 }
796 }
797 }
798
799 /**
800 * Get parameter's value
801 */
802 void CommSession::getParameter(NXCPMessage *pRequest, NXCPMessage *pMsg)
803 {
804 TCHAR szParameter[MAX_PARAM_NAME], szValue[MAX_RESULT_LENGTH];
805 UINT32 dwErrorCode;
806
807 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
808 dwErrorCode = GetParameterValue(szParameter, szValue, this);
809 pMsg->setField(VID_RCC, dwErrorCode);
810 if (dwErrorCode == ERR_SUCCESS)
811 pMsg->setField(VID_VALUE, szValue);
812 }
813
814 /**
815 * Get list of values
816 */
817 void CommSession::getList(NXCPMessage *pRequest, NXCPMessage *pMsg)
818 {
819 TCHAR szParameter[MAX_PARAM_NAME];
820 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
821
822 StringList value;
823 UINT32 dwErrorCode = GetListValue(szParameter, &value, this);
824 pMsg->setField(VID_RCC, dwErrorCode);
825 if (dwErrorCode == ERR_SUCCESS)
826 {
827 value.fillMessage(pMsg, VID_ENUM_VALUE_BASE, VID_NUM_STRINGS);
828 }
829 }
830
831 /**
832 * Get table
833 */
834 void CommSession::getTable(NXCPMessage *pRequest, NXCPMessage *pMsg)
835 {
836 TCHAR szParameter[MAX_PARAM_NAME];
837
838 pRequest->getFieldAsString(VID_PARAMETER, szParameter, MAX_PARAM_NAME);
839
840 Table value;
841 UINT32 dwErrorCode = GetTableValue(szParameter, &value, this);
842 pMsg->setField(VID_RCC, dwErrorCode);
843 if (dwErrorCode == ERR_SUCCESS)
844 {
845 value.fillMessage(*pMsg, 0, -1); // no row limit
846 }
847 }
848
849 /**
850 * Perform action on request
851 */
852 void CommSession::action(NXCPMessage *pRequest, NXCPMessage *pMsg)
853 {
854 if ((g_dwFlags & AF_ENABLE_ACTIONS) && m_controlServer)
855 {
856 // Get action name and arguments
857 TCHAR action[MAX_PARAM_NAME];
858 pRequest->getFieldAsString(VID_ACTION_NAME, action, MAX_PARAM_NAME);
859
860 int numArgs = pRequest->getFieldAsInt32(VID_NUM_ARGS);
861 StringList *args = new StringList;
862 for(int i = 0; i < numArgs; i++)
863 args->addPreallocated(pRequest->getFieldAsString(VID_ACTION_ARG_BASE + i));
864
865 // Execute action
866 if (pRequest->getFieldAsBoolean(VID_RECEIVE_OUTPUT))
867 {
868 UINT32 rcc = ExecActionWithOutput(this, pRequest->getId(), action, args);
869 pMsg->setField(VID_RCC, rcc);
870 }
871 else
872 {
873 UINT32 rcc = ExecAction(action, args, this);
874 pMsg->setField(VID_RCC, rcc);
875 delete args;
876 }
877 }
878 else
879 {
880 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
881 }
882 }
883
884 /**
885 * Prepare for receiving file
886 */
887 void CommSession::recvFile(NXCPMessage *pRequest, NXCPMessage *pMsg)
888 {
889 TCHAR szFileName[MAX_PATH], szFullPath[MAX_PATH];
890
891 if (m_masterServer)
892 {
893 szFileName[0] = 0;
894 pRequest->getFieldAsString(VID_FILE_NAME, szFileName, MAX_PATH);
895 debugPrintf(5, _T("CommSession::recvFile(): Preparing for receiving file \"%s\""), szFileName);
896 BuildFullPath(szFileName, szFullPath);
897
898 // Check if for some reason we have already opened file
899 pMsg->setField(VID_RCC, openFile(szFullPath, pRequest->getId(), pRequest->getFieldAsTime(VID_MODIFICATION_TIME)));
900 }
901 else
902 {
903 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
904 }
905 }
906
907 /**
908 * Open file for writing
909 */
910 UINT32 CommSession::openFile(TCHAR *szFullPath, UINT32 requestId, time_t fileModTime)
911 {
912 DownloadFileInfo *fInfo = new DownloadFileInfo(szFullPath, fileModTime);
913 debugPrintf(5, _T("CommSession::recvFile(): Writing to local file \"%s\""), szFullPath);
914
915 if (!fInfo->open())
916 {
917 delete fInfo;
918 debugPrintf(2, _T("CommSession::recvFile(): Error opening file \"%s\" for writing (%s)"), szFullPath, _tcserror(errno));
919 return ERR_IO_FAILURE;
920 }
921 else
922 {
923 m_downloadFileMap.set(requestId, fInfo);
924 return ERR_SUCCESS;
925 }
926 }
927
928 /**
929 * Progress callback for file sending
930 */
931 static void SendFileProgressCallback(INT64 bytesTransferred, void *cbArg)
932 {
933 ((CommSession *)cbArg)->updateTimeStamp();
934 }
935
936 /**
937 * Send file to server
938 */
939 bool CommSession::sendFile(UINT32 requestId, const TCHAR *file, long offset, bool allowCompression)
940 {
941 if (m_disconnected)
942 return false;
943 return SendFileOverNXCP(m_channel, requestId, file, m_pCtx, offset,
944 SendFileProgressCallback, this, m_socketWriteMutex, allowCompression ? NXCP_STREAM_COMPRESSION_DEFLATE : NXCP_STREAM_COMPRESSION_NONE);
945 }
946
947 /**
948 * Upgrade agent from package in the file store
949 */
950 UINT32 CommSession::upgrade(NXCPMessage *request)
951 {
952 if (m_masterServer)
953 {
954 TCHAR szPkgName[MAX_PATH], szFullPath[MAX_PATH];
955
956 szPkgName[0] = 0;
957 request->getFieldAsString(VID_FILE_NAME, szPkgName, MAX_PATH);
958 BuildFullPath(szPkgName, szFullPath);
959
960 //Create line in registry file with upgrade file name to delete it after system start
961 DB_HANDLE hdb = GetLocalDatabaseHandle();
962 if(hdb != NULL)
963 {
964 TCHAR upgradeFileInsert[256];
965 _sntprintf(upgradeFileInsert, 256, _T("INSERT INTO registry (attribute,value) VALUES ('upgrade.file',%s)"), (const TCHAR *)DBPrepareString(hdb, szPkgName));
966 DBQuery(hdb, upgradeFileInsert);
967 }
968
969 return UpgradeAgent(szFullPath);
970 }
971 else
972 {
973 return ERR_ACCESS_DENIED;
974 }
975 }
976
977 /**
978 * Get agent's configuration file
979 */
980 void CommSession::getConfig(NXCPMessage *pMsg)
981 {
982 if (m_masterServer)
983 {
984 pMsg->setField(VID_RCC,
985 pMsg->setFieldFromFile(VID_CONFIG_FILE, g_szConfigFile) ? ERR_SUCCESS : ERR_IO_FAILURE);
986 }
987 else
988 {
989 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
990 }
991 }
992
993 /**
994 * Update agent's configuration file
995 */
996 void CommSession::updateConfig(NXCPMessage *pRequest, NXCPMessage *pMsg)
997 {
998 if (m_masterServer)
999 {
1000 BYTE *pConfig;
1001 int hFile;
1002 UINT32 size;
1003
1004 if (pRequest->isFieldExist(VID_CONFIG_FILE))
1005 {
1006 size = pRequest->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
1007 pConfig = (BYTE *)malloc(size);
1008 pRequest->getFieldAsBinary(VID_CONFIG_FILE, pConfig, size);
1009 hFile = _topen(g_szConfigFile, O_CREAT | O_TRUNC | O_WRONLY, 0644);
1010 if (hFile != -1)
1011 {
1012 if (size > 0)
1013 {
1014 for(UINT32 i = 0; i < size - 1; i++)
1015 if (pConfig[i] == 0x0D)
1016 {
1017 size--;
1018 memmove(&pConfig[i], &pConfig[i + 1], size - i);
1019 i--;
1020 }
1021 }
1022 if (_write(hFile, pConfig, size) == size)
1023 pMsg->setField(VID_RCC, ERR_SUCCESS);
1024 else
1025 pMsg->setField(VID_RCC, ERR_IO_FAILURE);
1026 _close(hFile);
1027 }
1028 else
1029 {
1030 debugPrintf(2, _T("CommSession::updateConfig(): Error opening file \"%s\" for writing (%s)"),
1031 g_szConfigFile, _tcserror(errno));
1032 pMsg->setField(VID_RCC, ERR_FILE_OPEN_ERROR);
1033 }
1034 free(pConfig);
1035 }
1036 else
1037 {
1038 pMsg->setField(VID_RCC, ERR_MALFORMED_COMMAND);
1039 }
1040 }
1041 else
1042 {
1043 pMsg->setField(VID_RCC, ERR_ACCESS_DENIED);
1044 }
1045 }
1046
1047 /**
1048 * Setup proxy connection
1049 */
1050 UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
1051 {
1052 UINT32 dwResult, dwAddr;
1053 WORD wPort;
1054 struct sockaddr_in sa;
1055 NXCPEncryptionContext *pSavedCtx;
1056 TCHAR szBuffer[32];
1057
1058 if (m_masterServer && (g_dwFlags & AF_ENABLE_PROXY))
1059 {
1060 dwAddr = pRequest->getFieldAsUInt32(VID_IP_ADDRESS);
1061 wPort = pRequest->getFieldAsUInt16(VID_AGENT_PORT);
1062 m_hProxySocket = socket(AF_INET, SOCK_STREAM, 0);
1063 if (m_hProxySocket != INVALID_SOCKET)
1064 {
1065 // Fill in address structure
1066 memset(&sa, 0, sizeof(sa));
1067 sa.sin_addr.s_addr = htonl(dwAddr);
1068 sa.sin_family = AF_INET;
1069 sa.sin_port = htons(wPort);
1070 if (connect(m_hProxySocket, (struct sockaddr *)&sa, sizeof(sa)) != -1)
1071 {
1072 NXCPMessage msg;
1073 NXCP_MESSAGE *pRawMsg;
1074
1075 // Stop writing thread
1076 m_sendQueue->put(INVALID_POINTER_VALUE);
1077
1078 // Wait while all queued messages will be sent
1079 while(m_sendQueue->size() > 0)
1080 ThreadSleepMs(100);
1081
1082 // Finish proxy connection setup
1083 pSavedCtx = m_pCtx;
1084 m_pCtx = PROXY_ENCRYPTION_CTX;
1085 m_proxyConnection = true;
1086 dwResult = ERR_SUCCESS;
1087 m_hProxyReadThread = ThreadCreateEx(proxyReadThreadStarter, 0, this);
1088
1089 // Send confirmation message
1090 // We cannot use sendMessage() and writing thread, because
1091 // encryption context already overriden, and writing thread
1092 // already stopped
1093 msg.setCode(CMD_REQUEST_COMPLETED);
1094 msg.setId(pRequest->getId());
1095 msg.setField(VID_RCC, RCC_SUCCESS);
1096 pRawMsg = msg.serialize();
1097 sendRawMessage(pRawMsg, pSavedCtx);
1098 if (pSavedCtx != NULL)
1099 pSavedCtx->decRefCount();
1100
1101 debugPrintf(5, _T("Established proxy connection to %s:%d"), IpToStr(dwAddr, szBuffer), wPort);
1102 }
1103 else
1104 {
1105 dwResult = ERR_CONNECT_FAILED;
1106 }
1107 }
1108 else
1109 {
1110 dwResult = ERR_SOCKET_ERROR;
1111 }
1112 }
1113 else
1114 {
1115 dwResult = ERR_ACCESS_DENIED;
1116 }
1117 return dwResult;
1118 }
1119
1120 /**
1121 * Proxy reading thread
1122 */
1123 void CommSession::proxyReadThread()
1124 {
1125 SocketPoller sp;
1126 while(true)
1127 {
1128 sp.reset();
1129 sp.add(m_hProxySocket);
1130 int rc = sp.poll(500); // Half-second timeout
1131 if (rc < 0)
1132 break;
1133 if (rc > 0)
1134 {
1135 char buffer[32768];
1136 rc = recv(m_hProxySocket, buffer, 32768, 0);
1137 if (rc <= 0)
1138 break;
1139 m_channel->send(buffer, rc, m_socketWriteMutex);
1140 }
1141 }
1142 disconnect();
1143 }
1144
1145 /**
1146 * Wait for request completion
1147 */
1148 UINT32 CommSession::doRequest(NXCPMessage *msg, UINT32 timeout)
1149 {
1150 if (!sendMessage(msg))
1151 return ERR_CONNECTION_BROKEN;
1152
1153 NXCPMessage *response = m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1154 UINT32 rcc;
1155 if (response != NULL)
1156 {
1157 rcc = response->getFieldAsUInt32(VID_RCC);
1158 delete response;
1159 }
1160 else
1161 {
1162 rcc = ERR_REQUEST_TIMEOUT;
1163 }
1164 return rcc;
1165 }
1166
1167 /**
1168 * Wait for request completion
1169 */
1170 NXCPMessage *CommSession::doRequestEx(NXCPMessage *msg, UINT32 timeout)
1171 {
1172 if (!sendMessage(msg))
1173 return NULL;
1174 return m_responseQueue->waitForMessage(CMD_REQUEST_COMPLETED, msg->getId(), timeout);
1175 }
1176
1177 /**
1178 * Generate new request ID
1179 */
1180 UINT32 CommSession::generateRequestId()
1181 {
1182 return (UINT32)InterlockedIncrement(&m_requestId);
1183 }
1184
1185 /**
1186 * Virtual session constructor
1187 */
1188 VirtualSession::VirtualSession(UINT64 serverId)
1189 {
1190 m_id = InterlockedIncrement(&s_sessionId);
1191 m_serverId = serverId;
1192 }
1193
1194 /**
1195 * Debug print in virtual session context
1196 */
1197 void VirtualSession::debugPrintf(int level, const TCHAR *format, ...)
1198 {
1199 if (level > nxlog_get_debug_level())
1200 return;
1201
1202 va_list args;
1203 TCHAR buffer[8192];
1204
1205 va_start(args, format);
1206 _vsntprintf(buffer, 8192, format, args);
1207 va_end(args);
1208
1209 nxlog_write(MSG_DEBUG_VSESSION, EVENTLOG_DEBUG_TYPE, "ds", m_id, buffer);
1210 }