Do DNS resolve for node names via zone proxy. (issue #NX-1268)
[public/netxms.git] / src / server / libnxsrv / agent.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Server Library
4 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: agent.cpp
21 **
22 **/
23
24 #include "libnxsrv.h"
25 #include <stdarg.h>
26 #include <nxstat.h>
27
28 #ifndef _WIN32
29 #define _tell(f) lseek(f,0,SEEK_CUR)
30 #endif
31
32 #define DEBUG_TAG _T("agent.conn")
33
34 /**
35 * Constants
36 */
37 #define MAX_MSG_SIZE 268435456
38
39 /**
40 * Agent connection thread pool
41 */
42 ThreadPool LIBNXSRV_EXPORTABLE *g_agentConnectionThreadPool = NULL;
43
44 /**
45 * Unique connection ID
46 */
47 static VolatileCounter s_connectionId = 0;
48
49 /**
50 * Static data
51 */
52 #ifdef _WITH_ENCRYPTION
53 static int m_iDefaultEncryptionPolicy = ENCRYPTION_ALLOWED;
54 #else
55 static int m_iDefaultEncryptionPolicy = ENCRYPTION_DISABLED;
56 #endif
57
58 /**
59 * Set default encryption policy for agent communication
60 */
61 void LIBNXSRV_EXPORTABLE SetAgentDEP(int iPolicy)
62 {
63 #ifdef _WITH_ENCRYPTION
64 m_iDefaultEncryptionPolicy = iPolicy;
65 #endif
66 }
67
68 /**
69 * Receiver thread starter
70 */
71 THREAD_RESULT THREAD_CALL AgentConnection::receiverThreadStarter(void *pArg)
72 {
73 ThreadSetName("AgentReceiver");
74 ((AgentConnection *)pArg)->receiverThread();
75 ((AgentConnection *)pArg)->decInternalRefCount();
76 return THREAD_OK;
77 }
78
79 /**
80 * Constructor for AgentConnection
81 */
82 AgentConnection::AgentConnection(const InetAddress& addr, WORD port, int authMethod, const TCHAR *secret, bool allowCompression)
83 {
84 m_internalRefCount = 1;
85 m_userRefCount = 1;
86 m_debugId = InterlockedIncrement(&s_connectionId);
87 m_addr = addr;
88 m_wPort = port;
89 m_iAuthMethod = authMethod;
90 if (secret != NULL)
91 {
92 #ifdef UNICODE
93 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
94 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
95 #else
96 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
97 #endif
98 }
99 else
100 {
101 m_szSecret[0] = 0;
102 }
103 m_allowCompression = allowCompression;
104 m_channel = NULL;
105 m_tLastCommandTime = 0;
106 m_dwNumDataLines = 0;
107 m_ppDataLines = NULL;
108 m_pMsgWaitQueue = new MsgWaitQueue;
109 m_requestId = 0;
110 m_connectionTimeout = 5000; // 5 seconds
111 m_dwCommandTimeout = 5000; // Default timeout 5 seconds
112 m_isConnected = false;
113 m_mutexDataLock = MutexCreate();
114 m_mutexSocketWrite = MutexCreate();
115 m_hReceiverThread = INVALID_THREAD_HANDLE;
116 m_pCtx = NULL;
117 m_iEncryptionPolicy = m_iDefaultEncryptionPolicy;
118 m_useProxy = false;
119 m_iProxyAuth = AUTH_NONE;
120 m_wProxyPort = 4700;
121 m_dwRecvTimeout = 420000; // 7 minutes
122 m_nProtocolVersion = NXCP_VERSION;
123 m_hCurrFile = -1;
124 m_deleteFileOnDownloadFailure = true;
125 m_condFileDownload = ConditionCreate(TRUE);
126 m_fileDownloadSucceeded = false;
127 m_fileUploadInProgress = false;
128 m_sendToClientMessageCallback = NULL;
129 m_dwDownloadRequestId = 0;
130 m_downloadProgressCallback = NULL;
131 m_downloadProgressCallbackArg = NULL;
132 }
133
134 /**
135 * Destructor
136 */
137 AgentConnection::~AgentConnection()
138 {
139 debugPrintf(7, _T("AgentConnection destructor called (this=%p, thread=%p)"), this, (void *)m_hReceiverThread);
140
141 ThreadDetach(m_hReceiverThread);
142
143 lock();
144 destroyResultData();
145 unlock();
146
147 delete m_pMsgWaitQueue;
148 if (m_pCtx != NULL)
149 m_pCtx->decRefCount();
150
151 if (m_hCurrFile != -1)
152 {
153 _close(m_hCurrFile);
154 onFileDownload(false);
155 }
156 else if (m_sendToClientMessageCallback != NULL)
157 {
158 onFileDownload(false);
159 }
160
161 if (m_channel != NULL)
162 m_channel->decRefCount();
163
164 MutexDestroy(m_mutexDataLock);
165 MutexDestroy(m_mutexSocketWrite);
166 ConditionDestroy(m_condFileDownload);
167 }
168
169 /**
170 * Write debug output
171 */
172 void AgentConnection::debugPrintf(int level, const TCHAR *format, ...)
173 {
174 va_list args;
175 va_start(args, format);
176 nxlog_debug_tag_object2(DEBUG_TAG, m_debugId, level, format, args);
177 va_end(args);
178 }
179
180 /**
181 * Receiver thread
182 */
183 void AgentConnection::receiverThread()
184 {
185 AbstractCommChannel *channel = m_channel;
186 UINT32 msgBufferSize = 1024;
187
188 // Initialize raw message receiving function
189 NXCP_BUFFER *msgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
190 NXCPInitBuffer(msgBuffer);
191
192 // Allocate space for raw message
193 NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)malloc(msgBufferSize);
194 #ifdef _WITH_ENCRYPTION
195 BYTE *decryptionBuffer = (BYTE *)malloc(msgBufferSize);
196 #else
197 BYTE *decryptionBuffer = NULL;
198 #endif
199
200 while(true)
201 {
202 // Shrink buffer after receiving large message
203 if (msgBufferSize > 131072)
204 {
205 msgBufferSize = 131072;
206 rawMsg = (NXCP_MESSAGE *)realloc(rawMsg, msgBufferSize);
207 if (decryptionBuffer != NULL)
208 decryptionBuffer = (BYTE *)realloc(decryptionBuffer, msgBufferSize);
209 }
210
211 // Receive raw message
212 int rc = RecvNXCPMessageEx(channel, &rawMsg, msgBuffer, &msgBufferSize,
213 &m_pCtx, (decryptionBuffer != NULL) ? &decryptionBuffer : NULL,
214 m_dwRecvTimeout, MAX_MSG_SIZE);
215 if (rc <= 0)
216 {
217 if ((rc != 0) && (WSAGetLastError() != WSAESHUTDOWN))
218 debugPrintf(6, _T("AgentConnection::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), rc, WSAGetLastError());
219 else
220 debugPrintf(6, _T("AgentConnection::ReceiverThread(): communication channel shutdown"));
221 break;
222 }
223
224 // Check if we get too large message
225 if (rc == 1)
226 {
227 TCHAR buffer[64];
228 debugPrintf(6, _T("Received too large message %s (%d bytes)"), NXCPMessageCodeName(ntohs(rawMsg->code), buffer), ntohl(rawMsg->size));
229 continue;
230 }
231
232 // Check if we are unable to decrypt message
233 if (rc == 2)
234 {
235 debugPrintf(6, _T("Unable to decrypt received message"));
236 continue;
237 }
238
239 // Check for timeout
240 if (rc == 3)
241 {
242 if (m_fileUploadInProgress)
243 continue; // Receive timeout may occur when uploading large files via slow links
244 debugPrintf(6, _T("Timed out waiting for message"));
245 break;
246 }
247
248 // Check that actual received packet size is equal to encoded in packet
249 if ((int)ntohl(rawMsg->size) != rc)
250 {
251 debugPrintf(6, _T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(rawMsg->size), rc);
252 continue; // Bad packet, wait for next
253 }
254
255 if (ntohs(rawMsg->flags) & MF_BINARY)
256 {
257 // Convert message header to host format
258 rawMsg->id = ntohl(rawMsg->id);
259 rawMsg->code = ntohs(rawMsg->code);
260 rawMsg->numFields = ntohl(rawMsg->numFields);
261 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
262 {
263 TCHAR buffer[64];
264 debugPrintf(6, _T("Received raw message %s (%d) from agent at %s"),
265 NXCPMessageCodeName(rawMsg->code, buffer), rawMsg->id, (const TCHAR *)m_addr.toString());
266 }
267
268 if ((rawMsg->code == CMD_FILE_DATA) && (rawMsg->id == m_dwDownloadRequestId))
269 {
270 if (m_sendToClientMessageCallback != NULL)
271 {
272 rawMsg->code = ntohs(rawMsg->code);
273 rawMsg->numFields = ntohl(rawMsg->numFields);
274 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
275
276 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
277 {
278 m_sendToClientMessageCallback = NULL;
279 onFileDownload(true);
280 }
281 else
282 {
283 if (m_downloadProgressCallback != NULL)
284 {
285 m_downloadProgressCallback(rawMsg->size - (NXCP_HEADER_SIZE + 8), m_downloadProgressCallbackArg);
286 }
287 }
288 }
289 else
290 {
291 if (m_hCurrFile != -1)
292 {
293 if (_write(m_hCurrFile, rawMsg->fields, rawMsg->numFields) == (int)rawMsg->numFields)
294 {
295 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
296 {
297 _close(m_hCurrFile);
298 m_hCurrFile = -1;
299
300 onFileDownload(true);
301 }
302 else
303 {
304 if (m_downloadProgressCallback != NULL)
305 {
306 m_downloadProgressCallback(_tell(m_hCurrFile), m_downloadProgressCallbackArg);
307 }
308 }
309 }
310 }
311 else
312 {
313 // I/O error
314 _close(m_hCurrFile);
315 m_hCurrFile = -1;
316
317 onFileDownload(false);
318 }
319 }
320 }
321
322 if ((rawMsg->code == CMD_ABORT_FILE_TRANSFER) && (rawMsg->id == m_dwDownloadRequestId))
323 {
324 if (m_sendToClientMessageCallback != NULL)
325 {
326 rawMsg->code = ntohs(rawMsg->code);
327 rawMsg->numFields = ntohl(rawMsg->numFields);
328 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
329 m_sendToClientMessageCallback = NULL;
330
331 onFileDownload(false);
332 }
333 else
334 {
335 //error on agent side
336 _close(m_hCurrFile);
337 m_hCurrFile = -1;
338
339 onFileDownload(false);
340 }
341 }
342 }
343 else if (ntohs(rawMsg->flags) & MF_CONTROL)
344 {
345 // Convert message header to host format
346 rawMsg->id = ntohl(rawMsg->id);
347 rawMsg->code = ntohs(rawMsg->code);
348 rawMsg->flags = ntohs(rawMsg->flags);
349 rawMsg->numFields = ntohl(rawMsg->numFields);
350 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
351 {
352 TCHAR buffer[64];
353 debugPrintf(6, _T("Received control message %s from agent at %s"),
354 NXCPMessageCodeName(rawMsg->code, buffer), (const TCHAR *)m_addr.toString());
355 }
356 m_pMsgWaitQueue->put((NXCP_MESSAGE *)nx_memdup(rawMsg, ntohl(rawMsg->size)));
357 }
358 else
359 {
360 // Create message object from raw message
361 NXCPMessage *msg = NXCPMessage::deserialize(rawMsg, m_nProtocolVersion);
362 if (msg != NULL)
363 {
364 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
365 {
366 TCHAR buffer[64];
367 debugPrintf(6, _T("Received message %s (%d) from agent at %s"),
368 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId(), (const TCHAR *)m_addr.toString());
369 }
370 switch(msg->getCode())
371 {
372 case CMD_REQUEST_COMPLETED:
373 case CMD_SESSION_KEY:
374 m_pMsgWaitQueue->put(msg);
375 break;
376 case CMD_TRAP:
377 if (g_agentConnectionThreadPool != NULL)
378 {
379 incInternalRefCount();
380 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onTrapCallback, msg);
381 }
382 else
383 {
384 delete msg;
385 }
386 break;
387 case CMD_SYSLOG_RECORDS:
388 if (g_agentConnectionThreadPool != NULL)
389 {
390 incInternalRefCount();
391 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSyslogMessageCallback, msg);
392 }
393 else
394 {
395 delete msg;
396 }
397 break;
398 case CMD_PUSH_DCI_DATA:
399 if (g_agentConnectionThreadPool != NULL)
400 {
401 incInternalRefCount();
402 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onDataPushCallback, msg);
403 }
404 else
405 {
406 delete msg;
407 }
408 break;
409 case CMD_DCI_DATA:
410 if (g_agentConnectionThreadPool != NULL)
411 {
412 incInternalRefCount();
413 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::processCollectedDataCallback, msg);
414 }
415 else
416 {
417 NXCPMessage response;
418 response.setCode(CMD_REQUEST_COMPLETED);
419 response.setId(msg->getId());
420 response.setField(VID_RCC, ERR_INTERNAL_ERROR);
421 sendMessage(&response);
422 delete msg;
423 }
424 break;
425 case CMD_FILE_MONITORING:
426 onFileMonitoringData(msg);
427 delete msg;
428 break;
429 case CMD_SNMP_TRAP:
430 if (g_agentConnectionThreadPool != NULL)
431 {
432 incInternalRefCount();
433 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSnmpTrapCallback, msg);
434 }
435 else
436 {
437 delete msg;
438 }
439 break;
440 default:
441 if (processCustomMessage(msg))
442 delete msg;
443 else
444 m_pMsgWaitQueue->put(msg);
445 break;
446 }
447 }
448 else
449 {
450 debugPrintf(6, _T("RecvMsg: message deserialization error"));
451 }
452 }
453 }
454 debugPrintf(6, _T("Receiver loop terminated"));
455
456 // Close socket and mark connection as disconnected
457 lock();
458 if (m_hCurrFile != -1)
459 {
460 _close(m_hCurrFile);
461 m_hCurrFile = -1;
462 onFileDownload(false);
463 }
464 else if (m_sendToClientMessageCallback != NULL)
465 {
466 m_sendToClientMessageCallback = NULL;
467 onFileDownload(false);
468 }
469
470 debugPrintf(6, _T("Closing communication channel"));
471 channel->close();
472 channel->decRefCount();
473 if (m_pCtx != NULL)
474 {
475 m_pCtx->decRefCount();
476 m_pCtx = NULL;
477 }
478 m_isConnected = false;
479 unlock();
480
481 free(rawMsg);
482 free(msgBuffer);
483 #ifdef _WITH_ENCRYPTION
484 free(decryptionBuffer);
485 #endif
486
487 debugPrintf(6, _T("Receiver thread stopped"));
488 }
489
490 /**
491 * Create channel. Default implementation creates socket channel.
492 */
493 AbstractCommChannel *AgentConnection::createChannel()
494 {
495 // Create socket
496 SOCKET s = socket(m_useProxy ? m_proxyAddr.getFamily() : m_addr.getFamily(), SOCK_STREAM, 0);
497 if (s == INVALID_SOCKET)
498 {
499 debugPrintf(6, _T("Call to socket() failed"));
500 return NULL;
501 }
502
503 // Fill in address structure
504 SockAddrBuffer sb;
505 struct sockaddr *sa = m_useProxy ? m_proxyAddr.fillSockAddr(&sb, m_wProxyPort) : m_addr.fillSockAddr(&sb, m_wPort);
506
507 // Connect to server
508 if ((sa == NULL) || (ConnectEx(s, sa, SA_LEN(sa), m_connectionTimeout) == -1))
509 {
510 TCHAR buffer[64];
511 debugPrintf(5, _T("Cannot establish connection with agent at %s:%d"),
512 m_useProxy ? m_proxyAddr.toString(buffer) : m_addr.toString(buffer),
513 (int)(m_useProxy ? m_wProxyPort : m_wPort));
514 closesocket(s);
515 return NULL;
516 }
517
518 return new SocketCommChannel(s);
519 }
520
521 /**
522 * Acquire communication channel. Caller must call decRefCount to release channel.
523 */
524 AbstractCommChannel *AgentConnection::acquireChannel()
525 {
526 lock();
527 AbstractCommChannel *channel = m_channel;
528 if (channel != NULL)
529 channel->incRefCount();
530 unlock();
531 return channel;
532 }
533
534 /**
535 * Connect to agent
536 */
537 bool AgentConnection::connect(RSA *pServerKey, UINT32 *pdwError, UINT32 *pdwSocketError, UINT64 serverId)
538 {
539 TCHAR szBuffer[256];
540 bool success = false;
541 bool forceEncryption = false;
542 bool secondPass = false;
543 UINT32 dwError = 0;
544
545 if (pdwError != NULL)
546 *pdwError = ERR_INTERNAL_ERROR;
547
548 if (pdwSocketError != NULL)
549 *pdwSocketError = 0;
550
551 // Check if already connected
552 if (m_isConnected)
553 return false;
554
555 // Wait for receiver thread from previous connection, if any
556 ThreadJoin(m_hReceiverThread);
557 m_hReceiverThread = INVALID_THREAD_HANDLE;
558
559 // Check if we need to close existing channel
560 if (m_channel != NULL)
561 {
562 m_channel->decRefCount();
563 m_channel = NULL;
564 }
565
566 m_channel = createChannel();
567 if (m_channel == NULL)
568 {
569 debugPrintf(6, _T("Cannot create communication channel"));
570 dwError = ERR_CONNECT_FAILED;
571 goto connect_cleanup;
572 }
573
574 if (!NXCPGetPeerProtocolVersion(m_channel, &m_nProtocolVersion, m_mutexSocketWrite))
575 {
576 debugPrintf(6, _T("Protocol version negotiation failed"));
577 dwError = ERR_INTERNAL_ERROR;
578 goto connect_cleanup;
579 }
580 debugPrintf(6, _T("Using NXCP version %d"), m_nProtocolVersion);
581
582 // Start receiver thread
583 incInternalRefCount();
584 m_channel->incRefCount(); // for receiver thread
585 m_hReceiverThread = ThreadCreateEx(receiverThreadStarter, 0, this);
586
587 // Setup encryption
588 setup_encryption:
589 if ((m_iEncryptionPolicy == ENCRYPTION_PREFERRED) ||
590 (m_iEncryptionPolicy == ENCRYPTION_REQUIRED) ||
591 forceEncryption) // Agent require encryption
592 {
593 if (pServerKey != NULL)
594 {
595 dwError = setupEncryption(pServerKey);
596 if ((dwError != ERR_SUCCESS) &&
597 ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption))
598 goto connect_cleanup;
599 }
600 else
601 {
602 if ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption)
603 {
604 dwError = ERR_ENCRYPTION_REQUIRED;
605 goto connect_cleanup;
606 }
607 }
608 }
609
610 // Authenticate itself to agent
611 if ((dwError = authenticate(m_useProxy && !secondPass)) != ERR_SUCCESS)
612 {
613 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
614 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
615 {
616 forceEncryption = true;
617 goto setup_encryption;
618 }
619 debugPrintf(5, _T("Authentication to agent %s failed (%s)"), m_addr.toString(szBuffer),
620 AgentErrorCodeToText(dwError));
621 goto connect_cleanup;
622 }
623
624 // Test connectivity and inform agent about server capabilities
625 if ((dwError = setServerCapabilities()) != ERR_SUCCESS)
626 {
627 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
628 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
629 {
630 forceEncryption = true;
631 goto setup_encryption;
632 }
633 if (dwError != ERR_UNKNOWN_COMMAND) // Older agents may not support enable IPv6 command
634 {
635 debugPrintf(5, _T("Communication with agent %s failed (%s)"), m_addr.toString(szBuffer), AgentErrorCodeToText(dwError));
636 goto connect_cleanup;
637 }
638 }
639
640 if (m_useProxy && !secondPass)
641 {
642 dwError = setupProxyConnection();
643 if (dwError != ERR_SUCCESS)
644 goto connect_cleanup;
645 lock();
646 if (m_pCtx != NULL)
647 {
648 m_pCtx->decRefCount();
649 m_pCtx = NULL;
650 }
651 unlock();
652
653 debugPrintf(6, _T("Proxy connection established"));
654
655 // Renegotiate NXCP version with actual target agent
656 NXCP_MESSAGE msg;
657 msg.id = 0;
658 msg.numFields = 0;
659 msg.size = htonl(NXCP_HEADER_SIZE);
660 msg.code = htons(CMD_GET_NXCP_CAPS);
661 msg.flags = htons(MF_CONTROL);
662 if (m_channel->send(&msg, NXCP_HEADER_SIZE, m_mutexSocketWrite) == NXCP_HEADER_SIZE)
663 {
664 NXCP_MESSAGE *rsp = m_pMsgWaitQueue->waitForRawMessage(CMD_NXCP_CAPS, 0, m_dwCommandTimeout);
665 if (rsp != NULL)
666 {
667 m_nProtocolVersion = rsp->numFields >> 24;
668 free(rsp);
669 }
670 else
671 {
672 // assume that peer doesn't understand CMD_GET_NXCP_CAPS message
673 // and set version number to 1
674 m_nProtocolVersion = 1;
675 }
676 debugPrintf(6, _T("Using NXCP version %d after re-negotioation"), m_nProtocolVersion);
677 }
678 else
679 {
680 debugPrintf(6, _T("Protocol version re-negotiation failed - cannot send CMD_GET_NXCP_CAPS message"));
681 dwError = ERR_CONNECTION_BROKEN;
682 goto connect_cleanup;
683 }
684
685 secondPass = true;
686 forceEncryption = false;
687 goto setup_encryption;
688 }
689
690 if (serverId != 0)
691 setServerId(serverId);
692
693 success = true;
694 dwError = ERR_SUCCESS;
695
696 connect_cleanup:
697 if (!success)
698 {
699 if (pdwSocketError != NULL)
700 *pdwSocketError = (UINT32)WSAGetLastError();
701
702 lock();
703 if (m_channel != NULL)
704 m_channel->shutdown();
705 unlock();
706 ThreadJoin(m_hReceiverThread);
707 m_hReceiverThread = INVALID_THREAD_HANDLE;
708
709 lock();
710 if (m_channel != NULL)
711 {
712 m_channel->close();
713 m_channel->decRefCount();
714 m_channel = NULL;
715 }
716
717 if (m_pCtx != NULL)
718 {
719 m_pCtx->decRefCount();
720 m_pCtx = NULL;
721 }
722
723 unlock();
724 }
725 m_isConnected = success;
726 if (pdwError != NULL)
727 *pdwError = dwError;
728 return success;
729 }
730
731 /**
732 * Disconnect from agent
733 */
734 void AgentConnection::disconnect()
735 {
736 debugPrintf(6, _T("disconnect() called"));
737 lock();
738 if (m_hCurrFile != -1)
739 {
740 _close(m_hCurrFile);
741 m_hCurrFile = -1;
742 onFileDownload(false);
743 }
744 else if (m_sendToClientMessageCallback != NULL)
745 {
746 m_sendToClientMessageCallback = NULL;
747 onFileDownload(false);
748 }
749
750 if (m_channel != NULL)
751 {
752 m_channel->shutdown();
753 m_channel->decRefCount();
754 m_channel = NULL;
755 }
756 destroyResultData();
757 m_isConnected = false;
758 unlock();
759 debugPrintf(6, _T("Disconnect completed"));
760 }
761
762 /**
763 * Set authentication data
764 */
765 void AgentConnection::setAuthData(int method, const TCHAR *secret)
766 {
767 m_iAuthMethod = method;
768 #ifdef UNICODE
769 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
770 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
771 #else
772 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
773 #endif
774 }
775
776 /**
777 * Destroy command execuion results data
778 */
779 void AgentConnection::destroyResultData()
780 {
781 UINT32 i;
782
783 if (m_ppDataLines != NULL)
784 {
785 for(i = 0; i < m_dwNumDataLines; i++)
786 if (m_ppDataLines[i] != NULL)
787 free(m_ppDataLines[i]);
788 free(m_ppDataLines);
789 m_ppDataLines = NULL;
790 }
791 m_dwNumDataLines = 0;
792 }
793
794 /**
795 * Get interface list from agent
796 */
797 InterfaceList *AgentConnection::getInterfaceList()
798 {
799 InterfaceList *pIfList = NULL;
800 TCHAR *pChar, *pBuf;
801
802 if (getList(_T("Net.InterfaceList")) == ERR_SUCCESS)
803 {
804 pIfList = new InterfaceList(m_dwNumDataLines);
805
806 // Parse result set. Each line should have the following format:
807 // index ip_address/mask_bits iftype mac_address name
808 for(UINT32 i = 0; i < m_dwNumDataLines; i++)
809 {
810 pBuf = m_ppDataLines[i];
811 UINT32 ifIndex = 0;
812
813 // Index
814 pChar = _tcschr(pBuf, ' ');
815 if (pChar != NULL)
816 {
817 *pChar = 0;
818 ifIndex = _tcstoul(pBuf, NULL, 10);
819 pBuf = pChar + 1;
820 }
821
822 bool newInterface = false;
823 InterfaceInfo *iface = pIfList->findByIfIndex(ifIndex);
824 if (iface == NULL)
825 {
826 iface = new InterfaceInfo(ifIndex);
827 newInterface = true;
828 }
829
830 // Address and mask
831 pChar = _tcschr(pBuf, _T(' '));
832 if (pChar != NULL)
833 {
834 TCHAR *pSlash;
835 static TCHAR defaultMask[] = _T("24");
836
837 *pChar = 0;
838 pSlash = _tcschr(pBuf, _T('/'));
839 if (pSlash != NULL)
840 {
841 *pSlash = 0;
842 pSlash++;
843 }
844 else // Just a paranoia protection, should'n happen if agent working correctly
845 {
846 pSlash = defaultMask;
847 }
848 InetAddress addr = InetAddress::parse(pBuf);
849 if (addr.isValid())
850 {
851 addr.setMaskBits(_tcstol(pSlash, NULL, 10));
852 // Agent may return 0.0.0.0/0 for interfaces without IP address
853 if ((addr.getFamily() != AF_INET) || (addr.getAddressV4() != 0))
854 iface->ipAddrList.add(addr);
855 }
856 pBuf = pChar + 1;
857 }
858
859 if (newInterface)
860 {
861 // Interface type
862 pChar = _tcschr(pBuf, ' ');
863 if (pChar != NULL)
864 {
865 *pChar = 0;
866
867 TCHAR *eptr;
868 iface->type = _tcstoul(pBuf, &eptr, 10);
869
870 // newer agents can return if_type(mtu)
871 if (*eptr == _T('('))
872 {
873 pBuf = eptr + 1;
874 eptr = _tcschr(pBuf, _T(')'));
875 if (eptr != NULL)
876 {
877 *eptr = 0;
878 iface->mtu = _tcstol(pBuf, NULL, 10);
879 }
880 }
881
882 pBuf = pChar + 1;
883 }
884
885 // MAC address
886 pChar = _tcschr(pBuf, ' ');
887 if (pChar != NULL)
888 {
889 *pChar = 0;
890 StrToBin(pBuf, iface->macAddr, MAC_ADDR_LENGTH);
891 pBuf = pChar + 1;
892 }
893
894 // Name (set description to name)
895 nx_strncpy(iface->name, pBuf, MAX_DB_STRING);
896 nx_strncpy(iface->description, pBuf, MAX_DB_STRING);
897
898 pIfList->add(iface);
899 }
900 }
901
902 lock();
903 destroyResultData();
904 unlock();
905 }
906
907 return pIfList;
908 }
909
910 /**
911 * Get parameter value
912 */
913 UINT32 AgentConnection::getParameter(const TCHAR *pszParam, UINT32 dwBufSize, TCHAR *pszBuffer)
914 {
915 if (!m_isConnected)
916 return ERR_NOT_CONNECTED;
917
918 NXCPMessage msg(m_nProtocolVersion);
919 UINT32 dwRqId = generateRequestId();
920 msg.setCode(CMD_GET_PARAMETER);
921 msg.setId(dwRqId);
922 msg.setField(VID_PARAMETER, pszParam);
923
924 UINT32 dwRetCode;
925 if (sendMessage(&msg))
926 {
927 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
928 if (response != NULL)
929 {
930 dwRetCode = response->getFieldAsUInt32(VID_RCC);
931 if (dwRetCode == ERR_SUCCESS)
932 {
933 if (response->isFieldExist(VID_VALUE))
934 {
935 response->getFieldAsString(VID_VALUE, pszBuffer, dwBufSize);
936 }
937 else
938 {
939 dwRetCode = ERR_MALFORMED_RESPONSE;
940 debugPrintf(3, _T("Malformed response to CMD_GET_PARAMETER"));
941 }
942 }
943 delete response;
944 }
945 else
946 {
947 dwRetCode = ERR_REQUEST_TIMEOUT;
948 }
949 }
950 else
951 {
952 dwRetCode = ERR_CONNECTION_BROKEN;
953 }
954 return dwRetCode;
955 }
956
957 /**
958 * Get ARP cache
959 */
960 ARP_CACHE *AgentConnection::getArpCache()
961 {
962 ARP_CACHE *pArpCache = NULL;
963 TCHAR szByte[4], *pBuf, *pChar;
964 UINT32 i, j;
965
966 if (getList(_T("Net.ArpCache")) == ERR_SUCCESS)
967 {
968 // Create empty structure
969 pArpCache = (ARP_CACHE *)malloc(sizeof(ARP_CACHE));
970 pArpCache->dwNumEntries = m_dwNumDataLines;
971 pArpCache->pEntries = (ARP_ENTRY *)malloc(sizeof(ARP_ENTRY) * m_dwNumDataLines);
972 memset(pArpCache->pEntries, 0, sizeof(ARP_ENTRY) * m_dwNumDataLines);
973
974 szByte[2] = 0;
975
976 // Parse data lines
977 // Each line has form of XXXXXXXXXXXX a.b.c.d n
978 // where XXXXXXXXXXXX is a MAC address (12 hexadecimal digits)
979 // a.b.c.d is an IP address in decimal dotted notation
980 // n is an interface index
981 for(i = 0; i < m_dwNumDataLines; i++)
982 {
983 pBuf = m_ppDataLines[i];
984 if (_tcslen(pBuf) < 20) // Invalid line
985 continue;
986
987 // MAC address
988 for(j = 0; j < 6; j++)
989 {
990 memcpy(szByte, pBuf, sizeof(TCHAR) * 2);
991 pArpCache->pEntries[i].bMacAddr[j] = (BYTE)_tcstol(szByte, NULL, 16);
992 pBuf+=2;
993 }
994
995 // IP address
996 while(*pBuf == ' ')
997 pBuf++;
998 pChar = _tcschr(pBuf, _T(' '));
999 if (pChar != NULL)
1000 *pChar = 0;
1001 pArpCache->pEntries[i].ipAddr = ntohl(_t_inet_addr(pBuf));
1002
1003 // Interface index
1004 if (pChar != NULL)
1005 pArpCache->pEntries[i].dwIndex = _tcstoul(pChar + 1, NULL, 10);
1006 }
1007
1008 lock();
1009 destroyResultData();
1010 unlock();
1011 }
1012 return pArpCache;
1013 }
1014
1015 /**
1016 * Send dummy command to agent (can be used for keepalive)
1017 */
1018 UINT32 AgentConnection::nop()
1019 {
1020 if (!m_isConnected)
1021 return ERR_CONNECTION_BROKEN;
1022
1023 NXCPMessage msg(m_nProtocolVersion);
1024 UINT32 dwRqId;
1025
1026 dwRqId = generateRequestId();
1027 msg.setCode(CMD_KEEPALIVE);
1028 msg.setId(dwRqId);
1029 if (sendMessage(&msg))
1030 return waitForRCC(dwRqId, m_dwCommandTimeout);
1031 else
1032 return ERR_CONNECTION_BROKEN;
1033 }
1034
1035 /**
1036 * inform agent about server capabilities
1037 */
1038 UINT32 AgentConnection::setServerCapabilities()
1039 {
1040 NXCPMessage msg(m_nProtocolVersion);
1041 UINT32 dwRqId = generateRequestId();
1042 msg.setCode(CMD_SET_SERVER_CAPABILITIES);
1043 msg.setField(VID_ENABLED, (INT16)1); // Enables IPv6 on pre-2.0 agents
1044 msg.setField(VID_IPV6_SUPPORT, (INT16)1);
1045 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
1046 msg.setField(VID_ENABLE_COMPRESSION, (INT16)(m_allowCompression ? 1 : 0));
1047 msg.setId(dwRqId);
1048 if (sendMessage(&msg))
1049 return waitForRCC(dwRqId, m_dwCommandTimeout);
1050 else
1051 return ERR_CONNECTION_BROKEN;
1052 }
1053
1054 /**
1055 * Set server ID
1056 */
1057 UINT32 AgentConnection::setServerId(UINT64 serverId)
1058 {
1059 NXCPMessage msg(m_nProtocolVersion);
1060 UINT32 dwRqId = generateRequestId();
1061 msg.setCode(CMD_SET_SERVER_ID);
1062 msg.setField(VID_SERVER_ID, serverId);
1063 msg.setId(dwRqId);
1064 if (sendMessage(&msg))
1065 return waitForRCC(dwRqId, m_dwCommandTimeout);
1066 else
1067 return ERR_CONNECTION_BROKEN;
1068 }
1069
1070 /**
1071 * Wait for request completion code
1072 */
1073 UINT32 AgentConnection::waitForRCC(UINT32 dwRqId, UINT32 dwTimeOut)
1074 {
1075 UINT32 rcc;
1076 NXCPMessage *response = m_pMsgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, dwTimeOut);
1077 if (response != NULL)
1078 {
1079 rcc = response->getFieldAsUInt32(VID_RCC);
1080 delete response;
1081 }
1082 else
1083 {
1084 rcc = ERR_REQUEST_TIMEOUT;
1085 }
1086 return rcc;
1087 }
1088
1089 /**
1090 * Send message to agent
1091 */
1092 bool AgentConnection::sendMessage(NXCPMessage *pMsg)
1093 {
1094 AbstractCommChannel *channel = acquireChannel();
1095 if (channel == NULL)
1096 return false;
1097
1098 if (nxlog_get_debug_level_tag_object(DEBUG_TAG, m_debugId) >= 6)
1099 {
1100 TCHAR buffer[64];
1101 debugPrintf(6, _T("Sending message %s (%d) to agent at %s"),
1102 NXCPMessageCodeName(pMsg->getCode(), buffer), pMsg->getId(), (const TCHAR *)m_addr.toString());
1103 }
1104
1105 bool success;
1106 NXCP_MESSAGE *rawMsg = pMsg->serialize(m_allowCompression);
1107 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1108 if (pCtx != NULL)
1109 {
1110 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1111 if (pEnMsg != NULL)
1112 {
1113 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1114 free(pEnMsg);
1115 }
1116 else
1117 {
1118 success = false;
1119 }
1120 pCtx->decRefCount();
1121 }
1122 else
1123 {
1124 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1125 }
1126 free(rawMsg);
1127 channel->decRefCount();
1128 return success;
1129 }
1130
1131 /**
1132 * Send raw message to agent
1133 */
1134 bool AgentConnection::sendRawMessage(NXCP_MESSAGE *pMsg)
1135 {
1136 AbstractCommChannel *channel = acquireChannel();
1137 if (channel == NULL)
1138 return false;
1139
1140 bool success;
1141 NXCP_MESSAGE *rawMsg = pMsg;
1142 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1143 if (pCtx != NULL)
1144 {
1145 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1146 if (pEnMsg != NULL)
1147 {
1148 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1149 free(pEnMsg);
1150 }
1151 else
1152 {
1153 success = false;
1154 }
1155 pCtx->decRefCount();
1156 }
1157 else
1158 {
1159 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1160 }
1161 channel->decRefCount();
1162 return success;
1163 }
1164
1165 /**
1166 * Callback for processing incoming event on separate thread
1167 */
1168 void AgentConnection::onTrapCallback(NXCPMessage *msg)
1169 {
1170 onTrap(msg);
1171 delete msg;
1172 decInternalRefCount();
1173 }
1174
1175 /**
1176 * Trap handler. Should be overriden in derived classes to implement
1177 * actual trap processing. Default implementation do nothing.
1178 */
1179 void AgentConnection::onTrap(NXCPMessage *pMsg)
1180 {
1181 }
1182
1183 /**
1184 * Callback for processing incoming syslog message on separate thread
1185 */
1186 void AgentConnection::onSyslogMessageCallback(NXCPMessage *msg)
1187 {
1188 onSyslogMessage(msg);
1189 delete msg;
1190 decInternalRefCount();
1191 }
1192
1193 /**
1194 * Syslog message handler. Should be overriden in derived classes to implement
1195 * actual message processing. Default implementation do nothing.
1196 */
1197 void AgentConnection::onSyslogMessage(NXCPMessage *pMsg)
1198 {
1199 }
1200
1201 /**
1202 * Callback for processing data push on separate thread
1203 */
1204 void AgentConnection::onDataPushCallback(NXCPMessage *msg)
1205 {
1206 onDataPush(msg);
1207 delete msg;
1208 decInternalRefCount();
1209 }
1210
1211 /**
1212 * Data push handler. Should be overriden in derived classes to implement
1213 * actual data push processing. Default implementation do nothing.
1214 */
1215 void AgentConnection::onDataPush(NXCPMessage *pMsg)
1216 {
1217 }
1218
1219 /**
1220 * Monitoring data handler. Should be overriden in derived classes to implement
1221 * actual monitoring data processing. Default implementation do nothing.
1222 */
1223 void AgentConnection::onFileMonitoringData(NXCPMessage *pMsg)
1224 {
1225 }
1226
1227 /**
1228 * Callback for processing data push on separate thread
1229 */
1230 void AgentConnection::onSnmpTrapCallback(NXCPMessage *msg)
1231 {
1232 onSnmpTrap(msg);
1233 delete msg;
1234 decInternalRefCount();
1235 }
1236
1237 /**
1238 * SNMP trap handler. Should be overriden in derived classes to implement
1239 * actual SNMP trap processing. Default implementation do nothing.
1240 */
1241 void AgentConnection::onSnmpTrap(NXCPMessage *pMsg)
1242 {
1243 }
1244
1245 /**
1246 * Custom message handler
1247 * If returns true, message considered as processed and will not be placed in wait queue
1248 */
1249 bool AgentConnection::processCustomMessage(NXCPMessage *pMsg)
1250 {
1251 return false;
1252 }
1253
1254 /**
1255 * Get list of values
1256 */
1257 UINT32 AgentConnection::getList(const TCHAR *pszParam)
1258 {
1259 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1260 UINT32 i, dwRqId, dwRetCode;
1261
1262 if (m_isConnected)
1263 {
1264 destroyResultData();
1265 dwRqId = generateRequestId();
1266 msg.setCode(CMD_GET_LIST);
1267 msg.setId(dwRqId);
1268 msg.setField(VID_PARAMETER, pszParam);
1269 if (sendMessage(&msg))
1270 {
1271 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1272 if (pResponse != NULL)
1273 {
1274 dwRetCode = pResponse->getFieldAsUInt32(VID_RCC);
1275 if (dwRetCode == ERR_SUCCESS)
1276 {
1277 m_dwNumDataLines = pResponse->getFieldAsUInt32(VID_NUM_STRINGS);
1278 m_ppDataLines = (TCHAR **)malloc(sizeof(TCHAR *) * m_dwNumDataLines);
1279 for(i = 0; i < m_dwNumDataLines; i++)
1280 m_ppDataLines[i] = pResponse->getFieldAsString(VID_ENUM_VALUE_BASE + i);
1281 }
1282 delete pResponse;
1283 }
1284 else
1285 {
1286 dwRetCode = ERR_REQUEST_TIMEOUT;
1287 }
1288 }
1289 else
1290 {
1291 dwRetCode = ERR_CONNECTION_BROKEN;
1292 }
1293 }
1294 else
1295 {
1296 dwRetCode = ERR_NOT_CONNECTED;
1297 }
1298
1299 return dwRetCode;
1300 }
1301
1302 /**
1303 * Get table
1304 */
1305 UINT32 AgentConnection::getTable(const TCHAR *pszParam, Table **table)
1306 {
1307 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1308 UINT32 dwRqId, dwRetCode;
1309
1310 *table = NULL;
1311 if (m_isConnected)
1312 {
1313 dwRqId = generateRequestId();
1314 msg.setCode(CMD_GET_TABLE);
1315 msg.setId(dwRqId);
1316 msg.setField(VID_PARAMETER, pszParam);
1317 if (sendMessage(&msg))
1318 {
1319 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1320 if (pResponse != NULL)
1321 {
1322 dwRetCode = pResponse->getFieldAsUInt32(VID_RCC);
1323 if (dwRetCode == ERR_SUCCESS)
1324 {
1325 *table = new Table(pResponse);
1326 }
1327 delete pResponse;
1328 }
1329 else
1330 {
1331 dwRetCode = ERR_REQUEST_TIMEOUT;
1332 }
1333 }
1334 else
1335 {
1336 dwRetCode = ERR_CONNECTION_BROKEN;
1337 }
1338 }
1339 else
1340 {
1341 dwRetCode = ERR_NOT_CONNECTED;
1342 }
1343
1344 return dwRetCode;
1345 }
1346
1347 /**
1348 * Authenticate to agent
1349 */
1350 UINT32 AgentConnection::authenticate(BOOL bProxyData)
1351 {
1352 NXCPMessage msg(m_nProtocolVersion);
1353 UINT32 dwRqId;
1354 BYTE hash[32];
1355 int iAuthMethod = bProxyData ? m_iProxyAuth : m_iAuthMethod;
1356 const char *pszSecret = bProxyData ? m_szProxySecret : m_szSecret;
1357 #ifdef UNICODE
1358 WCHAR szBuffer[MAX_SECRET_LENGTH];
1359 #endif
1360
1361 if (iAuthMethod == AUTH_NONE)
1362 return ERR_SUCCESS; // No authentication required
1363
1364 dwRqId = generateRequestId();
1365 msg.setCode(CMD_AUTHENTICATE);
1366 msg.setId(dwRqId);
1367 msg.setField(VID_AUTH_METHOD, (WORD)iAuthMethod);
1368 switch(iAuthMethod)
1369 {
1370 case AUTH_PLAINTEXT:
1371 #ifdef UNICODE
1372 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, pszSecret, -1, szBuffer, MAX_SECRET_LENGTH);
1373 msg.setField(VID_SHARED_SECRET, szBuffer);
1374 #else
1375 msg.setField(VID_SHARED_SECRET, pszSecret);
1376 #endif
1377 break;
1378 case AUTH_MD5_HASH:
1379 CalculateMD5Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1380 msg.setField(VID_SHARED_SECRET, hash, MD5_DIGEST_SIZE);
1381 break;
1382 case AUTH_SHA1_HASH:
1383 CalculateSHA1Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1384 msg.setField(VID_SHARED_SECRET, hash, SHA1_DIGEST_SIZE);
1385 break;
1386 default:
1387 break;
1388 }
1389 if (sendMessage(&msg))
1390 return waitForRCC(dwRqId, m_dwCommandTimeout);
1391 else
1392 return ERR_CONNECTION_BROKEN;
1393 }
1394
1395 /**
1396 * Execute action on agent
1397 */
1398 UINT32 AgentConnection::execAction(const TCHAR *action, const StringList &list,
1399 bool withOutput, void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *), void *cbData)
1400 {
1401 NXCPMessage msg(m_nProtocolVersion);
1402 UINT32 dwRqId;
1403 int i;
1404
1405 if (!m_isConnected)
1406 return ERR_NOT_CONNECTED;
1407
1408 dwRqId = generateRequestId();
1409 msg.setCode(CMD_ACTION);
1410 msg.setId(dwRqId);
1411 msg.setField(VID_ACTION_NAME, action);
1412 msg.setField(VID_RECEIVE_OUTPUT, (UINT16)(withOutput ? 1 : 0));
1413 msg.setField(VID_NUM_ARGS, (UINT32)list.size());
1414 for(i = 0; i < list.size(); i++)
1415 msg.setField(VID_ACTION_ARG_BASE + i, list.get(i));
1416
1417 if (sendMessage(&msg))
1418 {
1419 if (withOutput)
1420 {
1421 UINT32 rcc = waitForRCC(dwRqId, m_dwCommandTimeout);
1422 if (rcc == ERR_SUCCESS)
1423 {
1424 outputCallback(ACE_CONNECTED, NULL, cbData); // Indicate successful start
1425 bool eos = false;
1426 while(!eos)
1427 {
1428 NXCPMessage *response = waitForMessage(CMD_COMMAND_OUTPUT, dwRqId, m_dwCommandTimeout * 10);
1429 if (response != NULL)
1430 {
1431 eos = response->isEndOfSequence();
1432 if (response->isFieldExist(VID_MESSAGE))
1433 {
1434 TCHAR line[4096];
1435 response->getFieldAsString(VID_MESSAGE, line, 4096);
1436 outputCallback(ACE_DATA, line, cbData);
1437 }
1438 delete response;
1439 }
1440 else
1441 {
1442 return ERR_REQUEST_TIMEOUT;
1443 }
1444 }
1445 outputCallback(ACE_DISCONNECTED, NULL, cbData);
1446 return ERR_SUCCESS;
1447 }
1448 else
1449 {
1450 return rcc;
1451 }
1452 }
1453 else
1454 {
1455 return waitForRCC(dwRqId, m_dwCommandTimeout);
1456 }
1457 }
1458 else
1459 {
1460 return ERR_CONNECTION_BROKEN;
1461 }
1462 }
1463
1464 /**
1465 * Upload file to agent
1466 */
1467 UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinationFile, void (* progressCallback)(INT64, void *), void *cbArg, NXCPStreamCompressionMethod compMethod)
1468 {
1469 UINT32 dwRqId, dwResult;
1470 NXCPMessage msg(m_nProtocolVersion);
1471
1472 // Disable compression if it is disabled on connection level or if agent do not support it
1473 if (!m_allowCompression || (m_nProtocolVersion < 4))
1474 compMethod = NXCP_STREAM_COMPRESSION_NONE;
1475
1476 if (!m_isConnected)
1477 return ERR_NOT_CONNECTED;
1478
1479 dwRqId = generateRequestId();
1480 msg.setId(dwRqId);
1481
1482 time_t lastModTime = 0;
1483 NX_STAT_STRUCT st;
1484 if (CALL_STAT(localFile, &st) == 0)
1485 {
1486 lastModTime = st.st_mtime;
1487 }
1488
1489 // Use core agent if destination file name is not set and file manager subagent otherwise
1490 if ((destinationFile == NULL) || (*destinationFile == 0))
1491 {
1492 msg.setCode(CMD_TRANSFER_FILE);
1493 int i;
1494 for(i = (int)_tcslen(localFile) - 1;
1495 (i >= 0) && (localFile[i] != '\\') && (localFile[i] != '/'); i--);
1496 msg.setField(VID_FILE_NAME, &localFile[i + 1]);
1497 }
1498 else
1499 {
1500 msg.setCode(CMD_FILEMGR_UPLOAD);
1501 msg.setField(VID_OVERVRITE, true);
1502 msg.setField(VID_FILE_NAME, destinationFile);
1503 }
1504 msg.setFieldFromTime(VID_MODIFICATION_TIME, lastModTime);
1505
1506 if (sendMessage(&msg))
1507 {
1508 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1509 }
1510 else
1511 {
1512 dwResult = ERR_CONNECTION_BROKEN;
1513 }
1514
1515 if (dwResult == ERR_SUCCESS)
1516 {
1517 AbstractCommChannel *channel = acquireChannel();
1518 if (channel != NULL)
1519 {
1520 debugPrintf(5, _T("Sending file \"%s\" to agent %s compression"),
1521 localFile, (compMethod == NXCP_STREAM_COMPRESSION_NONE) ? _T("without") : _T("with"));
1522 m_fileUploadInProgress = true;
1523 NXCPEncryptionContext *ctx = acquireEncryptionContext();
1524 if (SendFileOverNXCP(channel, dwRqId, localFile, ctx, 0, progressCallback, cbArg, m_mutexSocketWrite, compMethod))
1525 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1526 else
1527 dwResult = ERR_IO_FAILURE;
1528 m_fileUploadInProgress = false;
1529 if (ctx != NULL)
1530 ctx->decRefCount();
1531 channel->decRefCount();
1532 }
1533 else
1534 {
1535 dwResult = ERR_CONNECTION_BROKEN;
1536 }
1537 }
1538
1539 return dwResult;
1540 }
1541
1542 /**
1543 * Send upgrade command
1544 */
1545 UINT32 AgentConnection::startUpgrade(const TCHAR *pszPkgName)
1546 {
1547 UINT32 dwRqId, dwResult;
1548 NXCPMessage msg(m_nProtocolVersion);
1549 int i;
1550
1551 if (!m_isConnected)
1552 return ERR_NOT_CONNECTED;
1553
1554 dwRqId = generateRequestId();
1555
1556 msg.setCode(CMD_UPGRADE_AGENT);
1557 msg.setId(dwRqId);
1558 for(i = (int)_tcslen(pszPkgName) - 1;
1559 (i >= 0) && (pszPkgName[i] != '\\') && (pszPkgName[i] != '/'); i--);
1560 msg.setField(VID_FILE_NAME, &pszPkgName[i + 1]);
1561
1562 if (sendMessage(&msg))
1563 {
1564 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1565 }
1566 else
1567 {
1568 dwResult = ERR_CONNECTION_BROKEN;
1569 }
1570
1571 return dwResult;
1572 }
1573
1574 /**
1575 * Check status of network service via agent
1576 */
1577 UINT32 AgentConnection::checkNetworkService(UINT32 *pdwStatus, const InetAddress& addr, int iServiceType,
1578 WORD wPort, WORD wProto, const TCHAR *pszRequest,
1579 const TCHAR *pszResponse, UINT32 *responseTime)
1580 {
1581 UINT32 dwRqId, dwResult;
1582 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1583 static WORD m_wDefaultPort[] = { 7, 22, 110, 25, 21, 80, 443, 23 };
1584
1585 if (!m_isConnected)
1586 return ERR_NOT_CONNECTED;
1587
1588 dwRqId = generateRequestId();
1589
1590 msg.setCode(CMD_CHECK_NETWORK_SERVICE);
1591 msg.setId(dwRqId);
1592 msg.setField(VID_IP_ADDRESS, addr);
1593 msg.setField(VID_SERVICE_TYPE, (WORD)iServiceType);
1594 msg.setField(VID_IP_PORT,
1595 (wPort != 0) ? wPort :
1596 m_wDefaultPort[((iServiceType >= NETSRV_CUSTOM) &&
1597 (iServiceType <= NETSRV_TELNET)) ? iServiceType : 0]);
1598 msg.setField(VID_IP_PROTO, (wProto != 0) ? wProto : (WORD)IPPROTO_TCP);
1599 msg.setField(VID_SERVICE_REQUEST, pszRequest);
1600 msg.setField(VID_SERVICE_RESPONSE, pszResponse);
1601
1602 if (sendMessage(&msg))
1603 {
1604 // Wait up to 90 seconds for results
1605 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, 90000);
1606 if (pResponse != NULL)
1607 {
1608 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1609 if (dwResult == ERR_SUCCESS)
1610 {
1611 *pdwStatus = pResponse->getFieldAsUInt32(VID_SERVICE_STATUS);
1612 if (responseTime != NULL)
1613 {
1614 *responseTime = pResponse->getFieldAsUInt32(VID_RESPONSE_TIME);
1615 }
1616 }
1617 delete pResponse;
1618 }
1619 else
1620 {
1621 dwResult = ERR_REQUEST_TIMEOUT;
1622 }
1623 }
1624 else
1625 {
1626 dwResult = ERR_CONNECTION_BROKEN;
1627 }
1628
1629 return dwResult;
1630 }
1631
1632 /**
1633 * Get list of supported parameters from agent
1634 */
1635 UINT32 AgentConnection::getSupportedParameters(ObjectArray<AgentParameterDefinition> **paramList, ObjectArray<AgentTableDefinition> **tableList)
1636 {
1637 UINT32 dwRqId, dwResult;
1638 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1639
1640 *paramList = NULL;
1641 *tableList = NULL;
1642
1643 if (!m_isConnected)
1644 return ERR_NOT_CONNECTED;
1645
1646 dwRqId = generateRequestId();
1647
1648 msg.setCode(CMD_GET_PARAMETER_LIST);
1649 msg.setId(dwRqId);
1650
1651 if (sendMessage(&msg))
1652 {
1653 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1654 if (pResponse != NULL)
1655 {
1656 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1657 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): RCC=%d"), dwResult);
1658 if (dwResult == ERR_SUCCESS)
1659 {
1660 UINT32 count = pResponse->getFieldAsUInt32(VID_NUM_PARAMETERS);
1661 ObjectArray<AgentParameterDefinition> *plist = new ObjectArray<AgentParameterDefinition>(count, 16, true);
1662 for(UINT32 i = 0, id = VID_PARAM_LIST_BASE; i < count; i++)
1663 {
1664 plist->add(new AgentParameterDefinition(pResponse, id));
1665 id += 3;
1666 }
1667 *paramList = plist;
1668 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d parameters received from agent"), count);
1669
1670 count = pResponse->getFieldAsUInt32(VID_NUM_TABLES);
1671 ObjectArray<AgentTableDefinition> *tlist = new ObjectArray<AgentTableDefinition>(count, 16, true);
1672 for(UINT32 i = 0, id = VID_TABLE_LIST_BASE; i < count; i++)
1673 {
1674 tlist->add(new AgentTableDefinition(pResponse, id));
1675 id += 3;
1676 }
1677 *tableList = tlist;
1678 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d tables received from agent"), count);
1679 }
1680 delete pResponse;
1681 }
1682 else
1683 {
1684 dwResult = ERR_REQUEST_TIMEOUT;
1685 }
1686 }
1687 else
1688 {
1689 dwResult = ERR_CONNECTION_BROKEN;
1690 }
1691
1692 return dwResult;
1693 }
1694
1695 /**
1696 * Setup encryption
1697 */
1698 UINT32 AgentConnection::setupEncryption(RSA *pServerKey)
1699 {
1700 #ifdef _WITH_ENCRYPTION
1701 NXCPMessage msg(m_nProtocolVersion), *pResp;
1702 UINT32 dwRqId, dwError, dwResult;
1703
1704 dwRqId = generateRequestId();
1705
1706 PrepareKeyRequestMsg(&msg, pServerKey, false);
1707 msg.setId(dwRqId);
1708 if (sendMessage(&msg))
1709 {
1710 pResp = waitForMessage(CMD_SESSION_KEY, dwRqId, m_dwCommandTimeout);
1711 if (pResp != NULL)
1712 {
1713 dwResult = SetupEncryptionContext(pResp, &m_pCtx, NULL, pServerKey, m_nProtocolVersion);
1714 switch(dwResult)
1715 {
1716 case RCC_SUCCESS:
1717 dwError = ERR_SUCCESS;
1718 break;
1719 case RCC_NO_CIPHERS:
1720 dwError = ERR_NO_CIPHERS;
1721 break;
1722 case RCC_INVALID_PUBLIC_KEY:
1723 dwError = ERR_INVALID_PUBLIC_KEY;
1724 break;
1725 case RCC_INVALID_SESSION_KEY:
1726 dwError = ERR_INVALID_SESSION_KEY;
1727 break;
1728 default:
1729 dwError = ERR_INTERNAL_ERROR;
1730 break;
1731 }
1732 delete pResp;
1733 }
1734 else
1735 {
1736 dwError = ERR_REQUEST_TIMEOUT;
1737 }
1738 }
1739 else
1740 {
1741 dwError = ERR_CONNECTION_BROKEN;
1742 }
1743
1744 return dwError;
1745 #else
1746 return ERR_NOT_IMPLEMENTED;
1747 #endif
1748 }
1749
1750 /**
1751 * Get configuration file from agent
1752 */
1753 UINT32 AgentConnection::getConfigFile(TCHAR **ppszConfig, UINT32 *pdwSize)
1754 {
1755 *ppszConfig = NULL;
1756 *pdwSize = 0;
1757
1758 if (!m_isConnected)
1759 return ERR_NOT_CONNECTED;
1760
1761 UINT32 dwResult;
1762 UINT32 dwRqId = generateRequestId();
1763
1764 NXCPMessage msg(m_nProtocolVersion);
1765 msg.setCode(CMD_GET_AGENT_CONFIG);
1766 msg.setId(dwRqId);
1767
1768 if (sendMessage(&msg))
1769 {
1770 NXCPMessage *pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1771 if (pResponse != NULL)
1772 {
1773 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1774 if (dwResult == ERR_SUCCESS)
1775 {
1776 size_t size = pResponse->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
1777 BYTE *utf8Text = (BYTE *)malloc(size + 1);
1778 pResponse->getFieldAsBinary(VID_CONFIG_FILE, (BYTE *)utf8Text, size);
1779
1780 // We expect text file, so replace all non-printable characters with spaces
1781 for(size_t i = 0; i < size; i++)
1782 if ((utf8Text[i] < ' ') &&
1783 (utf8Text[i] != '\t') &&
1784 (utf8Text[i] != '\r') &&
1785 (utf8Text[i] != '\n'))
1786 utf8Text[i] = ' ';
1787 utf8Text[size] = 0;
1788
1789 #ifdef UNICODE
1790 *ppszConfig = WideStringFromUTF8String((char *)utf8Text);
1791 #else
1792 *ppszConfig = MBStringFromUTF8String((char *)utf8Text);
1793 #endif
1794 free(utf8Text);
1795 *pdwSize = (UINT32)_tcslen(*ppszConfig);
1796 }
1797 delete pResponse;
1798 }
1799 else
1800 {
1801 dwResult = ERR_REQUEST_TIMEOUT;
1802 }
1803 }
1804 else
1805 {
1806 dwResult = ERR_CONNECTION_BROKEN;
1807 }
1808
1809 return dwResult;
1810 }
1811
1812 /**
1813 * Update configuration file on agent
1814 */
1815 UINT32 AgentConnection::updateConfigFile(const TCHAR *pszConfig)
1816 {
1817 UINT32 dwRqId, dwResult;
1818 NXCPMessage msg(m_nProtocolVersion);
1819 #ifdef UNICODE
1820 int nChars;
1821 BYTE *pBuffer;
1822 #endif
1823
1824 if (!m_isConnected)
1825 return ERR_NOT_CONNECTED;
1826
1827 dwRqId = generateRequestId();
1828
1829 msg.setCode(CMD_UPDATE_AGENT_CONFIG);
1830 msg.setId(dwRqId);
1831 #ifdef UNICODE
1832 nChars = (int)_tcslen(pszConfig);
1833 pBuffer = (BYTE *)malloc(nChars + 1);
1834 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1835 pszConfig, nChars, (char *)pBuffer, nChars + 1, NULL, NULL);
1836 msg.setField(VID_CONFIG_FILE, pBuffer, nChars);
1837 free(pBuffer);
1838 #else
1839 msg.setField(VID_CONFIG_FILE, (BYTE *)pszConfig, (UINT32)strlen(pszConfig));
1840 #endif
1841
1842 if (sendMessage(&msg))
1843 {
1844 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1845 }
1846 else
1847 {
1848 dwResult = ERR_CONNECTION_BROKEN;
1849 }
1850
1851 return dwResult;
1852 }
1853
1854 /**
1855 * Get routing table from agent
1856 */
1857 ROUTING_TABLE *AgentConnection::getRoutingTable()
1858 {
1859 ROUTING_TABLE *pRT = NULL;
1860 UINT32 i, dwBits;
1861 TCHAR *pChar, *pBuf;
1862
1863 if (getList(_T("Net.IP.RoutingTable")) == ERR_SUCCESS)
1864 {
1865 pRT = (ROUTING_TABLE *)malloc(sizeof(ROUTING_TABLE));
1866 pRT->iNumEntries = m_dwNumDataLines;
1867 pRT->pRoutes = (ROUTE *)malloc(sizeof(ROUTE) * m_dwNumDataLines);
1868 memset(pRT->pRoutes, 0, sizeof(ROUTE) * m_dwNumDataLines);
1869 for(i = 0; i < m_dwNumDataLines; i++)
1870 {
1871 pBuf = m_ppDataLines[i];
1872
1873 // Destination address and mask
1874 pChar = _tcschr(pBuf, _T(' '));
1875 if (pChar != NULL)
1876 {
1877 TCHAR *pSlash;
1878 static TCHAR defaultMask[] = _T("24");
1879
1880 *pChar = 0;
1881 pSlash = _tcschr(pBuf, _T('/'));
1882 if (pSlash != NULL)
1883 {
1884 *pSlash = 0;
1885 pSlash++;
1886 }
1887 else // Just a paranoia protection, should'n happen if agent working correctly
1888 {
1889 pSlash = defaultMask;
1890 }
1891 pRT->pRoutes[i].dwDestAddr = ntohl(_t_inet_addr(pBuf));
1892 dwBits = _tcstoul(pSlash, NULL, 10);
1893 pRT->pRoutes[i].dwDestMask = (dwBits == 32) ? 0xFFFFFFFF : (~(0xFFFFFFFF >> dwBits));
1894 pBuf = pChar + 1;
1895 }
1896
1897 // Next hop address
1898 pChar = _tcschr(pBuf, _T(' '));
1899 if (pChar != NULL)
1900 {
1901 *pChar = 0;
1902 pRT->pRoutes[i].dwNextHop = ntohl(_t_inet_addr(pBuf));
1903 pBuf = pChar + 1;
1904 }
1905
1906 // Interface index
1907 pChar = _tcschr(pBuf, ' ');
1908 if (pChar != NULL)
1909 {
1910 *pChar = 0;
1911 pRT->pRoutes[i].dwIfIndex = _tcstoul(pBuf, NULL, 10);
1912 pBuf = pChar + 1;
1913 }
1914
1915 // Route type
1916 pRT->pRoutes[i].dwRouteType = _tcstoul(pBuf, NULL, 10);
1917 }
1918
1919 lock();
1920 destroyResultData();
1921 unlock();
1922 }
1923
1924 return pRT;
1925 }
1926
1927 /**
1928 * Set proxy information
1929 */
1930 void AgentConnection::setProxy(InetAddress addr, WORD wPort, int iAuthMethod, const TCHAR *pszSecret)
1931 {
1932 m_proxyAddr = addr;
1933 m_wProxyPort = wPort;
1934 m_iProxyAuth = iAuthMethod;
1935 if (pszSecret != NULL)
1936 {
1937 #ifdef UNICODE
1938 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1939 pszSecret, -1, m_szProxySecret, MAX_SECRET_LENGTH, NULL, NULL);
1940 #else
1941 nx_strncpy(m_szProxySecret, pszSecret, MAX_SECRET_LENGTH);
1942 #endif
1943 }
1944 else
1945 {
1946 m_szProxySecret[0] = 0;
1947 }
1948 m_useProxy = true;
1949 }
1950
1951 /**
1952 * Setup proxy connection
1953 */
1954 UINT32 AgentConnection::setupProxyConnection()
1955 {
1956 NXCPMessage msg(m_nProtocolVersion);
1957 UINT32 dwRqId;
1958
1959 dwRqId = generateRequestId();
1960 msg.setCode(CMD_SETUP_PROXY_CONNECTION);
1961 msg.setId(dwRqId);
1962 msg.setField(VID_IP_ADDRESS, m_addr.getAddressV4()); // FIXME: V6 support in proxy
1963 msg.setField(VID_AGENT_PORT, m_wPort);
1964 if (sendMessage(&msg))
1965 return waitForRCC(dwRqId, 60000); // Wait 60 seconds for remote connect
1966 else
1967 return ERR_CONNECTION_BROKEN;
1968 }
1969
1970 /**
1971 * Enable trap receiving on connection
1972 */
1973 UINT32 AgentConnection::enableTraps()
1974 {
1975 NXCPMessage msg(m_nProtocolVersion);
1976 UINT32 dwRqId;
1977
1978 dwRqId = generateRequestId();
1979 msg.setCode(CMD_ENABLE_AGENT_TRAPS);
1980 msg.setId(dwRqId);
1981 if (sendMessage(&msg))
1982 return waitForRCC(dwRqId, m_dwCommandTimeout);
1983 else
1984 return ERR_CONNECTION_BROKEN;
1985 }
1986
1987 /**
1988 * Enable trap receiving on connection
1989 */
1990 UINT32 AgentConnection::enableFileUpdates()
1991 {
1992 NXCPMessage msg(m_nProtocolVersion);
1993 UINT32 dwRqId;
1994
1995 dwRqId = generateRequestId();
1996 msg.setCode(CMD_ENABLE_FILE_UPDATES);
1997 msg.setId(dwRqId);
1998 if (sendMessage(&msg))
1999 {
2000 return waitForRCC(dwRqId, m_dwCommandTimeout);
2001 }
2002 else
2003 return ERR_CONNECTION_BROKEN;
2004 }
2005
2006 /**
2007 * Take screenshot from remote system
2008 */
2009 UINT32 AgentConnection::takeScreenshot(const TCHAR *sessionName, BYTE **data, size_t *size)
2010 {
2011 NXCPMessage msg(m_nProtocolVersion);
2012 UINT32 dwRqId;
2013
2014 dwRqId = generateRequestId();
2015 msg.setCode(CMD_TAKE_SCREENSHOT);
2016 msg.setId(dwRqId);
2017 msg.setField(VID_NAME, sessionName);
2018 if (sendMessage(&msg))
2019 {
2020 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2021 if (response != NULL)
2022 {
2023 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
2024 if (rcc == ERR_SUCCESS)
2025 {
2026 const BYTE *p = response->getBinaryFieldPtr(VID_FILE_DATA, size);
2027 if (p != NULL)
2028 {
2029 *data = (BYTE *)malloc(*size);
2030 memcpy(*data, p, *size);
2031 }
2032 else
2033 {
2034 *data = NULL;
2035 }
2036 }
2037 delete response;
2038 return rcc;
2039 }
2040 else
2041 {
2042 return ERR_REQUEST_TIMEOUT;
2043 }
2044 }
2045 else
2046 {
2047 return ERR_CONNECTION_BROKEN;
2048 }
2049 }
2050
2051 /**
2052 * Resolve hostname by IP address in local network
2053 */
2054 TCHAR *AgentConnection::getHostByAddr(const InetAddress& ipAddr, TCHAR *buf, size_t bufLen)
2055 {
2056 NXCPMessage msg(m_nProtocolVersion);
2057 UINT32 dwRqId;
2058
2059 dwRqId = generateRequestId();
2060 msg.setCode(CMD_HOST_BY_IP);
2061 msg.setId(dwRqId);
2062 msg.setField(VID_IP_ADDRESS, ipAddr);
2063 TCHAR *result = NULL;
2064 if (sendMessage(&msg))
2065 {
2066 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2067 if (response != NULL)
2068 {
2069 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
2070 if (rcc == ERR_SUCCESS)
2071 {
2072 result = response->getFieldAsString(VID_NAME, buf, bufLen);
2073 }
2074 delete response;
2075 return result;
2076 }
2077 else
2078 {
2079 return result;
2080 }
2081 }
2082 else
2083 {
2084 return result;
2085 }
2086 }
2087
2088 /**
2089 * Send custom request to agent
2090 */
2091 NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *recvFile, bool append,
2092 void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2093 {
2094 UINT32 dwRqId, rcc;
2095 NXCPMessage *msg = NULL;
2096
2097 dwRqId = generateRequestId();
2098 pRequest->setId(dwRqId);
2099 if (recvFile != NULL)
2100 {
2101 rcc = prepareFileDownload(recvFile, dwRqId, append, downloadProgressCallback, fileResendCallback,cbArg);
2102 if (rcc != ERR_SUCCESS)
2103 {
2104 // Create fake response message
2105 msg = new NXCPMessage;
2106 msg->setCode(CMD_REQUEST_COMPLETED);
2107 msg->setId(dwRqId);
2108 msg->setField(VID_RCC, rcc);
2109 }
2110 }
2111
2112 if (msg == NULL)
2113 {
2114 if (sendMessage(pRequest))
2115 {
2116 msg = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2117 if ((msg != NULL) && (recvFile != NULL))
2118 {
2119 if (msg->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
2120 {
2121 if (ConditionWait(m_condFileDownload, 1800000)) // 30 min timeout
2122 {
2123 if (!m_fileDownloadSucceeded)
2124 {
2125 msg->setField(VID_RCC, ERR_IO_FAILURE);
2126 if (m_deleteFileOnDownloadFailure)
2127 _tremove(recvFile);
2128 }
2129 }
2130 else
2131 {
2132 msg->setField(VID_RCC, ERR_REQUEST_TIMEOUT);
2133 }
2134 }
2135 else
2136 {
2137 if (fileResendCallback != NULL)
2138 {
2139 _close(m_hCurrFile);
2140 m_hCurrFile = -1;
2141 _tremove(recvFile);
2142 }
2143 }
2144 }
2145
2146 }
2147 }
2148
2149 return msg;
2150 }
2151
2152 /**
2153 * Prepare for file download
2154 */
2155 UINT32 AgentConnection::prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append, void (*downloadProgressCallback)(size_t, void *),
2156 void (* fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2157 {
2158 if (fileResendCallback == NULL)
2159 {
2160 if (m_hCurrFile != -1)
2161 return ERR_RESOURCE_BUSY;
2162
2163 nx_strncpy(m_currentFileName, fileName, MAX_PATH);
2164 ConditionReset(m_condFileDownload);
2165 m_hCurrFile = _topen(fileName, (append ? 0 : (O_CREAT | O_TRUNC)) | O_RDWR | O_BINARY, S_IREAD | S_IWRITE);
2166 if (m_hCurrFile == -1)
2167 {
2168 DbgPrintf(4, _T("AgentConnection::PrepareFileDownload(): cannot open file %s (%s); append=%d rqId=%d"),
2169 fileName, _tcserror(errno), append, rqId);
2170 }
2171 else
2172 {
2173 if (append)
2174 lseek(m_hCurrFile, 0, SEEK_END);
2175 }
2176
2177 m_dwDownloadRequestId = rqId;
2178 m_downloadProgressCallback = downloadProgressCallback;
2179 m_downloadProgressCallbackArg = cbArg;
2180
2181 m_sendToClientMessageCallback = NULL;
2182
2183 return (m_hCurrFile != -1) ? ERR_SUCCESS : ERR_FILE_OPEN_ERROR;
2184 }
2185 else
2186 {
2187 ConditionReset(m_condFileDownload);
2188
2189 m_dwDownloadRequestId = rqId;
2190 m_downloadProgressCallback = downloadProgressCallback;
2191 m_downloadProgressCallbackArg = cbArg;
2192
2193 m_sendToClientMessageCallback = fileResendCallback;
2194
2195 return ERR_SUCCESS;
2196 }
2197 }
2198
2199 /**
2200 * File upload completion handler
2201 */
2202 void AgentConnection::onFileDownload(bool success)
2203 {
2204 if (!success && m_deleteFileOnDownloadFailure)
2205 _tremove(m_currentFileName);
2206 m_fileDownloadSucceeded = success;
2207 ConditionSet(m_condFileDownload);
2208 }
2209
2210 /**
2211 * Enable trap receiving on connection
2212 */
2213 UINT32 AgentConnection::getPolicyInventory(AgentPolicyInfo **info)
2214 {
2215 NXCPMessage msg(m_nProtocolVersion);
2216 UINT32 dwRqId, rcc;
2217
2218 *info = NULL;
2219 dwRqId = generateRequestId();
2220 msg.setCode(CMD_GET_POLICY_INVENTORY);
2221 msg.setId(dwRqId);
2222 if (sendMessage(&msg))
2223 {
2224 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2225 if (response != NULL)
2226 {
2227 rcc = response->getFieldAsUInt32(VID_RCC);
2228 if (rcc == ERR_SUCCESS)
2229 *info = new AgentPolicyInfo(response);
2230 delete response;
2231 }
2232 else
2233 {
2234 rcc = ERR_REQUEST_TIMEOUT;
2235 }
2236 }
2237 else
2238 {
2239 rcc = ERR_CONNECTION_BROKEN;
2240 }
2241 return rcc;
2242 }
2243
2244 /**
2245 * Uninstall policy by GUID
2246 */
2247 UINT32 AgentConnection::uninstallPolicy(const uuid& guid)
2248 {
2249 UINT32 rqId, rcc;
2250 NXCPMessage msg(m_nProtocolVersion);
2251
2252 rqId = generateRequestId();
2253 msg.setId(rqId);
2254 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
2255 msg.setField(VID_GUID, guid);
2256 if (sendMessage(&msg))
2257 {
2258 rcc = waitForRCC(rqId, m_dwCommandTimeout);
2259 }
2260 else
2261 {
2262 rcc = ERR_CONNECTION_BROKEN;
2263 }
2264 return rcc;
2265 }
2266
2267 /**
2268 * Acquire encryption context
2269 */
2270 NXCPEncryptionContext *AgentConnection::acquireEncryptionContext()
2271 {
2272 lock();
2273 NXCPEncryptionContext *ctx = m_pCtx;
2274 if (ctx != NULL)
2275 ctx->incRefCount();
2276 unlock();
2277 return ctx;
2278 }
2279
2280 /**
2281 * Callback for processing collected data on separate thread
2282 */
2283 void AgentConnection::processCollectedDataCallback(NXCPMessage *msg)
2284 {
2285 NXCPMessage response;
2286 response.setCode(CMD_REQUEST_COMPLETED);
2287 response.setId(msg->getId());
2288
2289 if (msg->getFieldAsBoolean(VID_BULK_RECONCILIATION))
2290 {
2291 UINT32 rcc = processBulkCollectedData(msg, &response);
2292 response.setField(VID_RCC, rcc);
2293 }
2294 else
2295 {
2296 UINT32 rcc = processCollectedData(msg);
2297 response.setField(VID_RCC, rcc);
2298 }
2299
2300 sendMessage(&response);
2301 delete msg;
2302 decInternalRefCount();
2303 }
2304
2305 /**
2306 * Process collected data information (for DCI with agent-side cache)
2307 */
2308 UINT32 AgentConnection::processCollectedData(NXCPMessage *msg)
2309 {
2310 return ERR_NOT_IMPLEMENTED;
2311 }
2312
2313 /**
2314 * Process collected data information in bulk mode (for DCI with agent-side cache)
2315 */
2316 UINT32 AgentConnection::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
2317 {
2318 return ERR_NOT_IMPLEMENTED;
2319 }
2320
2321 /**
2322 * Create new agent parameter definition from NXCP message
2323 */
2324 AgentParameterDefinition::AgentParameterDefinition(NXCPMessage *msg, UINT32 baseId)
2325 {
2326 m_name = msg->getFieldAsString(baseId);
2327 m_description = msg->getFieldAsString(baseId + 1);
2328 m_dataType = (int)msg->getFieldAsUInt16(baseId + 2);
2329 }
2330
2331 /**
2332 * Create new agent parameter definition from another definition object
2333 */
2334 AgentParameterDefinition::AgentParameterDefinition(AgentParameterDefinition *src)
2335 {
2336 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2337 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2338 m_dataType = src->m_dataType;
2339 }
2340
2341 /**
2342 * Destructor for agent parameter definition
2343 */
2344 AgentParameterDefinition::~AgentParameterDefinition()
2345 {
2346 safe_free(m_name);
2347 safe_free(m_description);
2348 }
2349
2350 /**
2351 * Fill NXCP message
2352 */
2353 UINT32 AgentParameterDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2354 {
2355 msg->setField(baseId, m_name);
2356 msg->setField(baseId + 1, m_description);
2357 msg->setField(baseId + 2, (WORD)m_dataType);
2358 return 3;
2359 }
2360
2361 /**
2362 * Create new agent table definition from NXCP message
2363 */
2364 AgentTableDefinition::AgentTableDefinition(NXCPMessage *msg, UINT32 baseId)
2365 {
2366 m_name = msg->getFieldAsString(baseId);
2367 m_description = msg->getFieldAsString(baseId + 2);
2368
2369 TCHAR *instanceColumns = msg->getFieldAsString(baseId + 1);
2370 if (instanceColumns != NULL)
2371 {
2372 m_instanceColumns = new StringList(instanceColumns, _T("|"));
2373 free(instanceColumns);
2374 }
2375 else
2376 {
2377 m_instanceColumns = new StringList;
2378 }
2379
2380 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2381 }
2382
2383 /**
2384 * Create new agent table definition from another definition object
2385 */
2386 AgentTableDefinition::AgentTableDefinition(AgentTableDefinition *src)
2387 {
2388 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2389 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2390 m_instanceColumns = new StringList(src->m_instanceColumns);
2391 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2392 for(int i = 0; i < src->m_columns->size(); i++)
2393 {
2394 m_columns->add(new AgentTableColumnDefinition(src->m_columns->get(i)));
2395 }
2396 }
2397 /**
2398 * Destructor for agent table definition
2399 */
2400 AgentTableDefinition::~AgentTableDefinition()
2401 {
2402 safe_free(m_name);
2403 safe_free(m_description);
2404 delete m_instanceColumns;
2405 delete m_columns;
2406 }
2407
2408 /**
2409 * Fill NXCP message
2410 */
2411 UINT32 AgentTableDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2412 {
2413 msg->setField(baseId + 1, m_name);
2414 msg->setField(baseId + 2, m_description);
2415
2416 TCHAR *instanceColumns = m_instanceColumns->join(_T("|"));
2417 msg->setField(baseId + 3, instanceColumns);
2418 free(instanceColumns);
2419
2420 UINT32 varId = baseId + 4;
2421 for(int i = 0; i < m_columns->size(); i++)
2422 {
2423 msg->setField(varId++, m_columns->get(i)->m_name);
2424 msg->setField(varId++, (WORD)m_columns->get(i)->m_dataType);
2425 }
2426
2427 msg->setField(baseId, varId - baseId);
2428 return varId - baseId;
2429 }