931d7346adbff3eb6222f10e49d11b7028555a0e
[public/netxms.git] / src / agent / core / tunnel.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: tunnel.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Check if server address is valid
27 */
28 bool IsValidServerAddress(const InetAddress &addr, bool *pbMasterServer, bool *pbControlServer);
29
30 /**
31 * Register session
32 */
33 bool RegisterSession(CommSession *session);
34
35 class Tunnel;
36
37 /**
38 * Unique channel ID
39 */
40 static VolatileCounter s_nextChannelId = 0;
41
42 /**
43 * Tunnel communication channel
44 */
45 class TunnelCommChannel : public AbstractCommChannel
46 {
47 private:
48 Tunnel *m_tunnel;
49 UINT32 m_id;
50 bool m_active;
51 VolatileCounter m_closed;
52 RingBuffer m_buffer;
53 #ifdef _WIN32
54 CRITICAL_SECTION m_bufferLock;
55 HANDLE m_dataCondition;
56 #else
57 pthread_mutex_t m_bufferLock;
58 pthread_cond_t m_dataCondition;
59 #endif
60
61 protected:
62 virtual ~TunnelCommChannel();
63
64 public:
65 TunnelCommChannel(Tunnel *tunnel);
66
67 virtual int send(const void *data, size_t size, MUTEX mutex = INVALID_MUTEX_HANDLE);
68 virtual int recv(void *buffer, size_t size, UINT32 timeout = INFINITE);
69 virtual int poll(UINT32 timeout, bool write = false);
70 virtual int shutdown();
71 virtual void close();
72
73 UINT32 getId() const { return m_id; }
74
75 void putData(const BYTE *data, size_t size);
76 };
77
78 /**
79 * Tunnel class
80 */
81 class Tunnel
82 {
83 private:
84 TCHAR *m_hostname;
85 UINT16 m_port;
86 InetAddress m_address;
87 SOCKET m_socket;
88 SSL_CTX *m_context;
89 SSL *m_ssl;
90 MUTEX m_sslLock;
91 bool m_connected;
92 bool m_reset;
93 VolatileCounter m_requestId;
94 THREAD m_recvThread;
95 MsgWaitQueue *m_queue;
96 TCHAR m_debugId[64];
97 RefCountHashMap<UINT32, TunnelCommChannel> m_channels;
98 MUTEX m_channelLock;
99
100 Tunnel(const TCHAR *hostname, UINT16 port);
101
102 bool connectToServer();
103 int sslWrite(const void *data, size_t size);
104 bool sendMessage(const NXCPMessage *msg);
105 NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return (m_queue != NULL) ? m_queue->waitForMessage(code, id, 5000) : NULL; }
106
107 void processBindRequest(NXCPMessage *request);
108 void processChannelCloseRequest(NXCPMessage *request);
109 void createSession(NXCPMessage *request);
110
111 X509_REQ *createCertificateRequest(const char *country, const char *org, const char *cn, EVP_PKEY **pkey);
112 bool saveCertificate(X509 *cert, EVP_PKEY *key);
113 void loadCertificate();
114
115 void recvThread();
116 static THREAD_RESULT THREAD_CALL recvThreadStarter(void *arg);
117
118 public:
119 ~Tunnel();
120
121 void checkConnection();
122 void disconnect();
123
124 TunnelCommChannel *createChannel();
125 void closeChannel(TunnelCommChannel *channel);
126 int sendChannelData(UINT32 id, const void *data, size_t len);
127
128 const TCHAR *getHostname() const { return m_hostname; }
129
130 void debugPrintf(int level, const TCHAR *format, ...);
131
132 static Tunnel *createFromConfig(TCHAR *config);
133 };
134
135 /**
136 * Tunnel constructor
137 */
138 Tunnel::Tunnel(const TCHAR *hostname, UINT16 port) : m_channels(true)
139 {
140 m_hostname = _tcsdup(hostname);
141 m_port = port;
142 m_socket = INVALID_SOCKET;
143 m_context = NULL;
144 m_ssl = NULL;
145 m_sslLock = MutexCreate();
146 m_connected = false;
147 m_reset = false;
148 m_requestId = 0;
149 m_recvThread = INVALID_THREAD_HANDLE;
150 m_queue = NULL;
151 _sntprintf(m_debugId, 64, _T("TUN-%s"), m_hostname);
152 m_channelLock = MutexCreate();
153 }
154
155 /**
156 * Tunnel destructor
157 */
158 Tunnel::~Tunnel()
159 {
160 disconnect();
161 if (m_socket != INVALID_SOCKET)
162 closesocket(m_socket);
163 if (m_ssl != NULL)
164 SSL_free(m_ssl);
165 if (m_context != NULL)
166 SSL_CTX_free(m_context);
167 MutexDestroy(m_sslLock);
168 MutexDestroy(m_channelLock);
169 free(m_hostname);
170 }
171
172 /**
173 * Debug output
174 */
175 void Tunnel::debugPrintf(int level, const TCHAR *format, ...)
176 {
177 va_list args;
178 va_start(args, format);
179 TCHAR buffer[8192];
180 _vsntprintf(buffer, 8192, format, args);
181 va_end(args);
182 nxlog_debug(level, _T("[%s] %s"), m_debugId, buffer);
183 }
184
185 /**
186 * Force disconnect
187 */
188 void Tunnel::disconnect()
189 {
190 if (m_socket != INVALID_SOCKET)
191 shutdown(m_socket, SHUT_RDWR);
192 m_connected = false;
193 ThreadJoin(m_recvThread);
194 delete_and_null(m_queue);
195
196 Array channels(g_dwMaxSessions, 16, false);
197 MutexLock(m_channelLock);
198 Iterator<TunnelCommChannel> *it = m_channels.iterator();
199 while(it->hasNext())
200 {
201 TunnelCommChannel *c = it->next();
202 channels.add(c);
203 c->incRefCount();
204 }
205 delete it;
206 MutexUnlock(m_channelLock);
207
208 for(int i = 0; i < channels.size(); i++)
209 {
210 AbstractCommChannel *c = (AbstractCommChannel *)channels.get(i);
211 c->close();
212 c->decRefCount();
213 }
214 }
215
216 /**
217 * Receiver thread starter
218 */
219 THREAD_RESULT THREAD_CALL Tunnel::recvThreadStarter(void *arg)
220 {
221 ((Tunnel *)arg)->recvThread();
222 return THREAD_OK;
223 }
224
225 /**
226 * Receiver thread
227 */
228 void Tunnel::recvThread()
229 {
230 TlsMessageReceiver receiver(m_socket, m_ssl, m_sslLock, 8192, MAX_AGENT_MSG_SIZE);
231 while(m_connected)
232 {
233 MessageReceiverResult result;
234 NXCPMessage *msg = receiver.readMessage(1000, &result);
235 if (msg != NULL)
236 {
237 if (nxlog_get_debug_level() >= 6)
238 {
239 TCHAR buffer[64];
240 debugPrintf(6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
241 }
242
243 if (msg->getCode() == CMD_RESET_TUNNEL)
244 {
245 m_reset = true;
246 debugPrintf(4, _T("Receiver thread stopped (tunnel reset)"));
247 break;
248 }
249
250 switch(msg->getCode())
251 {
252 case CMD_BIND_AGENT_TUNNEL:
253 ThreadPoolExecute(g_commThreadPool, this, &Tunnel::processBindRequest, msg);
254 msg = NULL; // prevent message deletion
255 break;
256 case CMD_CREATE_CHANNEL:
257 createSession(msg);
258 break;
259 case CMD_CHANNEL_DATA:
260 if (msg->isBinary())
261 {
262 MutexLock(m_channelLock);
263 TunnelCommChannel *channel = m_channels.get(msg->getId());
264 MutexUnlock(m_channelLock);
265 if (channel != NULL)
266 {
267 channel->putData(msg->getBinaryData(), msg->getBinaryDataSize());
268 channel->decRefCount();
269 }
270 }
271 break;
272 case CMD_CLOSE_CHANNEL:
273 processChannelCloseRequest(msg);
274 break;
275 default:
276 m_queue->put(msg);
277 msg = NULL; // prevent message deletion
278 break;
279 }
280 delete msg;
281 }
282 else if (result != MSGRECV_TIMEOUT)
283 {
284 debugPrintf(4, _T("Receiver thread stopped (%s)"), AbstractMessageReceiver::resultToText(result));
285 m_reset = true;
286 break;
287 }
288 }
289 }
290
291 /**
292 * Write to SSL
293 */
294 int Tunnel::sslWrite(const void *data, size_t size)
295 {
296 if (!m_connected || m_reset)
297 return -1;
298
299 bool canRetry;
300 int bytes;
301 MutexLock(m_sslLock);
302 do
303 {
304 canRetry = false;
305 bytes = SSL_write(m_ssl, data, (int)size);
306 if (bytes <= 0)
307 {
308 int err = SSL_get_error(m_ssl, bytes);
309 if ((err == SSL_ERROR_WANT_READ) || (err == SSL_ERROR_WANT_WRITE))
310 {
311 SocketPoller sp(true);
312 sp.add(m_socket);
313 if (sp.poll(5000) > 0)
314 canRetry = true;
315 }
316 else
317 {
318 debugPrintf(7, _T("SSL_write error (bytes=%d ssl_err=%d errno=%d)"), bytes, err, errno);
319 if (err == SSL_ERROR_SSL)
320 LogOpenSSLErrorStack(7);
321 }
322 }
323 }
324 while(canRetry);
325 MutexUnlock(m_sslLock);
326 return bytes;
327 }
328
329 /**
330 * Send message
331 */
332 bool Tunnel::sendMessage(const NXCPMessage *msg)
333 {
334 if (!m_connected || m_reset)
335 return false;
336
337 if (nxlog_get_debug_level() >= 6)
338 {
339 TCHAR buffer[64];
340 debugPrintf(6, _T("Sending message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
341 }
342 NXCP_MESSAGE *data = msg->serialize(false);
343 bool success = (sslWrite(data, ntohl(data->size)) == ntohl(data->size));
344 free(data);
345 return success;
346 }
347
348 /**
349 * Load certificate for this tunnel
350 */
351 void Tunnel::loadCertificate()
352 {
353 BYTE addressHash[SHA1_DIGEST_SIZE];
354 #ifdef UNICODE
355 char *un = MBStringFromWideString(m_hostname);
356 CalculateSHA1Hash((BYTE *)un, strlen(un), addressHash);
357 free(un);
358 #else
359 CalculateSHA1Hash((BYTE *)m_hostname, strlen(m_hostname), addressHash);
360 #endif
361
362 TCHAR prefix[48];
363 BinToStr(addressHash, SHA1_DIGEST_SIZE, prefix);
364
365 TCHAR name[MAX_PATH];
366 _sntprintf(name, MAX_PATH, _T("%s%s.crt"), g_certificateDirectory, prefix);
367 FILE *f = _tfopen(name, _T("r"));
368 if (f == NULL)
369 {
370 debugPrintf(4, _T("Cannot open file \"%s\" (%s)"), name, _tcserror(errno));
371 if (errno == ENOENT)
372 {
373 // Try fallback file
374 m_address.buildHashKey(addressHash);
375 BinToStr(addressHash, 18, prefix);
376 _sntprintf(name, MAX_PATH, _T("%s%s.crt"), g_certificateDirectory, prefix);
377
378 f = _tfopen(name, _T("r"));
379 if (f == NULL)
380 {
381 debugPrintf(4, _T("Cannot open file \"%s\" (%s)"), name, _tcserror(errno));
382 return;
383 }
384 }
385 else
386 {
387 return;
388 }
389 }
390
391 X509 *cert = PEM_read_X509(f, NULL, NULL, NULL);
392 fclose(f);
393
394 if (cert == NULL)
395 {
396 debugPrintf(4, _T("Cannot load certificate from file \"%s\""), name);
397 return;
398 }
399
400 _sntprintf(name, MAX_PATH, _T("%s%s.key"), g_certificateDirectory, prefix);
401 f = _tfopen(name, _T("r"));
402 if (f == NULL)
403 {
404 debugPrintf(4, _T("Cannot open file \"%s\" (%s)"), name, _tcserror(errno));
405 X509_free(cert);
406 return;
407 }
408
409 EVP_PKEY *key = PEM_read_PrivateKey(f, NULL, NULL, (void *)"nxagentd");
410 fclose(f);
411
412 if (key == NULL)
413 {
414 debugPrintf(4, _T("Cannot load private key from file \"%s\""), name);
415 X509_free(cert);
416 return;
417 }
418
419 if (SSL_CTX_use_certificate(m_context, cert) == 1)
420 {
421 if (SSL_CTX_use_PrivateKey(m_context, key) == 1)
422 {
423 debugPrintf(4, _T("Certificate and private key loaded"));
424 }
425 else
426 {
427 debugPrintf(4, _T("Cannot set private key"));
428 }
429 }
430 else
431 {
432 debugPrintf(4, _T("Cannot set certificate"));
433 }
434
435 X509_free(cert);
436 EVP_PKEY_free(key);
437 }
438
439 /**
440 * Connect to server
441 */
442 bool Tunnel::connectToServer()
443 {
444 // Cleanup from previous connection attempt
445 if (m_socket != INVALID_SOCKET)
446 closesocket(m_socket);
447 if (m_ssl != NULL)
448 SSL_free(m_ssl);
449 if (m_context != NULL)
450 SSL_CTX_free(m_context);
451
452 m_socket = INVALID_SOCKET;
453 m_context = NULL;
454 m_ssl = NULL;
455
456 m_address = InetAddress::resolveHostName(m_hostname);
457 if (!m_address.isValidUnicast())
458 {
459 debugPrintf(4, _T("Server address cannot be resolved or is not valid"));
460 return false;
461 }
462
463 // Create socket and connect
464 m_socket = socket(m_address.getFamily(), SOCK_STREAM, 0);
465 if (m_socket == INVALID_SOCKET)
466 {
467 debugPrintf(4, _T("Cannot create socket (%s)"), _tcserror(WSAGetLastError()));
468 return false;
469 }
470
471 SockAddrBuffer sa;
472 m_address.fillSockAddr(&sa, m_port);
473 if (ConnectEx(m_socket, (struct sockaddr *)&sa, SA_LEN((struct sockaddr *)&sa), 5000) == -1)
474 {
475 debugPrintf(4, _T("Cannot establish connection (%s)"), _tcserror(WSAGetLastError()));
476 return false;
477 }
478
479 // Setup secure connection
480 const SSL_METHOD *method = SSLv23_method();
481 if (method == NULL)
482 {
483 debugPrintf(4, _T("Cannot obtain TLS method"));
484 return false;
485 }
486
487 m_context = SSL_CTX_new((SSL_METHOD *)method);
488 if (m_context == NULL)
489 {
490 debugPrintf(4, _T("Cannot create TLS context"));
491 return false;
492 }
493 #ifdef SSL_OP_NO_COMPRESSION
494 SSL_CTX_set_options(m_context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
495 #else
496 SSL_CTX_set_options(m_context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
497 #endif
498 loadCertificate();
499
500 m_ssl = SSL_new(m_context);
501 if (m_ssl == NULL)
502 {
503 debugPrintf(4, _T("Cannot create SSL object"));
504 return false;
505 }
506
507 SSL_set_connect_state(m_ssl);
508 SSL_set_fd(m_ssl, (int)m_socket);
509
510 while(true)
511 {
512 int rc = SSL_do_handshake(m_ssl);
513 if (rc != 1)
514 {
515 int sslErr = SSL_get_error(m_ssl, rc);
516 if (sslErr == SSL_ERROR_WANT_READ)
517 {
518 SocketPoller poller;
519 poller.add(m_socket);
520 if (poller.poll(5000) > 0)
521 continue;
522 debugPrintf(4, _T("TLS handshake failed (timeout)"));
523 return false;
524 }
525 else
526 {
527 char buffer[128];
528 debugPrintf(4, _T("TLS handshake failed (%hs)"), ERR_error_string(sslErr, buffer));
529 return false;
530 }
531 }
532 break;
533 }
534
535 // Check server certificate
536 X509 *cert = SSL_get_peer_certificate(m_ssl);
537 if (cert == NULL)
538 {
539 debugPrintf(4, _T("Server certificate not provided"));
540 return false;
541 }
542
543 char *subj = X509_NAME_oneline(X509_get_subject_name(cert), NULL ,0);
544 char *issuer = X509_NAME_oneline(X509_get_issuer_name(cert), NULL ,0);
545 debugPrintf(4, _T("Server certificate subject is %hs"), subj);
546 debugPrintf(4, _T("Server certificate issuer is %hs"), issuer);
547 OPENSSL_free(subj);
548 OPENSSL_free(issuer);
549
550 X509_free(cert);
551
552 // Setup receiver
553 delete m_queue;
554 m_queue = new MsgWaitQueue();
555 m_connected = true;
556 m_recvThread = ThreadCreateEx(Tunnel::recvThreadStarter, 0, this);
557
558 m_requestId = 0;
559
560 // Do handshake
561 NXCPMessage msg;
562 msg.setCode(CMD_SETUP_AGENT_TUNNEL);
563 msg.setId(InterlockedIncrement(&m_requestId));
564 msg.setField(VID_AGENT_VERSION, NETXMS_BUILD_TAG);
565 msg.setField(VID_SYS_NAME, g_systemName);
566 msg.setField(VID_ZONE_UIN, g_zoneUIN);
567
568 TCHAR fqdn[256];
569 if (GetLocalHostName(fqdn, 256, true))
570 msg.setField(VID_HOSTNAME, fqdn);
571
572 VirtualSession session(0);
573 TCHAR buffer[MAX_RESULT_LENGTH];
574 if (GetParameterValue(_T("System.PlatformName"), buffer, &session) == ERR_SUCCESS)
575 msg.setField(VID_PLATFORM_NAME, buffer);
576 if (GetParameterValue(_T("System.UName"), buffer, &session) == ERR_SUCCESS)
577 msg.setField(VID_SYS_DESCRIPTION, buffer);
578
579 sendMessage(&msg);
580
581 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
582 if (response == NULL)
583 {
584 debugPrintf(4, _T("Cannot configure tunnel (request timeout)"));
585 disconnect();
586 return false;
587 }
588
589 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
590 delete response;
591 if (rcc != ERR_SUCCESS)
592 {
593 debugPrintf(4, _T("Cannot configure tunnel (error %d)"), rcc);
594 disconnect();
595 return false;
596 }
597
598 return true;
599 }
600
601 /**
602 * Check tunnel connection and connect as needed
603 */
604 void Tunnel::checkConnection()
605 {
606 if (m_reset)
607 {
608 m_reset = false;
609 disconnect();
610 closesocket(m_socket);
611 m_socket = INVALID_SOCKET;
612 debugPrintf(3, _T("Resetting tunnel"));
613 if (connectToServer())
614 debugPrintf(3, _T("Tunnel is active"));
615 }
616 else if (!m_connected)
617 {
618 if (connectToServer())
619 debugPrintf(3, _T("Tunnel is active"));
620 }
621 else
622 {
623 NXCPMessage msg;
624 msg.setCode(CMD_KEEPALIVE);
625 msg.setId(InterlockedIncrement(&m_requestId));
626 if (sendMessage(&msg))
627 {
628 NXCPMessage *response = waitForMessage(CMD_KEEPALIVE, msg.getId());
629 if (response == NULL)
630 {
631 disconnect();
632 closesocket(m_socket);
633 m_socket = INVALID_SOCKET;
634 debugPrintf(3, _T("Connection test failed"));
635 }
636 else
637 {
638 delete response;
639 }
640 }
641 else
642 {
643 disconnect();
644 closesocket(m_socket);
645 m_socket = INVALID_SOCKET;
646 debugPrintf(3, _T("Connection test failed"));
647 }
648 }
649 }
650
651 /**
652 * Create certificate request
653 */
654 X509_REQ *Tunnel::createCertificateRequest(const char *country, const char *org, const char *cn, EVP_PKEY **pkey)
655 {
656 RSA *key = RSA_new();
657 if (key == NULL)
658 {
659 debugPrintf(4, _T("call to RSA_new() failed"));
660 return NULL;
661 }
662
663 BIGNUM *bn = BN_new();
664 if (bn == NULL)
665 {
666 debugPrintf(4, _T("call to BN_new() failed"));
667 RSA_free(key);
668 return NULL;
669 }
670
671 BN_set_word(bn, RSA_F4);
672 if (RSA_generate_key_ex(key, NETXMS_RSA_KEYLEN, bn, NULL) == -1)
673 {
674 debugPrintf(4, _T("call to RSA_generate_key_ex() failed"));
675 RSA_free(key);
676 BN_free(bn);
677 return NULL;
678 }
679 BN_free(bn);
680
681 X509_REQ *req = X509_REQ_new();
682 if (req != NULL)
683 {
684 X509_REQ_set_version(req, 1);
685 X509_NAME *subject = X509_REQ_get_subject_name(req);
686 if (subject != NULL)
687 {
688 if (country != NULL)
689 X509_NAME_add_entry_by_txt(subject, "C", MBSTRING_UTF8, (const BYTE *)country, -1, -1, 0);
690 X509_NAME_add_entry_by_txt(subject, "O", MBSTRING_UTF8, (const BYTE *)((org != NULL) ? org : "netxms.org"), -1, -1, 0);
691 X509_NAME_add_entry_by_txt(subject, "CN", MBSTRING_UTF8, (const BYTE *)cn, -1, -1, 0);
692
693 EVP_PKEY *ekey = EVP_PKEY_new();
694 if (ekey != NULL)
695 {
696 EVP_PKEY_assign_RSA(ekey, key);
697 key = NULL; // will be freed by EVP_PKEY_free
698 X509_REQ_set_pubkey(req, ekey);
699 if (X509_REQ_sign(req, ekey, EVP_sha256()) > 0)
700 {
701 *pkey = ekey;
702 }
703 else
704 {
705 debugPrintf(4, _T("call to X509_REQ_sign() failed"));
706 X509_REQ_free(req);
707 req = NULL;
708 EVP_PKEY_free(ekey);
709 }
710 }
711 else
712 {
713 debugPrintf(4, _T("call to EVP_PKEY_new() failed"));
714 X509_REQ_free(req);
715 req = NULL;
716 }
717 }
718 else
719 {
720 debugPrintf(4, _T("call to X509_REQ_get_subject_name() failed"));
721 X509_REQ_free(req);
722 req = NULL;
723 }
724 }
725 else
726 {
727 debugPrintf(4, _T("call to X509_REQ_new() failed"));
728 }
729
730 if (key != NULL)
731 RSA_free(key);
732 return req;
733 }
734
735 /**
736 * Save certificate
737 */
738 bool Tunnel::saveCertificate(X509 *cert, EVP_PKEY *key)
739 {
740 BYTE addressHash[SHA1_DIGEST_SIZE];
741 #ifdef UNICODE
742 char *un = MBStringFromWideString(m_hostname);
743 CalculateSHA1Hash((BYTE *)un, strlen(un), addressHash);
744 free(un);
745 #else
746 CalculateSHA1Hash((BYTE *)m_hostname, strlen(m_hostname), addressHash);
747 #endif
748
749 TCHAR prefix[48];
750 BinToStr(addressHash, SHA1_DIGEST_SIZE, prefix);
751
752 TCHAR name[MAX_PATH];
753 _sntprintf(name, MAX_PATH, _T("%s%s.crt"), g_certificateDirectory, prefix);
754 FILE *f = _tfopen(name, _T("w"));
755 if (f == NULL)
756 {
757 debugPrintf(4, _T("Cannot open file \"%s\" (%s)"), name, _tcserror(errno));
758 return false;
759 }
760 int rc = PEM_write_X509(f, cert);
761 fclose(f);
762 if (rc != 1)
763 {
764 debugPrintf(4, _T("PEM_write_X509(\"%s\") failed"), name);
765 return false;
766 }
767
768 _sntprintf(name, MAX_PATH, _T("%s%s.key"), g_certificateDirectory, prefix);
769 f = _tfopen(name, _T("w"));
770 if (f == NULL)
771 {
772 debugPrintf(4, _T("Cannot open file \"%s\" (%s)"), name, _tcserror(errno));
773 return false;
774 }
775 rc = PEM_write_PrivateKey(f, key, EVP_des_ede3_cbc(), NULL, 0, 0, (void *)"nxagentd");
776 fclose(f);
777 if (rc != 1)
778 {
779 debugPrintf(4, _T("PEM_write_PrivateKey(\"%s\") failed"), name);
780 return false;
781 }
782
783 debugPrintf(4, _T("Certificate and private key saved"));
784 return true;
785 }
786
787 /**
788 * Process tunnel bind request
789 */
790 void Tunnel::processBindRequest(NXCPMessage *request)
791 {
792 NXCPMessage response(CMD_REQUEST_COMPLETED, request->getId());
793
794 uuid guid = request->getFieldAsGUID(VID_GUID);
795 char *cn = guid.toString().getUTF8String();
796
797 char *country = request->getFieldAsUtf8String(VID_COUNTRY);
798 char *org = request->getFieldAsUtf8String(VID_ORGANIZATION);
799
800 EVP_PKEY *key = NULL;
801 X509_REQ *req = createCertificateRequest(country, org, cn, &key);
802 free(cn);
803
804 if (req != NULL)
805 {
806 BYTE *buffer = NULL;
807 int len = i2d_X509_REQ(req, &buffer);
808 if (len > 0)
809 {
810 NXCPMessage certRequest(CMD_REQUEST_CERTIFICATE, request->getId());
811 certRequest.setField(VID_CERTIFICATE, buffer, len);
812 sendMessage(&certRequest);
813 OPENSSL_free(buffer);
814
815 NXCPMessage *certResponse = waitForMessage(CMD_NEW_CERTIFICATE, request->getId());
816 if (certResponse != NULL)
817 {
818 UINT32 rcc = certResponse->getFieldAsUInt32(VID_RCC);
819 if (rcc == ERR_SUCCESS)
820 {
821 size_t certLen;
822 const BYTE *certData = certResponse->getBinaryFieldPtr(VID_CERTIFICATE, &certLen);
823 if (certData != NULL)
824 {
825 X509 *cert = d2i_X509(NULL, &certData, (long)certLen);
826 if (cert != NULL)
827 {
828 if (saveCertificate(cert, key))
829 {
830 response.setField(VID_RCC, ERR_SUCCESS);
831 }
832 else
833 {
834 response.setField(VID_RCC, ERR_IO_FAILURE);
835 }
836 X509_free(cert);
837 }
838 else
839 {
840 debugPrintf(4, _T("certificate data is invalid"));
841 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
842 }
843 }
844 else
845 {
846 debugPrintf(4, _T("certificate data missing in server response"));
847 response.setField(VID_RCC, ERR_INTERNAL_ERROR);
848 }
849 }
850 else
851 {
852 debugPrintf(4, _T("certificate request failed (%d)"), rcc);
853 response.setField(VID_RCC, rcc);
854 }
855 delete certResponse;
856 }
857 else
858 {
859 debugPrintf(4, _T("timeout waiting for certificate request completion"));
860 response.setField(VID_RCC, ERR_REQUEST_TIMEOUT);
861 }
862 }
863 else
864 {
865 debugPrintf(4, _T("call to i2d_X509_REQ() failed"));
866 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
867 }
868 X509_REQ_free(req);
869 EVP_PKEY_free(key);
870 }
871 else
872 {
873 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
874 }
875
876 sendMessage(&response);
877 delete request;
878 }
879
880 /**
881 * Create new session
882 */
883 void Tunnel::createSession(NXCPMessage *request)
884 {
885 NXCPMessage response(CMD_REQUEST_COMPLETED, request->getId());
886
887 // Assume that peer always have minimal access, so don't check return value
888 bool masterServer, controlServer;
889 IsValidServerAddress(m_address, &masterServer, &controlServer);
890
891 TunnelCommChannel *channel = createChannel();
892 if (channel != NULL)
893 {
894 CommSession *session = new CommSession(channel, m_address, masterServer, controlServer);
895 if (RegisterSession(session))
896 {
897 response.setField(VID_RCC, ERR_SUCCESS);
898 response.setField(VID_CHANNEL_ID, channel->getId());
899 debugPrintf(9, _T("New session registered"));
900 session->run();
901 }
902 else
903 {
904 delete session;
905 response.setField(VID_RCC, ERR_OUT_OF_RESOURCES);
906 }
907 channel->decRefCount();
908 }
909 else
910 {
911 response.setField(VID_RCC, ERR_OUT_OF_RESOURCES);
912 }
913
914 sendMessage(&response);
915 }
916
917 /**
918 * Create channel
919 */
920 TunnelCommChannel *Tunnel::createChannel()
921 {
922 TunnelCommChannel *channel = NULL;
923 MutexLock(m_channelLock);
924 if (m_channels.size() < (int)g_dwMaxSessions)
925 {
926 channel = new TunnelCommChannel(this);
927 m_channels.set(channel->getId(), channel);
928 debugPrintf(5, _T("New channel created (ID=%d)"), channel->getId());
929 }
930 MutexUnlock(m_channelLock);
931 return channel;
932 }
933
934 /**
935 * Process server's channel close request
936 */
937 void Tunnel::processChannelCloseRequest(NXCPMessage *request)
938 {
939 UINT32 id = request->getFieldAsUInt32(VID_CHANNEL_ID);
940 debugPrintf(5, _T("Close request for channel %d"), id);
941 MutexLock(m_channelLock);
942 TunnelCommChannel *channel = m_channels.get(id);
943 MutexUnlock(m_channelLock);
944 if (channel != NULL)
945 {
946 channel->close();
947 channel->decRefCount();
948 }
949 }
950
951 /**
952 * Close channel
953 */
954 void Tunnel::closeChannel(TunnelCommChannel *channel)
955 {
956 UINT32 id = 0;
957 MutexLock(m_channelLock);
958 if (m_channels.contains(channel->getId()))
959 {
960 id = channel->getId();
961 debugPrintf(5, _T("Channel %d closed"), id);
962 m_channels.remove(id);
963 }
964 MutexUnlock(m_channelLock);
965
966 if (id != 0)
967 {
968 NXCPMessage msg(CMD_CLOSE_CHANNEL, 0);
969 msg.setField(VID_CHANNEL_ID, id);
970 sendMessage(&msg);
971 }
972 }
973
974 /**
975 * Send channel data
976 */
977 int Tunnel::sendChannelData(UINT32 id, const void *data, size_t len)
978 {
979 NXCP_MESSAGE *msg = CreateRawNXCPMessage(CMD_CHANNEL_DATA, id, 0, data, len, NULL, false);
980 int rc = sslWrite(msg, ntohl(msg->size));
981 if (rc == ntohl(msg->size))
982 rc = (int)len; // adjust number of bytes to exclude tunnel overhead
983 free(msg);
984 return rc;
985 }
986
987 /**
988 * Create tunnel object from configuration record
989 */
990 Tunnel *Tunnel::createFromConfig(TCHAR *config)
991 {
992 int port = AGENT_TUNNEL_PORT;
993 TCHAR *p = _tcschr(config, _T(':'));
994 if (p != NULL)
995 {
996 *p = 0;
997 p++;
998
999 TCHAR *eptr;
1000 int port = _tcstol(p, &eptr, 10);
1001 if ((port < 1) || (port > 65535))
1002 return NULL;
1003 }
1004 return new Tunnel(config, port);
1005 }
1006
1007 /**
1008 * Channel constructor
1009 */
1010 TunnelCommChannel::TunnelCommChannel(Tunnel *tunnel) : AbstractCommChannel(), m_buffer(32768, 32768)
1011 {
1012 m_id = InterlockedIncrement(&s_nextChannelId);
1013 m_tunnel = tunnel;
1014 m_active = true;
1015 m_closed = 0;
1016 #ifdef _WIN32
1017 InitializeCriticalSectionAndSpinCount(&m_bufferLock, 4000);
1018 m_dataCondition = CreateEvent(NULL, TRUE, FALSE, NULL);
1019 #else
1020 pthread_mutex_init(&m_bufferLock, NULL);
1021 pthread_cond_init(&m_dataCondition, NULL);
1022 #endif
1023 }
1024
1025 /**
1026 * Channel destructor
1027 */
1028 TunnelCommChannel::~TunnelCommChannel()
1029 {
1030 #ifdef _WIN32
1031 DeleteCriticalSection(&m_bufferLock);
1032 CloseHandle(m_dataCondition);
1033 #else
1034 pthread_mutex_destroy(&m_bufferLock);
1035 pthread_cond_destroy(&m_dataCondition);
1036 #endif
1037 }
1038
1039 /**
1040 * Send data
1041 */
1042 int TunnelCommChannel::send(const void *data, size_t size, MUTEX mutex)
1043 {
1044 return m_active ? m_tunnel->sendChannelData(m_id, data, size) : -1;
1045 }
1046
1047 /**
1048 * Receive data
1049 */
1050 int TunnelCommChannel::recv(void *buffer, size_t size, UINT32 timeout)
1051 {
1052 if (!m_active)
1053 return 0;
1054
1055 #ifdef _WIN32
1056 EnterCriticalSection(&m_bufferLock);
1057 if (m_buffer.isEmpty())
1058 {
1059 retry_wait:
1060 LeaveCriticalSection(&m_bufferLock);
1061 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
1062 return -2;
1063
1064 if (!m_active)
1065 return 0; // closed while waiting
1066
1067 EnterCriticalSection(&m_bufferLock);
1068 if (m_buffer.isEmpty())
1069 {
1070 ResetEvent(m_dataCondition);
1071 goto retry_wait;
1072 }
1073 }
1074 #else
1075 pthread_mutex_lock(&m_bufferLock);
1076 if (m_buffer.isEmpty())
1077 {
1078 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
1079 struct timespec ts;
1080 ts.tv_sec = timeout / 1000;
1081 ts.tv_nsec = (timeout % 1000) * 1000000;
1082 int rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
1083 #else
1084 struct timeval now;
1085 struct timespec ts;
1086 gettimeofday(&now, NULL);
1087 ts.tv_sec = now.tv_sec + (timeout / 1000);
1088 now.tv_usec += (timeout % 1000) * 1000;
1089 ts.tv_sec += now.tv_usec / 1000000;
1090 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
1091 int rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
1092 #endif
1093 if (rc != 0)
1094 {
1095 pthread_mutex_unlock(&m_bufferLock);
1096 return -2; // timeout
1097 }
1098
1099 if (!m_active) // closed while waiting
1100 {
1101 pthread_mutex_unlock(&m_bufferLock);
1102 return 0;
1103 }
1104 }
1105 #endif
1106
1107 size_t bytes = m_buffer.read((BYTE *)buffer, size);
1108 #ifdef _WIN32
1109 if (m_buffer.isEmpty())
1110 ResetEvent(m_dataCondition);
1111 LeaveCriticalSection(&m_bufferLock);
1112 #else
1113 pthread_mutex_unlock(&m_bufferLock);
1114 #endif
1115 return (int)bytes;
1116 }
1117
1118 /**
1119 * Poll for data
1120 */
1121 int TunnelCommChannel::poll(UINT32 timeout, bool write)
1122 {
1123 if (write)
1124 return 1;
1125
1126 if (!m_active)
1127 return -1;
1128
1129 int rc = 0;
1130
1131 #ifdef _WIN32
1132 EnterCriticalSection(&m_bufferLock);
1133 if (m_buffer.isEmpty())
1134 {
1135 retry_wait:
1136 LeaveCriticalSection(&m_bufferLock);
1137 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
1138 return 0;
1139
1140 if (!m_active)
1141 return -1;
1142
1143 EnterCriticalSection(&m_bufferLock);
1144 if (m_buffer.isEmpty())
1145 {
1146 ResetEvent(m_dataCondition);
1147 goto retry_wait;
1148 }
1149 }
1150 LeaveCriticalSection(&m_bufferLock);
1151 #else
1152 pthread_mutex_lock(&m_bufferLock);
1153 if (m_buffer.isEmpty())
1154 {
1155 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
1156 struct timespec ts;
1157 ts.tv_sec = timeout / 1000;
1158 ts.tv_nsec = (timeout % 1000) * 1000000;
1159 rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
1160 #else
1161 struct timeval now;
1162 struct timespec ts;
1163 gettimeofday(&now, NULL);
1164 ts.tv_sec = now.tv_sec + (timeout / 1000);
1165 now.tv_usec += (timeout % 1000) * 1000;
1166 ts.tv_sec += now.tv_usec / 1000000;
1167 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
1168 rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
1169 #endif
1170 }
1171 pthread_mutex_unlock(&m_bufferLock);
1172 #endif
1173
1174 return (rc == 0) ? 1 : 0;
1175 }
1176
1177 /**
1178 * Shutdown channel
1179 */
1180 int TunnelCommChannel::shutdown()
1181 {
1182 m_active = false;
1183 #ifdef _WIN32
1184 EnterCriticalSection(&m_bufferLock);
1185 SetEvent(m_dataCondition);
1186 LeaveCriticalSection(&m_bufferLock);
1187 #else
1188 pthread_cond_broadcast(&m_dataCondition);
1189 #endif
1190 return 0;
1191 }
1192
1193 /**
1194 * Close channel
1195 */
1196 void TunnelCommChannel::close()
1197 {
1198 if (InterlockedIncrement(&m_closed) > 1)
1199 return; // already closed or close in progress
1200 shutdown();
1201 m_tunnel->closeChannel(this);
1202 }
1203
1204 /**
1205 * Put data into buffer
1206 */
1207 void TunnelCommChannel::putData(const BYTE *data, size_t size)
1208 {
1209 #ifdef _WIN32
1210 EnterCriticalSection(&m_bufferLock);
1211 #else
1212 pthread_mutex_lock(&m_bufferLock);
1213 #endif
1214 m_buffer.write(data, size);
1215 #ifdef _WIN32
1216 SetEvent(m_dataCondition);
1217 LeaveCriticalSection(&m_bufferLock);
1218 #else
1219 pthread_cond_broadcast(&m_dataCondition);
1220 pthread_mutex_unlock(&m_bufferLock);
1221 #endif
1222 }
1223
1224 /**
1225 * Configured tunnels
1226 */
1227 static ObjectArray<Tunnel> s_tunnels;
1228
1229 /**
1230 * Parser server connection (tunnel) list
1231 */
1232 void ParseTunnelList(TCHAR *list)
1233 {
1234 TCHAR *curr, *next;
1235 for(curr = next = list; curr != NULL && *curr != 0; curr = next + 1)
1236 {
1237 next = _tcschr(curr, _T('\n'));
1238 if (next != NULL)
1239 *next = 0;
1240 StrStrip(curr);
1241
1242 Tunnel *t = Tunnel::createFromConfig(curr);
1243 if (t != NULL)
1244 {
1245 s_tunnels.add(t);
1246 nxlog_debug(1, _T("Added server tunnel %s"), t->getHostname());
1247 }
1248 else
1249 {
1250 nxlog_write(MSG_INVALID_TUNNEL_CONFIG, NXLOG_ERROR, "s", curr);
1251 }
1252 }
1253 free(list);
1254 }
1255
1256 /**
1257 * Tunnel manager
1258 */
1259 THREAD_RESULT THREAD_CALL TunnelManager(void *)
1260 {
1261 if (s_tunnels.size() == 0)
1262 {
1263 nxlog_debug(3, _T("No tunnels configured, tunnel manager will not start"));
1264 return THREAD_OK;
1265 }
1266
1267 g_tunnelKeepaliveInterval *= 1000; // convert to milliseconds
1268 nxlog_debug(3, _T("Tunnel manager started"));
1269 do
1270 {
1271 for(int i = 0; i < s_tunnels.size(); i++)
1272 {
1273 Tunnel *t = s_tunnels.get(i);
1274 t->checkConnection();
1275 }
1276 }
1277 while(!AgentSleepAndCheckForShutdown(g_tunnelKeepaliveInterval));
1278 nxlog_debug(3, _T("Tunnel manager stopped"));
1279 return THREAD_OK;
1280 }