added tags to tunnel debug; debug tag length increased
[public/netxms.git] / src / server / core / tunnel.cpp
1 /*
2 ** NetXMS - Network Management System
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: tunnel.cpp
20 **/
21
22 #include "nxcore.h"
23 #include <socket_listener.h>
24 #include <agent_tunnel.h>
25
26 #define MAX_MSG_SIZE 268435456
27
28 #define DEBUG_TAG _T("agent.tunnel")
29
30 /**
31 * Tunnel registration
32 */
33 static RefCountHashMap<UINT32, AgentTunnel> s_boundTunnels(true);
34 static ObjectRefArray<AgentTunnel> s_unboundTunnels(16, 16);
35 static Mutex s_tunnelListLock;
36
37 /**
38 * Register tunnel
39 */
40 static void RegisterTunnel(AgentTunnel *tunnel)
41 {
42 tunnel->incRefCount();
43 s_tunnelListLock.lock();
44 if (tunnel->isBound())
45 {
46 s_boundTunnels.set(tunnel->getNodeId(), tunnel);
47 tunnel->decRefCount(); // set already increased ref count
48 }
49 else
50 {
51 s_unboundTunnels.add(tunnel);
52 }
53 s_tunnelListLock.unlock();
54 }
55
56 /**
57 * Unregister tunnel
58 */
59 static void UnregisterTunnel(AgentTunnel *tunnel)
60 {
61 tunnel->debugPrintf(4, _T("Tunnel unregistered"));
62 s_tunnelListLock.lock();
63 if (tunnel->isBound())
64 {
65 // Check that current tunnel for node is tunnel being unregistered
66 // New tunnel could be established while old one still finishing
67 // outstanding requests
68 if (s_boundTunnels.peek(tunnel->getNodeId()) == tunnel)
69 s_boundTunnels.remove(tunnel->getNodeId());
70 }
71 else
72 {
73 s_unboundTunnels.remove(tunnel);
74 tunnel->decRefCount();
75 }
76 s_tunnelListLock.unlock();
77 }
78
79 /**
80 * Get tunnel for node. Caller must decrease reference counter on tunnel.
81 */
82 AgentTunnel *GetTunnelForNode(UINT32 nodeId)
83 {
84 s_tunnelListLock.lock();
85 AgentTunnel *t = s_boundTunnels.get(nodeId);
86 s_tunnelListLock.unlock();
87 return t;
88 }
89
90 /**
91 * Bind agent tunnel
92 */
93 UINT32 BindAgentTunnel(UINT32 tunnelId, UINT32 nodeId)
94 {
95 AgentTunnel *tunnel = NULL;
96 s_tunnelListLock.lock();
97 for(int i = 0; i < s_unboundTunnels.size(); i++)
98 {
99 if (s_unboundTunnels.get(i)->getId() == tunnelId)
100 {
101 tunnel = s_unboundTunnels.get(i);
102 tunnel->incRefCount();
103 break;
104 }
105 }
106 s_tunnelListLock.unlock();
107
108 if (tunnel == NULL)
109 {
110 nxlog_debug_tag(DEBUG_TAG, 4, _T("BindAgentTunnel: unbound tunnel with ID %d not found"), tunnelId);
111 return RCC_INVALID_TUNNEL_ID;
112 }
113
114 UINT32 rcc = tunnel->bind(nodeId);
115 tunnel->decRefCount();
116 return rcc;
117 }
118
119 /**
120 * Bind agent tunnel from node
121 */
122 UINT32 UnbindAgentTunnel(UINT32 nodeId)
123 {
124 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
125 if (node == NULL)
126 return RCC_INVALID_OBJECT_ID;
127
128 if (node->getTunnelId().isNull())
129 return RCC_SUCCESS; // tunnel is not set
130
131 node->setTunnelId(uuid::NULL_UUID);
132
133 AgentTunnel *tunnel = GetTunnelForNode(nodeId);
134 if (tunnel != NULL)
135 {
136 nxlog_debug_tag(DEBUG_TAG, 4, _T("UnbindAgentTunnel(%s): shutting down existing tunnel"), node->getName());
137 tunnel->shutdown();
138 tunnel->decRefCount();
139 }
140
141 return RCC_SUCCESS;
142 }
143
144 /**
145 * Get list of agent tunnels into NXCP message
146 */
147 void GetAgentTunnels(NXCPMessage *msg)
148 {
149 s_tunnelListLock.lock();
150 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
151
152 for(int i = 0; i < s_unboundTunnels.size(); i++)
153 {
154 s_unboundTunnels.get(i)->fillMessage(msg, fieldId);
155 fieldId += 64;
156 }
157
158 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
159 while(it->hasNext())
160 {
161 it->next()->fillMessage(msg, fieldId);
162 fieldId += 64;
163 }
164 delete it;
165
166 msg->setField(VID_NUM_ELEMENTS, (UINT32)(s_unboundTunnels.size() + s_boundTunnels.size()));
167 s_tunnelListLock.unlock();
168 }
169
170 /**
171 * Show tunnels in console
172 */
173 void ShowAgentTunnels(CONSOLE_CTX console)
174 {
175 s_tunnelListLock.lock();
176
177 ConsolePrintf(console,
178 _T("\n\x1b[1mBOUND TUNNELS\x1b[0m\n")
179 _T("ID | Node ID | Peer IP Address | System Name | Platform Name | Agent Version\n")
180 _T("-----+---------+--------------------------+--------------------------+------------------+------------------------\n"));
181 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
182 while(it->hasNext())
183 {
184 AgentTunnel *t = it->next();
185 TCHAR ipAddrBuffer[64];
186 ConsolePrintf(console, _T("%4d | %7u | %-24s | %-24s | %-16s | %s\n"), t->getId(), t->getNodeId(), t->getAddress().toString(ipAddrBuffer), t->getSystemName(), t->getPlatformName(), t->getAgentVersion());
187 }
188 delete it;
189
190 ConsolePrintf(console,
191 _T("\n\x1b[1mUNBOUND TUNNELS\x1b[0m\n")
192 _T("ID | Peer IP Address | System Name | Platform Name | Agent Version\n")
193 _T("-----+--------------------------+--------------------------+------------------+------------------------\n"));
194 for(int i = 0; i < s_unboundTunnels.size(); i++)
195 {
196 const AgentTunnel *t = s_unboundTunnels.get(i);
197 TCHAR ipAddrBuffer[64];
198 ConsolePrintf(console, _T("%4d | %-24s | %-24s | %-16s | %s\n"), t->getId(), t->getAddress().toString(ipAddrBuffer), t->getSystemName(), t->getPlatformName(), t->getAgentVersion());
199 }
200
201 s_tunnelListLock.unlock();
202 }
203
204 /**
205 * Next free tunnel ID
206 */
207 static VolatileCounter s_nextTunnelId = 0;
208
209 /**
210 * Agent tunnel constructor
211 */
212 AgentTunnel::AgentTunnel(SSL_CTX *context, SSL *ssl, SOCKET sock, const InetAddress& addr, UINT32 nodeId, UINT32 zoneUIN) : RefCountObject(), m_channels(true)
213 {
214 m_id = InterlockedIncrement(&s_nextTunnelId);
215 m_address = addr;
216 m_socket = sock;
217 m_context = context;
218 m_ssl = ssl;
219 m_sslLock = MutexCreate();
220 m_requestId = 0;
221 m_nodeId = nodeId;
222 m_zoneUIN = zoneUIN;
223 m_state = AGENT_TUNNEL_INIT;
224 m_systemName = NULL;
225 m_platformName = NULL;
226 m_systemInfo = NULL;
227 m_agentVersion = NULL;
228 m_bindRequestId = 0;
229 m_channelLock = MutexCreate();
230 }
231
232 /**
233 * Agent tunnel destructor
234 */
235 AgentTunnel::~AgentTunnel()
236 {
237 m_channels.clear();
238 shutdown();
239 SSL_CTX_free(m_context);
240 SSL_free(m_ssl);
241 MutexDestroy(m_sslLock);
242 closesocket(m_socket);
243 free(m_systemName);
244 free(m_platformName);
245 free(m_systemInfo);
246 free(m_agentVersion);
247 MutexDestroy(m_channelLock);
248 debugPrintf(4, _T("Tunnel destroyed"));
249 }
250
251 /**
252 * Debug output
253 */
254 void AgentTunnel::debugPrintf(int level, const TCHAR *format, ...)
255 {
256 va_list args;
257 va_start(args, format);
258 nxlog_debug_tag_object2(DEBUG_TAG, m_id, level, format, args);
259 va_end(args);
260 }
261
262 /**
263 * Tunnel receiver thread
264 */
265 void AgentTunnel::recvThread()
266 {
267 TlsMessageReceiver receiver(m_socket, m_ssl, m_sslLock, 4096, MAX_MSG_SIZE);
268 while(true)
269 {
270 MessageReceiverResult result;
271 NXCPMessage *msg = receiver.readMessage(60000, &result);
272 if (result != MSGRECV_SUCCESS)
273 {
274 if (result == MSGRECV_CLOSED)
275 debugPrintf(4, _T("Tunnel closed by peer"));
276 else
277 debugPrintf(4, _T("Communication error (%s)"), AbstractMessageReceiver::resultToText(result));
278 break;
279 }
280
281 if (nxlog_get_debug_level() >= 6)
282 {
283 TCHAR buffer[64];
284 debugPrintf(6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
285 }
286
287 switch(msg->getCode())
288 {
289 case CMD_KEEPALIVE:
290 {
291 NXCPMessage response(CMD_KEEPALIVE, msg->getId());
292 sendMessage(&response);
293 }
294 break;
295 case CMD_SETUP_AGENT_TUNNEL:
296 setup(msg);
297 break;
298 case CMD_REQUEST_CERTIFICATE:
299 processCertificateRequest(msg);
300 break;
301 case CMD_CHANNEL_DATA:
302 if (msg->isBinary())
303 {
304 MutexLock(m_channelLock);
305 AgentTunnelCommChannel *channel = m_channels.get(msg->getId());
306 MutexUnlock(m_channelLock);
307 if (channel != NULL)
308 {
309 channel->putData(msg->getBinaryData(), msg->getBinaryDataSize());
310 channel->decRefCount();
311 }
312 else
313 {
314 debugPrintf(6, _T("Received channel data for non-existing channel %u"), msg->getId());
315 }
316 }
317 break;
318 case CMD_CLOSE_CHANNEL: // channel close notification
319 processChannelClose(msg->getFieldAsUInt32(VID_CHANNEL_ID));
320 break;
321 default:
322 m_queue.put(msg);
323 msg = NULL; // prevent message deletion
324 break;
325 }
326 delete msg;
327 }
328
329 UnregisterTunnel(this);
330 m_state = AGENT_TUNNEL_SHUTDOWN;
331
332 // shutdown all channels
333 MutexLock(m_channelLock);
334 Iterator<AgentTunnelCommChannel> *it = m_channels.iterator();
335 while(it->hasNext())
336 it->next()->shutdown();
337 delete it;
338 m_channels.clear();
339 MutexUnlock(m_channelLock);
340
341 debugPrintf(4, _T("Receiver thread stopped"));
342 }
343
344 /**
345 * Tunnel receiver thread starter
346 */
347 THREAD_RESULT THREAD_CALL AgentTunnel::recvThreadStarter(void *arg)
348 {
349 ThreadSetName("TunnelReceiver");
350 ((AgentTunnel *)arg)->recvThread();
351 ((AgentTunnel *)arg)->decRefCount();
352 return THREAD_OK;
353 }
354
355 /**
356 * Write to SSL
357 */
358 int AgentTunnel::sslWrite(const void *data, size_t size)
359 {
360 bool canRetry;
361 int bytes;
362 MutexLock(m_sslLock);
363 do
364 {
365 canRetry = false;
366 bytes = SSL_write(m_ssl, data, (int)size);
367 if (bytes <= 0)
368 {
369 int err = SSL_get_error(m_ssl, bytes);
370 if ((err == SSL_ERROR_WANT_READ) || (err == SSL_ERROR_WANT_WRITE))
371 {
372 SocketPoller sp(true);
373 sp.add(m_socket);
374 if (sp.poll(5000) > 0)
375 canRetry = true;
376 }
377 else
378 {
379 debugPrintf(7, _T("SSL_write error (bytes=%d ssl_err=%d errno=%d)"), bytes, err, errno);
380 if (err == SSL_ERROR_SSL)
381 LogOpenSSLErrorStack(7);
382 }
383 }
384 }
385 while(canRetry);
386 MutexUnlock(m_sslLock);
387 return bytes;
388 }
389
390 /**
391 * Send message on tunnel
392 */
393 bool AgentTunnel::sendMessage(NXCPMessage *msg)
394 {
395 if (m_state == AGENT_TUNNEL_SHUTDOWN)
396 return false;
397
398 if (nxlog_get_debug_level() >= 6)
399 {
400 TCHAR buffer[64];
401 debugPrintf(6, _T("Sending message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
402 }
403 NXCP_MESSAGE *data = msg->serialize(true);
404 bool success = (sslWrite(data, ntohl(data->size)) == ntohl(data->size));
405 free(data);
406 return success;
407 }
408
409 /**
410 * Start tunnel
411 */
412 void AgentTunnel::start()
413 {
414 debugPrintf(4, _T("Tunnel started"));
415 incRefCount();
416 ThreadCreate(AgentTunnel::recvThreadStarter, 0, this);
417 }
418
419 /**
420 * Shutdown tunnel
421 */
422 void AgentTunnel::shutdown()
423 {
424 if (m_socket != INVALID_SOCKET)
425 ::shutdown(m_socket, SHUT_RDWR);
426 m_state = AGENT_TUNNEL_SHUTDOWN;
427 debugPrintf(4, _T("Tunnel shutdown"));
428 }
429
430 /**
431 * Process setup request
432 */
433 void AgentTunnel::setup(const NXCPMessage *request)
434 {
435 NXCPMessage response;
436 response.setCode(CMD_REQUEST_COMPLETED);
437 response.setId(request->getId());
438
439 if (m_state == AGENT_TUNNEL_INIT)
440 {
441 m_systemName = request->getFieldAsString(VID_SYS_NAME);
442 m_systemInfo = request->getFieldAsString(VID_SYS_DESCRIPTION);
443 m_platformName = request->getFieldAsString(VID_PLATFORM_NAME);
444 m_agentVersion = request->getFieldAsString(VID_AGENT_VERSION);
445
446 m_state = (m_nodeId != 0) ? AGENT_TUNNEL_BOUND : AGENT_TUNNEL_UNBOUND;
447 response.setField(VID_RCC, ERR_SUCCESS);
448 response.setField(VID_IS_ACTIVE, m_state == AGENT_TUNNEL_BOUND);
449
450 // For bound tunnels zone UIN taken from node object
451 if (m_state != AGENT_TUNNEL_BOUND)
452 m_zoneUIN = request->getFieldAsUInt32(VID_ZONE_UIN);
453
454 debugPrintf(3, _T("%s tunnel initialized"), (m_state == AGENT_TUNNEL_BOUND) ? _T("Bound") : _T("Unbound"));
455 debugPrintf(5, _T(" System name: %s"), m_systemName);
456 debugPrintf(5, _T(" System information: %s"), m_systemInfo);
457 debugPrintf(5, _T(" Platform name: %s"), m_platformName);
458 debugPrintf(5, _T(" Agent version: %s"), m_agentVersion);
459 debugPrintf(5, _T(" Zone UIN: %u"), m_zoneUIN);
460 }
461 else
462 {
463 response.setField(VID_RCC, ERR_OUT_OF_STATE_REQUEST);
464 }
465
466 sendMessage(&response);
467 }
468
469 /**
470 * Bind tunnel to node
471 */
472 UINT32 AgentTunnel::bind(UINT32 nodeId)
473 {
474 if ((m_state != AGENT_TUNNEL_UNBOUND) || (m_bindRequestId != 0))
475 return RCC_OUT_OF_STATE_REQUEST;
476
477 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
478 if (node == NULL)
479 return RCC_INVALID_OBJECT_ID;
480
481 NXCPMessage msg;
482 msg.setCode(CMD_BIND_AGENT_TUNNEL);
483 msg.setId(InterlockedIncrement(&m_requestId));
484 msg.setField(VID_SERVER_ID, g_serverId);
485 msg.setField(VID_GUID, node->getGuid());
486 m_guid = uuid::generate();
487 msg.setField(VID_TUNNEL_GUID, m_guid);
488
489 TCHAR buffer[256];
490 if (GetServerCertificateCountry(buffer, 256))
491 msg.setField(VID_COUNTRY, buffer);
492 if (GetServerCertificateOrganization(buffer, 256))
493 msg.setField(VID_ORGANIZATION, buffer);
494
495 m_bindRequestId = msg.getId();
496 m_bindGuid = node->getGuid();
497 sendMessage(&msg);
498
499 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
500 if (response == NULL)
501 return RCC_TIMEOUT;
502
503 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
504 delete response;
505 if (rcc == ERR_SUCCESS)
506 {
507 debugPrintf(4, _T("Bind successful, resetting tunnel"));
508 node->setNewTunnelBindFlag();
509 msg.setCode(CMD_RESET_TUNNEL);
510 msg.setId(InterlockedIncrement(&m_requestId));
511 sendMessage(&msg);
512 }
513 else
514 {
515 debugPrintf(4, _T("Bind failed: agent error %d (%s)"), rcc, AgentErrorCodeToText(rcc));
516 }
517 return AgentErrorToRCC(rcc);
518 }
519
520 /**
521 * Process certificate request
522 */
523 void AgentTunnel::processCertificateRequest(NXCPMessage *request)
524 {
525 NXCPMessage response(CMD_NEW_CERTIFICATE, request->getId());
526
527 if ((request->getId() == m_bindRequestId) && (m_bindRequestId != 0) && (m_state == AGENT_TUNNEL_UNBOUND))
528 {
529 size_t certRequestLen;
530 const BYTE *certRequestData = request->getBinaryFieldPtr(VID_CERTIFICATE, &certRequestLen);
531 if (certRequestData != NULL)
532 {
533 X509_REQ *certRequest = d2i_X509_REQ(NULL, &certRequestData, (long)certRequestLen);
534 if (certRequest != NULL)
535 {
536 char *ou = m_bindGuid.toString().getUTF8String();
537 char *cn = m_guid.toString().getUTF8String();
538 X509 *cert = IssueCertificate(certRequest, ou, cn, 365);
539 free(ou);
540 free(cn);
541 if (cert != NULL)
542 {
543 BYTE *buffer = NULL;
544 int len = i2d_X509(cert, &buffer);
545 if (len > 0)
546 {
547 response.setField(VID_RCC, ERR_SUCCESS);
548 response.setField(VID_CERTIFICATE, buffer, len);
549 OPENSSL_free(buffer);
550 debugPrintf(4, _T("Certificate issued"));
551
552 Node *node = (Node *)FindObjectByGUID(m_bindGuid, OBJECT_NODE);
553 if (node != NULL)
554 {
555 node->setTunnelId(m_guid);
556 }
557 }
558 else
559 {
560 debugPrintf(4, _T("Cannot encode certificate"));
561 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
562 }
563 X509_free(cert);
564 }
565 else
566 {
567 debugPrintf(4, _T("Cannot issue certificate"));
568 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
569 }
570 }
571 else
572 {
573 debugPrintf(4, _T("Cannot decode certificate request data"));
574 response.setField(VID_RCC, ERR_BAD_ARGUMENTS);
575 }
576 }
577 else
578 {
579 debugPrintf(4, _T("Missing certificate request data"));
580 response.setField(VID_RCC, ERR_BAD_ARGUMENTS);
581 }
582 }
583 else
584 {
585 response.setField(VID_RCC, ERR_OUT_OF_STATE_REQUEST);
586 }
587
588 sendMessage(&response);
589 }
590
591 /**
592 * Create channel
593 */
594 AgentTunnelCommChannel *AgentTunnel::createChannel()
595 {
596 NXCPMessage request(CMD_CREATE_CHANNEL, InterlockedIncrement(&m_requestId));
597 sendMessage(&request);
598 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, request.getId());
599 if (response == NULL)
600 {
601 debugPrintf(4, _T("createChannel: request timeout"));
602 return NULL;
603 }
604
605 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
606 if (rcc != ERR_SUCCESS)
607 {
608 delete response;
609 debugPrintf(4, _T("createChannel: agent error %d (%s)"), rcc, AgentErrorCodeToText(rcc));
610 return NULL;
611 }
612
613 AgentTunnelCommChannel *channel = new AgentTunnelCommChannel(this, response->getFieldAsUInt32(VID_CHANNEL_ID));
614 delete response;
615 MutexLock(m_channelLock);
616 m_channels.set(channel->getId(), channel);
617 MutexUnlock(m_channelLock);
618 debugPrintf(4, _T("createChannel: new channel created (ID=%d)"), channel->getId());
619 return channel;
620 }
621
622 /**
623 * Process channel close notification from agent
624 */
625 void AgentTunnel::processChannelClose(UINT32 channelId)
626 {
627 debugPrintf(4, _T("processChannelClose: notification of channel %d closure"), channelId);
628
629 MutexLock(m_channelLock);
630 AgentTunnelCommChannel *ch = m_channels.get(channelId);
631 MutexUnlock(m_channelLock);
632 if (ch != NULL)
633 {
634 ch->shutdown();
635 ch->decRefCount();
636 }
637 }
638
639 /**
640 * Close channel
641 */
642 void AgentTunnel::closeChannel(AgentTunnelCommChannel *channel)
643 {
644 if (m_state == AGENT_TUNNEL_SHUTDOWN)
645 return;
646
647 debugPrintf(4, _T("closeChannel: request to close channel %d"), channel->getId());
648
649 MutexLock(m_channelLock);
650 m_channels.remove(channel->getId());
651 MutexUnlock(m_channelLock);
652
653 // Inform agent that channel is closing
654 NXCPMessage msg(CMD_CLOSE_CHANNEL, InterlockedIncrement(&m_requestId));
655 msg.setField(VID_CHANNEL_ID, channel->getId());
656 sendMessage(&msg);
657 }
658
659 /**
660 * Send channel data
661 */
662 int AgentTunnel::sendChannelData(UINT32 id, const void *data, size_t len)
663 {
664 NXCP_MESSAGE *msg = CreateRawNXCPMessage(CMD_CHANNEL_DATA, id, 0, data, len, NULL, false);
665 int rc = sslWrite(msg, ntohl(msg->size));
666 if (rc == ntohl(msg->size))
667 rc = (int)len; // adjust number of bytes to exclude tunnel overhead
668 free(msg);
669 return rc;
670 }
671
672 /**
673 * Fill NXCP message with tunnel data
674 */
675 void AgentTunnel::fillMessage(NXCPMessage *msg, UINT32 baseId) const
676 {
677 msg->setField(baseId, m_id);
678 msg->setField(baseId + 1, m_guid);
679 msg->setField(baseId + 2, m_nodeId);
680 msg->setField(baseId + 3, m_address);
681 msg->setField(baseId + 4, m_systemName);
682 msg->setField(baseId + 5, m_systemInfo);
683 msg->setField(baseId + 6, m_platformName);
684 msg->setField(baseId + 7, m_agentVersion);
685 MutexLock(m_channelLock);
686 msg->setField(baseId + 8, m_channels.size());
687 MutexUnlock(m_channelLock);
688 msg->setField(baseId + 9, m_zoneUIN);
689 }
690
691 /**
692 * Channel constructor
693 */
694 AgentTunnelCommChannel::AgentTunnelCommChannel(AgentTunnel *tunnel, UINT32 id) : m_buffer(65536, 65536)
695 {
696 tunnel->incRefCount();
697 m_tunnel = tunnel;
698 m_id = id;
699 m_active = true;
700 #ifdef _WIN32
701 InitializeCriticalSectionAndSpinCount(&m_bufferLock, 4000);
702 m_dataCondition = CreateEvent(NULL, TRUE, FALSE, NULL);
703 #else
704 pthread_mutex_init(&m_bufferLock, NULL);
705 pthread_cond_init(&m_dataCondition, NULL);
706 #endif
707 }
708
709 /**
710 * Channel destructor
711 */
712 AgentTunnelCommChannel::~AgentTunnelCommChannel()
713 {
714 m_tunnel->decRefCount();
715 #ifdef _WIN32
716 DeleteCriticalSection(&m_bufferLock);
717 CloseHandle(m_dataCondition);
718 #else
719 pthread_mutex_destroy(&m_bufferLock);
720 pthread_cond_destroy(&m_dataCondition);
721 #endif
722 }
723
724 /**
725 * Send data
726 */
727 int AgentTunnelCommChannel::send(const void *data, size_t size, MUTEX mutex)
728 {
729 return m_active ? m_tunnel->sendChannelData(m_id, data, size) : -1;
730 }
731
732 /**
733 * Receive data
734 */
735 int AgentTunnelCommChannel::recv(void *buffer, size_t size, UINT32 timeout)
736 {
737 if (!m_active)
738 return 0;
739
740 #ifdef _WIN32
741 EnterCriticalSection(&m_bufferLock);
742 if (m_buffer.isEmpty())
743 {
744 retry_wait:
745 LeaveCriticalSection(&m_bufferLock);
746 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
747 return -2;
748
749 if (!m_active)
750 return 0; // closed while waiting
751
752 EnterCriticalSection(&m_bufferLock);
753 if (m_buffer.isEmpty())
754 {
755 ResetEvent(m_dataCondition);
756 goto retry_wait;
757 }
758 }
759 #else
760 pthread_mutex_lock(&m_bufferLock);
761 if (m_buffer.isEmpty())
762 {
763 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
764 struct timespec ts;
765 ts.tv_sec = timeout / 1000;
766 ts.tv_nsec = (timeout % 1000) * 1000000;
767 int rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
768 #else
769 struct timeval now;
770 struct timespec ts;
771 gettimeofday(&now, NULL);
772 ts.tv_sec = now.tv_sec + (timeout / 1000);
773 now.tv_usec += (timeout % 1000) * 1000;
774 ts.tv_sec += now.tv_usec / 1000000;
775 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
776 int rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
777 #endif
778 if (rc != 0)
779 {
780 pthread_mutex_unlock(&m_bufferLock);
781 return -2; // timeout
782 }
783
784 if (!m_active) // closed while waiting
785 {
786 pthread_mutex_unlock(&m_bufferLock);
787 return 0;
788 }
789 }
790 #endif
791
792 size_t bytes = m_buffer.read((BYTE *)buffer, size);
793 #ifdef _WIN32
794 if (m_buffer.isEmpty())
795 ResetEvent(m_dataCondition);
796 LeaveCriticalSection(&m_bufferLock);
797 #else
798 pthread_mutex_unlock(&m_bufferLock);
799 #endif
800 return (int)bytes;
801 }
802
803 /**
804 * Poll for data
805 */
806 int AgentTunnelCommChannel::poll(UINT32 timeout, bool write)
807 {
808 if (write)
809 return 1;
810
811 if (!m_active)
812 return -1;
813
814 int rc = 0;
815
816 #ifdef _WIN32
817 EnterCriticalSection(&m_bufferLock);
818 if (m_buffer.isEmpty())
819 {
820 retry_wait:
821 LeaveCriticalSection(&m_bufferLock);
822 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
823 return 0;
824
825 if (!m_active)
826 return -1;
827
828 EnterCriticalSection(&m_bufferLock);
829 if (m_buffer.isEmpty())
830 {
831 ResetEvent(m_dataCondition);
832 goto retry_wait;
833 }
834 }
835 LeaveCriticalSection(&m_bufferLock);
836 #else
837 pthread_mutex_lock(&m_bufferLock);
838 if (m_buffer.isEmpty())
839 {
840 #if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
841 struct timespec ts;
842 ts.tv_sec = timeout / 1000;
843 ts.tv_nsec = (timeout % 1000) * 1000000;
844 rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
845 #else
846 struct timeval now;
847 struct timespec ts;
848 gettimeofday(&now, NULL);
849 ts.tv_sec = now.tv_sec + (timeout / 1000);
850 now.tv_usec += (timeout % 1000) * 1000;
851 ts.tv_sec += now.tv_usec / 1000000;
852 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
853 rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
854 #endif
855 }
856 pthread_mutex_unlock(&m_bufferLock);
857 #endif
858
859 return (rc == 0) ? 1 : 0;
860 }
861
862 /**
863 * Shutdown channel
864 */
865 int AgentTunnelCommChannel::shutdown()
866 {
867 m_active = false;
868 #ifdef _WIN32
869 EnterCriticalSection(&m_bufferLock);
870 SetEvent(m_dataCondition);
871 LeaveCriticalSection(&m_bufferLock);
872 #else
873 pthread_cond_broadcast(&m_dataCondition);
874 #endif
875 return 0;
876 }
877
878 /**
879 * Close channel
880 */
881 void AgentTunnelCommChannel::close()
882 {
883 m_active = false;
884 #ifdef _WIN32
885 EnterCriticalSection(&m_bufferLock);
886 SetEvent(m_dataCondition);
887 LeaveCriticalSection(&m_bufferLock);
888 #else
889 pthread_cond_broadcast(&m_dataCondition);
890 #endif
891 m_tunnel->closeChannel(this);
892 }
893
894 /**
895 * Put data into buffer
896 */
897 void AgentTunnelCommChannel::putData(const BYTE *data, size_t size)
898 {
899 #ifdef _WIN32
900 EnterCriticalSection(&m_bufferLock);
901 #else
902 pthread_mutex_lock(&m_bufferLock);
903 #endif
904 m_buffer.write(data, size);
905 #ifdef _WIN32
906 SetEvent(m_dataCondition);
907 LeaveCriticalSection(&m_bufferLock);
908 #else
909 pthread_cond_broadcast(&m_dataCondition);
910 pthread_mutex_unlock(&m_bufferLock);
911 #endif
912 }
913
914 /**
915 * Incoming connection data
916 */
917 struct ConnectionRequest
918 {
919 SOCKET sock;
920 InetAddress addr;
921 };
922
923 /**
924 * Setup tunnel
925 */
926 static void SetupTunnel(void *arg)
927 {
928 ConnectionRequest *request = (ConnectionRequest *)arg;
929
930 SSL_CTX *context = NULL;
931 SSL *ssl = NULL;
932 AgentTunnel *tunnel = NULL;
933 int rc;
934 UINT32 nodeId = 0;
935 UINT32 zoneUIN = 0;
936 X509 *cert = NULL;
937
938 // Setup secure connection
939 const SSL_METHOD *method = SSLv23_method();
940 if (method == NULL)
941 {
942 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot obtain TLS method"), (const TCHAR *)request->addr.toString());
943 goto failure;
944 }
945
946 context = SSL_CTX_new((SSL_METHOD *)method);
947 if (context == NULL)
948 {
949 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot create TLS context"), (const TCHAR *)request->addr.toString());
950 goto failure;
951 }
952 #ifdef SSL_OP_NO_COMPRESSION
953 SSL_CTX_set_options(context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
954 #else
955 SSL_CTX_set_options(context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
956 #endif
957 if (!SetupServerTlsContext(context))
958 {
959 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot configure TLS context"), (const TCHAR *)request->addr.toString());
960 goto failure;
961 }
962
963 ssl = SSL_new(context);
964 if (ssl == NULL)
965 {
966 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot create SSL object"), (const TCHAR *)request->addr.toString());
967 goto failure;
968 }
969
970 SSL_set_accept_state(ssl);
971 SSL_set_fd(ssl, (int)request->sock);
972
973 retry:
974 rc = SSL_do_handshake(ssl);
975 if (rc != 1)
976 {
977 int sslErr = SSL_get_error(ssl, rc);
978 if (sslErr == SSL_ERROR_WANT_READ)
979 {
980 SocketPoller poller;
981 poller.add(request->sock);
982 if (poller.poll(5000) > 0)
983 goto retry;
984 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): TLS handshake failed (timeout)"), (const TCHAR *)request->addr.toString());
985 }
986 else
987 {
988 char buffer[128];
989 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): TLS handshake failed (%hs)"),
990 (const TCHAR *)request->addr.toString(), ERR_error_string(sslErr, buffer));
991 }
992 goto failure;
993 }
994
995 cert = SSL_get_peer_certificate(ssl);
996 if (cert != NULL)
997 {
998 if (ValidateAgentCertificate(cert))
999 {
1000 TCHAR ou[256], cn[256];
1001 if (GetCertificateOU(cert, ou, 256) && GetCertificateCN(cert, cn, 256))
1002 {
1003 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): certificate OU=%s CN=%s"), (const TCHAR *)request->addr.toString(), ou, cn);
1004 uuid nodeGuid = uuid::parse(ou);
1005 uuid tunnelGuid = uuid::parse(cn);
1006 if (!nodeGuid.isNull() && !tunnelGuid.isNull())
1007 {
1008 Node *node = (Node *)FindObjectByGUID(nodeGuid, OBJECT_NODE);
1009 if (node != NULL)
1010 {
1011 if (tunnelGuid.equals(node->getTunnelId()))
1012 {
1013 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Tunnel attached to node %s [%d]"), (const TCHAR *)request->addr.toString(), node->getName(), node->getId());
1014 if (node->getRuntimeFlags() & NDF_NEW_TUNNEL_BIND)
1015 {
1016 node->removeNewTunnelBindFlag();
1017 node->setRecheckCapsFlag();
1018 node->forceConfigurationPoll();
1019 }
1020 nodeId = node->getId();
1021 zoneUIN = node->getZoneUIN();
1022 }
1023 else
1024 {
1025 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Tunnel ID %s is not valid for node %s [%d]"),
1026 (const TCHAR *)request->addr.toString(), (const TCHAR *)tunnelGuid.toString(),
1027 node->getName(), node->getId());
1028 }
1029 }
1030 else
1031 {
1032 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Node with GUID %s not found"), (const TCHAR *)request->addr.toString(), (const TCHAR *)nodeGuid.toString());
1033 }
1034 }
1035 else
1036 {
1037 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Certificate OU or CN is not a valid GUID"), (const TCHAR *)request->addr.toString());
1038 }
1039 }
1040 else
1041 {
1042 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Cannot get certificate OU and CN"), (const TCHAR *)request->addr.toString());
1043 }
1044 }
1045 else
1046 {
1047 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Agent certificate validation failed"), (const TCHAR *)request->addr.toString());
1048 }
1049 X509_free(cert);
1050 }
1051 else
1052 {
1053 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Agent certificate not provided"), (const TCHAR *)request->addr.toString());
1054 }
1055
1056 tunnel = new AgentTunnel(context, ssl, request->sock, request->addr, nodeId, zoneUIN);
1057 RegisterTunnel(tunnel);
1058 tunnel->start();
1059 tunnel->decRefCount();
1060
1061 delete request;
1062 return;
1063
1064 failure:
1065 if (ssl != NULL)
1066 SSL_free(ssl);
1067 if (context != NULL)
1068 SSL_CTX_free(context);
1069 shutdown(request->sock, SHUT_RDWR);
1070 closesocket(request->sock);
1071 delete request;
1072 }
1073
1074 /**
1075 * Tunnel listener lock
1076 */
1077 static Mutex s_tunnelListenerLock;
1078
1079 /**
1080 * Client listener class
1081 */
1082 class TunnelListener : public SocketListener
1083 {
1084 protected:
1085 virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
1086 virtual bool isStopConditionReached();
1087
1088 public:
1089 TunnelListener(UINT16 port) : SocketListener(port) { setName(_T("AgentTunnels")); }
1090 };
1091
1092 /**
1093 * Listener stop condition
1094 */
1095 bool TunnelListener::isStopConditionReached()
1096 {
1097 return IsShutdownInProgress();
1098 }
1099
1100 /**
1101 * Process incoming connection
1102 */
1103 ConnectionProcessingResult TunnelListener::processConnection(SOCKET s, const InetAddress& peer)
1104 {
1105 ConnectionRequest *request = new ConnectionRequest();
1106 request->sock = s;
1107 request->addr = peer;
1108 ThreadPoolExecute(g_mainThreadPool, SetupTunnel, request);
1109 return CPR_BACKGROUND;
1110 }
1111
1112 /**
1113 * Tunnel listener
1114 */
1115 THREAD_RESULT THREAD_CALL TunnelListenerThread(void *arg)
1116 {
1117 ThreadSetName("TunnelListener");
1118 s_tunnelListenerLock.lock();
1119 UINT16 listenPort = (UINT16)ConfigReadULong(_T("AgentTunnelListenPort"), 4703);
1120 TunnelListener listener(listenPort);
1121 listener.setListenAddress(g_szListenAddress);
1122 if (!listener.initialize())
1123 {
1124 s_tunnelListenerLock.unlock();
1125 return THREAD_OK;
1126 }
1127
1128 listener.mainLoop();
1129 listener.shutdown();
1130
1131 nxlog_debug_tag(DEBUG_TAG, 1, _T("Tunnel listener thread terminated"));
1132 s_tunnelListenerLock.unlock();
1133 return THREAD_OK;
1134 }
1135
1136 /**
1137 * Close all active agent tunnels
1138 */
1139 void CloseAgentTunnels()
1140 {
1141 nxlog_debug_tag(DEBUG_TAG, 2, _T("Closing active agent tunnels..."));
1142
1143 // Wait for listener thread
1144 s_tunnelListenerLock.lock();
1145 s_tunnelListenerLock.unlock();
1146
1147 s_tunnelListLock.lock();
1148 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
1149 while(it->hasNext())
1150 {
1151 AgentTunnel *t = it->next();
1152 t->shutdown();
1153 }
1154 delete it;
1155 for(int i = 0; i < s_unboundTunnels.size(); i++)
1156 s_unboundTunnels.get(i)->shutdown();
1157 s_tunnelListLock.unlock();
1158
1159 bool wait = true;
1160 while(wait)
1161 {
1162 ThreadSleepMs(500);
1163 s_tunnelListLock.lock();
1164 if ((s_boundTunnels.size() == 0) && (s_unboundTunnels.size() == 0))
1165 wait = false;
1166 s_tunnelListLock.unlock();
1167 }
1168
1169 nxlog_debug_tag(DEBUG_TAG, 2, _T("All agent tunnels unregistered"));
1170 }