bb80e79ed41be8b1c0e5c3b65e57c6db16ee91a9
[public/netxms.git] / src / server / libnxsrv / agent.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Server Library
4 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: agent.cpp
21 **
22 **/
23
24 #include "libnxsrv.h"
25 #include <stdarg.h>
26 #include <nxstat.h>
27
28 #ifndef _WIN32
29 #define _tell(f) lseek(f,0,SEEK_CUR)
30 #endif
31
32 #define DEBUG_TAG _T("agent.conn")
33
34 /**
35 * Constants
36 */
37 #define MAX_MSG_SIZE 268435456
38
39 /**
40 * Agent connection thread pool
41 */
42 ThreadPool LIBNXSRV_EXPORTABLE *g_agentConnectionThreadPool = NULL;
43
44 /**
45 * Unique connection ID
46 */
47 static VolatileCounter s_connectionId = 0;
48
49 /**
50 * Static data
51 */
52 #ifdef _WITH_ENCRYPTION
53 static int m_iDefaultEncryptionPolicy = ENCRYPTION_ALLOWED;
54 #else
55 static int m_iDefaultEncryptionPolicy = ENCRYPTION_DISABLED;
56 #endif
57
58 /**
59 * Set default encryption policy for agent communication
60 */
61 void LIBNXSRV_EXPORTABLE SetAgentDEP(int iPolicy)
62 {
63 #ifdef _WITH_ENCRYPTION
64 m_iDefaultEncryptionPolicy = iPolicy;
65 #endif
66 }
67
68 /**
69 * Receiver thread starter
70 */
71 THREAD_RESULT THREAD_CALL AgentConnection::receiverThreadStarter(void *pArg)
72 {
73 ThreadSetName("AgentReceiver");
74 ((AgentConnection *)pArg)->receiverThread();
75 ((AgentConnection *)pArg)->decInternalRefCount();
76 return THREAD_OK;
77 }
78
79 /**
80 * Constructor for AgentConnection
81 */
82 AgentConnection::AgentConnection(const InetAddress& addr, WORD port, int authMethod, const TCHAR *secret, bool allowCompression)
83 {
84 m_internalRefCount = 1;
85 m_userRefCount = 1;
86 m_debugId = InterlockedIncrement(&s_connectionId);
87 m_addr = addr;
88 m_wPort = port;
89 m_iAuthMethod = authMethod;
90 if (secret != NULL)
91 {
92 #ifdef UNICODE
93 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
94 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
95 #else
96 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
97 #endif
98 }
99 else
100 {
101 m_szSecret[0] = 0;
102 }
103 m_allowCompression = allowCompression;
104 m_channel = NULL;
105 m_tLastCommandTime = 0;
106 m_pMsgWaitQueue = new MsgWaitQueue;
107 m_requestId = 0;
108 m_connectionTimeout = 5000; // 5 seconds
109 m_dwCommandTimeout = 5000; // Default timeout 5 seconds
110 m_isConnected = false;
111 m_mutexDataLock = MutexCreate();
112 m_mutexSocketWrite = MutexCreate();
113 m_hReceiverThread = INVALID_THREAD_HANDLE;
114 m_pCtx = NULL;
115 m_iEncryptionPolicy = m_iDefaultEncryptionPolicy;
116 m_useProxy = false;
117 m_iProxyAuth = AUTH_NONE;
118 m_wProxyPort = 4700;
119 m_dwRecvTimeout = 420000; // 7 minutes
120 m_nProtocolVersion = NXCP_VERSION;
121 m_hCurrFile = -1;
122 m_deleteFileOnDownloadFailure = true;
123 m_condFileDownload = ConditionCreate(TRUE);
124 m_fileDownloadSucceeded = false;
125 m_fileUploadInProgress = false;
126 m_sendToClientMessageCallback = NULL;
127 m_dwDownloadRequestId = 0;
128 m_downloadProgressCallback = NULL;
129 m_downloadProgressCallbackArg = NULL;
130 }
131
132 /**
133 * Destructor
134 */
135 AgentConnection::~AgentConnection()
136 {
137 debugPrintf(7, _T("AgentConnection destructor called (this=%p, thread=%p)"), this, (void *)m_hReceiverThread);
138
139 ThreadDetach(m_hReceiverThread);
140
141 delete m_pMsgWaitQueue;
142 if (m_pCtx != NULL)
143 m_pCtx->decRefCount();
144
145 if (m_hCurrFile != -1)
146 {
147 _close(m_hCurrFile);
148 onFileDownload(false);
149 }
150 else if (m_sendToClientMessageCallback != NULL)
151 {
152 onFileDownload(false);
153 }
154
155 if (m_channel != NULL)
156 m_channel->decRefCount();
157
158 MutexDestroy(m_mutexDataLock);
159 MutexDestroy(m_mutexSocketWrite);
160 ConditionDestroy(m_condFileDownload);
161 }
162
163 /**
164 * Write debug output
165 */
166 void AgentConnection::debugPrintf(int level, const TCHAR *format, ...)
167 {
168 va_list args;
169 va_start(args, format);
170 nxlog_debug_tag_object2(DEBUG_TAG, m_debugId, level, format, args);
171 va_end(args);
172 }
173
174 /**
175 * Receiver thread
176 */
177 void AgentConnection::receiverThread()
178 {
179 AbstractCommChannel *channel = m_channel;
180 UINT32 msgBufferSize = 1024;
181
182 // Initialize raw message receiving function
183 NXCP_BUFFER *msgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
184 NXCPInitBuffer(msgBuffer);
185
186 // Allocate space for raw message
187 NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)malloc(msgBufferSize);
188 #ifdef _WITH_ENCRYPTION
189 BYTE *decryptionBuffer = (BYTE *)malloc(msgBufferSize);
190 #else
191 BYTE *decryptionBuffer = NULL;
192 #endif
193
194 while(true)
195 {
196 // Shrink buffer after receiving large message
197 if (msgBufferSize > 131072)
198 {
199 msgBufferSize = 131072;
200 rawMsg = (NXCP_MESSAGE *)realloc(rawMsg, msgBufferSize);
201 if (decryptionBuffer != NULL)
202 decryptionBuffer = (BYTE *)realloc(decryptionBuffer, msgBufferSize);
203 }
204
205 // Receive raw message
206 int rc = RecvNXCPMessageEx(channel, &rawMsg, msgBuffer, &msgBufferSize,
207 &m_pCtx, (decryptionBuffer != NULL) ? &decryptionBuffer : NULL,
208 m_dwRecvTimeout, MAX_MSG_SIZE);
209 if (rc <= 0)
210 {
211 if ((rc != 0) && (WSAGetLastError() != WSAESHUTDOWN))
212 debugPrintf(6, _T("AgentConnection::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), rc, WSAGetLastError());
213 else
214 debugPrintf(6, _T("AgentConnection::ReceiverThread(): communication channel shutdown"));
215 break;
216 }
217
218 // Check if we get too large message
219 if (rc == 1)
220 {
221 TCHAR buffer[64];
222 debugPrintf(6, _T("Received too large message %s (%d bytes)"), NXCPMessageCodeName(ntohs(rawMsg->code), buffer), ntohl(rawMsg->size));
223 continue;
224 }
225
226 // Check if we are unable to decrypt message
227 if (rc == 2)
228 {
229 debugPrintf(6, _T("Unable to decrypt received message"));
230 continue;
231 }
232
233 // Check for timeout
234 if (rc == 3)
235 {
236 if (m_fileUploadInProgress)
237 continue; // Receive timeout may occur when uploading large files via slow links
238 debugPrintf(6, _T("Timed out waiting for message"));
239 break;
240 }
241
242 // Check that actual received packet size is equal to encoded in packet
243 if ((int)ntohl(rawMsg->size) != rc)
244 {
245 debugPrintf(6, _T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(rawMsg->size), rc);
246 continue; // Bad packet, wait for next
247 }
248
249 if (ntohs(rawMsg->flags) & MF_BINARY)
250 {
251 // Convert message header to host format
252 rawMsg->id = ntohl(rawMsg->id);
253 rawMsg->code = ntohs(rawMsg->code);
254 rawMsg->numFields = ntohl(rawMsg->numFields);
255 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
256 {
257 TCHAR buffer[64];
258 debugPrintf(6, _T("Received raw message %s (%d) from agent at %s"),
259 NXCPMessageCodeName(rawMsg->code, buffer), rawMsg->id, (const TCHAR *)m_addr.toString());
260 }
261
262 if ((rawMsg->code == CMD_FILE_DATA) && (rawMsg->id == m_dwDownloadRequestId))
263 {
264 if (m_sendToClientMessageCallback != NULL)
265 {
266 rawMsg->code = ntohs(rawMsg->code);
267 rawMsg->numFields = ntohl(rawMsg->numFields);
268 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
269
270 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
271 {
272 m_sendToClientMessageCallback = NULL;
273 onFileDownload(true);
274 }
275 else
276 {
277 if (m_downloadProgressCallback != NULL)
278 {
279 m_downloadProgressCallback(rawMsg->size - (NXCP_HEADER_SIZE + 8), m_downloadProgressCallbackArg);
280 }
281 }
282 }
283 else
284 {
285 if (m_hCurrFile != -1)
286 {
287 if (_write(m_hCurrFile, rawMsg->fields, rawMsg->numFields) == (int)rawMsg->numFields)
288 {
289 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
290 {
291 _close(m_hCurrFile);
292 m_hCurrFile = -1;
293
294 onFileDownload(true);
295 }
296 else
297 {
298 if (m_downloadProgressCallback != NULL)
299 {
300 m_downloadProgressCallback(_tell(m_hCurrFile), m_downloadProgressCallbackArg);
301 }
302 }
303 }
304 }
305 else
306 {
307 // I/O error
308 _close(m_hCurrFile);
309 m_hCurrFile = -1;
310
311 onFileDownload(false);
312 }
313 }
314 }
315
316 if ((rawMsg->code == CMD_ABORT_FILE_TRANSFER) && (rawMsg->id == m_dwDownloadRequestId))
317 {
318 if (m_sendToClientMessageCallback != NULL)
319 {
320 rawMsg->code = ntohs(rawMsg->code);
321 rawMsg->numFields = ntohl(rawMsg->numFields);
322 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
323 m_sendToClientMessageCallback = NULL;
324
325 onFileDownload(false);
326 }
327 else
328 {
329 //error on agent side
330 _close(m_hCurrFile);
331 m_hCurrFile = -1;
332
333 onFileDownload(false);
334 }
335 }
336 }
337 else if (ntohs(rawMsg->flags) & MF_CONTROL)
338 {
339 // Convert message header to host format
340 rawMsg->id = ntohl(rawMsg->id);
341 rawMsg->code = ntohs(rawMsg->code);
342 rawMsg->flags = ntohs(rawMsg->flags);
343 rawMsg->numFields = ntohl(rawMsg->numFields);
344 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
345 {
346 TCHAR buffer[64];
347 debugPrintf(6, _T("Received control message %s from agent at %s"),
348 NXCPMessageCodeName(rawMsg->code, buffer), (const TCHAR *)m_addr.toString());
349 }
350 m_pMsgWaitQueue->put((NXCP_MESSAGE *)nx_memdup(rawMsg, ntohl(rawMsg->size)));
351 }
352 else
353 {
354 // Create message object from raw message
355 NXCPMessage *msg = NXCPMessage::deserialize(rawMsg, m_nProtocolVersion);
356 if (msg != NULL)
357 {
358 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
359 {
360 TCHAR buffer[64];
361 debugPrintf(6, _T("Received message %s (%d) from agent at %s"),
362 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId(), (const TCHAR *)m_addr.toString());
363 }
364 switch(msg->getCode())
365 {
366 case CMD_REQUEST_COMPLETED:
367 case CMD_SESSION_KEY:
368 m_pMsgWaitQueue->put(msg);
369 break;
370 case CMD_TRAP:
371 if (g_agentConnectionThreadPool != NULL)
372 {
373 incInternalRefCount();
374 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onTrapCallback, msg);
375 }
376 else
377 {
378 delete msg;
379 }
380 break;
381 case CMD_SYSLOG_RECORDS:
382 if (g_agentConnectionThreadPool != NULL)
383 {
384 incInternalRefCount();
385 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSyslogMessageCallback, msg);
386 }
387 else
388 {
389 delete msg;
390 }
391 break;
392 case CMD_PUSH_DCI_DATA:
393 if (g_agentConnectionThreadPool != NULL)
394 {
395 incInternalRefCount();
396 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onDataPushCallback, msg);
397 }
398 else
399 {
400 delete msg;
401 }
402 break;
403 case CMD_DCI_DATA:
404 if (g_agentConnectionThreadPool != NULL)
405 {
406 incInternalRefCount();
407 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::processCollectedDataCallback, msg);
408 }
409 else
410 {
411 NXCPMessage response;
412 response.setCode(CMD_REQUEST_COMPLETED);
413 response.setId(msg->getId());
414 response.setField(VID_RCC, ERR_INTERNAL_ERROR);
415 sendMessage(&response);
416 delete msg;
417 }
418 break;
419 case CMD_FILE_MONITORING:
420 onFileMonitoringData(msg);
421 delete msg;
422 break;
423 case CMD_SNMP_TRAP:
424 if (g_agentConnectionThreadPool != NULL)
425 {
426 incInternalRefCount();
427 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSnmpTrapCallback, msg);
428 }
429 else
430 {
431 delete msg;
432 }
433 break;
434 default:
435 if (processCustomMessage(msg))
436 delete msg;
437 else
438 m_pMsgWaitQueue->put(msg);
439 break;
440 }
441 }
442 else
443 {
444 debugPrintf(6, _T("RecvMsg: message deserialization error"));
445 }
446 }
447 }
448 debugPrintf(6, _T("Receiver loop terminated"));
449
450 // Close socket and mark connection as disconnected
451 lock();
452 if (m_hCurrFile != -1)
453 {
454 _close(m_hCurrFile);
455 m_hCurrFile = -1;
456 onFileDownload(false);
457 }
458 else if (m_sendToClientMessageCallback != NULL)
459 {
460 m_sendToClientMessageCallback = NULL;
461 onFileDownload(false);
462 }
463
464 debugPrintf(6, _T("Closing communication channel"));
465 channel->close();
466 channel->decRefCount();
467 if (m_pCtx != NULL)
468 {
469 m_pCtx->decRefCount();
470 m_pCtx = NULL;
471 }
472 m_isConnected = false;
473 unlock();
474
475 free(rawMsg);
476 free(msgBuffer);
477 #ifdef _WITH_ENCRYPTION
478 free(decryptionBuffer);
479 #endif
480
481 debugPrintf(6, _T("Receiver thread stopped"));
482 }
483
484 /**
485 * Create channel. Default implementation creates socket channel.
486 */
487 AbstractCommChannel *AgentConnection::createChannel()
488 {
489 // Create socket
490 SOCKET s = socket(m_useProxy ? m_proxyAddr.getFamily() : m_addr.getFamily(), SOCK_STREAM, 0);
491 if (s == INVALID_SOCKET)
492 {
493 debugPrintf(6, _T("Call to socket() failed"));
494 return NULL;
495 }
496
497 // Fill in address structure
498 SockAddrBuffer sb;
499 struct sockaddr *sa = m_useProxy ? m_proxyAddr.fillSockAddr(&sb, m_wProxyPort) : m_addr.fillSockAddr(&sb, m_wPort);
500
501 // Connect to server
502 if ((sa == NULL) || (ConnectEx(s, sa, SA_LEN(sa), m_connectionTimeout) == -1))
503 {
504 TCHAR buffer[64];
505 debugPrintf(5, _T("Cannot establish connection with agent at %s:%d"),
506 m_useProxy ? m_proxyAddr.toString(buffer) : m_addr.toString(buffer),
507 (int)(m_useProxy ? m_wProxyPort : m_wPort));
508 closesocket(s);
509 return NULL;
510 }
511
512 return new SocketCommChannel(s);
513 }
514
515 /**
516 * Acquire communication channel. Caller must call decRefCount to release channel.
517 */
518 AbstractCommChannel *AgentConnection::acquireChannel()
519 {
520 lock();
521 AbstractCommChannel *channel = m_channel;
522 if (channel != NULL)
523 channel->incRefCount();
524 unlock();
525 return channel;
526 }
527
528 /**
529 * Connect to agent
530 */
531 bool AgentConnection::connect(RSA *pServerKey, UINT32 *pdwError, UINT32 *pdwSocketError, UINT64 serverId)
532 {
533 TCHAR szBuffer[256];
534 bool success = false;
535 bool forceEncryption = false;
536 bool secondPass = false;
537 UINT32 dwError = 0;
538
539 if (pdwError != NULL)
540 *pdwError = ERR_INTERNAL_ERROR;
541
542 if (pdwSocketError != NULL)
543 *pdwSocketError = 0;
544
545 // Check if already connected
546 if (m_isConnected)
547 return false;
548
549 // Wait for receiver thread from previous connection, if any
550 ThreadJoin(m_hReceiverThread);
551 m_hReceiverThread = INVALID_THREAD_HANDLE;
552
553 // Check if we need to close existing channel
554 if (m_channel != NULL)
555 {
556 m_channel->decRefCount();
557 m_channel = NULL;
558 }
559
560 m_channel = createChannel();
561 if (m_channel == NULL)
562 {
563 debugPrintf(6, _T("Cannot create communication channel"));
564 dwError = ERR_CONNECT_FAILED;
565 goto connect_cleanup;
566 }
567
568 if (!NXCPGetPeerProtocolVersion(m_channel, &m_nProtocolVersion, m_mutexSocketWrite))
569 {
570 debugPrintf(6, _T("Protocol version negotiation failed"));
571 dwError = ERR_INTERNAL_ERROR;
572 goto connect_cleanup;
573 }
574 debugPrintf(6, _T("Using NXCP version %d"), m_nProtocolVersion);
575
576 // Start receiver thread
577 incInternalRefCount();
578 m_channel->incRefCount(); // for receiver thread
579 m_hReceiverThread = ThreadCreateEx(receiverThreadStarter, 0, this);
580
581 // Setup encryption
582 setup_encryption:
583 if ((m_iEncryptionPolicy == ENCRYPTION_PREFERRED) ||
584 (m_iEncryptionPolicy == ENCRYPTION_REQUIRED) ||
585 forceEncryption) // Agent require encryption
586 {
587 if (pServerKey != NULL)
588 {
589 dwError = setupEncryption(pServerKey);
590 if ((dwError != ERR_SUCCESS) &&
591 ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption))
592 goto connect_cleanup;
593 }
594 else
595 {
596 if ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption)
597 {
598 dwError = ERR_ENCRYPTION_REQUIRED;
599 goto connect_cleanup;
600 }
601 }
602 }
603
604 // Authenticate itself to agent
605 if ((dwError = authenticate(m_useProxy && !secondPass)) != ERR_SUCCESS)
606 {
607 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
608 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
609 {
610 forceEncryption = true;
611 goto setup_encryption;
612 }
613 debugPrintf(5, _T("Authentication to agent %s failed (%s)"), m_addr.toString(szBuffer),
614 AgentErrorCodeToText(dwError));
615 goto connect_cleanup;
616 }
617
618 // Test connectivity and inform agent about server capabilities
619 if ((dwError = setServerCapabilities()) != ERR_SUCCESS)
620 {
621 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
622 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
623 {
624 forceEncryption = true;
625 goto setup_encryption;
626 }
627 if (dwError != ERR_UNKNOWN_COMMAND) // Older agents may not support enable IPv6 command
628 {
629 debugPrintf(5, _T("Communication with agent %s failed (%s)"), m_addr.toString(szBuffer), AgentErrorCodeToText(dwError));
630 goto connect_cleanup;
631 }
632 }
633
634 if (m_useProxy && !secondPass)
635 {
636 dwError = setupProxyConnection();
637 if (dwError != ERR_SUCCESS)
638 goto connect_cleanup;
639 lock();
640 if (m_pCtx != NULL)
641 {
642 m_pCtx->decRefCount();
643 m_pCtx = NULL;
644 }
645 unlock();
646
647 debugPrintf(6, _T("Proxy connection established"));
648
649 // Renegotiate NXCP version with actual target agent
650 NXCP_MESSAGE msg;
651 msg.id = 0;
652 msg.numFields = 0;
653 msg.size = htonl(NXCP_HEADER_SIZE);
654 msg.code = htons(CMD_GET_NXCP_CAPS);
655 msg.flags = htons(MF_CONTROL);
656 if (m_channel->send(&msg, NXCP_HEADER_SIZE, m_mutexSocketWrite) == NXCP_HEADER_SIZE)
657 {
658 NXCP_MESSAGE *rsp = m_pMsgWaitQueue->waitForRawMessage(CMD_NXCP_CAPS, 0, m_dwCommandTimeout);
659 if (rsp != NULL)
660 {
661 m_nProtocolVersion = rsp->numFields >> 24;
662 free(rsp);
663 }
664 else
665 {
666 // assume that peer doesn't understand CMD_GET_NXCP_CAPS message
667 // and set version number to 1
668 m_nProtocolVersion = 1;
669 }
670 debugPrintf(6, _T("Using NXCP version %d after re-negotioation"), m_nProtocolVersion);
671 }
672 else
673 {
674 debugPrintf(6, _T("Protocol version re-negotiation failed - cannot send CMD_GET_NXCP_CAPS message"));
675 dwError = ERR_CONNECTION_BROKEN;
676 goto connect_cleanup;
677 }
678
679 secondPass = true;
680 forceEncryption = false;
681 goto setup_encryption;
682 }
683
684 if (serverId != 0)
685 setServerId(serverId);
686
687 success = true;
688 dwError = ERR_SUCCESS;
689
690 connect_cleanup:
691 if (!success)
692 {
693 if (pdwSocketError != NULL)
694 *pdwSocketError = (UINT32)WSAGetLastError();
695
696 lock();
697 if (m_channel != NULL)
698 m_channel->shutdown();
699 unlock();
700 ThreadJoin(m_hReceiverThread);
701 m_hReceiverThread = INVALID_THREAD_HANDLE;
702
703 lock();
704 if (m_channel != NULL)
705 {
706 m_channel->close();
707 m_channel->decRefCount();
708 m_channel = NULL;
709 }
710
711 if (m_pCtx != NULL)
712 {
713 m_pCtx->decRefCount();
714 m_pCtx = NULL;
715 }
716
717 unlock();
718 }
719 m_isConnected = success;
720 if (pdwError != NULL)
721 *pdwError = dwError;
722 return success;
723 }
724
725 /**
726 * Disconnect from agent
727 */
728 void AgentConnection::disconnect()
729 {
730 debugPrintf(6, _T("disconnect() called"));
731 lock();
732 if (m_hCurrFile != -1)
733 {
734 _close(m_hCurrFile);
735 m_hCurrFile = -1;
736 onFileDownload(false);
737 }
738 else if (m_sendToClientMessageCallback != NULL)
739 {
740 m_sendToClientMessageCallback = NULL;
741 onFileDownload(false);
742 }
743
744 if (m_channel != NULL)
745 {
746 m_channel->shutdown();
747 m_channel->decRefCount();
748 m_channel = NULL;
749 }
750 m_isConnected = false;
751 unlock();
752 debugPrintf(6, _T("Disconnect completed"));
753 }
754
755 /**
756 * Set authentication data
757 */
758 void AgentConnection::setAuthData(int method, const TCHAR *secret)
759 {
760 m_iAuthMethod = method;
761 #ifdef UNICODE
762 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
763 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
764 #else
765 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
766 #endif
767 }
768
769 /**
770 * Get interface list from agent
771 */
772 InterfaceList *AgentConnection::getInterfaceList()
773 {
774 StringList *data;
775 if (getList(_T("Net.InterfaceList"), &data) != ERR_SUCCESS)
776 return NULL;
777
778 InterfaceList *pIfList = new InterfaceList(data->size());
779
780 // Parse result set. Each line should have the following format:
781 // index ip_address/mask_bits iftype mac_address name
782 for(int i = 0; i < data->size(); i++)
783 {
784 TCHAR *line = _tcsdup(data->get(i));
785 TCHAR *pBuf = line;
786 UINT32 ifIndex = 0;
787
788 // Index
789 TCHAR *pChar = _tcschr(pBuf, ' ');
790 if (pChar != NULL)
791 {
792 *pChar = 0;
793 ifIndex = _tcstoul(pBuf, NULL, 10);
794 pBuf = pChar + 1;
795 }
796
797 bool newInterface = false;
798 InterfaceInfo *iface = pIfList->findByIfIndex(ifIndex);
799 if (iface == NULL)
800 {
801 iface = new InterfaceInfo(ifIndex);
802 newInterface = true;
803 }
804
805 // Address and mask
806 pChar = _tcschr(pBuf, _T(' '));
807 if (pChar != NULL)
808 {
809 TCHAR *pSlash;
810 static TCHAR defaultMask[] = _T("24");
811
812 *pChar = 0;
813 pSlash = _tcschr(pBuf, _T('/'));
814 if (pSlash != NULL)
815 {
816 *pSlash = 0;
817 pSlash++;
818 }
819 else // Just a paranoia protection, should'n happen if agent working correctly
820 {
821 pSlash = defaultMask;
822 }
823 InetAddress addr = InetAddress::parse(pBuf);
824 if (addr.isValid())
825 {
826 addr.setMaskBits(_tcstol(pSlash, NULL, 10));
827 // Agent may return 0.0.0.0/0 for interfaces without IP address
828 if ((addr.getFamily() != AF_INET) || (addr.getAddressV4() != 0))
829 iface->ipAddrList.add(addr);
830 }
831 pBuf = pChar + 1;
832 }
833
834 if (newInterface)
835 {
836 // Interface type
837 pChar = _tcschr(pBuf, ' ');
838 if (pChar != NULL)
839 {
840 *pChar = 0;
841
842 TCHAR *eptr;
843 iface->type = _tcstoul(pBuf, &eptr, 10);
844
845 // newer agents can return if_type(mtu)
846 if (*eptr == _T('('))
847 {
848 pBuf = eptr + 1;
849 eptr = _tcschr(pBuf, _T(')'));
850 if (eptr != NULL)
851 {
852 *eptr = 0;
853 iface->mtu = _tcstol(pBuf, NULL, 10);
854 }
855 }
856
857 pBuf = pChar + 1;
858 }
859
860 // MAC address
861 pChar = _tcschr(pBuf, ' ');
862 if (pChar != NULL)
863 {
864 *pChar = 0;
865 StrToBin(pBuf, iface->macAddr, MAC_ADDR_LENGTH);
866 pBuf = pChar + 1;
867 }
868
869 // Name (set description to name)
870 _tcslcpy(iface->name, pBuf, MAX_DB_STRING);
871 _tcslcpy(iface->description, pBuf, MAX_DB_STRING);
872
873 pIfList->add(iface);
874 }
875 free(line);
876 }
877
878 delete data;
879 return pIfList;
880 }
881
882 /**
883 * Get parameter value
884 */
885 UINT32 AgentConnection::getParameter(const TCHAR *pszParam, UINT32 dwBufSize, TCHAR *pszBuffer)
886 {
887 if (!m_isConnected)
888 return ERR_NOT_CONNECTED;
889
890 NXCPMessage msg(m_nProtocolVersion);
891 UINT32 dwRqId = generateRequestId();
892 msg.setCode(CMD_GET_PARAMETER);
893 msg.setId(dwRqId);
894 msg.setField(VID_PARAMETER, pszParam);
895
896 UINT32 dwRetCode;
897 if (sendMessage(&msg))
898 {
899 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
900 if (response != NULL)
901 {
902 dwRetCode = response->getFieldAsUInt32(VID_RCC);
903 if (dwRetCode == ERR_SUCCESS)
904 {
905 if (response->isFieldExist(VID_VALUE))
906 {
907 response->getFieldAsString(VID_VALUE, pszBuffer, dwBufSize);
908 }
909 else
910 {
911 dwRetCode = ERR_MALFORMED_RESPONSE;
912 debugPrintf(3, _T("Malformed response to CMD_GET_PARAMETER"));
913 }
914 }
915 delete response;
916 }
917 else
918 {
919 dwRetCode = ERR_REQUEST_TIMEOUT;
920 }
921 }
922 else
923 {
924 dwRetCode = ERR_CONNECTION_BROKEN;
925 }
926 return dwRetCode;
927 }
928
929 /**
930 * Get ARP cache
931 */
932 ARP_CACHE *AgentConnection::getArpCache()
933 {
934 StringList *data;
935 if (getList(_T("Net.ArpCache"), &data) != ERR_SUCCESS)
936 return NULL;
937
938 // Create empty structure
939 ARP_CACHE *pArpCache = (ARP_CACHE *)malloc(sizeof(ARP_CACHE));
940 pArpCache->dwNumEntries = data->size();
941 pArpCache->pEntries = (ARP_ENTRY *)calloc(data->size(), sizeof(ARP_ENTRY));
942
943 TCHAR szByte[4], *pBuf, *pChar;
944 szByte[2] = 0;
945
946 // Parse data lines
947 // Each line has form of XXXXXXXXXXXX a.b.c.d n
948 // where XXXXXXXXXXXX is a MAC address (12 hexadecimal digits)
949 // a.b.c.d is an IP address in decimal dotted notation
950 // n is an interface index
951 for(int i = 0; i < data->size(); i++)
952 {
953 TCHAR *line = _tcsdup(data->get(i));
954 pBuf = line;
955 if (_tcslen(pBuf) < 20) // Invalid line
956 continue;
957
958 // MAC address
959 for(int j = 0; j < 6; j++)
960 {
961 memcpy(szByte, pBuf, sizeof(TCHAR) * 2);
962 pArpCache->pEntries[i].bMacAddr[j] = (BYTE)_tcstol(szByte, NULL, 16);
963 pBuf += 2;
964 }
965
966 // IP address
967 while(*pBuf == ' ')
968 pBuf++;
969 pChar = _tcschr(pBuf, _T(' '));
970 if (pChar != NULL)
971 *pChar = 0;
972 pArpCache->pEntries[i].ipAddr = ntohl(_t_inet_addr(pBuf));
973
974 // Interface index
975 if (pChar != NULL)
976 pArpCache->pEntries[i].dwIndex = _tcstoul(pChar + 1, NULL, 10);
977
978 free(line);
979 }
980
981 delete data;
982 return pArpCache;
983 }
984
985 /**
986 * Send dummy command to agent (can be used for keepalive)
987 */
988 UINT32 AgentConnection::nop()
989 {
990 if (!m_isConnected)
991 return ERR_CONNECTION_BROKEN;
992
993 NXCPMessage msg(m_nProtocolVersion);
994 UINT32 dwRqId;
995
996 dwRqId = generateRequestId();
997 msg.setCode(CMD_KEEPALIVE);
998 msg.setId(dwRqId);
999 if (sendMessage(&msg))
1000 return waitForRCC(dwRqId, m_dwCommandTimeout);
1001 else
1002 return ERR_CONNECTION_BROKEN;
1003 }
1004
1005 /**
1006 * inform agent about server capabilities
1007 */
1008 UINT32 AgentConnection::setServerCapabilities()
1009 {
1010 NXCPMessage msg(m_nProtocolVersion);
1011 UINT32 dwRqId = generateRequestId();
1012 msg.setCode(CMD_SET_SERVER_CAPABILITIES);
1013 msg.setField(VID_ENABLED, (INT16)1); // Enables IPv6 on pre-2.0 agents
1014 msg.setField(VID_IPV6_SUPPORT, (INT16)1);
1015 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
1016 msg.setField(VID_ENABLE_COMPRESSION, (INT16)(m_allowCompression ? 1 : 0));
1017 msg.setId(dwRqId);
1018 if (sendMessage(&msg))
1019 return waitForRCC(dwRqId, m_dwCommandTimeout);
1020 else
1021 return ERR_CONNECTION_BROKEN;
1022 }
1023
1024 /**
1025 * Set server ID
1026 */
1027 UINT32 AgentConnection::setServerId(UINT64 serverId)
1028 {
1029 NXCPMessage msg(m_nProtocolVersion);
1030 UINT32 dwRqId = generateRequestId();
1031 msg.setCode(CMD_SET_SERVER_ID);
1032 msg.setField(VID_SERVER_ID, serverId);
1033 msg.setId(dwRqId);
1034 if (sendMessage(&msg))
1035 return waitForRCC(dwRqId, m_dwCommandTimeout);
1036 else
1037 return ERR_CONNECTION_BROKEN;
1038 }
1039
1040 /**
1041 * Wait for request completion code
1042 */
1043 UINT32 AgentConnection::waitForRCC(UINT32 dwRqId, UINT32 dwTimeOut)
1044 {
1045 UINT32 rcc;
1046 NXCPMessage *response = m_pMsgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, dwTimeOut);
1047 if (response != NULL)
1048 {
1049 rcc = response->getFieldAsUInt32(VID_RCC);
1050 delete response;
1051 }
1052 else
1053 {
1054 rcc = ERR_REQUEST_TIMEOUT;
1055 }
1056 return rcc;
1057 }
1058
1059 /**
1060 * Send message to agent
1061 */
1062 bool AgentConnection::sendMessage(NXCPMessage *pMsg)
1063 {
1064 AbstractCommChannel *channel = acquireChannel();
1065 if (channel == NULL)
1066 return false;
1067
1068 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
1069 {
1070 TCHAR buffer[64];
1071 debugPrintf(6, _T("Sending message %s (%d) to agent at %s"),
1072 NXCPMessageCodeName(pMsg->getCode(), buffer), pMsg->getId(), (const TCHAR *)m_addr.toString());
1073 }
1074
1075 bool success;
1076 NXCP_MESSAGE *rawMsg = pMsg->serialize(m_allowCompression);
1077 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1078 if (pCtx != NULL)
1079 {
1080 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1081 if (pEnMsg != NULL)
1082 {
1083 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1084 free(pEnMsg);
1085 }
1086 else
1087 {
1088 success = false;
1089 }
1090 pCtx->decRefCount();
1091 }
1092 else
1093 {
1094 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1095 }
1096 free(rawMsg);
1097 channel->decRefCount();
1098 return success;
1099 }
1100
1101 /**
1102 * Send raw message to agent
1103 */
1104 bool AgentConnection::sendRawMessage(NXCP_MESSAGE *pMsg)
1105 {
1106 AbstractCommChannel *channel = acquireChannel();
1107 if (channel == NULL)
1108 return false;
1109
1110 bool success;
1111 NXCP_MESSAGE *rawMsg = pMsg;
1112 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1113 if (pCtx != NULL)
1114 {
1115 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1116 if (pEnMsg != NULL)
1117 {
1118 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1119 free(pEnMsg);
1120 }
1121 else
1122 {
1123 success = false;
1124 }
1125 pCtx->decRefCount();
1126 }
1127 else
1128 {
1129 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1130 }
1131 channel->decRefCount();
1132 return success;
1133 }
1134
1135 /**
1136 * Callback for processing incoming event on separate thread
1137 */
1138 void AgentConnection::onTrapCallback(NXCPMessage *msg)
1139 {
1140 onTrap(msg);
1141 delete msg;
1142 decInternalRefCount();
1143 }
1144
1145 /**
1146 * Trap handler. Should be overriden in derived classes to implement
1147 * actual trap processing. Default implementation do nothing.
1148 */
1149 void AgentConnection::onTrap(NXCPMessage *pMsg)
1150 {
1151 }
1152
1153 /**
1154 * Callback for processing incoming syslog message on separate thread
1155 */
1156 void AgentConnection::onSyslogMessageCallback(NXCPMessage *msg)
1157 {
1158 onSyslogMessage(msg);
1159 delete msg;
1160 decInternalRefCount();
1161 }
1162
1163 /**
1164 * Syslog message handler. Should be overriden in derived classes to implement
1165 * actual message processing. Default implementation do nothing.
1166 */
1167 void AgentConnection::onSyslogMessage(NXCPMessage *pMsg)
1168 {
1169 }
1170
1171 /**
1172 * Callback for processing data push on separate thread
1173 */
1174 void AgentConnection::onDataPushCallback(NXCPMessage *msg)
1175 {
1176 onDataPush(msg);
1177 delete msg;
1178 decInternalRefCount();
1179 }
1180
1181 /**
1182 * Data push handler. Should be overriden in derived classes to implement
1183 * actual data push processing. Default implementation do nothing.
1184 */
1185 void AgentConnection::onDataPush(NXCPMessage *pMsg)
1186 {
1187 }
1188
1189 /**
1190 * Monitoring data handler. Should be overriden in derived classes to implement
1191 * actual monitoring data processing. Default implementation do nothing.
1192 */
1193 void AgentConnection::onFileMonitoringData(NXCPMessage *pMsg)
1194 {
1195 }
1196
1197 /**
1198 * Callback for processing data push on separate thread
1199 */
1200 void AgentConnection::onSnmpTrapCallback(NXCPMessage *msg)
1201 {
1202 onSnmpTrap(msg);
1203 delete msg;
1204 decInternalRefCount();
1205 }
1206
1207 /**
1208 * SNMP trap handler. Should be overriden in derived classes to implement
1209 * actual SNMP trap processing. Default implementation do nothing.
1210 */
1211 void AgentConnection::onSnmpTrap(NXCPMessage *pMsg)
1212 {
1213 }
1214
1215 /**
1216 * Custom message handler
1217 * If returns true, message considered as processed and will not be placed in wait queue
1218 */
1219 bool AgentConnection::processCustomMessage(NXCPMessage *pMsg)
1220 {
1221 return false;
1222 }
1223
1224 /**
1225 * Get list of values
1226 */
1227 UINT32 AgentConnection::getList(const TCHAR *param, StringList **list)
1228 {
1229 UINT32 rcc;
1230 *list = NULL;
1231 if (m_isConnected)
1232 {
1233 NXCPMessage msg(CMD_GET_LIST, generateRequestId(), m_nProtocolVersion);
1234 msg.setField(VID_PARAMETER, param);
1235 if (sendMessage(&msg))
1236 {
1237 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId(), m_dwCommandTimeout);
1238 if (response != NULL)
1239 {
1240 rcc = response->getFieldAsUInt32(VID_RCC);
1241 if (rcc == ERR_SUCCESS)
1242 {
1243 *list = new StringList();
1244 int count = response->getFieldAsInt32(VID_NUM_STRINGS);
1245 for(int i = 0; i < count; i++)
1246 (*list)->addPreallocated(response->getFieldAsString(VID_ENUM_VALUE_BASE + i));
1247 }
1248 delete response;
1249 }
1250 else
1251 {
1252 rcc = ERR_REQUEST_TIMEOUT;
1253 }
1254 }
1255 else
1256 {
1257 rcc = ERR_CONNECTION_BROKEN;
1258 }
1259 }
1260 else
1261 {
1262 rcc = ERR_NOT_CONNECTED;
1263 }
1264
1265 return rcc;
1266 }
1267
1268 /**
1269 * Get table
1270 */
1271 UINT32 AgentConnection::getTable(const TCHAR *pszParam, Table **table)
1272 {
1273 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1274 UINT32 dwRqId, dwRetCode;
1275
1276 *table = NULL;
1277 if (m_isConnected)
1278 {
1279 dwRqId = generateRequestId();
1280 msg.setCode(CMD_GET_TABLE);
1281 msg.setId(dwRqId);
1282 msg.setField(VID_PARAMETER, pszParam);
1283 if (sendMessage(&msg))
1284 {
1285 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1286 if (pResponse != NULL)
1287 {
1288 dwRetCode = pResponse->getFieldAsUInt32(VID_RCC);
1289 if (dwRetCode == ERR_SUCCESS)
1290 {
1291 *table = new Table(pResponse);
1292 }
1293 delete pResponse;
1294 }
1295 else
1296 {
1297 dwRetCode = ERR_REQUEST_TIMEOUT;
1298 }
1299 }
1300 else
1301 {
1302 dwRetCode = ERR_CONNECTION_BROKEN;
1303 }
1304 }
1305 else
1306 {
1307 dwRetCode = ERR_NOT_CONNECTED;
1308 }
1309
1310 return dwRetCode;
1311 }
1312
1313 /**
1314 * Authenticate to agent
1315 */
1316 UINT32 AgentConnection::authenticate(BOOL bProxyData)
1317 {
1318 NXCPMessage msg(m_nProtocolVersion);
1319 UINT32 dwRqId;
1320 BYTE hash[32];
1321 int iAuthMethod = bProxyData ? m_iProxyAuth : m_iAuthMethod;
1322 const char *pszSecret = bProxyData ? m_szProxySecret : m_szSecret;
1323 #ifdef UNICODE
1324 WCHAR szBuffer[MAX_SECRET_LENGTH];
1325 #endif
1326
1327 if (iAuthMethod == AUTH_NONE)
1328 return ERR_SUCCESS; // No authentication required
1329
1330 dwRqId = generateRequestId();
1331 msg.setCode(CMD_AUTHENTICATE);
1332 msg.setId(dwRqId);
1333 msg.setField(VID_AUTH_METHOD, (WORD)iAuthMethod);
1334 switch(iAuthMethod)
1335 {
1336 case AUTH_PLAINTEXT:
1337 #ifdef UNICODE
1338 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, pszSecret, -1, szBuffer, MAX_SECRET_LENGTH);
1339 msg.setField(VID_SHARED_SECRET, szBuffer);
1340 #else
1341 msg.setField(VID_SHARED_SECRET, pszSecret);
1342 #endif
1343 break;
1344 case AUTH_MD5_HASH:
1345 CalculateMD5Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1346 msg.setField(VID_SHARED_SECRET, hash, MD5_DIGEST_SIZE);
1347 break;
1348 case AUTH_SHA1_HASH:
1349 CalculateSHA1Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1350 msg.setField(VID_SHARED_SECRET, hash, SHA1_DIGEST_SIZE);
1351 break;
1352 default:
1353 break;
1354 }
1355 if (sendMessage(&msg))
1356 return waitForRCC(dwRqId, m_dwCommandTimeout);
1357 else
1358 return ERR_CONNECTION_BROKEN;
1359 }
1360
1361 /**
1362 * Execute action on agent
1363 */
1364 UINT32 AgentConnection::execAction(const TCHAR *action, const StringList &list,
1365 bool withOutput, void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *), void *cbData)
1366 {
1367 NXCPMessage msg(m_nProtocolVersion);
1368 UINT32 dwRqId;
1369 int i;
1370
1371 if (!m_isConnected)
1372 return ERR_NOT_CONNECTED;
1373
1374 dwRqId = generateRequestId();
1375 msg.setCode(CMD_ACTION);
1376 msg.setId(dwRqId);
1377 msg.setField(VID_ACTION_NAME, action);
1378 msg.setField(VID_RECEIVE_OUTPUT, (UINT16)(withOutput ? 1 : 0));
1379 msg.setField(VID_NUM_ARGS, (UINT32)list.size());
1380 for(i = 0; i < list.size(); i++)
1381 msg.setField(VID_ACTION_ARG_BASE + i, list.get(i));
1382
1383 if (sendMessage(&msg))
1384 {
1385 if (withOutput)
1386 {
1387 UINT32 rcc = waitForRCC(dwRqId, m_dwCommandTimeout);
1388 if (rcc == ERR_SUCCESS)
1389 {
1390 outputCallback(ACE_CONNECTED, NULL, cbData); // Indicate successful start
1391 bool eos = false;
1392 while(!eos)
1393 {
1394 NXCPMessage *response = waitForMessage(CMD_COMMAND_OUTPUT, dwRqId, m_dwCommandTimeout * 10);
1395 if (response != NULL)
1396 {
1397 eos = response->isEndOfSequence();
1398 if (response->isFieldExist(VID_MESSAGE))
1399 {
1400 TCHAR line[4096];
1401 response->getFieldAsString(VID_MESSAGE, line, 4096);
1402 outputCallback(ACE_DATA, line, cbData);
1403 }
1404 delete response;
1405 }
1406 else
1407 {
1408 return ERR_REQUEST_TIMEOUT;
1409 }
1410 }
1411 outputCallback(ACE_DISCONNECTED, NULL, cbData);
1412 return ERR_SUCCESS;
1413 }
1414 else
1415 {
1416 return rcc;
1417 }
1418 }
1419 else
1420 {
1421 return waitForRCC(dwRqId, m_dwCommandTimeout);
1422 }
1423 }
1424 else
1425 {
1426 return ERR_CONNECTION_BROKEN;
1427 }
1428 }
1429
1430 /**
1431 * Upload file to agent
1432 */
1433 UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinationFile, void (* progressCallback)(INT64, void *), void *cbArg, NXCPStreamCompressionMethod compMethod)
1434 {
1435 UINT32 dwRqId, dwResult;
1436 NXCPMessage msg(m_nProtocolVersion);
1437
1438 // Disable compression if it is disabled on connection level or if agent do not support it
1439 if (!m_allowCompression || (m_nProtocolVersion < 4))
1440 compMethod = NXCP_STREAM_COMPRESSION_NONE;
1441
1442 if (!m_isConnected)
1443 return ERR_NOT_CONNECTED;
1444
1445 dwRqId = generateRequestId();
1446 msg.setId(dwRqId);
1447
1448 time_t lastModTime = 0;
1449 NX_STAT_STRUCT st;
1450 if (CALL_STAT(localFile, &st) == 0)
1451 {
1452 lastModTime = st.st_mtime;
1453 }
1454
1455 // Use core agent if destination file name is not set and file manager subagent otherwise
1456 if ((destinationFile == NULL) || (*destinationFile == 0))
1457 {
1458 msg.setCode(CMD_TRANSFER_FILE);
1459 int i;
1460 for(i = (int)_tcslen(localFile) - 1;
1461 (i >= 0) && (localFile[i] != '\\') && (localFile[i] != '/'); i--);
1462 msg.setField(VID_FILE_NAME, &localFile[i + 1]);
1463 }
1464 else
1465 {
1466 msg.setCode(CMD_FILEMGR_UPLOAD);
1467 msg.setField(VID_OVERVRITE, true);
1468 msg.setField(VID_FILE_NAME, destinationFile);
1469 }
1470 msg.setFieldFromTime(VID_MODIFICATION_TIME, lastModTime);
1471
1472 if (sendMessage(&msg))
1473 {
1474 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1475 }
1476 else
1477 {
1478 dwResult = ERR_CONNECTION_BROKEN;
1479 }
1480
1481 if (dwResult == ERR_SUCCESS)
1482 {
1483 AbstractCommChannel *channel = acquireChannel();
1484 if (channel != NULL)
1485 {
1486 debugPrintf(5, _T("Sending file \"%s\" to agent %s compression"),
1487 localFile, (compMethod == NXCP_STREAM_COMPRESSION_NONE) ? _T("without") : _T("with"));
1488 m_fileUploadInProgress = true;
1489 NXCPEncryptionContext *ctx = acquireEncryptionContext();
1490 if (SendFileOverNXCP(channel, dwRqId, localFile, ctx, 0, progressCallback, cbArg, m_mutexSocketWrite, compMethod))
1491 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1492 else
1493 dwResult = ERR_IO_FAILURE;
1494 m_fileUploadInProgress = false;
1495 if (ctx != NULL)
1496 ctx->decRefCount();
1497 channel->decRefCount();
1498 }
1499 else
1500 {
1501 dwResult = ERR_CONNECTION_BROKEN;
1502 }
1503 }
1504
1505 return dwResult;
1506 }
1507
1508 /**
1509 * Send upgrade command
1510 */
1511 UINT32 AgentConnection::startUpgrade(const TCHAR *pszPkgName)
1512 {
1513 UINT32 dwRqId, dwResult;
1514 NXCPMessage msg(m_nProtocolVersion);
1515 int i;
1516
1517 if (!m_isConnected)
1518 return ERR_NOT_CONNECTED;
1519
1520 dwRqId = generateRequestId();
1521
1522 msg.setCode(CMD_UPGRADE_AGENT);
1523 msg.setId(dwRqId);
1524 for(i = (int)_tcslen(pszPkgName) - 1;
1525 (i >= 0) && (pszPkgName[i] != '\\') && (pszPkgName[i] != '/'); i--);
1526 msg.setField(VID_FILE_NAME, &pszPkgName[i + 1]);
1527
1528 if (sendMessage(&msg))
1529 {
1530 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1531 }
1532 else
1533 {
1534 dwResult = ERR_CONNECTION_BROKEN;
1535 }
1536
1537 return dwResult;
1538 }
1539
1540 /**
1541 * Check status of network service via agent
1542 */
1543 UINT32 AgentConnection::checkNetworkService(UINT32 *pdwStatus, const InetAddress& addr, int iServiceType,
1544 WORD wPort, WORD wProto, const TCHAR *pszRequest,
1545 const TCHAR *pszResponse, UINT32 *responseTime)
1546 {
1547 UINT32 dwRqId, dwResult;
1548 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1549 static WORD m_wDefaultPort[] = { 7, 22, 110, 25, 21, 80, 443, 23 };
1550
1551 if (!m_isConnected)
1552 return ERR_NOT_CONNECTED;
1553
1554 dwRqId = generateRequestId();
1555
1556 msg.setCode(CMD_CHECK_NETWORK_SERVICE);
1557 msg.setId(dwRqId);
1558 msg.setField(VID_IP_ADDRESS, addr);
1559 msg.setField(VID_SERVICE_TYPE, (WORD)iServiceType);
1560 msg.setField(VID_IP_PORT,
1561 (wPort != 0) ? wPort :
1562 m_wDefaultPort[((iServiceType >= NETSRV_CUSTOM) &&
1563 (iServiceType <= NETSRV_TELNET)) ? iServiceType : 0]);
1564 msg.setField(VID_IP_PROTO, (wProto != 0) ? wProto : (WORD)IPPROTO_TCP);
1565 msg.setField(VID_SERVICE_REQUEST, pszRequest);
1566 msg.setField(VID_SERVICE_RESPONSE, pszResponse);
1567
1568 if (sendMessage(&msg))
1569 {
1570 // Wait up to 90 seconds for results
1571 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, 90000);
1572 if (pResponse != NULL)
1573 {
1574 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1575 if (dwResult == ERR_SUCCESS)
1576 {
1577 *pdwStatus = pResponse->getFieldAsUInt32(VID_SERVICE_STATUS);
1578 if (responseTime != NULL)
1579 {
1580 *responseTime = pResponse->getFieldAsUInt32(VID_RESPONSE_TIME);
1581 }
1582 }
1583 delete pResponse;
1584 }
1585 else
1586 {
1587 dwResult = ERR_REQUEST_TIMEOUT;
1588 }
1589 }
1590 else
1591 {
1592 dwResult = ERR_CONNECTION_BROKEN;
1593 }
1594
1595 return dwResult;
1596 }
1597
1598 /**
1599 * Get list of supported parameters from agent
1600 */
1601 UINT32 AgentConnection::getSupportedParameters(ObjectArray<AgentParameterDefinition> **paramList, ObjectArray<AgentTableDefinition> **tableList)
1602 {
1603 UINT32 dwRqId, dwResult;
1604 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1605
1606 *paramList = NULL;
1607 *tableList = NULL;
1608
1609 if (!m_isConnected)
1610 return ERR_NOT_CONNECTED;
1611
1612 dwRqId = generateRequestId();
1613
1614 msg.setCode(CMD_GET_PARAMETER_LIST);
1615 msg.setId(dwRqId);
1616
1617 if (sendMessage(&msg))
1618 {
1619 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1620 if (pResponse != NULL)
1621 {
1622 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1623 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): RCC=%d"), dwResult);
1624 if (dwResult == ERR_SUCCESS)
1625 {
1626 UINT32 count = pResponse->getFieldAsUInt32(VID_NUM_PARAMETERS);
1627 ObjectArray<AgentParameterDefinition> *plist = new ObjectArray<AgentParameterDefinition>(count, 16, true);
1628 for(UINT32 i = 0, id = VID_PARAM_LIST_BASE; i < count; i++)
1629 {
1630 plist->add(new AgentParameterDefinition(pResponse, id));
1631 id += 3;
1632 }
1633 *paramList = plist;
1634 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d parameters received from agent"), count);
1635
1636 count = pResponse->getFieldAsUInt32(VID_NUM_TABLES);
1637 ObjectArray<AgentTableDefinition> *tlist = new ObjectArray<AgentTableDefinition>(count, 16, true);
1638 for(UINT32 i = 0, id = VID_TABLE_LIST_BASE; i < count; i++)
1639 {
1640 tlist->add(new AgentTableDefinition(pResponse, id));
1641 id += 3;
1642 }
1643 *tableList = tlist;
1644 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d tables received from agent"), count);
1645 }
1646 delete pResponse;
1647 }
1648 else
1649 {
1650 dwResult = ERR_REQUEST_TIMEOUT;
1651 }
1652 }
1653 else
1654 {
1655 dwResult = ERR_CONNECTION_BROKEN;
1656 }
1657
1658 return dwResult;
1659 }
1660
1661 /**
1662 * Setup encryption
1663 */
1664 UINT32 AgentConnection::setupEncryption(RSA *pServerKey)
1665 {
1666 #ifdef _WITH_ENCRYPTION
1667 NXCPMessage msg(m_nProtocolVersion), *pResp;
1668 UINT32 dwRqId, dwError, dwResult;
1669
1670 dwRqId = generateRequestId();
1671
1672 PrepareKeyRequestMsg(&msg, pServerKey, false);
1673 msg.setId(dwRqId);
1674 if (sendMessage(&msg))
1675 {
1676 pResp = waitForMessage(CMD_SESSION_KEY, dwRqId, m_dwCommandTimeout);
1677 if (pResp != NULL)
1678 {
1679 dwResult = SetupEncryptionContext(pResp, &m_pCtx, NULL, pServerKey, m_nProtocolVersion);
1680 switch(dwResult)
1681 {
1682 case RCC_SUCCESS:
1683 dwError = ERR_SUCCESS;
1684 break;
1685 case RCC_NO_CIPHERS:
1686 dwError = ERR_NO_CIPHERS;
1687 break;
1688 case RCC_INVALID_PUBLIC_KEY:
1689 dwError = ERR_INVALID_PUBLIC_KEY;
1690 break;
1691 case RCC_INVALID_SESSION_KEY:
1692 dwError = ERR_INVALID_SESSION_KEY;
1693 break;
1694 default:
1695 dwError = ERR_INTERNAL_ERROR;
1696 break;
1697 }
1698 delete pResp;
1699 }
1700 else
1701 {
1702 dwError = ERR_REQUEST_TIMEOUT;
1703 }
1704 }
1705 else
1706 {
1707 dwError = ERR_CONNECTION_BROKEN;
1708 }
1709
1710 return dwError;
1711 #else
1712 return ERR_NOT_IMPLEMENTED;
1713 #endif
1714 }
1715
1716 /**
1717 * Get configuration file from agent
1718 */
1719 UINT32 AgentConnection::getConfigFile(TCHAR **ppszConfig, UINT32 *pdwSize)
1720 {
1721 *ppszConfig = NULL;
1722 *pdwSize = 0;
1723
1724 if (!m_isConnected)
1725 return ERR_NOT_CONNECTED;
1726
1727 UINT32 dwResult;
1728 UINT32 dwRqId = generateRequestId();
1729
1730 NXCPMessage msg(m_nProtocolVersion);
1731 msg.setCode(CMD_GET_AGENT_CONFIG);
1732 msg.setId(dwRqId);
1733
1734 if (sendMessage(&msg))
1735 {
1736 NXCPMessage *pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1737 if (pResponse != NULL)
1738 {
1739 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1740 if (dwResult == ERR_SUCCESS)
1741 {
1742 size_t size = pResponse->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
1743 BYTE *utf8Text = (BYTE *)malloc(size + 1);
1744 pResponse->getFieldAsBinary(VID_CONFIG_FILE, (BYTE *)utf8Text, size);
1745
1746 // We expect text file, so replace all non-printable characters with spaces
1747 for(size_t i = 0; i < size; i++)
1748 if ((utf8Text[i] < ' ') &&
1749 (utf8Text[i] != '\t') &&
1750 (utf8Text[i] != '\r') &&
1751 (utf8Text[i] != '\n'))
1752 utf8Text[i] = ' ';
1753 utf8Text[size] = 0;
1754
1755 #ifdef UNICODE
1756 *ppszConfig = WideStringFromUTF8String((char *)utf8Text);
1757 #else
1758 *ppszConfig = MBStringFromUTF8String((char *)utf8Text);
1759 #endif
1760 free(utf8Text);
1761 *pdwSize = (UINT32)_tcslen(*ppszConfig);
1762 }
1763 delete pResponse;
1764 }
1765 else
1766 {
1767 dwResult = ERR_REQUEST_TIMEOUT;
1768 }
1769 }
1770 else
1771 {
1772 dwResult = ERR_CONNECTION_BROKEN;
1773 }
1774
1775 return dwResult;
1776 }
1777
1778 /**
1779 * Update configuration file on agent
1780 */
1781 UINT32 AgentConnection::updateConfigFile(const TCHAR *pszConfig)
1782 {
1783 UINT32 dwRqId, dwResult;
1784 NXCPMessage msg(m_nProtocolVersion);
1785 #ifdef UNICODE
1786 int nChars;
1787 BYTE *pBuffer;
1788 #endif
1789
1790 if (!m_isConnected)
1791 return ERR_NOT_CONNECTED;
1792
1793 dwRqId = generateRequestId();
1794
1795 msg.setCode(CMD_UPDATE_AGENT_CONFIG);
1796 msg.setId(dwRqId);
1797 #ifdef UNICODE
1798 nChars = (int)_tcslen(pszConfig);
1799 pBuffer = (BYTE *)malloc(nChars + 1);
1800 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1801 pszConfig, nChars, (char *)pBuffer, nChars + 1, NULL, NULL);
1802 msg.setField(VID_CONFIG_FILE, pBuffer, nChars);
1803 free(pBuffer);
1804 #else
1805 msg.setField(VID_CONFIG_FILE, (BYTE *)pszConfig, (UINT32)strlen(pszConfig));
1806 #endif
1807
1808 if (sendMessage(&msg))
1809 {
1810 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1811 }
1812 else
1813 {
1814 dwResult = ERR_CONNECTION_BROKEN;
1815 }
1816
1817 return dwResult;
1818 }
1819
1820 /**
1821 * Get routing table from agent
1822 */
1823 ROUTING_TABLE *AgentConnection::getRoutingTable()
1824 {
1825 StringList *data;
1826 if (getList(_T("Net.IP.RoutingTable"), &data) != ERR_SUCCESS)
1827 return NULL;
1828
1829 ROUTING_TABLE *pRT = (ROUTING_TABLE *)malloc(sizeof(ROUTING_TABLE));
1830 pRT->iNumEntries = data->size();
1831 pRT->pRoutes = (ROUTE *)calloc(data->size(), sizeof(ROUTE));
1832 for(int i = 0; i < data->size(); i++)
1833 {
1834 TCHAR *line = _tcsdup(data->get(i));
1835 TCHAR *pBuf = line;
1836
1837 // Destination address and mask
1838 TCHAR *pChar = _tcschr(pBuf, _T(' '));
1839 if (pChar != NULL)
1840 {
1841 TCHAR *pSlash;
1842 static TCHAR defaultMask[] = _T("24");
1843
1844 *pChar = 0;
1845 pSlash = _tcschr(pBuf, _T('/'));
1846 if (pSlash != NULL)
1847 {
1848 *pSlash = 0;
1849 pSlash++;
1850 }
1851 else // Just a paranoia protection, should'n happen if agent working correctly
1852 {
1853 pSlash = defaultMask;
1854 }
1855 pRT->pRoutes[i].dwDestAddr = ntohl(_t_inet_addr(pBuf));
1856 UINT32 dwBits = _tcstoul(pSlash, NULL, 10);
1857 pRT->pRoutes[i].dwDestMask = (dwBits == 32) ? 0xFFFFFFFF : (~(0xFFFFFFFF >> dwBits));
1858 pBuf = pChar + 1;
1859 }
1860
1861 // Next hop address
1862 pChar = _tcschr(pBuf, _T(' '));
1863 if (pChar != NULL)
1864 {
1865 *pChar = 0;
1866 pRT->pRoutes[i].dwNextHop = ntohl(_t_inet_addr(pBuf));
1867 pBuf = pChar + 1;
1868 }
1869
1870 // Interface index
1871 pChar = _tcschr(pBuf, ' ');
1872 if (pChar != NULL)
1873 {
1874 *pChar = 0;
1875 pRT->pRoutes[i].dwIfIndex = _tcstoul(pBuf, NULL, 10);
1876 pBuf = pChar + 1;
1877 }
1878
1879 // Route type
1880 pRT->pRoutes[i].dwRouteType = _tcstoul(pBuf, NULL, 10);
1881
1882 free(line);
1883 }
1884
1885 delete data;
1886 return pRT;
1887 }
1888
1889 /**
1890 * Set proxy information
1891 */
1892 void AgentConnection::setProxy(InetAddress addr, WORD wPort, int iAuthMethod, const TCHAR *pszSecret)
1893 {
1894 m_proxyAddr = addr;
1895 m_wProxyPort = wPort;
1896 m_iProxyAuth = iAuthMethod;
1897 if (pszSecret != NULL)
1898 {
1899 #ifdef UNICODE
1900 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1901 pszSecret, -1, m_szProxySecret, MAX_SECRET_LENGTH, NULL, NULL);
1902 #else
1903 nx_strncpy(m_szProxySecret, pszSecret, MAX_SECRET_LENGTH);
1904 #endif
1905 }
1906 else
1907 {
1908 m_szProxySecret[0] = 0;
1909 }
1910 m_useProxy = true;
1911 }
1912
1913 /**
1914 * Setup proxy connection
1915 */
1916 UINT32 AgentConnection::setupProxyConnection()
1917 {
1918 NXCPMessage msg(m_nProtocolVersion);
1919 UINT32 dwRqId;
1920
1921 dwRqId = generateRequestId();
1922 msg.setCode(CMD_SETUP_PROXY_CONNECTION);
1923 msg.setId(dwRqId);
1924 msg.setField(VID_IP_ADDRESS, m_addr.getAddressV4()); // FIXME: V6 support in proxy
1925 msg.setField(VID_AGENT_PORT, m_wPort);
1926 if (sendMessage(&msg))
1927 return waitForRCC(dwRqId, 60000); // Wait 60 seconds for remote connect
1928 else
1929 return ERR_CONNECTION_BROKEN;
1930 }
1931
1932 /**
1933 * Enable trap receiving on connection
1934 */
1935 UINT32 AgentConnection::enableTraps()
1936 {
1937 NXCPMessage msg(m_nProtocolVersion);
1938 UINT32 dwRqId;
1939
1940 dwRqId = generateRequestId();
1941 msg.setCode(CMD_ENABLE_AGENT_TRAPS);
1942 msg.setId(dwRqId);
1943 if (sendMessage(&msg))
1944 return waitForRCC(dwRqId, m_dwCommandTimeout);
1945 else
1946 return ERR_CONNECTION_BROKEN;
1947 }
1948
1949 /**
1950 * Enable trap receiving on connection
1951 */
1952 UINT32 AgentConnection::enableFileUpdates()
1953 {
1954 NXCPMessage msg(m_nProtocolVersion);
1955 UINT32 dwRqId;
1956
1957 dwRqId = generateRequestId();
1958 msg.setCode(CMD_ENABLE_FILE_UPDATES);
1959 msg.setId(dwRqId);
1960 if (sendMessage(&msg))
1961 {
1962 return waitForRCC(dwRqId, m_dwCommandTimeout);
1963 }
1964 else
1965 return ERR_CONNECTION_BROKEN;
1966 }
1967
1968 /**
1969 * Take screenshot from remote system
1970 */
1971 UINT32 AgentConnection::takeScreenshot(const TCHAR *sessionName, BYTE **data, size_t *size)
1972 {
1973 NXCPMessage msg(m_nProtocolVersion);
1974 UINT32 dwRqId;
1975
1976 dwRqId = generateRequestId();
1977 msg.setCode(CMD_TAKE_SCREENSHOT);
1978 msg.setId(dwRqId);
1979 msg.setField(VID_NAME, sessionName);
1980 if (sendMessage(&msg))
1981 {
1982 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1983 if (response != NULL)
1984 {
1985 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
1986 if (rcc == ERR_SUCCESS)
1987 {
1988 const BYTE *p = response->getBinaryFieldPtr(VID_FILE_DATA, size);
1989 if (p != NULL)
1990 {
1991 *data = (BYTE *)malloc(*size);
1992 memcpy(*data, p, *size);
1993 }
1994 else
1995 {
1996 *data = NULL;
1997 }
1998 }
1999 delete response;
2000 return rcc;
2001 }
2002 else
2003 {
2004 return ERR_REQUEST_TIMEOUT;
2005 }
2006 }
2007 else
2008 {
2009 return ERR_CONNECTION_BROKEN;
2010 }
2011 }
2012
2013 /**
2014 * Resolve hostname by IP address in local network
2015 */
2016 TCHAR *AgentConnection::getHostByAddr(const InetAddress& ipAddr, TCHAR *buf, size_t bufLen)
2017 {
2018 NXCPMessage msg(m_nProtocolVersion);
2019 UINT32 dwRqId;
2020
2021 dwRqId = generateRequestId();
2022 msg.setCode(CMD_HOST_BY_IP);
2023 msg.setId(dwRqId);
2024 msg.setField(VID_IP_ADDRESS, ipAddr);
2025 TCHAR *result = NULL;
2026 if (sendMessage(&msg))
2027 {
2028 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2029 if (response != NULL)
2030 {
2031 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
2032 if (rcc == ERR_SUCCESS)
2033 {
2034 result = response->getFieldAsString(VID_NAME, buf, bufLen);
2035 }
2036 delete response;
2037 return result;
2038 }
2039 else
2040 {
2041 return result;
2042 }
2043 }
2044 else
2045 {
2046 return result;
2047 }
2048 }
2049
2050 /**
2051 * Send custom request to agent
2052 */
2053 NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *recvFile, bool append,
2054 void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2055 {
2056 UINT32 dwRqId, rcc;
2057 NXCPMessage *msg = NULL;
2058
2059 dwRqId = generateRequestId();
2060 pRequest->setId(dwRqId);
2061 if (recvFile != NULL)
2062 {
2063 rcc = prepareFileDownload(recvFile, dwRqId, append, downloadProgressCallback, fileResendCallback,cbArg);
2064 if (rcc != ERR_SUCCESS)
2065 {
2066 // Create fake response message
2067 msg = new NXCPMessage;
2068 msg->setCode(CMD_REQUEST_COMPLETED);
2069 msg->setId(dwRqId);
2070 msg->setField(VID_RCC, rcc);
2071 }
2072 }
2073
2074 if (msg == NULL)
2075 {
2076 if (sendMessage(pRequest))
2077 {
2078 msg = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2079 if ((msg != NULL) && (recvFile != NULL))
2080 {
2081 if (msg->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
2082 {
2083 if (ConditionWait(m_condFileDownload, 1800000)) // 30 min timeout
2084 {
2085 if (!m_fileDownloadSucceeded)
2086 {
2087 msg->setField(VID_RCC, ERR_IO_FAILURE);
2088 if (m_deleteFileOnDownloadFailure)
2089 _tremove(recvFile);
2090 }
2091 }
2092 else
2093 {
2094 msg->setField(VID_RCC, ERR_REQUEST_TIMEOUT);
2095 }
2096 }
2097 else
2098 {
2099 if (fileResendCallback != NULL)
2100 {
2101 _close(m_hCurrFile);
2102 m_hCurrFile = -1;
2103 _tremove(recvFile);
2104 }
2105 }
2106 }
2107
2108 }
2109 }
2110
2111 return msg;
2112 }
2113
2114 /**
2115 * Prepare for file download
2116 */
2117 UINT32 AgentConnection::prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append, void (*downloadProgressCallback)(size_t, void *),
2118 void (* fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2119 {
2120 if (fileResendCallback == NULL)
2121 {
2122 if (m_hCurrFile != -1)
2123 return ERR_RESOURCE_BUSY;
2124
2125 nx_strncpy(m_currentFileName, fileName, MAX_PATH);
2126 ConditionReset(m_condFileDownload);
2127 m_hCurrFile = _topen(fileName, (append ? 0 : (O_CREAT | O_TRUNC)) | O_RDWR | O_BINARY, S_IREAD | S_IWRITE);
2128 if (m_hCurrFile == -1)
2129 {
2130 DbgPrintf(4, _T("AgentConnection::PrepareFileDownload(): cannot open file %s (%s); append=%d rqId=%d"),
2131 fileName, _tcserror(errno), append, rqId);
2132 }
2133 else
2134 {
2135 if (append)
2136 lseek(m_hCurrFile, 0, SEEK_END);
2137 }
2138
2139 m_dwDownloadRequestId = rqId;
2140 m_downloadProgressCallback = downloadProgressCallback;
2141 m_downloadProgressCallbackArg = cbArg;
2142
2143 m_sendToClientMessageCallback = NULL;
2144
2145 return (m_hCurrFile != -1) ? ERR_SUCCESS : ERR_FILE_OPEN_ERROR;
2146 }
2147 else
2148 {
2149 ConditionReset(m_condFileDownload);
2150
2151 m_dwDownloadRequestId = rqId;
2152 m_downloadProgressCallback = downloadProgressCallback;
2153 m_downloadProgressCallbackArg = cbArg;
2154
2155 m_sendToClientMessageCallback = fileResendCallback;
2156
2157 return ERR_SUCCESS;
2158 }
2159 }
2160
2161 /**
2162 * File upload completion handler
2163 */
2164 void AgentConnection::onFileDownload(bool success)
2165 {
2166 if (!success && m_deleteFileOnDownloadFailure)
2167 _tremove(m_currentFileName);
2168 m_fileDownloadSucceeded = success;
2169 ConditionSet(m_condFileDownload);
2170 }
2171
2172 /**
2173 * Enable trap receiving on connection
2174 */
2175 UINT32 AgentConnection::getPolicyInventory(AgentPolicyInfo **info)
2176 {
2177 NXCPMessage msg(m_nProtocolVersion);
2178 UINT32 dwRqId, rcc;
2179
2180 *info = NULL;
2181 dwRqId = generateRequestId();
2182 msg.setCode(CMD_GET_POLICY_INVENTORY);
2183 msg.setId(dwRqId);
2184 if (sendMessage(&msg))
2185 {
2186 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2187 if (response != NULL)
2188 {
2189 rcc = response->getFieldAsUInt32(VID_RCC);
2190 if (rcc == ERR_SUCCESS)
2191 *info = new AgentPolicyInfo(response);
2192 delete response;
2193 }
2194 else
2195 {
2196 rcc = ERR_REQUEST_TIMEOUT;
2197 }
2198 }
2199 else
2200 {
2201 rcc = ERR_CONNECTION_BROKEN;
2202 }
2203 return rcc;
2204 }
2205
2206 /**
2207 * Uninstall policy by GUID
2208 */
2209 UINT32 AgentConnection::uninstallPolicy(const uuid& guid)
2210 {
2211 UINT32 rqId, rcc;
2212 NXCPMessage msg(m_nProtocolVersion);
2213
2214 rqId = generateRequestId();
2215 msg.setId(rqId);
2216 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
2217 msg.setField(VID_GUID, guid);
2218 if (sendMessage(&msg))
2219 {
2220 rcc = waitForRCC(rqId, m_dwCommandTimeout);
2221 }
2222 else
2223 {
2224 rcc = ERR_CONNECTION_BROKEN;
2225 }
2226 return rcc;
2227 }
2228
2229 /**
2230 * Acquire encryption context
2231 */
2232 NXCPEncryptionContext *AgentConnection::acquireEncryptionContext()
2233 {
2234 lock();
2235 NXCPEncryptionContext *ctx = m_pCtx;
2236 if (ctx != NULL)
2237 ctx->incRefCount();
2238 unlock();
2239 return ctx;
2240 }
2241
2242 /**
2243 * Callback for processing collected data on separate thread
2244 */
2245 void AgentConnection::processCollectedDataCallback(NXCPMessage *msg)
2246 {
2247 NXCPMessage response;
2248 response.setCode(CMD_REQUEST_COMPLETED);
2249 response.setId(msg->getId());
2250
2251 if (msg->getFieldAsBoolean(VID_BULK_RECONCILIATION))
2252 {
2253 UINT32 rcc = processBulkCollectedData(msg, &response);
2254 response.setField(VID_RCC, rcc);
2255 }
2256 else
2257 {
2258 UINT32 rcc = processCollectedData(msg);
2259 response.setField(VID_RCC, rcc);
2260 }
2261
2262 sendMessage(&response);
2263 delete msg;
2264 decInternalRefCount();
2265 }
2266
2267 /**
2268 * Process collected data information (for DCI with agent-side cache)
2269 */
2270 UINT32 AgentConnection::processCollectedData(NXCPMessage *msg)
2271 {
2272 return ERR_NOT_IMPLEMENTED;
2273 }
2274
2275 /**
2276 * Process collected data information in bulk mode (for DCI with agent-side cache)
2277 */
2278 UINT32 AgentConnection::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
2279 {
2280 return ERR_NOT_IMPLEMENTED;
2281 }
2282
2283 /**
2284 * Create new agent parameter definition from NXCP message
2285 */
2286 AgentParameterDefinition::AgentParameterDefinition(NXCPMessage *msg, UINT32 baseId)
2287 {
2288 m_name = msg->getFieldAsString(baseId);
2289 m_description = msg->getFieldAsString(baseId + 1);
2290 m_dataType = (int)msg->getFieldAsUInt16(baseId + 2);
2291 }
2292
2293 /**
2294 * Create new agent parameter definition from another definition object
2295 */
2296 AgentParameterDefinition::AgentParameterDefinition(AgentParameterDefinition *src)
2297 {
2298 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2299 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2300 m_dataType = src->m_dataType;
2301 }
2302
2303 /**
2304 * Destructor for agent parameter definition
2305 */
2306 AgentParameterDefinition::~AgentParameterDefinition()
2307 {
2308 safe_free(m_name);
2309 safe_free(m_description);
2310 }
2311
2312 /**
2313 * Fill NXCP message
2314 */
2315 UINT32 AgentParameterDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2316 {
2317 msg->setField(baseId, m_name);
2318 msg->setField(baseId + 1, m_description);
2319 msg->setField(baseId + 2, (WORD)m_dataType);
2320 return 3;
2321 }
2322
2323 /**
2324 * Create new agent table definition from NXCP message
2325 */
2326 AgentTableDefinition::AgentTableDefinition(NXCPMessage *msg, UINT32 baseId)
2327 {
2328 m_name = msg->getFieldAsString(baseId);
2329 m_description = msg->getFieldAsString(baseId + 2);
2330
2331 TCHAR *instanceColumns = msg->getFieldAsString(baseId + 1);
2332 if (instanceColumns != NULL)
2333 {
2334 m_instanceColumns = new StringList(instanceColumns, _T("|"));
2335 free(instanceColumns);
2336 }
2337 else
2338 {
2339 m_instanceColumns = new StringList;
2340 }
2341
2342 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2343 }
2344
2345 /**
2346 * Create new agent table definition from another definition object
2347 */
2348 AgentTableDefinition::AgentTableDefinition(AgentTableDefinition *src)
2349 {
2350 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2351 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2352 m_instanceColumns = new StringList(src->m_instanceColumns);
2353 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2354 for(int i = 0; i < src->m_columns->size(); i++)
2355 {
2356 m_columns->add(new AgentTableColumnDefinition(src->m_columns->get(i)));
2357 }
2358 }
2359 /**
2360 * Destructor for agent table definition
2361 */
2362 AgentTableDefinition::~AgentTableDefinition()
2363 {
2364 safe_free(m_name);
2365 safe_free(m_description);
2366 delete m_instanceColumns;
2367 delete m_columns;
2368 }
2369
2370 /**
2371 * Fill NXCP message
2372 */
2373 UINT32 AgentTableDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2374 {
2375 msg->setField(baseId + 1, m_name);
2376 msg->setField(baseId + 2, m_description);
2377
2378 TCHAR *instanceColumns = m_instanceColumns->join(_T("|"));
2379 msg->setField(baseId + 3, instanceColumns);
2380 free(instanceColumns);
2381
2382 UINT32 varId = baseId + 4;
2383 for(int i = 0; i < m_columns->size(); i++)
2384 {
2385 msg->setField(varId++, m_columns->get(i)->m_name);
2386 msg->setField(varId++, (WORD)m_columns->get(i)->m_dataType);
2387 }
2388
2389 msg->setField(baseId, varId - baseId);
2390 return varId - baseId;
2391 }