All messages with codes 0x1100 - 0x11ff now forwarded to Reporting Server
[public/netxms.git] / src / server / libnxsrv / isc.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2010 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU Lesser General Public License as published by
7 ** the Free Software Foundation; either version 3 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU Lesser General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: isc.cpp
20 **
21 **/
22
23 #include "libnxsrv.h"
24
25
26 //
27 // Constants
28 //
29
30 #define RECEIVER_BUFFER_SIZE 262144
31
32
33 //
34 // Texts for ISC error codes
35 //
36
37 const TCHAR LIBNXSRV_EXPORTABLE *ISCErrorCodeToText(DWORD code)
38 {
39 static const TCHAR *errorText[] =
40 {
41 _T("Success"),
42 _T("Unknown service"),
43 _T("Request out of state"),
44 _T("Service disabled"),
45 _T("Encryption required"),
46 _T("Connection broken"),
47 _T("Already connected"),
48 _T("Socket error"),
49 _T("Connect failed"),
50 _T("Invalid or incompatible NXCP version"),
51 _T("Request timed out"),
52 _T("Command or function not implemented"),
53 _T("No suitable ciphers found"),
54 _T("Invalid public key"),
55 _T("Invalid session key"),
56 _T("Internal error"),
57 _T("Session setup failed"),
58 _T("Object not found"),
59 _T("Failed to post event")
60 };
61
62 if (code <= ISC_ERR_POST_EVENT_FAILED)
63 return errorText[code];
64 return _T("Unknown error code");
65 }
66
67
68 //
69 // Receiver thread starter
70 //
71
72 THREAD_RESULT THREAD_CALL ISC::ReceiverThreadStarter(void *arg)
73 {
74 ((ISC *)arg)->ReceiverThread();
75 return THREAD_OK;
76 }
77
78
79 //
80 // Default constructor for ISC - normally shouldn't be used
81 //
82
83 ISC::ISC()
84 {
85 m_flags = 0;
86 m_addr = inet_addr("127.0.0.1");
87 m_port = NETXMS_ISC_PORT;
88 m_socket = -1;
89 m_msgWaitQueue = new MsgWaitQueue;
90 m_requestId = 1;
91 m_hReceiverThread = INVALID_THREAD_HANDLE;
92 m_ctx = NULL;
93 m_recvTimeout = 420000; // 7 minutes
94 m_commandTimeout = 10000; // 10 seconds
95 m_mutexDataLock = MutexCreate();
96 m_socketLock = MutexCreate();
97 }
98
99
100 //
101 // Normal constructor for ISC
102 //
103
104 ISC::ISC(DWORD addr, WORD port)
105 {
106 m_flags = 0;
107 m_addr = addr;
108 m_port = port;
109 m_socket = -1;
110 m_msgWaitQueue = new MsgWaitQueue;
111 m_requestId = 1;
112 m_hReceiverThread = INVALID_THREAD_HANDLE;
113 m_ctx = NULL;
114 m_recvTimeout = 420000; // 7 minutes
115 m_commandTimeout = 10000; // 10 seconds
116 m_mutexDataLock = MutexCreate();
117 m_socketLock = MutexCreate();
118 }
119
120
121 //
122 // Destructor
123 //
124
125 ISC::~ISC()
126 {
127 // Disconnect from peer
128 Disconnect();
129
130 // Wait for receiver thread termination
131 ThreadJoin(m_hReceiverThread);
132
133 // Close socket if active
134 Lock();
135 if (m_socket != -1)
136 {
137 closesocket(m_socket);
138 m_socket = -1;
139 }
140 Unlock();
141
142 delete m_msgWaitQueue;
143 if (m_ctx != NULL)
144 m_ctx->decRefCount();
145
146 MutexDestroy(m_mutexDataLock);
147 MutexDestroy(m_socketLock);
148 }
149
150
151 //
152 // Print message. This function is virtual and can be overrided in
153 // derived classes. Default implementation will print message to stdout.
154 //
155
156 void ISC::PrintMsg(const TCHAR *format, ...)
157 {
158 va_list args;
159
160 va_start(args, format);
161 _vtprintf(format, args);
162 va_end(args);
163 _tprintf(_T("\n"));
164 }
165
166
167 //
168 // Receiver thread
169 //
170
171 void ISC::ReceiverThread()
172 {
173 CSCPMessage *pMsg;
174 CSCP_MESSAGE *pRawMsg;
175 CSCP_BUFFER *pMsgBuffer;
176 BYTE *pDecryptionBuffer = NULL;
177 int err;
178 TCHAR szBuffer[128], szIpAddr[16];
179 SOCKET nSocket;
180
181 // Initialize raw message receiving function
182 pMsgBuffer = (CSCP_BUFFER *)malloc(sizeof(CSCP_BUFFER));
183 RecvNXCPMessage(0, NULL, pMsgBuffer, 0, NULL, NULL, 0);
184
185 // Allocate space for raw message
186 pRawMsg = (CSCP_MESSAGE *)malloc(RECEIVER_BUFFER_SIZE);
187 #ifdef _WITH_ENCRYPTION
188 pDecryptionBuffer = (BYTE *)malloc(RECEIVER_BUFFER_SIZE);
189 #endif
190
191 // Message receiving loop
192 while(1)
193 {
194 // Receive raw message
195 Lock();
196 nSocket = m_socket;
197 Unlock();
198 if ((err = RecvNXCPMessage(nSocket, pRawMsg, pMsgBuffer, RECEIVER_BUFFER_SIZE,
199 &m_ctx, pDecryptionBuffer, m_recvTimeout)) <= 0)
200 {
201 PrintMsg(_T("ISC::ReceiverThread(): RecvNXCPMessage() failed: error=%d, socket_error=%d"), err, WSAGetLastError());
202 break;
203 }
204
205 // Check if we get too large message
206 if (err == 1)
207 {
208 PrintMsg(_T("Received too large message %s (%d bytes)"),
209 NXCPMessageCodeName(ntohs(pRawMsg->wCode), szBuffer),
210 ntohl(pRawMsg->dwSize));
211 continue;
212 }
213
214 // Check if we are unable to decrypt message
215 if (err == 2)
216 {
217 PrintMsg(_T("Unable to decrypt received message"));
218 continue;
219 }
220
221 // Check for timeout
222 if (err == 3)
223 {
224 PrintMsg(_T("Timed out waiting for message"));
225 break;
226 }
227
228 // Check that actual received packet size is equal to encoded in packet
229 if ((int)ntohl(pRawMsg->dwSize) != err)
230 {
231 PrintMsg(_T("RecvMsg: Bad packet length [dwSize=%d ActualSize=%d]"), ntohl(pRawMsg->dwSize), err);
232 continue; // Bad packet, wait for next
233 }
234
235 if (ntohs(pRawMsg->wFlags) & MF_BINARY)
236 {
237 // Convert message header to host format
238 pRawMsg->dwId = ntohl(pRawMsg->dwId);
239 pRawMsg->wCode = ntohs(pRawMsg->wCode);
240 pRawMsg->dwNumVars = ntohl(pRawMsg->dwNumVars);
241 DbgPrintf(6, _T("ISC: Received raw message %s from peer at %s"),
242 NXCPMessageCodeName(pRawMsg->wCode, szBuffer), IpToStr(m_addr, szIpAddr));
243 }
244 else
245 {
246 // Create message object from raw message
247 pMsg = new CSCPMessage(pRawMsg, m_protocolVersion);
248 m_msgWaitQueue->put(pMsg);
249 }
250 }
251
252 // Close socket and mark connection as disconnected
253 Lock();
254 if (err == 0)
255 shutdown(m_socket, SHUT_RDWR);
256 closesocket(m_socket);
257 m_socket = -1;
258 if (m_ctx != NULL)
259 {
260 m_ctx->decRefCount();;
261 m_ctx = NULL;
262 }
263 m_flags &= ~ISCF_IS_CONNECTED;;
264 Unlock();
265
266 free(pRawMsg);
267 free(pMsgBuffer);
268 #ifdef _WITH_ENCRYPTION
269 free(pDecryptionBuffer);
270 #endif
271 }
272
273
274 //
275 // Connect to ISC peer
276 //
277
278 DWORD ISC::Connect(DWORD service, RSA *pServerKey, BOOL requireEncryption)
279 {
280 struct sockaddr_in sa;
281 TCHAR szBuffer[256];
282 BOOL bForceEncryption = FALSE, bSecondPass = FALSE;
283 DWORD rcc = ISC_ERR_INTERNAL_ERROR;
284
285 // Check if already connected
286 if (m_flags & ISCF_IS_CONNECTED)
287 return ISC_ERR_ALREADY_CONNECTED;
288
289 if (requireEncryption)
290 m_flags |= ISCF_REQUIRE_ENCRYPTION;
291 else
292 m_flags &= ~ISCF_REQUIRE_ENCRYPTION;
293
294 // Wait for receiver thread from previous connection, if any
295 ThreadJoin(m_hReceiverThread);
296 m_hReceiverThread = INVALID_THREAD_HANDLE;
297
298 // Check if we need to close existing socket
299 if (m_socket != -1)
300 closesocket(m_socket);
301
302 // Create socket
303 m_socket = socket(AF_INET, SOCK_STREAM, 0);
304 if (m_socket == -1)
305 {
306 rcc = ISC_ERR_SOCKET_ERROR;
307 PrintMsg(_T("ISC: Call to socket() failed"));
308 goto connect_cleanup;
309 }
310
311 // Fill in address structure
312 memset(&sa, 0, sizeof(sa));
313 sa.sin_family = AF_INET;
314 sa.sin_addr.s_addr = m_addr;
315 sa.sin_port = htons(m_port);
316
317 // Connect to server
318 if (connect(m_socket, (struct sockaddr *)&sa, sizeof(sa)) == -1)
319 {
320 rcc = ISC_ERR_CONNECT_FAILED;
321 PrintMsg(_T("Cannot establish connection with ISC peer %s"),
322 IpToStr(ntohl(m_addr), szBuffer));
323 goto connect_cleanup;
324 }
325
326 // Set non-blocking mode
327 SetSocketNonBlocking(m_socket);
328
329 if (!NXCPGetPeerProtocolVersion(m_socket, &m_protocolVersion, m_socketLock))
330 {
331 rcc = ISC_ERR_INVALID_NXCP_VERSION;
332 PrintMsg(_T("Cannot detect NXCP version for ISC peer %s"),
333 IpToStr(ntohl(m_addr), szBuffer));
334 goto connect_cleanup;
335 }
336
337 if (m_protocolVersion > NXCP_VERSION)
338 {
339 rcc = ISC_ERR_INVALID_NXCP_VERSION;
340 PrintMsg(_T("ISC peer %s uses incompatible NXCP version %d"),
341 IpToStr(ntohl(m_addr), szBuffer), m_protocolVersion);
342 goto connect_cleanup;
343 }
344
345 // Start receiver thread
346 m_hReceiverThread = ThreadCreateEx(ReceiverThreadStarter, 0, this);
347
348 // Setup encryption
349 setup_encryption:
350 if (pServerKey != NULL)
351 {
352 rcc = SetupEncryption(pServerKey);
353 if ((rcc != ERR_SUCCESS) &&
354 (m_flags & ISCF_REQUIRE_ENCRYPTION))
355 {
356 PrintMsg(_T("Cannot setup encrypted channel with ISC peer %s"),
357 IpToStr(ntohl(m_addr), szBuffer));
358 goto connect_cleanup;
359 }
360 }
361 else
362 {
363 if (m_flags & ISCF_REQUIRE_ENCRYPTION)
364 {
365 rcc = ISC_ERR_ENCRYPTION_REQUIRED;
366 PrintMsg(_T("Cannot setup encrypted channel with ISC peer %s"),
367 IpToStr(ntohl(m_addr), szBuffer));
368 goto connect_cleanup;
369 }
370 }
371
372 // Test connectivity
373 m_flags |= ISCF_IS_CONNECTED;
374 if ((rcc = Nop()) != ERR_SUCCESS)
375 {
376 if (rcc == ISC_ERR_ENCRYPTION_REQUIRED)
377 {
378 m_flags |= ISCF_REQUIRE_ENCRYPTION;
379 goto setup_encryption;
380 }
381 PrintMsg(_T("Communication with ISC peer %s failed (%s)"), IpToStr(ntohl(m_addr), szBuffer),
382 ISCErrorCodeToText(rcc));
383 goto connect_cleanup;
384 }
385
386 rcc = ConnectToService(service);
387
388 connect_cleanup:
389 if (rcc != ISC_ERR_SUCCESS)
390 {
391 Lock();
392 m_flags &= ~ISCF_IS_CONNECTED;
393 if (m_socket != -1)
394 shutdown(m_socket, SHUT_RDWR);
395 Unlock();
396 ThreadJoin(m_hReceiverThread);
397 m_hReceiverThread = INVALID_THREAD_HANDLE;
398
399 Lock();
400 if (m_socket != -1)
401 {
402 closesocket(m_socket);
403 m_socket = -1;
404 }
405
406 if (m_ctx != NULL)
407 {
408 m_ctx->decRefCount();
409 m_ctx = NULL;
410 }
411
412 Unlock();
413 }
414
415 return rcc;
416 }
417
418
419 //
420 // Disconnect from ISC peer
421 //
422
423 void ISC::Disconnect()
424 {
425 Lock();
426 if (m_socket != -1)
427 {
428 shutdown(m_socket, SHUT_RDWR);
429 m_flags &= ~ISCF_IS_CONNECTED;;
430 }
431 Unlock();
432 }
433
434
435 //
436 // Send message to peer
437 //
438
439 BOOL ISC::SendMessage(CSCPMessage *pMsg)
440 {
441 CSCP_MESSAGE *pRawMsg;
442 CSCP_ENCRYPTED_MESSAGE *pEnMsg;
443 BOOL bResult;
444
445 if (!(m_flags & ISCF_IS_CONNECTED))
446 return FALSE;
447
448 if (pMsg->GetId() == 0)
449 {
450 pMsg->SetId(m_requestId++);
451 }
452
453 pRawMsg = pMsg->CreateMessage();
454 if (m_ctx != NULL)
455 {
456 pEnMsg = CSCPEncryptMessage(m_ctx, pRawMsg);
457 if (pEnMsg != NULL)
458 {
459 bResult = (SendEx(m_socket, (char *)pEnMsg, ntohl(pEnMsg->dwSize), 0, m_socketLock) == (int)ntohl(pEnMsg->dwSize));
460 free(pEnMsg);
461 }
462 else
463 {
464 bResult = FALSE;
465 }
466 }
467 else
468 {
469 bResult = (SendEx(m_socket, (char *)pRawMsg, ntohl(pRawMsg->dwSize), 0, m_socketLock) == (int)ntohl(pRawMsg->dwSize));
470 }
471 free(pRawMsg);
472 return bResult;
473 }
474
475
476 //
477 // Wait for request completion code
478 //
479
480 DWORD ISC::WaitForRCC(DWORD rqId, DWORD timeOut)
481 {
482 CSCPMessage *pMsg;
483 DWORD dwRetCode;
484
485 pMsg = m_msgWaitQueue->waitForMessage(CMD_REQUEST_COMPLETED, rqId, timeOut);
486 if (pMsg != NULL)
487 {
488 dwRetCode = pMsg->GetVariableLong(VID_RCC);
489 delete pMsg;
490 }
491 else
492 {
493 dwRetCode = ISC_ERR_REQUEST_TIMEOUT;
494 }
495 return dwRetCode;
496 }
497
498
499 //
500 // Setup encryption
501 //
502
503 DWORD ISC::SetupEncryption(RSA *pServerKey)
504 {
505 #ifdef _WITH_ENCRYPTION
506 CSCPMessage msg(m_protocolVersion), *pResp;
507 DWORD dwRqId, dwError, dwResult;
508
509 dwRqId = m_requestId++;
510
511 PrepareKeyRequestMsg(&msg, pServerKey, false);
512 msg.SetId(dwRqId);
513 if (SendMessage(&msg))
514 {
515 pResp = WaitForMessage(CMD_SESSION_KEY, dwRqId, m_commandTimeout);
516 if (pResp != NULL)
517 {
518 dwResult = SetupEncryptionContext(pResp, &m_ctx, NULL, pServerKey, m_protocolVersion);
519 switch(dwResult)
520 {
521 case RCC_SUCCESS:
522 dwError = ISC_ERR_SUCCESS;
523 break;
524 case RCC_NO_CIPHERS:
525 dwError = ISC_ERR_NO_CIPHERS;
526 break;
527 case RCC_INVALID_PUBLIC_KEY:
528 dwError = ISC_ERR_INVALID_PUBLIC_KEY;
529 break;
530 case RCC_INVALID_SESSION_KEY:
531 dwError = ISC_ERR_INVALID_SESSION_KEY;
532 break;
533 default:
534 dwError = ISC_ERR_INTERNAL_ERROR;
535 break;
536 }
537 delete pResp;
538 }
539 else
540 {
541 dwError = ISC_ERR_REQUEST_TIMEOUT;
542 }
543 }
544 else
545 {
546 dwError = ISC_ERR_CONNECTION_BROKEN;
547 }
548
549 return dwError;
550 #else
551 return ISC_ERR_NOT_IMPLEMENTED;
552 #endif
553 }
554
555
556 //
557 // Send dummy command to peer (can be used for keepalive)
558 //
559
560 DWORD ISC::Nop()
561 {
562 CSCPMessage msg(m_protocolVersion);
563 DWORD dwRqId;
564
565 dwRqId = m_requestId++;
566 msg.SetCode(CMD_KEEPALIVE);
567 msg.SetId(dwRqId);
568 if (SendMessage(&msg))
569 return WaitForRCC(dwRqId, m_commandTimeout);
570 else
571 return ISC_ERR_CONNECTION_BROKEN;
572 }
573
574
575 //
576 // Connect to requested service
577 //
578
579 DWORD ISC::ConnectToService(DWORD service)
580 {
581 CSCPMessage msg(m_protocolVersion);
582 DWORD dwRqId;
583
584 dwRqId = m_requestId++;
585 msg.SetCode(CMD_ISC_CONNECT_TO_SERVICE);
586 msg.SetId(dwRqId);
587 msg.SetVariable(VID_SERVICE_ID, service);
588 if (SendMessage(&msg))
589 return WaitForRCC(dwRqId, m_commandTimeout);
590 else
591 return ISC_ERR_CONNECTION_BROKEN;
592 }