added NXSL function AgentExecuteAction
[public/netxms.git] / src / server / libnxsrv / isc.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2014 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published by
7 ** the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: isc.cpp
20 **
21 **/
22
23 #include "libnxsrv.h"
24
25 /**
26 * Default receiver buffer size
27 */
28 #define RECEIVER_BUFFER_SIZE 262144
29
30 /**
31 * Texts for ISC error codes
32 */
33 const TCHAR LIBNXSRV_EXPORTABLE *ISCErrorCodeToText(UINT32 code)
34 {
35 static const TCHAR *errorText[] =
36 {
37 _T("Success"),
38 _T("Unknown service"),
39 _T("Request out of state"),
40 _T("Service disabled"),
41 _T("Encryption required"),
42 _T("Connection broken"),
43 _T("Already connected"),
44 _T("Socket error"),
45 _T("Connect failed"),
46 _T("Invalid or incompatible NXCP version"),
47 _T("Request timed out"),
48 _T("Command or function not implemented"),
49 _T("No suitable ciphers found"),
50 _T("Invalid public key"),
51 _T("Invalid session key"),
52 _T("Internal error"),
53 _T("Session setup failed"),
54 _T("Object not found"),
55 _T("Failed to post event")
56 };
57
58 if (code <= ISC_ERR_POST_EVENT_FAILED)
59 return errorText[code];
60 return _T("Unknown error code");
61 }
62
63 /**
64 * Receiver thread starter
65 */
66 THREAD_RESULT THREAD_CALL ISC::receiverThreadStarter(void *arg)
67 {
68 ((ISC *)arg)->receiverThread();
69 return THREAD_OK;
70 }
71
72 /**
73 * Default constructor for ISC - normally shouldn't be used
74 */
75 ISC::ISC()
76 {
77 m_flags = 0;
78 m_addr = InetAddress::LOOPBACK;
79 m_port = NETXMS_ISC_PORT;
80 m_socket = -1;
81 m_msgWaitQueue = new MsgWaitQueue;
82 m_requestId = 1;
83 m_hReceiverThread = INVALID_THREAD_HANDLE;
84 m_ctx = NULL;
85 m_recvTimeout = 420000; // 7 minutes
86 m_commandTimeout = 10000; // 10 seconds
87 m_mutexDataLock = MutexCreate();
88 m_socketLock = MutexCreate();
89 m_protocolVersion = NXCP_VERSION;
90 }
91
92 /**
93 * Create ISC connector for give IP address and port
94 */
95 ISC::ISC(const InetAddress& addr, WORD port)
96 {
97 m_flags = 0;
98 m_addr = addr;
99 m_port = port;
100 m_socket = -1;
101 m_msgWaitQueue = new MsgWaitQueue;
102 m_requestId = 1;
103 m_hReceiverThread = INVALID_THREAD_HANDLE;
104 m_ctx = NULL;
105 m_recvTimeout = 420000; // 7 minutes
106 m_commandTimeout = 10000; // 10 seconds
107 m_mutexDataLock = MutexCreate();
108 m_socketLock = MutexCreate();
109 m_protocolVersion = NXCP_VERSION;
110 }
111
112 /**
113 * Destructor
114 */
115 ISC::~ISC()
116 {
117 // Disconnect from peer
118 disconnect();
119
120 // Wait for receiver thread termination
121 ThreadJoin(m_hReceiverThread);
122
123 // Close socket if active
124 lock();
125 if (m_socket != -1)
126 {
127 closesocket(m_socket);
128 m_socket = -1;
129 }
130 unlock();
131
132 delete m_msgWaitQueue;
133 if (m_ctx != NULL)
134 m_ctx->decRefCount();
135
136 MutexDestroy(m_mutexDataLock);
137 MutexDestroy(m_socketLock);
138 }
139
140 /**
141 * Print message. This function is virtual and can be overrided in
142 * derived classes. Default implementation will print message to stdout.
143 */
144 void ISC::printMessage(const TCHAR *format, ...)
145 {
146 va_list args;
147
148 va_start(args, format);
149 _vtprintf(format, args);
150 va_end(args);
151 _tprintf(_T("\n"));
152 }
153
154 /**
155 * Receiver thread
156 */
157 void ISC::receiverThread()
158 {
159 NXCPMessage *pMsg;
160 NXCP_MESSAGE *pRawMsg;
161 NXCP_BUFFER *pMsgBuffer;
162 BYTE *pDecryptionBuffer = NULL;
163 int err;
164 TCHAR szBuffer[128], szIpAddr[16];
165 SOCKET nSocket;
166
167 // Initialize raw message receiving function
168 pMsgBuffer = (NXCP_BUFFER *)malloc(sizeof(NXCP_BUFFER));
169 RecvNXCPMessage(0, NULL, pMsgBuffer, 0, NULL, NULL, 0);
170
171 // Allocate space for raw message
172 pRawMsg = (NXCP_MESSAGE *)malloc(RECEIVER_BUFFER_SIZE);
173 #ifdef _WITH_ENCRYPTION
174 pDecryptionBuffer = (BYTE *)malloc(RECEIVER_BUFFER_SIZE);
175 #endif
176
177 // Message receiving loop
178 while(1)
179 {
180 // Receive raw message
181 lock();
182 nSocket = m_socket;
183 unlock();
184 if ((err = RecvNXCPMessage(nSocket, pRawMsg, pMsgBuffer, RECEIVER_BUFFER_SIZE, &m_ctx, pDecryptionBuffer, m_recvTimeout)) <= 0)
185 {
186 printMessage(_T("ISC::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), err, WSAGetLastError());
187 break;
188 }
189
190 // Check if we get too large message
191 if (err == 1)
192 {
193 printMessage(_T("Received too large message %s (%d bytes)"),
194 NXCPMessageCodeName(ntohs(pRawMsg->code), szBuffer),
195 ntohl(pRawMsg->size));
196 continue;
197 }
198
199 // Check if we are unable to decrypt message
200 if (err == 2)
201 {
202 printMessage(_T("Unable to decrypt received message"));
203 continue;
204 }
205
206 // Check for timeout
207 if (err == 3)
208 {
209 printMessage(_T("Timed out waiting for message"));
210 break;
211 }
212
213 // Check that actual received packet size is equal to encoded in packet
214 if ((int)ntohl(pRawMsg->size) != err)
215 {
216 printMessage(_T("RecvMsg: Bad packet length [size=%d ActualSize=%d]"), ntohl(pRawMsg->size), err);
217 continue; // Bad packet, wait for next
218 }
219
220 if (ntohs(pRawMsg->flags) & MF_BINARY)
221 {
222 // Convert message header to host format
223 DbgPrintf(6, _T("ISC: Received raw message %s from peer at %s"),
224 NXCPMessageCodeName(ntohs(pRawMsg->code), szBuffer), m_addr.toString(szIpAddr));
225 onBinaryMessage(pRawMsg);
226 }
227 else
228 {
229 // Create message object from raw message
230 pMsg = new NXCPMessage(pRawMsg, m_protocolVersion);
231 if (onMessage(pMsg))
232 {
233 // message was consumed by handler
234 delete pMsg;
235 }
236 else
237 {
238 m_msgWaitQueue->put(pMsg);
239 }
240 }
241 }
242
243 // Close socket and mark connection as disconnected
244 lock();
245 if (err == 0)
246 shutdown(m_socket, SHUT_RDWR);
247 closesocket(m_socket);
248 m_socket = -1;
249 if (m_ctx != NULL)
250 {
251 m_ctx->decRefCount();;
252 m_ctx = NULL;
253 }
254 m_flags &= ~ISCF_IS_CONNECTED;;
255 unlock();
256
257 free(pRawMsg);
258 free(pMsgBuffer);
259 #ifdef _WITH_ENCRYPTION
260 free(pDecryptionBuffer);
261 #endif
262 }
263
264 /**
265 * Connect to ISC peer
266 */
267 UINT32 ISC::connect(UINT32 service, RSA *pServerKey, BOOL requireEncryption)
268 {
269 TCHAR szBuffer[256];
270 UINT32 rcc = ISC_ERR_INTERNAL_ERROR;
271
272 // Check if already connected
273 if (m_flags & ISCF_IS_CONNECTED)
274 return ISC_ERR_ALREADY_CONNECTED;
275
276 if (requireEncryption)
277 m_flags |= ISCF_REQUIRE_ENCRYPTION;
278 else
279 m_flags &= ~ISCF_REQUIRE_ENCRYPTION;
280
281 // Wait for receiver thread from previous connection, if any
282 ThreadJoin(m_hReceiverThread);
283 m_hReceiverThread = INVALID_THREAD_HANDLE;
284
285 // Check if we need to close existing socket
286 if (m_socket != -1)
287 closesocket(m_socket);
288
289 struct sockaddr *sa;
290
291 // Create socket
292 m_socket = socket(m_addr.getFamily(), SOCK_STREAM, 0);
293 if (m_socket == INVALID_SOCKET)
294 {
295 rcc = ISC_ERR_SOCKET_ERROR;
296 printMessage(_T("ISC: Call to socket() failed"));
297 goto connect_cleanup;
298 }
299
300 // Fill in address structure
301 SockAddrBuffer sb;
302 sa = m_addr.fillSockAddr(&sb, m_port);
303
304 // Connect to server
305 if (ConnectEx(m_socket, sa, SA_LEN(sa), 5000) == -1)
306 {
307 rcc = ISC_ERR_CONNECT_FAILED;
308 printMessage(_T("Cannot establish connection with ISC peer %s"), m_addr.toString(szBuffer));
309 goto connect_cleanup;
310 }
311
312 // Set non-blocking mode
313 SetSocketNonBlocking(m_socket);
314
315 if (!NXCPGetPeerProtocolVersion(m_socket, &m_protocolVersion, m_socketLock))
316 {
317 rcc = ISC_ERR_INVALID_NXCP_VERSION;
318 printMessage(_T("Cannot detect NXCP version for ISC peer %s"), m_addr.toString(szBuffer));
319 goto connect_cleanup;
320 }
321
322 if (m_protocolVersion > NXCP_VERSION)
323 {
324 rcc = ISC_ERR_INVALID_NXCP_VERSION;
325 printMessage(_T("ISC peer %s uses incompatible NXCP version %d"), m_addr.toString(szBuffer), m_protocolVersion);
326 goto connect_cleanup;
327 }
328
329 // Start receiver thread
330 m_hReceiverThread = ThreadCreateEx(receiverThreadStarter, 0, this);
331
332 // Setup encryption
333 setup_encryption:
334 if (pServerKey != NULL)
335 {
336 rcc = setupEncryption(pServerKey);
337 if ((rcc != ERR_SUCCESS) &&
338 (m_flags & ISCF_REQUIRE_ENCRYPTION))
339 {
340 printMessage(_T("Cannot setup encrypted channel with ISC peer %s"), m_addr.toString(szBuffer));
341 goto connect_cleanup;
342 }
343 }
344 else
345 {
346 if (m_flags & ISCF_REQUIRE_ENCRYPTION)
347 {
348 rcc = ISC_ERR_ENCRYPTION_REQUIRED;
349 printMessage(_T("Cannot setup encrypted channel with ISC peer %s"), m_addr.toString(szBuffer));
350 goto connect_cleanup;
351 }
352 }
353
354 // Test connectivity
355 m_flags |= ISCF_IS_CONNECTED;
356 if ((rcc = nop()) != ERR_SUCCESS)
357 {
358 if (rcc == ISC_ERR_ENCRYPTION_REQUIRED)
359 {
360 m_flags |= ISCF_REQUIRE_ENCRYPTION;
361 goto setup_encryption;
362 }
363 printMessage(_T("Communication with ISC peer %s failed (%s)"), m_addr.toString(szBuffer), ISCErrorCodeToText(rcc));
364 goto connect_cleanup;
365 }
366
367 rcc = connectToService(service);
368
369 connect_cleanup:
370 if (rcc != ISC_ERR_SUCCESS)
371 {
372 lock();
373 m_flags &= ~ISCF_IS_CONNECTED;
374 if (m_socket != -1)
375 shutdown(m_socket, SHUT_RDWR);
376 unlock();
377 ThreadJoin(m_hReceiverThread);
378 m_hReceiverThread = INVALID_THREAD_HANDLE;
379
380 lock();
381 if (m_socket != -1)
382 {
383 closesocket(m_socket);
384 m_socket = -1;
385 }
386
387 if (m_ctx != NULL)
388 {
389 m_ctx->decRefCount();
390 m_ctx = NULL;
391 }
392
393 unlock();
394 }
395
396 return rcc;
397 }
398
399 /**
400 * Disconnect from ISC peer
401 */
402 void ISC::disconnect()
403 {
404 lock();
405 if (m_socket != -1)
406 {
407 shutdown(m_socket, SHUT_RDWR);
408 m_flags &= ~ISCF_IS_CONNECTED;;
409 }
410 unlock();
411 }
412
413 /**
414 * Send message to peer
415 */
416 BOOL ISC::sendMessage(NXCPMessage *pMsg)
417 {
418 NXCP_MESSAGE *pRawMsg;
419 NXCP_ENCRYPTED_MESSAGE *pEnMsg;
420 BOOL bResult;
421
422 if (!(m_flags & ISCF_IS_CONNECTED))
423 return FALSE;
424
425 if (pMsg->getId() == 0)
426 {
427 pMsg->setId((UINT32)InterlockedIncrement(&m_requestId));
428 }
429
430 pRawMsg = pMsg->createMessage();
431 if (m_ctx != NULL)
432 {
433 pEnMsg = m_ctx->encryptMessage(pRawMsg);
434 if (pEnMsg != NULL)
435 {
436 bResult = (SendEx(m_socket, (char *)pEnMsg, ntohl(pEnMsg->size), 0, m_socketLock) == (int)ntohl(pEnMsg->size));
437 free(pEnMsg);
438 }
439 else
440 {
441 bResult = FALSE;
442 }
443 }
444 else
445 {
446 bResult = (SendEx(m_socket, (char *)pRawMsg, ntohl(pRawMsg->size), 0, m_socketLock) == (int)ntohl(pRawMsg->size));
447 }
448 free(pRawMsg);
449 return bResult;
450 }
451
452 /**
453 * Wait for request completion code
454 */
455 UINT32 ISC::waitForRCC(UINT32 rqId, UINT32 timeOut)
456 {
457 NXCPMessage *pMsg;
458 UINT32 dwRetCode;
459
460 pMsg = m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, rqId, timeOut);
461 if (pMsg != NULL)
462 {
463 dwRetCode = pMsg->getFieldAsUInt32(VID_RCC);
464 delete pMsg;
465 }
466 else
467 {
468 dwRetCode = ISC_ERR_REQUEST_TIMEOUT;
469 }
470 return dwRetCode;
471 }
472
473 /**
474 * Setup encryption
475 */
476 UINT32 ISC::setupEncryption(RSA *pServerKey)
477 {
478 #ifdef _WITH_ENCRYPTION
479 NXCPMessage msg(m_protocolVersion), *pResp;
480 UINT32 dwRqId, dwError, dwResult;
481
482 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
483
484 PrepareKeyRequestMsg(&msg, pServerKey, false);
485 msg.setId(dwRqId);
486 if (sendMessage(&msg))
487 {
488 pResp = waitForMessage(CMD_SESSION_KEY, dwRqId, m_commandTimeout);
489 if (pResp != NULL)
490 {
491 dwResult = SetupEncryptionContext(pResp, &m_ctx, NULL, pServerKey, m_protocolVersion);
492 switch(dwResult)
493 {
494 case RCC_SUCCESS:
495 dwError = ISC_ERR_SUCCESS;
496 break;
497 case RCC_NO_CIPHERS:
498 dwError = ISC_ERR_NO_CIPHERS;
499 break;
500 case RCC_INVALID_PUBLIC_KEY:
501 dwError = ISC_ERR_INVALID_PUBLIC_KEY;
502 break;
503 case RCC_INVALID_SESSION_KEY:
504 dwError = ISC_ERR_INVALID_SESSION_KEY;
505 break;
506 default:
507 dwError = ISC_ERR_INTERNAL_ERROR;
508 break;
509 }
510 delete pResp;
511 }
512 else
513 {
514 dwError = ISC_ERR_REQUEST_TIMEOUT;
515 }
516 }
517 else
518 {
519 dwError = ISC_ERR_CONNECTION_BROKEN;
520 }
521
522 return dwError;
523 #else
524 return ISC_ERR_NOT_IMPLEMENTED;
525 #endif
526 }
527
528 /**
529 * Send dummy command to peer (can be used for keepalive)
530 */
531 UINT32 ISC::nop()
532 {
533 NXCPMessage msg(m_protocolVersion);
534 UINT32 dwRqId;
535
536 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
537 msg.setCode(CMD_KEEPALIVE);
538 msg.setId(dwRqId);
539 if (sendMessage(&msg))
540 return waitForRCC(dwRqId, m_commandTimeout);
541 else
542 return ISC_ERR_CONNECTION_BROKEN;
543 }
544
545 /**
546 * Connect to requested service
547 */
548 UINT32 ISC::connectToService(UINT32 service)
549 {
550 NXCPMessage msg(m_protocolVersion);
551 UINT32 dwRqId;
552
553 dwRqId = (UINT32)InterlockedIncrement(&m_requestId);
554 msg.setCode(CMD_ISC_CONNECT_TO_SERVICE);
555 msg.setId(dwRqId);
556 msg.setField(VID_SERVICE_ID, service);
557 if (sendMessage(&msg))
558 return waitForRCC(dwRqId, m_commandTimeout);
559 else
560 return ISC_ERR_CONNECTION_BROKEN;
561 }
562
563 /**
564 * Binary message handler. Default implementation do nothing.
565 */
566 void ISC::onBinaryMessage(NXCP_MESSAGE *rawMsg)
567 {
568 }
569
570 /**
571 * Incoming message handler. Default implementation do nothing and return false.
572 * Should return true if message was consumed and shou8ld not be put into wait queue
573 */
574 bool ISC::onMessage(NXCPMessage *msg)
575 {
576 return false;
577 }