added NXSL function AgentExecuteAction
[public/netxms.git] / src / server / libnxsrv / isc.cpp
CommitLineData
5039dede
AK
1/*
2** NetXMS - Network Management System
96fb011c 3** Copyright (C) 2003-2014 Victor Kirhenshtein
5039dede
AK
4**
5** This program is free software; you can redistribute it and/or modify
0702ed69
VK
6** it under the terms of the GNU Lesser General Public License as published by
7** the Free Software Foundation; either version 3 of the License, or
5039dede
AK
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
0702ed69 15** You should have received a copy of the GNU Lesser General Public License
5039dede
AK
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: isc.cpp
20**
21**/
22
23#include "libnxsrv.h"
24
bc7767c4
VK
25/**
26 * Default receiver buffer size
27 */
5039dede
AK
28#define RECEIVER_BUFFER_SIZE 262144
29
bc7767c4
VK
30/**
31 * Texts for ISC error codes
32 */
967893bb 33const TCHAR LIBNXSRV_EXPORTABLE *ISCErrorCodeToText(UINT32 code)
5039dede
AK
34{
35 static const TCHAR *errorText[] =
36 {
37 _T("Success"),
38 _T("Unknown service"),
39 _T("Request out of state"),
40 _T("Service disabled"),
41 _T("Encryption required"),
42 _T("Connection broken"),
43 _T("Already connected"),
44 _T("Socket error"),
45 _T("Connect failed"),
46 _T("Invalid or incompatible NXCP version"),
47 _T("Request timed out"),
48 _T("Command or function not implemented"),
49 _T("No suitable ciphers found"),
50 _T("Invalid public key"),
51 _T("Invalid session key"),
52 _T("Internal error"),
53 _T("Session setup failed"),
54 _T("Object not found"),
55 _T("Failed to post event")
56 };
57
58 if (code <= ISC_ERR_POST_EVENT_FAILED)
59 return errorText[code];
60 return _T("Unknown error code");
61}
62
bc7767c4
VK
63/**
64 * Receiver thread starter
65 */
66THREAD_RESULT THREAD_CALL ISC::receiverThreadStarter(void *arg)
5039dede 67{
bc7767c4 68 ((ISC *)arg)->receiverThread();
5039dede
AK
69 return THREAD_OK;
70}
71
bc7767c4
VK
72/**
73 * Default constructor for ISC - normally shouldn't be used
74 */
5039dede
AK
75ISC::ISC()
76{
77 m_flags = 0;
9d061c7b 78 m_addr = InetAddress::LOOPBACK;
5039dede
AK
79 m_port = NETXMS_ISC_PORT;
80 m_socket = -1;
81 m_msgWaitQueue = new MsgWaitQueue;
82 m_requestId = 1;
83 m_hReceiverThread = INVALID_THREAD_HANDLE;
84 m_ctx = NULL;
85 m_recvTimeout = 420000; // 7 minutes
86 m_commandTimeout = 10000; // 10 seconds
87 m_mutexDataLock = MutexCreate();
7b8b337e 88 m_socketLock = MutexCreate();
1e84fc1d 89 m_protocolVersion = NXCP_VERSION;
5039dede
AK
90}
91
bc7767c4
VK
92/**
93 * Create ISC connector for give IP address and port
94 */
9d061c7b 95ISC::ISC(const InetAddress& addr, WORD port)
5039dede
AK
96{
97 m_flags = 0;
98 m_addr = addr;
99 m_port = port;
100 m_socket = -1;
101 m_msgWaitQueue = new MsgWaitQueue;
102 m_requestId = 1;
103 m_hReceiverThread = INVALID_THREAD_HANDLE;
104 m_ctx = NULL;
105 m_recvTimeout = 420000; // 7 minutes
106 m_commandTimeout = 10000; // 10 seconds
107 m_mutexDataLock = MutexCreate();
7b8b337e 108 m_socketLock = MutexCreate();
1e84fc1d 109 m_protocolVersion = NXCP_VERSION;
5039dede
AK
110}
111
bc7767c4
VK
112/**
113 * Destructor
114 */
5039dede
AK
115ISC::~ISC()
116{
117 // Disconnect from peer
bc7767c4 118 disconnect();
5039dede
AK
119
120 // Wait for receiver thread termination
121 ThreadJoin(m_hReceiverThread);
122
123 // Close socket if active
796baedf 124 lock();
5039dede
AK
125 if (m_socket != -1)
126 {
127 closesocket(m_socket);
128 m_socket = -1;
129 }
796baedf 130 unlock();
5039dede
AK
131
132 delete m_msgWaitQueue;
98abc9f1
VK
133 if (m_ctx != NULL)
134 m_ctx->decRefCount();
5039dede
AK
135
136 MutexDestroy(m_mutexDataLock);
7b8b337e 137 MutexDestroy(m_socketLock);
5039dede
AK
138}
139
b16935d5
VK
140/**
141 * Print message. This function is virtual and can be overrided in
142 * derived classes. Default implementation will print message to stdout.
143 */
796baedf 144void ISC::printMessage(const TCHAR *format, ...)
5039dede
AK
145{
146 va_list args;
147
148 va_start(args, format);
149 _vtprintf(format, args);
150 va_end(args);
151 _tprintf(_T("\n"));
152}
153
bc7767c4
VK
154/**
155 * Receiver thread
156 */
157void ISC::receiverThread()
5039dede 158{
b368969c
VK
159 NXCPMessage *pMsg;
160 NXCP_MESSAGE *pRawMsg;
161 NXCP_BUFFER *pMsgBuffer;
5039dede
AK
162 BYTE *pDecryptionBuffer = NULL;
163 int err;
164 TCHAR szBuffer[128], szIpAddr[16];
165 SOCKET nSocket;
166
167 // Initialize raw message receiving function
b368969c 168 pMsgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
5039dede
AK
169 RecvNXCPMessage(0, NULL, pMsgBuffer, 0, NULL, NULL, 0);
170
171 // Allocate space for raw message
b368969c 172 pRawMsg = (NXCP_MESSAGE *)malloc(RECEIVER_BUFFER_SIZE);
5039dede
AK
173#ifdef _WITH_ENCRYPTION
174 pDecryptionBuffer = (BYTE *)malloc(RECEIVER_BUFFER_SIZE);
175#endif
176
177 // Message receiving loop
178 while(1)
179 {
180 // Receive raw message
796baedf 181 lock();
5039dede 182 nSocket = m_socket;
796baedf 183 unlock();
96fb011c 184 if ((err = RecvNXCPMessage(nSocket, pRawMsg, pMsgBuffer, RECEIVER_BUFFER_SIZE, &m_ctx, pDecryptionBuffer, m_recvTimeout)) <= 0)
5039dede 185 {
796baedf 186 printMessage(_T("ISC::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), err, WSAGetLastError());
5039dede
AK
187 break;
188 }
189
190 // Check if we get too large message
191 if (err == 1)
192 {
796baedf 193 printMessage(_T("Received too large message %s (%d bytes)"),
b368969c
VK
194 NXCPMessageCodeName(ntohs(pRawMsg->code), szBuffer),
195 ntohl(pRawMsg->size));
5039dede
AK
196 continue;
197 }
198
199 // Check if we are unable to decrypt message
200 if (err == 2)
201 {
796baedf 202 printMessage(_T("Unable to decrypt received message"));
5039dede
AK
203 continue;
204 }
205
206 // Check for timeout
207 if (err == 3)
208 {
796baedf 209 printMessage(_T("Timed out waiting for message"));
5039dede
AK
210 break;
211 }
212
213 // Check that actual received packet size is equal to encoded in packet
b368969c 214 if ((int)ntohl(pRawMsg->size) != err)
5039dede 215 {
b368969c 216 printMessage(_T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(pRawMsg->size), err);
5039dede
AK
217 continue; // Bad packet, wait for next
218 }
219
b368969c 220 if (ntohs(pRawMsg->flags) & MF_BINARY)
5039dede
AK
221 {
222 // Convert message header to host format
08b214c6 223 DbgPrintf(6, _T("ISC: Received raw message %s from peer at %s"),
9d061c7b 224 NXCPMessageCodeName(ntohs(pRawMsg->code), szBuffer), m_addr.toString(szIpAddr));
bc7767c4 225 onBinaryMessage(pRawMsg);
5039dede
AK
226 }
227 else
228 {
229 // Create message object from raw message
b368969c 230 pMsg = new NXCPMessage(pRawMsg, m_protocolVersion);
96fb011c
VK
231 if (onMessage(pMsg))
232 {
233 // message was consumed by handler
234 delete pMsg;
235 }
236 else
237 {
238 m_msgWaitQueue->put(pMsg);
239 }
5039dede
AK
240 }
241 }
242
243 // Close socket and mark connection as disconnected
796baedf 244 lock();
5039dede
AK
245 if (err == 0)
246 shutdown(m_socket, SHUT_RDWR);
247 closesocket(m_socket);
248 m_socket = -1;
98abc9f1
VK
249 if (m_ctx != NULL)
250 {
251 m_ctx->decRefCount();;
252 m_ctx = NULL;
253 }
5039dede 254 m_flags &= ~ISCF_IS_CONNECTED;;
796baedf 255 unlock();
5039dede
AK
256
257 free(pRawMsg);
258 free(pMsgBuffer);
259#ifdef _WITH_ENCRYPTION
260 free(pDecryptionBuffer);
261#endif
262}
263
bc7767c4
VK
264/**
265 * Connect to ISC peer
266 */
267UINT32 ISC::connect(UINT32 service, RSA *pServerKey, BOOL requireEncryption)
5039dede 268{
5039dede 269 TCHAR szBuffer[256];
967893bb 270 UINT32 rcc = ISC_ERR_INTERNAL_ERROR;
5039dede
AK
271
272 // Check if already connected
273 if (m_flags & ISCF_IS_CONNECTED)
274 return ISC_ERR_ALREADY_CONNECTED;
275
276 if (requireEncryption)
277 m_flags |= ISCF_REQUIRE_ENCRYPTION;
278 else
279 m_flags &= ~ISCF_REQUIRE_ENCRYPTION;
280
281 // Wait for receiver thread from previous connection, if any
282 ThreadJoin(m_hReceiverThread);
283 m_hReceiverThread = INVALID_THREAD_HANDLE;
284
285 // Check if we need to close existing socket
286 if (m_socket != -1)
287 closesocket(m_socket);
288
9d061c7b
VK
289 struct sockaddr *sa;
290
5039dede 291 // Create socket
9d061c7b 292 m_socket = socket(m_addr.getFamily(), SOCK_STREAM, 0);
4c0c75c7 293 if (m_socket == INVALID_SOCKET)
5039dede
AK
294 {
295 rcc = ISC_ERR_SOCKET_ERROR;
796baedf 296 printMessage(_T("ISC: Call to socket() failed"));
5039dede
AK
297 goto connect_cleanup;
298 }
299
300 // Fill in address structure
9d061c7b
VK
301 SockAddrBuffer sb;
302 sa = m_addr.fillSockAddr(&sb, m_port);
5039dede
AK
303
304 // Connect to server
9d061c7b 305 if (ConnectEx(m_socket, sa, SA_LEN(sa), 5000) == -1)
5039dede
AK
306 {
307 rcc = ISC_ERR_CONNECT_FAILED;
9d061c7b 308 printMessage(_T("Cannot establish connection with ISC peer %s"), m_addr.toString(szBuffer));
5039dede
AK
309 goto connect_cleanup;
310 }
311
312 // Set non-blocking mode
313 SetSocketNonBlocking(m_socket);
314
7b8b337e 315 if (!NXCPGetPeerProtocolVersion(m_socket, &m_protocolVersion, m_socketLock))
5039dede
AK
316 {
317 rcc = ISC_ERR_INVALID_NXCP_VERSION;
9d061c7b 318 printMessage(_T("Cannot detect NXCP version for ISC peer %s"), m_addr.toString(szBuffer));
5039dede
AK
319 goto connect_cleanup;
320 }
321
322 if (m_protocolVersion > NXCP_VERSION)
323 {
324 rcc = ISC_ERR_INVALID_NXCP_VERSION;
9d061c7b 325 printMessage(_T("ISC peer %s uses incompatible NXCP version %d"), m_addr.toString(szBuffer), m_protocolVersion);
5039dede
AK
326 goto connect_cleanup;
327 }
328
329 // Start receiver thread
bc7767c4 330 m_hReceiverThread = ThreadCreateEx(receiverThreadStarter, 0, this);
5039dede
AK
331
332 // Setup encryption
333setup_encryption:
334 if (pServerKey != NULL)
335 {
bc7767c4 336 rcc = setupEncryption(pServerKey);
5039dede
AK
337 if ((rcc != ERR_SUCCESS) &&
338 (m_flags & ISCF_REQUIRE_ENCRYPTION))
339 {
9d061c7b 340 printMessage(_T("Cannot setup encrypted channel with ISC peer %s"), m_addr.toString(szBuffer));
5039dede
AK
341 goto connect_cleanup;
342 }
343 }
344 else
345 {
346 if (m_flags & ISCF_REQUIRE_ENCRYPTION)
347 {
348 rcc = ISC_ERR_ENCRYPTION_REQUIRED;
9d061c7b 349 printMessage(_T("Cannot setup encrypted channel with ISC peer %s"), m_addr.toString(szBuffer));
5039dede
AK
350 goto connect_cleanup;
351 }
352 }
353
354 // Test connectivity
355 m_flags |= ISCF_IS_CONNECTED;
bc7767c4 356 if ((rcc = nop()) != ERR_SUCCESS)
5039dede
AK
357 {
358 if (rcc == ISC_ERR_ENCRYPTION_REQUIRED)
359 {
360 m_flags |= ISCF_REQUIRE_ENCRYPTION;
361 goto setup_encryption;
362 }
9d061c7b 363 printMessage(_T("Communication with ISC peer %s failed (%s)"), m_addr.toString(szBuffer), ISCErrorCodeToText(rcc));
5039dede
AK
364 goto connect_cleanup;
365 }
366
bc7767c4 367 rcc = connectToService(service);
5039dede
AK
368
369connect_cleanup:
370 if (rcc != ISC_ERR_SUCCESS)
371 {
796baedf 372 lock();
5039dede
AK
373 m_flags &= ~ISCF_IS_CONNECTED;
374 if (m_socket != -1)
375 shutdown(m_socket, SHUT_RDWR);
796baedf 376 unlock();
5039dede
AK
377 ThreadJoin(m_hReceiverThread);
378 m_hReceiverThread = INVALID_THREAD_HANDLE;
379
796baedf 380 lock();
5039dede
AK
381 if (m_socket != -1)
382 {
383 closesocket(m_socket);
384 m_socket = -1;
385 }
386
98abc9f1
VK
387 if (m_ctx != NULL)
388 {
389 m_ctx->decRefCount();
390 m_ctx = NULL;
391 }
5039dede 392
796baedf 393 unlock();
5039dede
AK
394 }
395
396 return rcc;
397}
398
bc7767c4
VK
399/**
400 * Disconnect from ISC peer
401 */
402void ISC::disconnect()
5039dede 403{
796baedf 404 lock();
5039dede
AK
405 if (m_socket != -1)
406 {
407 shutdown(m_socket, SHUT_RDWR);
408 m_flags &= ~ISCF_IS_CONNECTED;;
409 }
796baedf 410 unlock();
5039dede
AK
411}
412
bc7767c4
VK
413/**
414 * Send message to peer
415 */
b368969c 416BOOL ISC::sendMessage(NXCPMessage *pMsg)
5039dede 417{
b368969c 418 NXCP_MESSAGE *pRawMsg;
6be0a20b 419 NXCP_ENCRYPTED_MESSAGE *pEnMsg;
5039dede
AK
420 BOOL bResult;
421
422 if (!(m_flags & ISCF_IS_CONNECTED))
423 return FALSE;
424
b368969c 425 if (pMsg->getId() == 0)
d75003e1 426 {
b368969c 427 pMsg->setId((UINT32)InterlockedIncrement(&m_requestId));
d75003e1
AK
428 }
429
5c44534b 430 pRawMsg = pMsg->createMessage();
5039dede
AK
431 if (m_ctx != NULL)
432 {
b368969c 433 pEnMsg = m_ctx->encryptMessage(pRawMsg);
5039dede
AK
434 if (pEnMsg != NULL)
435 {
b368969c 436 bResult = (SendEx(m_socket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, m_socketLock) == (int)ntohl(pEnMsg->size));
5039dede
AK
437 free(pEnMsg);
438 }
439 else
440 {
441 bResult = FALSE;
442 }
443 }
444 else
445 {
b368969c 446 bResult = (SendEx(m_socket, (char *)pRawMsg, ntohl(pRawMsg->size), 0, m_socketLock) == (int)ntohl(pRawMsg->size));
5039dede
AK
447 }
448 free(pRawMsg);
449 return bResult;
450}
451
bc7767c4
VK
452/**
453 * Wait for request completion code
454 */
455UINT32 ISC::waitForRCC(UINT32 rqId, UINT32 timeOut)
5039dede 456{
b368969c 457 NXCPMessage *pMsg;
967893bb 458 UINT32 dwRetCode;
5039dede 459
c17f6cbc 460 pMsg = m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, rqId, timeOut);
5039dede
AK
461 if (pMsg != NULL)
462 {
b368969c 463 dwRetCode = pMsg->getFieldAsUInt32(VID_RCC);
5039dede
AK
464 delete pMsg;
465 }
466 else
467 {
468 dwRetCode = ISC_ERR_REQUEST_TIMEOUT;
469 }
470 return dwRetCode;
471}
472
bc7767c4
VK
473/**
474 * Setup encryption
475 */
476UINT32 ISC::setupEncryption(RSA *pServerKey)
5039dede
AK
477{
478#ifdef _WITH_ENCRYPTION
b368969c 479 NXCPMessage msg(m_protocolVersion), *pResp;
967893bb 480 UINT32 dwRqId, dwError, dwResult;
5039dede 481
bc7767c4 482 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
5039dede 483
9e9d631e 484 PrepareKeyRequestMsg(&msg, pServerKey, false);
b368969c 485 msg.setId(dwRqId);
bc7767c4 486 if (sendMessage(&msg))
5039dede 487 {
bc7767c4 488 pResp = waitForMessage(CMD_SESSION_KEY, dwRqId, m_commandTimeout);
5039dede
AK
489 if (pResp != NULL)
490 {
491 dwResult = SetupEncryptionContext(pResp, &m_ctx, NULL, pServerKey, m_protocolVersion);
492 switch(dwResult)
493 {
494 case RCC_SUCCESS:
495 dwError = ISC_ERR_SUCCESS;
496 break;
497 case RCC_NO_CIPHERS:
498 dwError = ISC_ERR_NO_CIPHERS;
499 break;
500 case RCC_INVALID_PUBLIC_KEY:
501 dwError = ISC_ERR_INVALID_PUBLIC_KEY;
502 break;
503 case RCC_INVALID_SESSION_KEY:
504 dwError = ISC_ERR_INVALID_SESSION_KEY;
505 break;
506 default:
507 dwError = ISC_ERR_INTERNAL_ERROR;
508 break;
509 }
510 delete pResp;
511 }
512 else
513 {
514 dwError = ISC_ERR_REQUEST_TIMEOUT;
515 }
516 }
517 else
518 {
519 dwError = ISC_ERR_CONNECTION_BROKEN;
520 }
521
522 return dwError;
523#else
524 return ISC_ERR_NOT_IMPLEMENTED;
525#endif
526}
527
bc7767c4
VK
528/**
529 * Send dummy command to peer (can be used for keepalive)
530 */
531UINT32 ISC::nop()
5039dede 532{
b368969c 533 NXCPMessage msg(m_protocolVersion);
967893bb 534 UINT32 dwRqId;
5039dede 535
bc7767c4 536 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
b368969c
VK
537 msg.setCode(CMD_KEEPALIVE);
538 msg.setId(dwRqId);
bc7767c4
VK
539 if (sendMessage(&msg))
540 return waitForRCC(dwRqId, m_commandTimeout);
5039dede
AK
541 else
542 return ISC_ERR_CONNECTION_BROKEN;
543}
544
bc7767c4
VK
545/**
546 * Connect to requested service
547 */
548UINT32 ISC::connectToService(UINT32 service)
5039dede 549{
b368969c 550 NXCPMessage msg(m_protocolVersion);
967893bb 551 UINT32 dwRqId;
5039dede 552
bc7767c4 553 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
b368969c
VK
554 msg.setCode(CMD_ISC_CONNECT_TO_SERVICE);
555 msg.setId(dwRqId);
556 msg.setField(VID_SERVICE_ID, service);
bc7767c4
VK
557 if (sendMessage(&msg))
558 return waitForRCC(dwRqId, m_commandTimeout);
5039dede
AK
559 else
560 return ISC_ERR_CONNECTION_BROKEN;
561}
bc7767c4
VK
562
563/**
564 * Binary message handler. Default implementation do nothing.
565 */
b368969c 566void ISC::onBinaryMessage(NXCP_MESSAGE *rawMsg)
bc7767c4
VK
567{
568}
96fb011c
VK
569
570/**
571 * Incoming message handler. Default implementation do nothing and return false.
572 * Should return true if message was consumed and shou8ld not be put into wait queue
573 */
b368969c 574bool ISC::onMessage(NXCPMessage *msg)
96fb011c
VK
575{
576 return false;
577}