fixed incorrect sprintf format usage; fixed access to uninitialized memory in DB...
[public/netxms.git] / src / server / libnxsrv / agent.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Server Library
4 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: agent.cpp
21 **
22 **/
23
24 #include "libnxsrv.h"
25 #include <stdarg.h>
26 #include <nxstat.h>
27
28 #ifndef _WIN32
29 #define _tell(f) lseek(f,0,SEEK_CUR)
30 #endif
31
32 /**
33 * Constants
34 */
35 #define MAX_MSG_SIZE 268435456
36
37 /**
38 * Agent connection thread pool
39 */
40 ThreadPool LIBNXSRV_EXPORTABLE *g_agentConnectionThreadPool = NULL;
41
42 /**
43 * Unique connection ID
44 */
45 static VolatileCounter s_connectionId = 0;
46
47 /**
48 * Static data
49 */
50 #ifdef _WITH_ENCRYPTION
51 static int m_iDefaultEncryptionPolicy = ENCRYPTION_ALLOWED;
52 #else
53 static int m_iDefaultEncryptionPolicy = ENCRYPTION_DISABLED;
54 #endif
55
56 /**
57 * Set default encryption policy for agent communication
58 */
59 void LIBNXSRV_EXPORTABLE SetAgentDEP(int iPolicy)
60 {
61 #ifdef _WITH_ENCRYPTION
62 m_iDefaultEncryptionPolicy = iPolicy;
63 #endif
64 }
65
66 /**
67 * Receiver thread starter
68 */
69 THREAD_RESULT THREAD_CALL AgentConnection::receiverThreadStarter(void *pArg)
70 {
71 ThreadSetName("AgentReceiver");
72 ((AgentConnection *)pArg)->receiverThread();
73 ((AgentConnection *)pArg)->decInternalRefCount();
74 return THREAD_OK;
75 }
76
77 /**
78 * Constructor for AgentConnection
79 */
80 AgentConnection::AgentConnection(const InetAddress& addr, WORD port, int authMethod, const TCHAR *secret, bool allowCompression)
81 {
82 m_internalRefCount = 1;
83 m_userRefCount = 1;
84 _sntprintf(m_debugName, 32, _T("[AC-%d]"), InterlockedIncrement(&s_connectionId));
85 m_addr = addr;
86 m_wPort = port;
87 m_iAuthMethod = authMethod;
88 if (secret != NULL)
89 {
90 #ifdef UNICODE
91 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
92 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
93 #else
94 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
95 #endif
96 }
97 else
98 {
99 m_szSecret[0] = 0;
100 }
101 m_allowCompression = allowCompression;
102 m_channel = NULL;
103 m_tLastCommandTime = 0;
104 m_dwNumDataLines = 0;
105 m_ppDataLines = NULL;
106 m_pMsgWaitQueue = new MsgWaitQueue;
107 m_requestId = 0;
108 m_connectionTimeout = 5000; // 5 seconds
109 m_dwCommandTimeout = 5000; // Default timeout 5 seconds
110 m_isConnected = false;
111 m_mutexDataLock = MutexCreate();
112 m_mutexSocketWrite = MutexCreate();
113 m_hReceiverThread = INVALID_THREAD_HANDLE;
114 m_pCtx = NULL;
115 m_iEncryptionPolicy = m_iDefaultEncryptionPolicy;
116 m_useProxy = false;
117 m_iProxyAuth = AUTH_NONE;
118 m_wProxyPort = 4700;
119 m_dwRecvTimeout = 420000; // 7 minutes
120 m_nProtocolVersion = NXCP_VERSION;
121 m_hCurrFile = -1;
122 m_deleteFileOnDownloadFailure = true;
123 m_condFileDownload = ConditionCreate(TRUE);
124 m_fileDownloadSucceeded = false;
125 m_fileUploadInProgress = false;
126 m_sendToClientMessageCallback = NULL;
127 m_dwDownloadRequestId = 0;
128 m_downloadProgressCallback = NULL;
129 m_downloadProgressCallbackArg = NULL;
130 }
131
132 /**
133 * Destructor
134 */
135 AgentConnection::~AgentConnection()
136 {
137 debugPrintf(7, _T("AgentConnection destructor called (this=%p, thread=%p)"), this, (void *)m_hReceiverThread);
138
139 ThreadDetach(m_hReceiverThread);
140
141 lock();
142 destroyResultData();
143 unlock();
144
145 delete m_pMsgWaitQueue;
146 if (m_pCtx != NULL)
147 m_pCtx->decRefCount();
148
149 if (m_hCurrFile != -1)
150 {
151 _close(m_hCurrFile);
152 onFileDownload(false);
153 }
154 else if (m_sendToClientMessageCallback != NULL)
155 {
156 onFileDownload(false);
157 }
158
159 if (m_channel != NULL)
160 m_channel->decRefCount();
161
162 MutexDestroy(m_mutexDataLock);
163 MutexDestroy(m_mutexSocketWrite);
164 ConditionDestroy(m_condFileDownload);
165 }
166
167 /**
168 * Write debug output
169 */
170 void AgentConnection::debugPrintf(int level, const TCHAR *format, ...)
171 {
172 if (nxlog_get_debug_level() < level)
173 return;
174
175 va_list args;
176 va_start(args, format);
177
178 TCHAR buffer[8192];
179 _vsntprintf(buffer, 8192, format, args);
180 nxlog_debug(level, _T("%s %s"), m_debugName, buffer);
181
182 va_end(args);
183 }
184
185 /**
186 * Receiver thread
187 */
188 void AgentConnection::receiverThread()
189 {
190 AbstractCommChannel *channel = m_channel;
191 UINT32 msgBufferSize = 1024;
192
193 // Initialize raw message receiving function
194 NXCP_BUFFER *msgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
195 NXCPInitBuffer(msgBuffer);
196
197 // Allocate space for raw message
198 NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)malloc(msgBufferSize);
199 #ifdef _WITH_ENCRYPTION
200 BYTE *decryptionBuffer = (BYTE *)malloc(msgBufferSize);
201 #else
202 BYTE *decryptionBuffer = NULL;
203 #endif
204
205 while(true)
206 {
207 // Shrink buffer after receiving large message
208 if (msgBufferSize > 131072)
209 {
210 msgBufferSize = 131072;
211 rawMsg = (NXCP_MESSAGE *)realloc(rawMsg, msgBufferSize);
212 if (decryptionBuffer != NULL)
213 decryptionBuffer = (BYTE *)realloc(decryptionBuffer, msgBufferSize);
214 }
215
216 // Receive raw message
217 int rc = RecvNXCPMessageEx(channel, &rawMsg, msgBuffer, &msgBufferSize,
218 &m_pCtx, (decryptionBuffer != NULL) ? &decryptionBuffer : NULL,
219 m_dwRecvTimeout, MAX_MSG_SIZE);
220 if (rc <= 0)
221 {
222 if ((rc != 0) && (WSAGetLastError() != WSAESHUTDOWN))
223 debugPrintf(6, _T("AgentConnection::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), rc, WSAGetLastError());
224 else
225 debugPrintf(6, _T("AgentConnection::ReceiverThread(): communication channel shutdown"));
226 break;
227 }
228
229 // Check if we get too large message
230 if (rc == 1)
231 {
232 TCHAR buffer[64];
233 debugPrintf(6, _T("Received too large message %s (%d bytes)"), NXCPMessageCodeName(ntohs(rawMsg->code), buffer), ntohl(rawMsg->size));
234 continue;
235 }
236
237 // Check if we are unable to decrypt message
238 if (rc == 2)
239 {
240 debugPrintf(6, _T("Unable to decrypt received message"));
241 continue;
242 }
243
244 // Check for timeout
245 if (rc == 3)
246 {
247 if (m_fileUploadInProgress)
248 continue; // Receive timeout may occur when uploading large files via slow links
249 debugPrintf(6, _T("Timed out waiting for message"));
250 break;
251 }
252
253 // Check that actual received packet size is equal to encoded in packet
254 if ((int)ntohl(rawMsg->size) != rc)
255 {
256 debugPrintf(6, _T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(rawMsg->size), rc);
257 continue; // Bad packet, wait for next
258 }
259
260 if (ntohs(rawMsg->flags) & MF_BINARY)
261 {
262 // Convert message header to host format
263 rawMsg->id = ntohl(rawMsg->id);
264 rawMsg->code = ntohs(rawMsg->code);
265 rawMsg->numFields = ntohl(rawMsg->numFields);
266 if (nxlog_get_debug_level() >= 6)
267 {
268 TCHAR buffer[64];
269 debugPrintf(6, _T("Received raw message %s (%d) from agent at %s"),
270 NXCPMessageCodeName(rawMsg->code, buffer), rawMsg->id, (const TCHAR *)m_addr.toString());
271 }
272
273 if ((rawMsg->code == CMD_FILE_DATA) && (rawMsg->id == m_dwDownloadRequestId))
274 {
275 if (m_sendToClientMessageCallback != NULL)
276 {
277 rawMsg->code = ntohs(rawMsg->code);
278 rawMsg->numFields = ntohl(rawMsg->numFields);
279 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
280
281 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
282 {
283 m_sendToClientMessageCallback = NULL;
284 onFileDownload(true);
285 }
286 else
287 {
288 if (m_downloadProgressCallback != NULL)
289 {
290 m_downloadProgressCallback(rawMsg->size - (NXCP_HEADER_SIZE + 8), m_downloadProgressCallbackArg);
291 }
292 }
293 }
294 else
295 {
296 if (m_hCurrFile != -1)
297 {
298 if (_write(m_hCurrFile, rawMsg->fields, rawMsg->numFields) == (int)rawMsg->numFields)
299 {
300 if (ntohs(rawMsg->flags) & MF_END_OF_FILE)
301 {
302 _close(m_hCurrFile);
303 m_hCurrFile = -1;
304
305 onFileDownload(true);
306 }
307 else
308 {
309 if (m_downloadProgressCallback != NULL)
310 {
311 m_downloadProgressCallback(_tell(m_hCurrFile), m_downloadProgressCallbackArg);
312 }
313 }
314 }
315 }
316 else
317 {
318 // I/O error
319 _close(m_hCurrFile);
320 m_hCurrFile = -1;
321
322 onFileDownload(false);
323 }
324 }
325 }
326
327 if ((rawMsg->code == CMD_ABORT_FILE_TRANSFER) && (rawMsg->id == m_dwDownloadRequestId))
328 {
329 if (m_sendToClientMessageCallback != NULL)
330 {
331 rawMsg->code = ntohs(rawMsg->code);
332 rawMsg->numFields = ntohl(rawMsg->numFields);
333 m_sendToClientMessageCallback(rawMsg, m_downloadProgressCallbackArg);
334 m_sendToClientMessageCallback = NULL;
335
336 onFileDownload(false);
337 }
338 else
339 {
340 //error on agent side
341 _close(m_hCurrFile);
342 m_hCurrFile = -1;
343
344 onFileDownload(false);
345 }
346 }
347 }
348 else if (ntohs(rawMsg->flags) & MF_CONTROL)
349 {
350 // Convert message header to host format
351 rawMsg->id = ntohl(rawMsg->id);
352 rawMsg->code = ntohs(rawMsg->code);
353 rawMsg->flags = ntohs(rawMsg->flags);
354 rawMsg->numFields = ntohl(rawMsg->numFields);
355 if (nxlog_get_debug_level() >= 6)
356 {
357 TCHAR buffer[64];
358 debugPrintf(6, _T("Received control message %s from agent at %s"),
359 NXCPMessageCodeName(rawMsg->code, buffer), (const TCHAR *)m_addr.toString());
360 }
361 m_pMsgWaitQueue->put((NXCP_MESSAGE *)nx_memdup(rawMsg, ntohl(rawMsg->size)));
362 }
363 else
364 {
365 // Create message object from raw message
366 NXCPMessage *msg = NXCPMessage::deserialize(rawMsg, m_nProtocolVersion);
367 if (msg != NULL)
368 {
369 if (nxlog_get_debug_level() >= 6)
370 {
371 TCHAR buffer[64];
372 debugPrintf(6, _T("Received message %s (%d) from agent at %s"),
373 NXCPMessageCodeName(msg->getCode(), buffer), msg->getId(), (const TCHAR *)m_addr.toString());
374 }
375 switch(msg->getCode())
376 {
377 case CMD_REQUEST_COMPLETED:
378 case CMD_SESSION_KEY:
379 m_pMsgWaitQueue->put(msg);
380 break;
381 case CMD_TRAP:
382 if (g_agentConnectionThreadPool != NULL)
383 {
384 incInternalRefCount();
385 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onTrapCallback, msg);
386 }
387 else
388 {
389 delete msg;
390 }
391 break;
392 case CMD_SYSLOG_RECORDS:
393 if (g_agentConnectionThreadPool != NULL)
394 {
395 incInternalRefCount();
396 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSyslogMessageCallback, msg);
397 }
398 else
399 {
400 delete msg;
401 }
402 break;
403 case CMD_PUSH_DCI_DATA:
404 if (g_agentConnectionThreadPool != NULL)
405 {
406 incInternalRefCount();
407 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onDataPushCallback, msg);
408 }
409 else
410 {
411 delete msg;
412 }
413 break;
414 case CMD_DCI_DATA:
415 if (g_agentConnectionThreadPool != NULL)
416 {
417 incInternalRefCount();
418 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::processCollectedDataCallback, msg);
419 }
420 else
421 {
422 NXCPMessage response;
423 response.setCode(CMD_REQUEST_COMPLETED);
424 response.setId(msg->getId());
425 response.setField(VID_RCC, ERR_INTERNAL_ERROR);
426 sendMessage(&response);
427 delete msg;
428 }
429 break;
430 case CMD_FILE_MONITORING:
431 onFileMonitoringData(msg);
432 delete msg;
433 break;
434 case CMD_SNMP_TRAP:
435 if (g_agentConnectionThreadPool != NULL)
436 {
437 incInternalRefCount();
438 ThreadPoolExecute(g_agentConnectionThreadPool, this, &AgentConnection::onSnmpTrapCallback, msg);
439 }
440 else
441 {
442 delete msg;
443 }
444 break;
445 default:
446 if (processCustomMessage(msg))
447 delete msg;
448 else
449 m_pMsgWaitQueue->put(msg);
450 break;
451 }
452 }
453 else
454 {
455 debugPrintf(6, _T("RecvMsg: message deserialization error"));
456 }
457 }
458 }
459 debugPrintf(6, _T("Receiver loop terminated"));
460
461 // Close socket and mark connection as disconnected
462 lock();
463 if (m_hCurrFile != -1)
464 {
465 _close(m_hCurrFile);
466 m_hCurrFile = -1;
467 onFileDownload(false);
468 }
469 else if (m_sendToClientMessageCallback != NULL)
470 {
471 m_sendToClientMessageCallback = NULL;
472 onFileDownload(false);
473 }
474
475 debugPrintf(6, _T("Closing communication channel"));
476 channel->close();
477 channel->decRefCount();
478 if (m_pCtx != NULL)
479 {
480 m_pCtx->decRefCount();
481 m_pCtx = NULL;
482 }
483 m_isConnected = false;
484 unlock();
485
486 free(rawMsg);
487 free(msgBuffer);
488 #ifdef _WITH_ENCRYPTION
489 free(decryptionBuffer);
490 #endif
491
492 debugPrintf(6, _T("Receiver thread stopped"));
493 }
494
495 /**
496 * Create channel. Default implementation creates socket channel.
497 */
498 AbstractCommChannel *AgentConnection::createChannel()
499 {
500 // Create socket
501 SOCKET s = socket(m_useProxy ? m_proxyAddr.getFamily() : m_addr.getFamily(), SOCK_STREAM, 0);
502 if (s == INVALID_SOCKET)
503 {
504 debugPrintf(6, _T("Call to socket() failed"));
505 return NULL;
506 }
507
508 // Fill in address structure
509 SockAddrBuffer sb;
510 struct sockaddr *sa = m_useProxy ? m_proxyAddr.fillSockAddr(&sb, m_wProxyPort) : m_addr.fillSockAddr(&sb, m_wPort);
511
512 // Connect to server
513 if ((sa == NULL) || (ConnectEx(s, sa, SA_LEN(sa), m_connectionTimeout) == -1))
514 {
515 TCHAR buffer[64];
516 debugPrintf(5, _T("Cannot establish connection with agent at %s:%d"),
517 m_useProxy ? m_proxyAddr.toString(buffer) : m_addr.toString(buffer),
518 (int)(m_useProxy ? m_wProxyPort : m_wPort));
519 closesocket(s);
520 return NULL;
521 }
522
523 return new SocketCommChannel(s);
524 }
525
526 /**
527 * Acquire communication channel. Caller must call decRefCount to release channel.
528 */
529 AbstractCommChannel *AgentConnection::acquireChannel()
530 {
531 lock();
532 AbstractCommChannel *channel = m_channel;
533 if (channel != NULL)
534 channel->incRefCount();
535 unlock();
536 return channel;
537 }
538
539 /**
540 * Connect to agent
541 */
542 bool AgentConnection::connect(RSA *pServerKey, UINT32 *pdwError, UINT32 *pdwSocketError, UINT64 serverId)
543 {
544 TCHAR szBuffer[256];
545 bool success = false;
546 bool forceEncryption = false;
547 bool secondPass = false;
548 UINT32 dwError = 0;
549
550 if (pdwError != NULL)
551 *pdwError = ERR_INTERNAL_ERROR;
552
553 if (pdwSocketError != NULL)
554 *pdwSocketError = 0;
555
556 // Check if already connected
557 if (m_isConnected)
558 return false;
559
560 // Wait for receiver thread from previous connection, if any
561 ThreadJoin(m_hReceiverThread);
562 m_hReceiverThread = INVALID_THREAD_HANDLE;
563
564 // Check if we need to close existing channel
565 if (m_channel != NULL)
566 {
567 m_channel->decRefCount();
568 m_channel = NULL;
569 }
570
571 m_channel = createChannel();
572 if (m_channel == NULL)
573 {
574 debugPrintf(6, _T("Cannot create communication channel"));
575 dwError = ERR_CONNECT_FAILED;
576 goto connect_cleanup;
577 }
578
579 if (!NXCPGetPeerProtocolVersion(m_channel, &m_nProtocolVersion, m_mutexSocketWrite))
580 {
581 debugPrintf(6, _T("Protocol version negotiation failed"));
582 dwError = ERR_INTERNAL_ERROR;
583 goto connect_cleanup;
584 }
585 debugPrintf(6, _T("Using NXCP version %d"), m_nProtocolVersion);
586
587 // Start receiver thread
588 incInternalRefCount();
589 m_channel->incRefCount(); // for receiver thread
590 m_hReceiverThread = ThreadCreateEx(receiverThreadStarter, 0, this);
591
592 // Setup encryption
593 setup_encryption:
594 if ((m_iEncryptionPolicy == ENCRYPTION_PREFERRED) ||
595 (m_iEncryptionPolicy == ENCRYPTION_REQUIRED) ||
596 forceEncryption) // Agent require encryption
597 {
598 if (pServerKey != NULL)
599 {
600 dwError = setupEncryption(pServerKey);
601 if ((dwError != ERR_SUCCESS) &&
602 ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption))
603 goto connect_cleanup;
604 }
605 else
606 {
607 if ((m_iEncryptionPolicy == ENCRYPTION_REQUIRED) || forceEncryption)
608 {
609 dwError = ERR_ENCRYPTION_REQUIRED;
610 goto connect_cleanup;
611 }
612 }
613 }
614
615 // Authenticate itself to agent
616 if ((dwError = authenticate(m_useProxy && !secondPass)) != ERR_SUCCESS)
617 {
618 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
619 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
620 {
621 forceEncryption = true;
622 goto setup_encryption;
623 }
624 debugPrintf(5, _T("Authentication to agent %s failed (%s)"), m_addr.toString(szBuffer),
625 AgentErrorCodeToText(dwError));
626 goto connect_cleanup;
627 }
628
629 // Test connectivity and inform agent about server capabilities
630 if ((dwError = setServerCapabilities()) != ERR_SUCCESS)
631 {
632 if ((dwError == ERR_ENCRYPTION_REQUIRED) &&
633 (m_iEncryptionPolicy != ENCRYPTION_DISABLED))
634 {
635 forceEncryption = true;
636 goto setup_encryption;
637 }
638 if (dwError != ERR_UNKNOWN_COMMAND) // Older agents may not support enable IPv6 command
639 {
640 debugPrintf(5, _T("Communication with agent %s failed (%s)"), m_addr.toString(szBuffer), AgentErrorCodeToText(dwError));
641 goto connect_cleanup;
642 }
643 }
644
645 if (m_useProxy && !secondPass)
646 {
647 dwError = setupProxyConnection();
648 if (dwError != ERR_SUCCESS)
649 goto connect_cleanup;
650 lock();
651 if (m_pCtx != NULL)
652 {
653 m_pCtx->decRefCount();
654 m_pCtx = NULL;
655 }
656 unlock();
657
658 debugPrintf(6, _T("Proxy connection established"));
659
660 // Renegotiate NXCP version with actual target agent
661 NXCP_MESSAGE msg;
662 msg.id = 0;
663 msg.numFields = 0;
664 msg.size = htonl(NXCP_HEADER_SIZE);
665 msg.code = htons(CMD_GET_NXCP_CAPS);
666 msg.flags = htons(MF_CONTROL);
667 if (m_channel->send(&msg, NXCP_HEADER_SIZE, m_mutexSocketWrite) == NXCP_HEADER_SIZE)
668 {
669 NXCP_MESSAGE *rsp = m_pMsgWaitQueue->waitForRawMessage(CMD_NXCP_CAPS, 0, m_dwCommandTimeout);
670 if (rsp != NULL)
671 {
672 m_nProtocolVersion = rsp->numFields >> 24;
673 free(rsp);
674 }
675 else
676 {
677 // assume that peer doesn't understand CMD_GET_NXCP_CAPS message
678 // and set version number to 1
679 m_nProtocolVersion = 1;
680 }
681 debugPrintf(6, _T("Using NXCP version %d after re-negotioation"), m_nProtocolVersion);
682 }
683 else
684 {
685 debugPrintf(6, _T("Protocol version re-negotiation failed - cannot send CMD_GET_NXCP_CAPS message"));
686 dwError = ERR_CONNECTION_BROKEN;
687 goto connect_cleanup;
688 }
689
690 secondPass = true;
691 forceEncryption = false;
692 goto setup_encryption;
693 }
694
695 if (serverId != 0)
696 setServerId(serverId);
697
698 success = true;
699 dwError = ERR_SUCCESS;
700
701 connect_cleanup:
702 if (!success)
703 {
704 if (pdwSocketError != NULL)
705 *pdwSocketError = (UINT32)WSAGetLastError();
706
707 lock();
708 if (m_channel != NULL)
709 m_channel->shutdown();
710 unlock();
711 ThreadJoin(m_hReceiverThread);
712 m_hReceiverThread = INVALID_THREAD_HANDLE;
713
714 lock();
715 if (m_channel != NULL)
716 {
717 m_channel->close();
718 m_channel->decRefCount();
719 m_channel = NULL;
720 }
721
722 if (m_pCtx != NULL)
723 {
724 m_pCtx->decRefCount();
725 m_pCtx = NULL;
726 }
727
728 unlock();
729 }
730 m_isConnected = success;
731 if (pdwError != NULL)
732 *pdwError = dwError;
733 return success;
734 }
735
736 /**
737 * Disconnect from agent
738 */
739 void AgentConnection::disconnect()
740 {
741 debugPrintf(6, _T("disconnect() called"));
742 lock();
743 if (m_hCurrFile != -1)
744 {
745 _close(m_hCurrFile);
746 m_hCurrFile = -1;
747 onFileDownload(false);
748 }
749 else if (m_sendToClientMessageCallback != NULL)
750 {
751 m_sendToClientMessageCallback = NULL;
752 onFileDownload(false);
753 }
754
755 if (m_channel != NULL)
756 {
757 m_channel->shutdown();
758 m_channel->decRefCount();
759 m_channel = NULL;
760 }
761 destroyResultData();
762 m_isConnected = false;
763 unlock();
764 debugPrintf(6, _T("Disconnect completed"));
765 }
766
767 /**
768 * Set authentication data
769 */
770 void AgentConnection::setAuthData(int method, const TCHAR *secret)
771 {
772 m_iAuthMethod = method;
773 #ifdef UNICODE
774 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR, secret, -1, m_szSecret, MAX_SECRET_LENGTH, NULL, NULL);
775 m_szSecret[MAX_SECRET_LENGTH - 1] = 0;
776 #else
777 nx_strncpy(m_szSecret, secret, MAX_SECRET_LENGTH);
778 #endif
779 }
780
781 /**
782 * Destroy command execuion results data
783 */
784 void AgentConnection::destroyResultData()
785 {
786 UINT32 i;
787
788 if (m_ppDataLines != NULL)
789 {
790 for(i = 0; i < m_dwNumDataLines; i++)
791 if (m_ppDataLines[i] != NULL)
792 free(m_ppDataLines[i]);
793 free(m_ppDataLines);
794 m_ppDataLines = NULL;
795 }
796 m_dwNumDataLines = 0;
797 }
798
799 /**
800 * Get interface list from agent
801 */
802 InterfaceList *AgentConnection::getInterfaceList()
803 {
804 InterfaceList *pIfList = NULL;
805 TCHAR *pChar, *pBuf;
806
807 if (getList(_T("Net.InterfaceList")) == ERR_SUCCESS)
808 {
809 pIfList = new InterfaceList(m_dwNumDataLines);
810
811 // Parse result set. Each line should have the following format:
812 // index ip_address/mask_bits iftype mac_address name
813 for(UINT32 i = 0; i < m_dwNumDataLines; i++)
814 {
815 pBuf = m_ppDataLines[i];
816 UINT32 ifIndex = 0;
817
818 // Index
819 pChar = _tcschr(pBuf, ' ');
820 if (pChar != NULL)
821 {
822 *pChar = 0;
823 ifIndex = _tcstoul(pBuf, NULL, 10);
824 pBuf = pChar + 1;
825 }
826
827 bool newInterface = false;
828 InterfaceInfo *iface = pIfList->findByIfIndex(ifIndex);
829 if (iface == NULL)
830 {
831 iface = new InterfaceInfo(ifIndex);
832 newInterface = true;
833 }
834
835 // Address and mask
836 pChar = _tcschr(pBuf, _T(' '));
837 if (pChar != NULL)
838 {
839 TCHAR *pSlash;
840 static TCHAR defaultMask[] = _T("24");
841
842 *pChar = 0;
843 pSlash = _tcschr(pBuf, _T('/'));
844 if (pSlash != NULL)
845 {
846 *pSlash = 0;
847 pSlash++;
848 }
849 else // Just a paranoia protection, should'n happen if agent working correctly
850 {
851 pSlash = defaultMask;
852 }
853 InetAddress addr = InetAddress::parse(pBuf);
854 if (addr.isValid())
855 {
856 addr.setMaskBits(_tcstol(pSlash, NULL, 10));
857 // Agent may return 0.0.0.0/0 for interfaces without IP address
858 if ((addr.getFamily() != AF_INET) || (addr.getAddressV4() != 0))
859 iface->ipAddrList.add(addr);
860 }
861 pBuf = pChar + 1;
862 }
863
864 if (newInterface)
865 {
866 // Interface type
867 pChar = _tcschr(pBuf, ' ');
868 if (pChar != NULL)
869 {
870 *pChar = 0;
871
872 TCHAR *eptr;
873 iface->type = _tcstoul(pBuf, &eptr, 10);
874
875 // newer agents can return if_type(mtu)
876 if (*eptr == _T('('))
877 {
878 pBuf = eptr + 1;
879 eptr = _tcschr(pBuf, _T(')'));
880 if (eptr != NULL)
881 {
882 *eptr = 0;
883 iface->mtu = _tcstol(pBuf, NULL, 10);
884 }
885 }
886
887 pBuf = pChar + 1;
888 }
889
890 // MAC address
891 pChar = _tcschr(pBuf, ' ');
892 if (pChar != NULL)
893 {
894 *pChar = 0;
895 StrToBin(pBuf, iface->macAddr, MAC_ADDR_LENGTH);
896 pBuf = pChar + 1;
897 }
898
899 // Name (set description to name)
900 nx_strncpy(iface->name, pBuf, MAX_DB_STRING);
901 nx_strncpy(iface->description, pBuf, MAX_DB_STRING);
902
903 pIfList->add(iface);
904 }
905 }
906
907 lock();
908 destroyResultData();
909 unlock();
910 }
911
912 return pIfList;
913 }
914
915 /**
916 * Get parameter value
917 */
918 UINT32 AgentConnection::getParameter(const TCHAR *pszParam, UINT32 dwBufSize, TCHAR *pszBuffer)
919 {
920 if (!m_isConnected)
921 return ERR_NOT_CONNECTED;
922
923 NXCPMessage msg(m_nProtocolVersion);
924 UINT32 dwRqId = generateRequestId();
925 msg.setCode(CMD_GET_PARAMETER);
926 msg.setId(dwRqId);
927 msg.setField(VID_PARAMETER, pszParam);
928
929 UINT32 dwRetCode;
930 if (sendMessage(&msg))
931 {
932 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
933 if (response != NULL)
934 {
935 dwRetCode = response->getFieldAsUInt32(VID_RCC);
936 if (dwRetCode == ERR_SUCCESS)
937 {
938 if (response->isFieldExist(VID_VALUE))
939 {
940 response->getFieldAsString(VID_VALUE, pszBuffer, dwBufSize);
941 }
942 else
943 {
944 dwRetCode = ERR_MALFORMED_RESPONSE;
945 debugPrintf(3, _T("Malformed response to CMD_GET_PARAMETER"));
946 }
947 }
948 delete response;
949 }
950 else
951 {
952 dwRetCode = ERR_REQUEST_TIMEOUT;
953 }
954 }
955 else
956 {
957 dwRetCode = ERR_CONNECTION_BROKEN;
958 }
959 return dwRetCode;
960 }
961
962 /**
963 * Get ARP cache
964 */
965 ARP_CACHE *AgentConnection::getArpCache()
966 {
967 ARP_CACHE *pArpCache = NULL;
968 TCHAR szByte[4], *pBuf, *pChar;
969 UINT32 i, j;
970
971 if (getList(_T("Net.ArpCache")) == ERR_SUCCESS)
972 {
973 // Create empty structure
974 pArpCache = (ARP_CACHE *)malloc(sizeof(ARP_CACHE));
975 pArpCache->dwNumEntries = m_dwNumDataLines;
976 pArpCache->pEntries = (ARP_ENTRY *)malloc(sizeof(ARP_ENTRY) * m_dwNumDataLines);
977 memset(pArpCache->pEntries, 0, sizeof(ARP_ENTRY) * m_dwNumDataLines);
978
979 szByte[2] = 0;
980
981 // Parse data lines
982 // Each line has form of XXXXXXXXXXXX a.b.c.d n
983 // where XXXXXXXXXXXX is a MAC address (12 hexadecimal digits)
984 // a.b.c.d is an IP address in decimal dotted notation
985 // n is an interface index
986 for(i = 0; i < m_dwNumDataLines; i++)
987 {
988 pBuf = m_ppDataLines[i];
989 if (_tcslen(pBuf) < 20) // Invalid line
990 continue;
991
992 // MAC address
993 for(j = 0; j < 6; j++)
994 {
995 memcpy(szByte, pBuf, sizeof(TCHAR) * 2);
996 pArpCache->pEntries[i].bMacAddr[j] = (BYTE)_tcstol(szByte, NULL, 16);
997 pBuf+=2;
998 }
999
1000 // IP address
1001 while(*pBuf == ' ')
1002 pBuf++;
1003 pChar = _tcschr(pBuf, _T(' '));
1004 if (pChar != NULL)
1005 *pChar = 0;
1006 pArpCache->pEntries[i].ipAddr = ntohl(_t_inet_addr(pBuf));
1007
1008 // Interface index
1009 if (pChar != NULL)
1010 pArpCache->pEntries[i].dwIndex = _tcstoul(pChar + 1, NULL, 10);
1011 }
1012
1013 lock();
1014 destroyResultData();
1015 unlock();
1016 }
1017 return pArpCache;
1018 }
1019
1020 /**
1021 * Send dummy command to agent (can be used for keepalive)
1022 */
1023 UINT32 AgentConnection::nop()
1024 {
1025 if (!m_isConnected)
1026 return ERR_CONNECTION_BROKEN;
1027
1028 NXCPMessage msg(m_nProtocolVersion);
1029 UINT32 dwRqId;
1030
1031 dwRqId = generateRequestId();
1032 msg.setCode(CMD_KEEPALIVE);
1033 msg.setId(dwRqId);
1034 if (sendMessage(&msg))
1035 return waitForRCC(dwRqId, m_dwCommandTimeout);
1036 else
1037 return ERR_CONNECTION_BROKEN;
1038 }
1039
1040 /**
1041 * inform agent about server capabilities
1042 */
1043 UINT32 AgentConnection::setServerCapabilities()
1044 {
1045 NXCPMessage msg(m_nProtocolVersion);
1046 UINT32 dwRqId = generateRequestId();
1047 msg.setCode(CMD_SET_SERVER_CAPABILITIES);
1048 msg.setField(VID_ENABLED, (INT16)1); // Enables IPv6 on pre-2.0 agents
1049 msg.setField(VID_IPV6_SUPPORT, (INT16)1);
1050 msg.setField(VID_BULK_RECONCILIATION, (INT16)1);
1051 msg.setField(VID_ENABLE_COMPRESSION, (INT16)(m_allowCompression ? 1 : 0));
1052 msg.setId(dwRqId);
1053 if (sendMessage(&msg))
1054 return waitForRCC(dwRqId, m_dwCommandTimeout);
1055 else
1056 return ERR_CONNECTION_BROKEN;
1057 }
1058
1059 /**
1060 * Set server ID
1061 */
1062 UINT32 AgentConnection::setServerId(UINT64 serverId)
1063 {
1064 NXCPMessage msg(m_nProtocolVersion);
1065 UINT32 dwRqId = generateRequestId();
1066 msg.setCode(CMD_SET_SERVER_ID);
1067 msg.setField(VID_SERVER_ID, serverId);
1068 msg.setId(dwRqId);
1069 if (sendMessage(&msg))
1070 return waitForRCC(dwRqId, m_dwCommandTimeout);
1071 else
1072 return ERR_CONNECTION_BROKEN;
1073 }
1074
1075 /**
1076 * Wait for request completion code
1077 */
1078 UINT32 AgentConnection::waitForRCC(UINT32 dwRqId, UINT32 dwTimeOut)
1079 {
1080 UINT32 rcc;
1081 NXCPMessage *response = m_pMsgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, dwTimeOut);
1082 if (response != NULL)
1083 {
1084 rcc = response->getFieldAsUInt32(VID_RCC);
1085 delete response;
1086 }
1087 else
1088 {
1089 rcc = ERR_REQUEST_TIMEOUT;
1090 }
1091 return rcc;
1092 }
1093
1094 /**
1095 * Send message to agent
1096 */
1097 bool AgentConnection::sendMessage(NXCPMessage *pMsg)
1098 {
1099 AbstractCommChannel *channel = acquireChannel();
1100 if (channel == NULL)
1101 return false;
1102
1103 if (nxlog_get_debug_level() >= 6)
1104 {
1105 TCHAR buffer[64];
1106 debugPrintf(6, _T("Sending message %s (%d) to agent at %s"),
1107 NXCPMessageCodeName(pMsg->getCode(), buffer), pMsg->getId(), (const TCHAR *)m_addr.toString());
1108 }
1109
1110 bool success;
1111 NXCP_MESSAGE *rawMsg = pMsg->serialize(m_allowCompression);
1112 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1113 if (pCtx != NULL)
1114 {
1115 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1116 if (pEnMsg != NULL)
1117 {
1118 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1119 free(pEnMsg);
1120 }
1121 else
1122 {
1123 success = false;
1124 }
1125 pCtx->decRefCount();
1126 }
1127 else
1128 {
1129 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1130 }
1131 free(rawMsg);
1132 channel->decRefCount();
1133 return success;
1134 }
1135
1136 /**
1137 * Send raw message to agent
1138 */
1139 bool AgentConnection::sendRawMessage(NXCP_MESSAGE *pMsg)
1140 {
1141 AbstractCommChannel *channel = acquireChannel();
1142 if (channel == NULL)
1143 return false;
1144
1145 bool success;
1146 NXCP_MESSAGE *rawMsg = pMsg;
1147 NXCPEncryptionContext *pCtx = acquireEncryptionContext();
1148 if (pCtx != NULL)
1149 {
1150 NXCP_ENCRYPTED_MESSAGE *pEnMsg = pCtx->encryptMessage(rawMsg);
1151 if (pEnMsg != NULL)
1152 {
1153 success = (channel->send(pEnMsg, ntohl(pEnMsg->size), m_mutexSocketWrite) == (int)ntohl(pEnMsg->size));
1154 free(pEnMsg);
1155 }
1156 else
1157 {
1158 success = false;
1159 }
1160 pCtx->decRefCount();
1161 }
1162 else
1163 {
1164 success = (channel->send(rawMsg, ntohl(rawMsg->size), m_mutexSocketWrite) == (int)ntohl(rawMsg->size));
1165 }
1166 channel->decRefCount();
1167 return success;
1168 }
1169
1170 /**
1171 * Callback for processing incoming event on separate thread
1172 */
1173 void AgentConnection::onTrapCallback(NXCPMessage *msg)
1174 {
1175 onTrap(msg);
1176 delete msg;
1177 decInternalRefCount();
1178 }
1179
1180 /**
1181 * Trap handler. Should be overriden in derived classes to implement
1182 * actual trap processing. Default implementation do nothing.
1183 */
1184 void AgentConnection::onTrap(NXCPMessage *pMsg)
1185 {
1186 }
1187
1188 /**
1189 * Callback for processing incoming syslog message on separate thread
1190 */
1191 void AgentConnection::onSyslogMessageCallback(NXCPMessage *msg)
1192 {
1193 onSyslogMessage(msg);
1194 delete msg;
1195 decInternalRefCount();
1196 }
1197
1198 /**
1199 * Syslog message handler. Should be overriden in derived classes to implement
1200 * actual message processing. Default implementation do nothing.
1201 */
1202 void AgentConnection::onSyslogMessage(NXCPMessage *pMsg)
1203 {
1204 }
1205
1206 /**
1207 * Callback for processing data push on separate thread
1208 */
1209 void AgentConnection::onDataPushCallback(NXCPMessage *msg)
1210 {
1211 onDataPush(msg);
1212 delete msg;
1213 decInternalRefCount();
1214 }
1215
1216 /**
1217 * Data push handler. Should be overriden in derived classes to implement
1218 * actual data push processing. Default implementation do nothing.
1219 */
1220 void AgentConnection::onDataPush(NXCPMessage *pMsg)
1221 {
1222 }
1223
1224 /**
1225 * Monitoring data handler. Should be overriden in derived classes to implement
1226 * actual monitoring data processing. Default implementation do nothing.
1227 */
1228 void AgentConnection::onFileMonitoringData(NXCPMessage *pMsg)
1229 {
1230 }
1231
1232 /**
1233 * Callback for processing data push on separate thread
1234 */
1235 void AgentConnection::onSnmpTrapCallback(NXCPMessage *msg)
1236 {
1237 onSnmpTrap(msg);
1238 delete msg;
1239 decInternalRefCount();
1240 }
1241
1242 /**
1243 * SNMP trap handler. Should be overriden in derived classes to implement
1244 * actual SNMP trap processing. Default implementation do nothing.
1245 */
1246 void AgentConnection::onSnmpTrap(NXCPMessage *pMsg)
1247 {
1248 }
1249
1250 /**
1251 * Custom message handler
1252 * If returns true, message considered as processed and will not be placed in wait queue
1253 */
1254 bool AgentConnection::processCustomMessage(NXCPMessage *pMsg)
1255 {
1256 return false;
1257 }
1258
1259 /**
1260 * Get list of values
1261 */
1262 UINT32 AgentConnection::getList(const TCHAR *pszParam)
1263 {
1264 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1265 UINT32 i, dwRqId, dwRetCode;
1266
1267 if (m_isConnected)
1268 {
1269 destroyResultData();
1270 dwRqId = generateRequestId();
1271 msg.setCode(CMD_GET_LIST);
1272 msg.setId(dwRqId);
1273 msg.setField(VID_PARAMETER, pszParam);
1274 if (sendMessage(&msg))
1275 {
1276 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1277 if (pResponse != NULL)
1278 {
1279 dwRetCode = pResponse->getFieldAsUInt32(VID_RCC);
1280 if (dwRetCode == ERR_SUCCESS)
1281 {
1282 m_dwNumDataLines = pResponse->getFieldAsUInt32(VID_NUM_STRINGS);
1283 m_ppDataLines = (TCHAR **)malloc(sizeof(TCHAR *) * m_dwNumDataLines);
1284 for(i = 0; i < m_dwNumDataLines; i++)
1285 m_ppDataLines[i] = pResponse->getFieldAsString(VID_ENUM_VALUE_BASE + i);
1286 }
1287 delete pResponse;
1288 }
1289 else
1290 {
1291 dwRetCode = ERR_REQUEST_TIMEOUT;
1292 }
1293 }
1294 else
1295 {
1296 dwRetCode = ERR_CONNECTION_BROKEN;
1297 }
1298 }
1299 else
1300 {
1301 dwRetCode = ERR_NOT_CONNECTED;
1302 }
1303
1304 return dwRetCode;
1305 }
1306
1307 /**
1308 * Get table
1309 */
1310 UINT32 AgentConnection::getTable(const TCHAR *pszParam, Table **table)
1311 {
1312 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1313 UINT32 dwRqId, dwRetCode;
1314
1315 *table = NULL;
1316 if (m_isConnected)
1317 {
1318 dwRqId = generateRequestId();
1319 msg.setCode(CMD_GET_TABLE);
1320 msg.setId(dwRqId);
1321 msg.setField(VID_PARAMETER, pszParam);
1322 if (sendMessage(&msg))
1323 {
1324 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1325 if (pResponse != NULL)
1326 {
1327 dwRetCode = pResponse->getFieldAsUInt32(VID_RCC);
1328 if (dwRetCode == ERR_SUCCESS)
1329 {
1330 *table = new Table(pResponse);
1331 }
1332 delete pResponse;
1333 }
1334 else
1335 {
1336 dwRetCode = ERR_REQUEST_TIMEOUT;
1337 }
1338 }
1339 else
1340 {
1341 dwRetCode = ERR_CONNECTION_BROKEN;
1342 }
1343 }
1344 else
1345 {
1346 dwRetCode = ERR_NOT_CONNECTED;
1347 }
1348
1349 return dwRetCode;
1350 }
1351
1352 /**
1353 * Authenticate to agent
1354 */
1355 UINT32 AgentConnection::authenticate(BOOL bProxyData)
1356 {
1357 NXCPMessage msg(m_nProtocolVersion);
1358 UINT32 dwRqId;
1359 BYTE hash[32];
1360 int iAuthMethod = bProxyData ? m_iProxyAuth : m_iAuthMethod;
1361 const char *pszSecret = bProxyData ? m_szProxySecret : m_szSecret;
1362 #ifdef UNICODE
1363 WCHAR szBuffer[MAX_SECRET_LENGTH];
1364 #endif
1365
1366 if (iAuthMethod == AUTH_NONE)
1367 return ERR_SUCCESS; // No authentication required
1368
1369 dwRqId = generateRequestId();
1370 msg.setCode(CMD_AUTHENTICATE);
1371 msg.setId(dwRqId);
1372 msg.setField(VID_AUTH_METHOD, (WORD)iAuthMethod);
1373 switch(iAuthMethod)
1374 {
1375 case AUTH_PLAINTEXT:
1376 #ifdef UNICODE
1377 MultiByteToWideChar(CP_ACP, MB_PRECOMPOSED, pszSecret, -1, szBuffer, MAX_SECRET_LENGTH);
1378 msg.setField(VID_SHARED_SECRET, szBuffer);
1379 #else
1380 msg.setField(VID_SHARED_SECRET, pszSecret);
1381 #endif
1382 break;
1383 case AUTH_MD5_HASH:
1384 CalculateMD5Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1385 msg.setField(VID_SHARED_SECRET, hash, MD5_DIGEST_SIZE);
1386 break;
1387 case AUTH_SHA1_HASH:
1388 CalculateSHA1Hash((BYTE *)pszSecret, (int)strlen(pszSecret), hash);
1389 msg.setField(VID_SHARED_SECRET, hash, SHA1_DIGEST_SIZE);
1390 break;
1391 default:
1392 break;
1393 }
1394 if (sendMessage(&msg))
1395 return waitForRCC(dwRqId, m_dwCommandTimeout);
1396 else
1397 return ERR_CONNECTION_BROKEN;
1398 }
1399
1400 /**
1401 * Execute action on agent
1402 */
1403 UINT32 AgentConnection::execAction(const TCHAR *action, int argc, const TCHAR * const *argv,
1404 bool withOutput, void (* outputCallback)(ActionCallbackEvent, const TCHAR *, void *), void *cbData)
1405 {
1406 NXCPMessage msg(m_nProtocolVersion);
1407 UINT32 dwRqId;
1408 int i;
1409
1410 if (!m_isConnected)
1411 return ERR_NOT_CONNECTED;
1412
1413 dwRqId = generateRequestId();
1414 msg.setCode(CMD_ACTION);
1415 msg.setId(dwRqId);
1416 msg.setField(VID_ACTION_NAME, action);
1417 msg.setField(VID_RECEIVE_OUTPUT, (UINT16)(withOutput ? 1 : 0));
1418 msg.setField(VID_NUM_ARGS, (UINT32)argc);
1419 for(i = 0; i < argc; i++)
1420 msg.setField(VID_ACTION_ARG_BASE + i, argv[i]);
1421
1422 if (sendMessage(&msg))
1423 {
1424 if (withOutput)
1425 {
1426 UINT32 rcc = waitForRCC(dwRqId, m_dwCommandTimeout);
1427 if (rcc == ERR_SUCCESS)
1428 {
1429 outputCallback(ACE_CONNECTED, NULL, cbData); // Indicate successful start
1430 bool eos = false;
1431 while(!eos)
1432 {
1433 NXCPMessage *response = waitForMessage(CMD_COMMAND_OUTPUT, dwRqId, m_dwCommandTimeout * 10);
1434 if (response != NULL)
1435 {
1436 eos = response->isEndOfSequence();
1437 if (response->isFieldExist(VID_MESSAGE))
1438 {
1439 TCHAR line[4096];
1440 response->getFieldAsString(VID_MESSAGE, line, 4096);
1441 outputCallback(ACE_DATA, line, cbData);
1442 }
1443 delete response;
1444 }
1445 else
1446 {
1447 return ERR_REQUEST_TIMEOUT;
1448 }
1449 }
1450 outputCallback(ACE_DISCONNECTED, NULL, cbData);
1451 return ERR_SUCCESS;
1452 }
1453 else
1454 {
1455 return rcc;
1456 }
1457 }
1458 else
1459 {
1460 return waitForRCC(dwRqId, m_dwCommandTimeout);
1461 }
1462 }
1463 else
1464 {
1465 return ERR_CONNECTION_BROKEN;
1466 }
1467 }
1468
1469 /**
1470 * Upload file to agent
1471 */
1472 UINT32 AgentConnection::uploadFile(const TCHAR *localFile, const TCHAR *destinationFile, void (* progressCallback)(INT64, void *), void *cbArg, NXCPStreamCompressionMethod compMethod)
1473 {
1474 UINT32 dwRqId, dwResult;
1475 NXCPMessage msg(m_nProtocolVersion);
1476
1477 // Disable compression if it is disabled on connection level or if agent do not support it
1478 if (!m_allowCompression || (m_nProtocolVersion < 4))
1479 compMethod = NXCP_STREAM_COMPRESSION_NONE;
1480
1481 if (!m_isConnected)
1482 return ERR_NOT_CONNECTED;
1483
1484 dwRqId = generateRequestId();
1485 msg.setId(dwRqId);
1486
1487 time_t lastModTime = 0;
1488 NX_STAT_STRUCT st;
1489 if (CALL_STAT(localFile, &st) == 0)
1490 {
1491 lastModTime = st.st_mtime;
1492 }
1493
1494 // Use core agent if destination file name is not set and file manager subagent otherwise
1495 if ((destinationFile == NULL) || (*destinationFile == 0))
1496 {
1497 msg.setCode(CMD_TRANSFER_FILE);
1498 int i;
1499 for(i = (int)_tcslen(localFile) - 1;
1500 (i >= 0) && (localFile[i] != '\\') && (localFile[i] != '/'); i--);
1501 msg.setField(VID_FILE_NAME, &localFile[i + 1]);
1502 }
1503 else
1504 {
1505 msg.setCode(CMD_FILEMGR_UPLOAD);
1506 msg.setField(VID_FILE_NAME, destinationFile);
1507 }
1508 msg.setFieldFromTime(VID_MODIFICATION_TIME, lastModTime);
1509
1510 if (sendMessage(&msg))
1511 {
1512 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1513 }
1514 else
1515 {
1516 dwResult = ERR_CONNECTION_BROKEN;
1517 }
1518
1519 if (dwResult == ERR_SUCCESS)
1520 {
1521 AbstractCommChannel *channel = acquireChannel();
1522 if (channel != NULL)
1523 {
1524 nxlog_debug(5, _T("AgentConnection: sending file \"%s\" to agent %s compression"),
1525 localFile, (compMethod == NXCP_STREAM_COMPRESSION_NONE) ? _T("without") : _T("with"));
1526 m_fileUploadInProgress = true;
1527 NXCPEncryptionContext *ctx = acquireEncryptionContext();
1528 if (SendFileOverNXCP(channel, dwRqId, localFile, ctx, 0, progressCallback, cbArg, m_mutexSocketWrite, compMethod))
1529 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1530 else
1531 dwResult = ERR_IO_FAILURE;
1532 m_fileUploadInProgress = false;
1533 if (ctx != NULL)
1534 ctx->decRefCount();
1535 channel->decRefCount();
1536 }
1537 else
1538 {
1539 dwResult = ERR_CONNECTION_BROKEN;
1540 }
1541 }
1542
1543 return dwResult;
1544 }
1545
1546 /**
1547 * Send upgrade command
1548 */
1549 UINT32 AgentConnection::startUpgrade(const TCHAR *pszPkgName)
1550 {
1551 UINT32 dwRqId, dwResult;
1552 NXCPMessage msg(m_nProtocolVersion);
1553 int i;
1554
1555 if (!m_isConnected)
1556 return ERR_NOT_CONNECTED;
1557
1558 dwRqId = generateRequestId();
1559
1560 msg.setCode(CMD_UPGRADE_AGENT);
1561 msg.setId(dwRqId);
1562 for(i = (int)_tcslen(pszPkgName) - 1;
1563 (i >= 0) && (pszPkgName[i] != '\\') && (pszPkgName[i] != '/'); i--);
1564 msg.setField(VID_FILE_NAME, &pszPkgName[i + 1]);
1565
1566 if (sendMessage(&msg))
1567 {
1568 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1569 }
1570 else
1571 {
1572 dwResult = ERR_CONNECTION_BROKEN;
1573 }
1574
1575 return dwResult;
1576 }
1577
1578 /**
1579 * Check status of network service via agent
1580 */
1581 UINT32 AgentConnection::checkNetworkService(UINT32 *pdwStatus, const InetAddress& addr, int iServiceType,
1582 WORD wPort, WORD wProto, const TCHAR *pszRequest,
1583 const TCHAR *pszResponse, UINT32 *responseTime)
1584 {
1585 UINT32 dwRqId, dwResult;
1586 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1587 static WORD m_wDefaultPort[] = { 7, 22, 110, 25, 21, 80, 443, 23 };
1588
1589 if (!m_isConnected)
1590 return ERR_NOT_CONNECTED;
1591
1592 dwRqId = generateRequestId();
1593
1594 msg.setCode(CMD_CHECK_NETWORK_SERVICE);
1595 msg.setId(dwRqId);
1596 msg.setField(VID_IP_ADDRESS, addr);
1597 msg.setField(VID_SERVICE_TYPE, (WORD)iServiceType);
1598 msg.setField(VID_IP_PORT,
1599 (wPort != 0) ? wPort :
1600 m_wDefaultPort[((iServiceType >= NETSRV_CUSTOM) &&
1601 (iServiceType <= NETSRV_TELNET)) ? iServiceType : 0]);
1602 msg.setField(VID_IP_PROTO, (wProto != 0) ? wProto : (WORD)IPPROTO_TCP);
1603 msg.setField(VID_SERVICE_REQUEST, pszRequest);
1604 msg.setField(VID_SERVICE_RESPONSE, pszResponse);
1605
1606 if (sendMessage(&msg))
1607 {
1608 // Wait up to 90 seconds for results
1609 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, 90000);
1610 if (pResponse != NULL)
1611 {
1612 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1613 if (dwResult == ERR_SUCCESS)
1614 {
1615 *pdwStatus = pResponse->getFieldAsUInt32(VID_SERVICE_STATUS);
1616 if (responseTime != NULL)
1617 {
1618 *responseTime = pResponse->getFieldAsUInt32(VID_RESPONSE_TIME);
1619 }
1620 }
1621 delete pResponse;
1622 }
1623 else
1624 {
1625 dwResult = ERR_REQUEST_TIMEOUT;
1626 }
1627 }
1628 else
1629 {
1630 dwResult = ERR_CONNECTION_BROKEN;
1631 }
1632
1633 return dwResult;
1634 }
1635
1636 /**
1637 * Get list of supported parameters from agent
1638 */
1639 UINT32 AgentConnection::getSupportedParameters(ObjectArray<AgentParameterDefinition> **paramList, ObjectArray<AgentTableDefinition> **tableList)
1640 {
1641 UINT32 dwRqId, dwResult;
1642 NXCPMessage msg(m_nProtocolVersion), *pResponse;
1643
1644 *paramList = NULL;
1645 *tableList = NULL;
1646
1647 if (!m_isConnected)
1648 return ERR_NOT_CONNECTED;
1649
1650 dwRqId = generateRequestId();
1651
1652 msg.setCode(CMD_GET_PARAMETER_LIST);
1653 msg.setId(dwRqId);
1654
1655 if (sendMessage(&msg))
1656 {
1657 pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1658 if (pResponse != NULL)
1659 {
1660 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1661 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): RCC=%d"), dwResult);
1662 if (dwResult == ERR_SUCCESS)
1663 {
1664 UINT32 count = pResponse->getFieldAsUInt32(VID_NUM_PARAMETERS);
1665 ObjectArray<AgentParameterDefinition> *plist = new ObjectArray<AgentParameterDefinition>(count, 16, true);
1666 for(UINT32 i = 0, id = VID_PARAM_LIST_BASE; i < count; i++)
1667 {
1668 plist->add(new AgentParameterDefinition(pResponse, id));
1669 id += 3;
1670 }
1671 *paramList = plist;
1672 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d parameters received from agent"), count);
1673
1674 count = pResponse->getFieldAsUInt32(VID_NUM_TABLES);
1675 ObjectArray<AgentTableDefinition> *tlist = new ObjectArray<AgentTableDefinition>(count, 16, true);
1676 for(UINT32 i = 0, id = VID_TABLE_LIST_BASE; i < count; i++)
1677 {
1678 tlist->add(new AgentTableDefinition(pResponse, id));
1679 id += 3;
1680 }
1681 *tableList = tlist;
1682 DbgPrintf(6, _T("AgentConnection::getSupportedParameters(): %d tables received from agent"), count);
1683 }
1684 delete pResponse;
1685 }
1686 else
1687 {
1688 dwResult = ERR_REQUEST_TIMEOUT;
1689 }
1690 }
1691 else
1692 {
1693 dwResult = ERR_CONNECTION_BROKEN;
1694 }
1695
1696 return dwResult;
1697 }
1698
1699 /**
1700 * Setup encryption
1701 */
1702 UINT32 AgentConnection::setupEncryption(RSA *pServerKey)
1703 {
1704 #ifdef _WITH_ENCRYPTION
1705 NXCPMessage msg(m_nProtocolVersion), *pResp;
1706 UINT32 dwRqId, dwError, dwResult;
1707
1708 dwRqId = generateRequestId();
1709
1710 PrepareKeyRequestMsg(&msg, pServerKey, false);
1711 msg.setId(dwRqId);
1712 if (sendMessage(&msg))
1713 {
1714 pResp = waitForMessage(CMD_SESSION_KEY, dwRqId, m_dwCommandTimeout);
1715 if (pResp != NULL)
1716 {
1717 dwResult = SetupEncryptionContext(pResp, &m_pCtx, NULL, pServerKey, m_nProtocolVersion);
1718 switch(dwResult)
1719 {
1720 case RCC_SUCCESS:
1721 dwError = ERR_SUCCESS;
1722 break;
1723 case RCC_NO_CIPHERS:
1724 dwError = ERR_NO_CIPHERS;
1725 break;
1726 case RCC_INVALID_PUBLIC_KEY:
1727 dwError = ERR_INVALID_PUBLIC_KEY;
1728 break;
1729 case RCC_INVALID_SESSION_KEY:
1730 dwError = ERR_INVALID_SESSION_KEY;
1731 break;
1732 default:
1733 dwError = ERR_INTERNAL_ERROR;
1734 break;
1735 }
1736 delete pResp;
1737 }
1738 else
1739 {
1740 dwError = ERR_REQUEST_TIMEOUT;
1741 }
1742 }
1743 else
1744 {
1745 dwError = ERR_CONNECTION_BROKEN;
1746 }
1747
1748 return dwError;
1749 #else
1750 return ERR_NOT_IMPLEMENTED;
1751 #endif
1752 }
1753
1754 /**
1755 * Get configuration file from agent
1756 */
1757 UINT32 AgentConnection::getConfigFile(TCHAR **ppszConfig, UINT32 *pdwSize)
1758 {
1759 *ppszConfig = NULL;
1760 *pdwSize = 0;
1761
1762 if (!m_isConnected)
1763 return ERR_NOT_CONNECTED;
1764
1765 UINT32 dwResult;
1766 UINT32 dwRqId = generateRequestId();
1767
1768 NXCPMessage msg(m_nProtocolVersion);
1769 msg.setCode(CMD_GET_AGENT_CONFIG);
1770 msg.setId(dwRqId);
1771
1772 if (sendMessage(&msg))
1773 {
1774 NXCPMessage *pResponse = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
1775 if (pResponse != NULL)
1776 {
1777 dwResult = pResponse->getFieldAsUInt32(VID_RCC);
1778 if (dwResult == ERR_SUCCESS)
1779 {
1780 size_t size = pResponse->getFieldAsBinary(VID_CONFIG_FILE, NULL, 0);
1781 BYTE *utf8Text = (BYTE *)malloc(size + 1);
1782 pResponse->getFieldAsBinary(VID_CONFIG_FILE, (BYTE *)utf8Text, size);
1783
1784 // We expect text file, so replace all non-printable characters with spaces
1785 for(size_t i = 0; i < size; i++)
1786 if ((utf8Text[i] < ' ') &&
1787 (utf8Text[i] != '\t') &&
1788 (utf8Text[i] != '\r') &&
1789 (utf8Text[i] != '\n'))
1790 utf8Text[i] = ' ';
1791 utf8Text[size] = 0;
1792
1793 #ifdef UNICODE
1794 *ppszConfig = WideStringFromUTF8String((char *)utf8Text);
1795 #else
1796 *ppszConfig = MBStringFromUTF8String((char *)utf8Text);
1797 #endif
1798 free(utf8Text);
1799 *pdwSize = (UINT32)_tcslen(*ppszConfig);
1800 }
1801 delete pResponse;
1802 }
1803 else
1804 {
1805 dwResult = ERR_REQUEST_TIMEOUT;
1806 }
1807 }
1808 else
1809 {
1810 dwResult = ERR_CONNECTION_BROKEN;
1811 }
1812
1813 return dwResult;
1814 }
1815
1816 /**
1817 * Update configuration file on agent
1818 */
1819 UINT32 AgentConnection::updateConfigFile(const TCHAR *pszConfig)
1820 {
1821 UINT32 dwRqId, dwResult;
1822 NXCPMessage msg(m_nProtocolVersion);
1823 #ifdef UNICODE
1824 int nChars;
1825 BYTE *pBuffer;
1826 #endif
1827
1828 if (!m_isConnected)
1829 return ERR_NOT_CONNECTED;
1830
1831 dwRqId = generateRequestId();
1832
1833 msg.setCode(CMD_UPDATE_AGENT_CONFIG);
1834 msg.setId(dwRqId);
1835 #ifdef UNICODE
1836 nChars = (int)_tcslen(pszConfig);
1837 pBuffer = (BYTE *)malloc(nChars + 1);
1838 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1839 pszConfig, nChars, (char *)pBuffer, nChars + 1, NULL, NULL);
1840 msg.setField(VID_CONFIG_FILE, pBuffer, nChars);
1841 free(pBuffer);
1842 #else
1843 msg.setField(VID_CONFIG_FILE, (BYTE *)pszConfig, (UINT32)strlen(pszConfig));
1844 #endif
1845
1846 if (sendMessage(&msg))
1847 {
1848 dwResult = waitForRCC(dwRqId, m_dwCommandTimeout);
1849 }
1850 else
1851 {
1852 dwResult = ERR_CONNECTION_BROKEN;
1853 }
1854
1855 return dwResult;
1856 }
1857
1858 /**
1859 * Get routing table from agent
1860 */
1861 ROUTING_TABLE *AgentConnection::getRoutingTable()
1862 {
1863 ROUTING_TABLE *pRT = NULL;
1864 UINT32 i, dwBits;
1865 TCHAR *pChar, *pBuf;
1866
1867 if (getList(_T("Net.IP.RoutingTable")) == ERR_SUCCESS)
1868 {
1869 pRT = (ROUTING_TABLE *)malloc(sizeof(ROUTING_TABLE));
1870 pRT->iNumEntries = m_dwNumDataLines;
1871 pRT->pRoutes = (ROUTE *)malloc(sizeof(ROUTE) * m_dwNumDataLines);
1872 memset(pRT->pRoutes, 0, sizeof(ROUTE) * m_dwNumDataLines);
1873 for(i = 0; i < m_dwNumDataLines; i++)
1874 {
1875 pBuf = m_ppDataLines[i];
1876
1877 // Destination address and mask
1878 pChar = _tcschr(pBuf, _T(' '));
1879 if (pChar != NULL)
1880 {
1881 TCHAR *pSlash;
1882 static TCHAR defaultMask[] = _T("24");
1883
1884 *pChar = 0;
1885 pSlash = _tcschr(pBuf, _T('/'));
1886 if (pSlash != NULL)
1887 {
1888 *pSlash = 0;
1889 pSlash++;
1890 }
1891 else // Just a paranoia protection, should'n happen if agent working correctly
1892 {
1893 pSlash = defaultMask;
1894 }
1895 pRT->pRoutes[i].dwDestAddr = ntohl(_t_inet_addr(pBuf));
1896 dwBits = _tcstoul(pSlash, NULL, 10);
1897 pRT->pRoutes[i].dwDestMask = (dwBits == 32) ? 0xFFFFFFFF : (~(0xFFFFFFFF >> dwBits));
1898 pBuf = pChar + 1;
1899 }
1900
1901 // Next hop address
1902 pChar = _tcschr(pBuf, _T(' '));
1903 if (pChar != NULL)
1904 {
1905 *pChar = 0;
1906 pRT->pRoutes[i].dwNextHop = ntohl(_t_inet_addr(pBuf));
1907 pBuf = pChar + 1;
1908 }
1909
1910 // Interface index
1911 pChar = _tcschr(pBuf, ' ');
1912 if (pChar != NULL)
1913 {
1914 *pChar = 0;
1915 pRT->pRoutes[i].dwIfIndex = _tcstoul(pBuf, NULL, 10);
1916 pBuf = pChar + 1;
1917 }
1918
1919 // Route type
1920 pRT->pRoutes[i].dwRouteType = _tcstoul(pBuf, NULL, 10);
1921 }
1922
1923 lock();
1924 destroyResultData();
1925 unlock();
1926 }
1927
1928 return pRT;
1929 }
1930
1931 /**
1932 * Set proxy information
1933 */
1934 void AgentConnection::setProxy(InetAddress addr, WORD wPort, int iAuthMethod, const TCHAR *pszSecret)
1935 {
1936 m_proxyAddr = addr;
1937 m_wProxyPort = wPort;
1938 m_iProxyAuth = iAuthMethod;
1939 if (pszSecret != NULL)
1940 {
1941 #ifdef UNICODE
1942 WideCharToMultiByte(CP_ACP, WC_COMPOSITECHECK | WC_DEFAULTCHAR,
1943 pszSecret, -1, m_szProxySecret, MAX_SECRET_LENGTH, NULL, NULL);
1944 #else
1945 nx_strncpy(m_szProxySecret, pszSecret, MAX_SECRET_LENGTH);
1946 #endif
1947 }
1948 else
1949 {
1950 m_szProxySecret[0] = 0;
1951 }
1952 m_useProxy = true;
1953 }
1954
1955 /**
1956 * Setup proxy connection
1957 */
1958 UINT32 AgentConnection::setupProxyConnection()
1959 {
1960 NXCPMessage msg(m_nProtocolVersion);
1961 UINT32 dwRqId;
1962
1963 dwRqId = generateRequestId();
1964 msg.setCode(CMD_SETUP_PROXY_CONNECTION);
1965 msg.setId(dwRqId);
1966 msg.setField(VID_IP_ADDRESS, m_addr.getAddressV4()); // FIXME: V6 support in proxy
1967 msg.setField(VID_AGENT_PORT, m_wPort);
1968 if (sendMessage(&msg))
1969 return waitForRCC(dwRqId, 60000); // Wait 60 seconds for remote connect
1970 else
1971 return ERR_CONNECTION_BROKEN;
1972 }
1973
1974 /**
1975 * Enable trap receiving on connection
1976 */
1977 UINT32 AgentConnection::enableTraps()
1978 {
1979 NXCPMessage msg(m_nProtocolVersion);
1980 UINT32 dwRqId;
1981
1982 dwRqId = generateRequestId();
1983 msg.setCode(CMD_ENABLE_AGENT_TRAPS);
1984 msg.setId(dwRqId);
1985 if (sendMessage(&msg))
1986 return waitForRCC(dwRqId, m_dwCommandTimeout);
1987 else
1988 return ERR_CONNECTION_BROKEN;
1989 }
1990
1991 /**
1992 * Enable trap receiving on connection
1993 */
1994 UINT32 AgentConnection::enableFileUpdates()
1995 {
1996 NXCPMessage msg(m_nProtocolVersion);
1997 UINT32 dwRqId;
1998
1999 dwRqId = generateRequestId();
2000 msg.setCode(CMD_ENABLE_FILE_UPDATES);
2001 msg.setId(dwRqId);
2002 if (sendMessage(&msg))
2003 {
2004 return waitForRCC(dwRqId, m_dwCommandTimeout);
2005 }
2006 else
2007 return ERR_CONNECTION_BROKEN;
2008 }
2009
2010 /**
2011 * Take screenshot from remote system
2012 */
2013 UINT32 AgentConnection::takeScreenshot(const TCHAR *sessionName, BYTE **data, size_t *size)
2014 {
2015 NXCPMessage msg(m_nProtocolVersion);
2016 UINT32 dwRqId;
2017
2018 dwRqId = generateRequestId();
2019 msg.setCode(CMD_TAKE_SCREENSHOT);
2020 msg.setId(dwRqId);
2021 msg.setField(VID_NAME, sessionName);
2022 if (sendMessage(&msg))
2023 {
2024 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2025 if (response != NULL)
2026 {
2027 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
2028 if (rcc == ERR_SUCCESS)
2029 {
2030 const BYTE *p = response->getBinaryFieldPtr(VID_FILE_DATA, size);
2031 if (p != NULL)
2032 {
2033 *data = (BYTE *)malloc(*size);
2034 memcpy(*data, p, *size);
2035 }
2036 else
2037 {
2038 *data = NULL;
2039 }
2040 }
2041 delete response;
2042 return rcc;
2043 }
2044 else
2045 {
2046 return ERR_REQUEST_TIMEOUT;
2047 }
2048 }
2049 else
2050 {
2051 return ERR_CONNECTION_BROKEN;
2052 }
2053 }
2054
2055 /**
2056 * Send custom request to agent
2057 */
2058 NXCPMessage *AgentConnection::customRequest(NXCPMessage *pRequest, const TCHAR *recvFile, bool append,
2059 void (*downloadProgressCallback)(size_t, void *), void (*fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2060 {
2061 UINT32 dwRqId, rcc;
2062 NXCPMessage *msg = NULL;
2063
2064 dwRqId = generateRequestId();
2065 pRequest->setId(dwRqId);
2066 if (recvFile != NULL)
2067 {
2068 rcc = prepareFileDownload(recvFile, dwRqId, append, downloadProgressCallback, fileResendCallback,cbArg);
2069 if (rcc != ERR_SUCCESS)
2070 {
2071 // Create fake response message
2072 msg = new NXCPMessage;
2073 msg->setCode(CMD_REQUEST_COMPLETED);
2074 msg->setId(dwRqId);
2075 msg->setField(VID_RCC, rcc);
2076 }
2077 }
2078
2079 if (msg == NULL)
2080 {
2081 if (sendMessage(pRequest))
2082 {
2083 msg = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2084 if ((msg != NULL) && (recvFile != NULL))
2085 {
2086 if (msg->getFieldAsUInt32(VID_RCC) == ERR_SUCCESS)
2087 {
2088 if (ConditionWait(m_condFileDownload, 1800000)) // 30 min timeout
2089 {
2090 if (!m_fileDownloadSucceeded)
2091 {
2092 msg->setField(VID_RCC, ERR_IO_FAILURE);
2093 if (m_deleteFileOnDownloadFailure)
2094 _tremove(recvFile);
2095 }
2096 }
2097 else
2098 {
2099 msg->setField(VID_RCC, ERR_REQUEST_TIMEOUT);
2100 }
2101 }
2102 else
2103 {
2104 if (fileResendCallback != NULL)
2105 {
2106 _close(m_hCurrFile);
2107 m_hCurrFile = -1;
2108 _tremove(recvFile);
2109 }
2110 }
2111 }
2112
2113 }
2114 }
2115
2116 return msg;
2117 }
2118
2119 /**
2120 * Prepare for file download
2121 */
2122 UINT32 AgentConnection::prepareFileDownload(const TCHAR *fileName, UINT32 rqId, bool append, void (*downloadProgressCallback)(size_t, void *),
2123 void (* fileResendCallback)(NXCP_MESSAGE *, void *), void *cbArg)
2124 {
2125 if (fileResendCallback == NULL)
2126 {
2127 if (m_hCurrFile != -1)
2128 return ERR_RESOURCE_BUSY;
2129
2130 nx_strncpy(m_currentFileName, fileName, MAX_PATH);
2131 ConditionReset(m_condFileDownload);
2132 m_hCurrFile = _topen(fileName, (append ? 0 : (O_CREAT | O_TRUNC)) | O_RDWR | O_BINARY, S_IREAD | S_IWRITE);
2133 if (m_hCurrFile == -1)
2134 {
2135 DbgPrintf(4, _T("AgentConnection::PrepareFileDownload(): cannot open file %s (%s); append=%d rqId=%d"),
2136 fileName, _tcserror(errno), append, rqId);
2137 }
2138 else
2139 {
2140 if (append)
2141 lseek(m_hCurrFile, 0, SEEK_END);
2142 }
2143
2144 m_dwDownloadRequestId = rqId;
2145 m_downloadProgressCallback = downloadProgressCallback;
2146 m_downloadProgressCallbackArg = cbArg;
2147
2148 m_sendToClientMessageCallback = NULL;
2149
2150 return (m_hCurrFile != -1) ? ERR_SUCCESS : ERR_FILE_OPEN_ERROR;
2151 }
2152 else
2153 {
2154 ConditionReset(m_condFileDownload);
2155
2156 m_dwDownloadRequestId = rqId;
2157 m_downloadProgressCallback = downloadProgressCallback;
2158 m_downloadProgressCallbackArg = cbArg;
2159
2160 m_sendToClientMessageCallback = fileResendCallback;
2161
2162 return ERR_SUCCESS;
2163 }
2164 }
2165
2166 /**
2167 * File upload completion handler
2168 */
2169 void AgentConnection::onFileDownload(bool success)
2170 {
2171 if (!success && m_deleteFileOnDownloadFailure)
2172 _tremove(m_currentFileName);
2173 m_fileDownloadSucceeded = success;
2174 ConditionSet(m_condFileDownload);
2175 }
2176
2177 /**
2178 * Enable trap receiving on connection
2179 */
2180 UINT32 AgentConnection::getPolicyInventory(AgentPolicyInfo **info)
2181 {
2182 NXCPMessage msg(m_nProtocolVersion);
2183 UINT32 dwRqId, rcc;
2184
2185 *info = NULL;
2186 dwRqId = generateRequestId();
2187 msg.setCode(CMD_GET_POLICY_INVENTORY);
2188 msg.setId(dwRqId);
2189 if (sendMessage(&msg))
2190 {
2191 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, dwRqId, m_dwCommandTimeout);
2192 if (response != NULL)
2193 {
2194 rcc = response->getFieldAsUInt32(VID_RCC);
2195 if (rcc == ERR_SUCCESS)
2196 *info = new AgentPolicyInfo(response);
2197 delete response;
2198 }
2199 else
2200 {
2201 rcc = ERR_REQUEST_TIMEOUT;
2202 }
2203 }
2204 else
2205 {
2206 rcc = ERR_CONNECTION_BROKEN;
2207 }
2208 return rcc;
2209 }
2210
2211 /**
2212 * Uninstall policy by GUID
2213 */
2214 UINT32 AgentConnection::uninstallPolicy(const uuid& guid)
2215 {
2216 UINT32 rqId, rcc;
2217 NXCPMessage msg(m_nProtocolVersion);
2218
2219 rqId = generateRequestId();
2220 msg.setId(rqId);
2221 msg.setCode(CMD_UNINSTALL_AGENT_POLICY);
2222 msg.setField(VID_GUID, guid);
2223 if (sendMessage(&msg))
2224 {
2225 rcc = waitForRCC(rqId, m_dwCommandTimeout);
2226 }
2227 else
2228 {
2229 rcc = ERR_CONNECTION_BROKEN;
2230 }
2231 return rcc;
2232 }
2233
2234 /**
2235 * Acquire encryption context
2236 */
2237 NXCPEncryptionContext *AgentConnection::acquireEncryptionContext()
2238 {
2239 lock();
2240 NXCPEncryptionContext *ctx = m_pCtx;
2241 if (ctx != NULL)
2242 ctx->incRefCount();
2243 unlock();
2244 return ctx;
2245 }
2246
2247 /**
2248 * Callback for processing collected data on separate thread
2249 */
2250 void AgentConnection::processCollectedDataCallback(NXCPMessage *msg)
2251 {
2252 NXCPMessage response;
2253 response.setCode(CMD_REQUEST_COMPLETED);
2254 response.setId(msg->getId());
2255
2256 if (msg->getFieldAsBoolean(VID_BULK_RECONCILIATION))
2257 {
2258 UINT32 rcc = processBulkCollectedData(msg, &response);
2259 response.setField(VID_RCC, rcc);
2260 }
2261 else
2262 {
2263 UINT32 rcc = processCollectedData(msg);
2264 response.setField(VID_RCC, rcc);
2265 }
2266
2267 sendMessage(&response);
2268 delete msg;
2269 decInternalRefCount();
2270 }
2271
2272 /**
2273 * Process collected data information (for DCI with agent-side cache)
2274 */
2275 UINT32 AgentConnection::processCollectedData(NXCPMessage *msg)
2276 {
2277 return ERR_NOT_IMPLEMENTED;
2278 }
2279
2280 /**
2281 * Process collected data information in bulk mode (for DCI with agent-side cache)
2282 */
2283 UINT32 AgentConnection::processBulkCollectedData(NXCPMessage *request, NXCPMessage *response)
2284 {
2285 return ERR_NOT_IMPLEMENTED;
2286 }
2287
2288 /**
2289 * Create new agent parameter definition from NXCP message
2290 */
2291 AgentParameterDefinition::AgentParameterDefinition(NXCPMessage *msg, UINT32 baseId)
2292 {
2293 m_name = msg->getFieldAsString(baseId);
2294 m_description = msg->getFieldAsString(baseId + 1);
2295 m_dataType = (int)msg->getFieldAsUInt16(baseId + 2);
2296 }
2297
2298 /**
2299 * Create new agent parameter definition from another definition object
2300 */
2301 AgentParameterDefinition::AgentParameterDefinition(AgentParameterDefinition *src)
2302 {
2303 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2304 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2305 m_dataType = src->m_dataType;
2306 }
2307
2308 /**
2309 * Destructor for agent parameter definition
2310 */
2311 AgentParameterDefinition::~AgentParameterDefinition()
2312 {
2313 safe_free(m_name);
2314 safe_free(m_description);
2315 }
2316
2317 /**
2318 * Fill NXCP message
2319 */
2320 UINT32 AgentParameterDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2321 {
2322 msg->setField(baseId, m_name);
2323 msg->setField(baseId + 1, m_description);
2324 msg->setField(baseId + 2, (WORD)m_dataType);
2325 return 3;
2326 }
2327
2328 /**
2329 * Create new agent table definition from NXCP message
2330 */
2331 AgentTableDefinition::AgentTableDefinition(NXCPMessage *msg, UINT32 baseId)
2332 {
2333 m_name = msg->getFieldAsString(baseId);
2334 m_description = msg->getFieldAsString(baseId + 2);
2335
2336 TCHAR *instanceColumns = msg->getFieldAsString(baseId + 1);
2337 if (instanceColumns != NULL)
2338 {
2339 m_instanceColumns = new StringList(instanceColumns, _T("|"));
2340 free(instanceColumns);
2341 }
2342 else
2343 {
2344 m_instanceColumns = new StringList;
2345 }
2346
2347 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2348 }
2349
2350 /**
2351 * Create new agent table definition from another definition object
2352 */
2353 AgentTableDefinition::AgentTableDefinition(AgentTableDefinition *src)
2354 {
2355 m_name = (src->m_name != NULL) ? _tcsdup(src->m_name) : NULL;
2356 m_description = (src->m_description != NULL) ? _tcsdup(src->m_description) : NULL;
2357 m_instanceColumns = new StringList(src->m_instanceColumns);
2358 m_columns = new ObjectArray<AgentTableColumnDefinition>(16, 16, true);
2359 for(int i = 0; i < src->m_columns->size(); i++)
2360 {
2361 m_columns->add(new AgentTableColumnDefinition(src->m_columns->get(i)));
2362 }
2363 }
2364 /**
2365 * Destructor for agent table definition
2366 */
2367 AgentTableDefinition::~AgentTableDefinition()
2368 {
2369 safe_free(m_name);
2370 safe_free(m_description);
2371 delete m_instanceColumns;
2372 delete m_columns;
2373 }
2374
2375 /**
2376 * Fill NXCP message
2377 */
2378 UINT32 AgentTableDefinition::fillMessage(NXCPMessage *msg, UINT32 baseId)
2379 {
2380 msg->setField(baseId + 1, m_name);
2381 msg->setField(baseId + 2, m_description);
2382
2383 TCHAR *instanceColumns = m_instanceColumns->join(_T("|"));
2384 msg->setField(baseId + 3, instanceColumns);
2385 free(instanceColumns);
2386
2387 UINT32 varId = baseId + 4;
2388 for(int i = 0; i < m_columns->size(); i++)
2389 {
2390 msg->setField(varId++, m_columns->get(i)->m_name);
2391 msg->setField(varId++, (WORD)m_columns->get(i)->m_dataType);
2392 }
2393
2394 msg->setField(baseId, varId - baseId);
2395 return varId - baseId;
2396 }