added tags to tunnel debug; debug tag length increased
[public/netxms.git] / src / server / core / tunnel.cpp
CommitLineData
bf306af3
VK
1/*
2** NetXMS - Network Management System
3** Copyright (C) 2003-2017 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be useful,
11** but WITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: tunnel.cpp
20**/
21
22#include "nxcore.h"
f0718805 23#include <socket_listener.h>
cce82c3a
VK
24#include <agent_tunnel.h>
25
26#define MAX_MSG_SIZE 268435456
27
42c338ac
VK
28#define DEBUG_TAG _T("agent.tunnel")
29
cce82c3a 30/**
bffa20b1
VK
31 * Tunnel registration
32 */
98f0ac42 33static RefCountHashMap<UINT32, AgentTunnel> s_boundTunnels(true);
9c144154 34static ObjectRefArray<AgentTunnel> s_unboundTunnels(16, 16);
bffa20b1
VK
35static Mutex s_tunnelListLock;
36
37/**
bffa20b1
VK
38 * Register tunnel
39 */
40static void RegisterTunnel(AgentTunnel *tunnel)
41{
bffa20b1
VK
42 tunnel->incRefCount();
43 s_tunnelListLock.lock();
44 if (tunnel->isBound())
45 {
bffa20b1 46 s_boundTunnels.set(tunnel->getNodeId(), tunnel);
98f0ac42 47 tunnel->decRefCount(); // set already increased ref count
bffa20b1
VK
48 }
49 else
50 {
51 s_unboundTunnels.add(tunnel);
52 }
53 s_tunnelListLock.unlock();
bffa20b1
VK
54}
55
56/**
57 * Unregister tunnel
58 */
59static void UnregisterTunnel(AgentTunnel *tunnel)
60{
b59a78b0 61 tunnel->debugPrintf(4, _T("Tunnel unregistered"));
bffa20b1
VK
62 s_tunnelListLock.lock();
63 if (tunnel->isBound())
64 {
5c25a3d2
VK
65 // Check that current tunnel for node is tunnel being unregistered
66 // New tunnel could be established while old one still finishing
67 // outstanding requests
68 if (s_boundTunnels.peek(tunnel->getNodeId()) == tunnel)
69 s_boundTunnels.remove(tunnel->getNodeId());
bffa20b1
VK
70 }
71 else
72 {
73 s_unboundTunnels.remove(tunnel);
b59a78b0 74 tunnel->decRefCount();
bffa20b1
VK
75 }
76 s_tunnelListLock.unlock();
bffa20b1
VK
77}
78
79/**
2f6c6597
VK
80 * Get tunnel for node. Caller must decrease reference counter on tunnel.
81 */
82AgentTunnel *GetTunnelForNode(UINT32 nodeId)
83{
84 s_tunnelListLock.lock();
85 AgentTunnel *t = s_boundTunnels.get(nodeId);
2f6c6597
VK
86 s_tunnelListLock.unlock();
87 return t;
88}
89
90/**
3b89a4c1
VK
91 * Bind agent tunnel
92 */
93UINT32 BindAgentTunnel(UINT32 tunnelId, UINT32 nodeId)
94{
95 AgentTunnel *tunnel = NULL;
96 s_tunnelListLock.lock();
97 for(int i = 0; i < s_unboundTunnels.size(); i++)
98 {
9c144154 99 if (s_unboundTunnels.get(i)->getId() == tunnelId)
3b89a4c1 100 {
9c144154 101 tunnel = s_unboundTunnels.get(i);
3b89a4c1
VK
102 tunnel->incRefCount();
103 break;
104 }
105 }
106 s_tunnelListLock.unlock();
107
108 if (tunnel == NULL)
109 {
42c338ac 110 nxlog_debug_tag(DEBUG_TAG, 4, _T("BindAgentTunnel: unbound tunnel with ID %d not found"), tunnelId);
3b89a4c1
VK
111 return RCC_INVALID_TUNNEL_ID;
112 }
113
114 UINT32 rcc = tunnel->bind(nodeId);
115 tunnel->decRefCount();
116 return rcc;
117}
118
119/**
0837f989
VK
120 * Bind agent tunnel from node
121 */
122UINT32 UnbindAgentTunnel(UINT32 nodeId)
123{
c48843a5 124 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
0837f989
VK
125 if (node == NULL)
126 return RCC_INVALID_OBJECT_ID;
127
128 if (node->getTunnelId().isNull())
129 return RCC_SUCCESS; // tunnel is not set
130
131 node->setTunnelId(uuid::NULL_UUID);
132
133 AgentTunnel *tunnel = GetTunnelForNode(nodeId);
134 if (tunnel != NULL)
135 {
42c338ac 136 nxlog_debug_tag(DEBUG_TAG, 4, _T("UnbindAgentTunnel(%s): shutting down existing tunnel"), node->getName());
0837f989
VK
137 tunnel->shutdown();
138 tunnel->decRefCount();
139 }
140
141 return RCC_SUCCESS;
142}
143
144/**
3c463101 145 * Get list of agent tunnels into NXCP message
3b89a4c1 146 */
3c463101 147void GetAgentTunnels(NXCPMessage *msg)
3b89a4c1
VK
148{
149 s_tunnelListLock.lock();
150 UINT32 fieldId = VID_ELEMENT_LIST_BASE;
3c463101 151
3b89a4c1
VK
152 for(int i = 0; i < s_unboundTunnels.size(); i++)
153 {
9c144154 154 s_unboundTunnels.get(i)->fillMessage(msg, fieldId);
3c463101 155 fieldId += 64;
3b89a4c1 156 }
3c463101
VK
157
158 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
159 while(it->hasNext())
160 {
161 it->next()->fillMessage(msg, fieldId);
162 fieldId += 64;
163 }
164 delete it;
165
166 msg->setField(VID_NUM_ELEMENTS, (UINT32)(s_unboundTunnels.size() + s_boundTunnels.size()));
3b89a4c1
VK
167 s_tunnelListLock.unlock();
168}
169
170/**
bffa20b1
VK
171 * Show tunnels in console
172 */
173void ShowAgentTunnels(CONSOLE_CTX console)
174{
175 s_tunnelListLock.lock();
176
177 ConsolePrintf(console,
178 _T("\n\x1b[1mBOUND TUNNELS\x1b[0m\n")
179 _T("ID | Node ID | Peer IP Address | System Name | Platform Name | Agent Version\n")
180 _T("-----+---------+--------------------------+--------------------------+------------------+------------------------\n"));
181 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
182 while(it->hasNext())
183 {
184 AgentTunnel *t = it->next();
185 TCHAR ipAddrBuffer[64];
186 ConsolePrintf(console, _T("%4d | %7u | %-24s | %-24s | %-16s | %s\n"), t->getId(), t->getNodeId(), t->getAddress().toString(ipAddrBuffer), t->getSystemName(), t->getPlatformName(), t->getAgentVersion());
187 }
188 delete it;
189
190 ConsolePrintf(console,
191 _T("\n\x1b[1mUNBOUND TUNNELS\x1b[0m\n")
192 _T("ID | Peer IP Address | System Name | Platform Name | Agent Version\n")
193 _T("-----+--------------------------+--------------------------+------------------+------------------------\n"));
194 for(int i = 0; i < s_unboundTunnels.size(); i++)
195 {
9c144154 196 const AgentTunnel *t = s_unboundTunnels.get(i);
bffa20b1
VK
197 TCHAR ipAddrBuffer[64];
198 ConsolePrintf(console, _T("%4d | %-24s | %-24s | %-16s | %s\n"), t->getId(), t->getAddress().toString(ipAddrBuffer), t->getSystemName(), t->getPlatformName(), t->getAgentVersion());
199 }
200
201 s_tunnelListLock.unlock();
202}
203
204/**
cce82c3a
VK
205 * Next free tunnel ID
206 */
207static VolatileCounter s_nextTunnelId = 0;
208
209/**
210 * Agent tunnel constructor
211 */
a191c634 212AgentTunnel::AgentTunnel(SSL_CTX *context, SSL *ssl, SOCKET sock, const InetAddress& addr, UINT32 nodeId, UINT32 zoneUIN) : RefCountObject(), m_channels(true)
cce82c3a
VK
213{
214 m_id = InterlockedIncrement(&s_nextTunnelId);
215 m_address = addr;
216 m_socket = sock;
217 m_context = context;
218 m_ssl = ssl;
219 m_sslLock = MutexCreate();
3b89a4c1 220 m_requestId = 0;
cce82c3a 221 m_nodeId = nodeId;
a191c634 222 m_zoneUIN = zoneUIN;
cce82c3a
VK
223 m_state = AGENT_TUNNEL_INIT;
224 m_systemName = NULL;
225 m_platformName = NULL;
226 m_systemInfo = NULL;
227 m_agentVersion = NULL;
3b89a4c1 228 m_bindRequestId = 0;
98f0ac42 229 m_channelLock = MutexCreate();
cce82c3a
VK
230}
231
232/**
233 * Agent tunnel destructor
234 */
235AgentTunnel::~AgentTunnel()
236{
98f0ac42 237 m_channels.clear();
0837f989 238 shutdown();
cce82c3a
VK
239 SSL_CTX_free(m_context);
240 SSL_free(m_ssl);
241 MutexDestroy(m_sslLock);
242 closesocket(m_socket);
243 free(m_systemName);
244 free(m_platformName);
245 free(m_systemInfo);
246 free(m_agentVersion);
98f0ac42 247 MutexDestroy(m_channelLock);
cce82c3a
VK
248 debugPrintf(4, _T("Tunnel destroyed"));
249}
250
251/**
252 * Debug output
253 */
254void AgentTunnel::debugPrintf(int level, const TCHAR *format, ...)
255{
256 va_list args;
257 va_start(args, format);
42c338ac 258 nxlog_debug_tag_object2(DEBUG_TAG, m_id, level, format, args);
cce82c3a 259 va_end(args);
cce82c3a
VK
260}
261
262/**
263 * Tunnel receiver thread
264 */
265void AgentTunnel::recvThread()
266{
267 TlsMessageReceiver receiver(m_socket, m_ssl, m_sslLock, 4096, MAX_MSG_SIZE);
268 while(true)
269 {
270 MessageReceiverResult result;
271 NXCPMessage *msg = receiver.readMessage(60000, &result);
272 if (result != MSGRECV_SUCCESS)
273 {
274 if (result == MSGRECV_CLOSED)
275 debugPrintf(4, _T("Tunnel closed by peer"));
276 else
277 debugPrintf(4, _T("Communication error (%s)"), AbstractMessageReceiver::resultToText(result));
278 break;
279 }
280
281 if (nxlog_get_debug_level() >= 6)
282 {
283 TCHAR buffer[64];
284 debugPrintf(6, _T("Received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
285 }
286
287 switch(msg->getCode())
288 {
289 case CMD_KEEPALIVE:
19bfbf41 290 {
3b89a4c1 291 NXCPMessage response(CMD_KEEPALIVE, msg->getId());
19bfbf41
VK
292 sendMessage(&response);
293 }
cce82c3a
VK
294 break;
295 case CMD_SETUP_AGENT_TUNNEL:
296 setup(msg);
297 break;
3b89a4c1
VK
298 case CMD_REQUEST_CERTIFICATE:
299 processCertificateRequest(msg);
300 break;
2f6c6597
VK
301 case CMD_CHANNEL_DATA:
302 if (msg->isBinary())
303 {
98f0ac42
VK
304 MutexLock(m_channelLock);
305 AgentTunnelCommChannel *channel = m_channels.get(msg->getId());
306 MutexUnlock(m_channelLock);
307 if (channel != NULL)
308 {
309 channel->putData(msg->getBinaryData(), msg->getBinaryDataSize());
310 channel->decRefCount();
311 }
312 else
313 {
314 debugPrintf(6, _T("Received channel data for non-existing channel %u"), msg->getId());
315 }
2f6c6597
VK
316 }
317 break;
9af93576
VK
318 case CMD_CLOSE_CHANNEL: // channel close notification
319 processChannelClose(msg->getFieldAsUInt32(VID_CHANNEL_ID));
320 break;
19bfbf41 321 default:
3b89a4c1
VK
322 m_queue.put(msg);
323 msg = NULL; // prevent message deletion
19bfbf41 324 break;
cce82c3a 325 }
19bfbf41 326 delete msg;
cce82c3a 327 }
2852b150 328
bffa20b1 329 UnregisterTunnel(this);
2852b150 330 m_state = AGENT_TUNNEL_SHUTDOWN;
9af93576
VK
331
332 // shutdown all channels
333 MutexLock(m_channelLock);
334 Iterator<AgentTunnelCommChannel> *it = m_channels.iterator();
335 while(it->hasNext())
336 it->next()->shutdown();
337 delete it;
2852b150 338 m_channels.clear();
9af93576
VK
339 MutexUnlock(m_channelLock);
340
9b7ff3f5 341 debugPrintf(4, _T("Receiver thread stopped"));
cce82c3a
VK
342}
343
344/**
345 * Tunnel receiver thread starter
346 */
347THREAD_RESULT THREAD_CALL AgentTunnel::recvThreadStarter(void *arg)
348{
930a2a62 349 ThreadSetName("TunnelReceiver");
cce82c3a 350 ((AgentTunnel *)arg)->recvThread();
9b7ff3f5 351 ((AgentTunnel *)arg)->decRefCount();
cce82c3a
VK
352 return THREAD_OK;
353}
354
355/**
d0739aa1
VK
356 * Write to SSL
357 */
358int AgentTunnel::sslWrite(const void *data, size_t size)
359{
360 bool canRetry;
361 int bytes;
362 MutexLock(m_sslLock);
363 do
364 {
365 canRetry = false;
366 bytes = SSL_write(m_ssl, data, (int)size);
367 if (bytes <= 0)
368 {
369 int err = SSL_get_error(m_ssl, bytes);
370 if ((err == SSL_ERROR_WANT_READ) || (err == SSL_ERROR_WANT_WRITE))
371 {
372 SocketPoller sp(true);
373 sp.add(m_socket);
374 if (sp.poll(5000) > 0)
375 canRetry = true;
376 }
377 else
378 {
379 debugPrintf(7, _T("SSL_write error (bytes=%d ssl_err=%d errno=%d)"), bytes, err, errno);
380 if (err == SSL_ERROR_SSL)
381 LogOpenSSLErrorStack(7);
382 }
383 }
384 }
385 while(canRetry);
386 MutexUnlock(m_sslLock);
387 return bytes;
388}
389
390/**
cce82c3a
VK
391 * Send message on tunnel
392 */
393bool AgentTunnel::sendMessage(NXCPMessage *msg)
394{
2852b150
VK
395 if (m_state == AGENT_TUNNEL_SHUTDOWN)
396 return false;
397
19bfbf41
VK
398 if (nxlog_get_debug_level() >= 6)
399 {
400 TCHAR buffer[64];
401 debugPrintf(6, _T("Sending message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
402 }
198745dd 403 NXCP_MESSAGE *data = msg->serialize(true);
d0739aa1 404 bool success = (sslWrite(data, ntohl(data->size)) == ntohl(data->size));
cce82c3a
VK
405 free(data);
406 return success;
407}
408
409/**
2852b150 410 * Start tunnel
cce82c3a
VK
411 */
412void AgentTunnel::start()
413{
cce82c3a 414 debugPrintf(4, _T("Tunnel started"));
9b7ff3f5 415 incRefCount();
b59a78b0 416 ThreadCreate(AgentTunnel::recvThreadStarter, 0, this);
cce82c3a
VK
417}
418
419/**
0837f989
VK
420 * Shutdown tunnel
421 */
422void AgentTunnel::shutdown()
423{
424 if (m_socket != INVALID_SOCKET)
425 ::shutdown(m_socket, SHUT_RDWR);
2852b150
VK
426 m_state = AGENT_TUNNEL_SHUTDOWN;
427 debugPrintf(4, _T("Tunnel shutdown"));
0837f989
VK
428}
429
430/**
cce82c3a
VK
431 * Process setup request
432 */
433void AgentTunnel::setup(const NXCPMessage *request)
434{
435 NXCPMessage response;
436 response.setCode(CMD_REQUEST_COMPLETED);
437 response.setId(request->getId());
438
439 if (m_state == AGENT_TUNNEL_INIT)
440 {
441 m_systemName = request->getFieldAsString(VID_SYS_NAME);
442 m_systemInfo = request->getFieldAsString(VID_SYS_DESCRIPTION);
443 m_platformName = request->getFieldAsString(VID_PLATFORM_NAME);
444 m_agentVersion = request->getFieldAsString(VID_AGENT_VERSION);
445
446 m_state = (m_nodeId != 0) ? AGENT_TUNNEL_BOUND : AGENT_TUNNEL_UNBOUND;
447 response.setField(VID_RCC, ERR_SUCCESS);
448 response.setField(VID_IS_ACTIVE, m_state == AGENT_TUNNEL_BOUND);
19bfbf41 449
a191c634
VK
450 // For bound tunnels zone UIN taken from node object
451 if (m_state != AGENT_TUNNEL_BOUND)
452 m_zoneUIN = request->getFieldAsUInt32(VID_ZONE_UIN);
453
19bfbf41
VK
454 debugPrintf(3, _T("%s tunnel initialized"), (m_state == AGENT_TUNNEL_BOUND) ? _T("Bound") : _T("Unbound"));
455 debugPrintf(5, _T(" System name: %s"), m_systemName);
456 debugPrintf(5, _T(" System information: %s"), m_systemInfo);
457 debugPrintf(5, _T(" Platform name: %s"), m_platformName);
458 debugPrintf(5, _T(" Agent version: %s"), m_agentVersion);
a191c634 459 debugPrintf(5, _T(" Zone UIN: %u"), m_zoneUIN);
cce82c3a
VK
460 }
461 else
462 {
463 response.setField(VID_RCC, ERR_OUT_OF_STATE_REQUEST);
464 }
465
466 sendMessage(&response);
467}
468
469/**
3b89a4c1
VK
470 * Bind tunnel to node
471 */
472UINT32 AgentTunnel::bind(UINT32 nodeId)
473{
474 if ((m_state != AGENT_TUNNEL_UNBOUND) || (m_bindRequestId != 0))
475 return RCC_OUT_OF_STATE_REQUEST;
476
477 Node *node = (Node *)FindObjectById(nodeId, OBJECT_NODE);
478 if (node == NULL)
479 return RCC_INVALID_OBJECT_ID;
480
481 NXCPMessage msg;
482 msg.setCode(CMD_BIND_AGENT_TUNNEL);
483 msg.setId(InterlockedIncrement(&m_requestId));
484 msg.setField(VID_SERVER_ID, g_serverId);
485 msg.setField(VID_GUID, node->getGuid());
0837f989
VK
486 m_guid = uuid::generate();
487 msg.setField(VID_TUNNEL_GUID, m_guid);
3b89a4c1 488
c48843a5
VK
489 TCHAR buffer[256];
490 if (GetServerCertificateCountry(buffer, 256))
491 msg.setField(VID_COUNTRY, buffer);
492 if (GetServerCertificateOrganization(buffer, 256))
493 msg.setField(VID_ORGANIZATION, buffer);
494
3b89a4c1
VK
495 m_bindRequestId = msg.getId();
496 m_bindGuid = node->getGuid();
497 sendMessage(&msg);
498
499 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
500 if (response == NULL)
501 return RCC_TIMEOUT;
502
503 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
504 delete response;
2803eaa4
VK
505 if (rcc == ERR_SUCCESS)
506 {
507 debugPrintf(4, _T("Bind successful, resetting tunnel"));
5b0d07be 508 node->setNewTunnelBindFlag();
2803eaa4
VK
509 msg.setCode(CMD_RESET_TUNNEL);
510 msg.setId(InterlockedIncrement(&m_requestId));
511 sendMessage(&msg);
512 }
513 else
514 {
3b89a4c1 515 debugPrintf(4, _T("Bind failed: agent error %d (%s)"), rcc, AgentErrorCodeToText(rcc));
2803eaa4 516 }
3b89a4c1
VK
517 return AgentErrorToRCC(rcc);
518}
519
520/**
521 * Process certificate request
522 */
523void AgentTunnel::processCertificateRequest(NXCPMessage *request)
524{
525 NXCPMessage response(CMD_NEW_CERTIFICATE, request->getId());
526
527 if ((request->getId() == m_bindRequestId) && (m_bindRequestId != 0) && (m_state == AGENT_TUNNEL_UNBOUND))
528 {
529 size_t certRequestLen;
530 const BYTE *certRequestData = request->getBinaryFieldPtr(VID_CERTIFICATE, &certRequestLen);
531 if (certRequestData != NULL)
532 {
efe30b82 533 X509_REQ *certRequest = d2i_X509_REQ(NULL, &certRequestData, (long)certRequestLen);
3b89a4c1
VK
534 if (certRequest != NULL)
535 {
c48843a5
VK
536 char *ou = m_bindGuid.toString().getUTF8String();
537 char *cn = m_guid.toString().getUTF8String();
538 X509 *cert = IssueCertificate(certRequest, ou, cn, 365);
539 free(ou);
3b89a4c1
VK
540 free(cn);
541 if (cert != NULL)
542 {
543 BYTE *buffer = NULL;
544 int len = i2d_X509(cert, &buffer);
545 if (len > 0)
546 {
547 response.setField(VID_RCC, ERR_SUCCESS);
548 response.setField(VID_CERTIFICATE, buffer, len);
549 OPENSSL_free(buffer);
550 debugPrintf(4, _T("Certificate issued"));
0837f989 551
c48843a5 552 Node *node = (Node *)FindObjectByGUID(m_bindGuid, OBJECT_NODE);
0837f989
VK
553 if (node != NULL)
554 {
555 node->setTunnelId(m_guid);
556 }
3b89a4c1
VK
557 }
558 else
559 {
560 debugPrintf(4, _T("Cannot encode certificate"));
561 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
562 }
563 X509_free(cert);
564 }
565 else
566 {
567 debugPrintf(4, _T("Cannot issue certificate"));
568 response.setField(VID_RCC, ERR_ENCRYPTION_ERROR);
569 }
570 }
571 else
572 {
573 debugPrintf(4, _T("Cannot decode certificate request data"));
574 response.setField(VID_RCC, ERR_BAD_ARGUMENTS);
575 }
576 }
577 else
578 {
579 debugPrintf(4, _T("Missing certificate request data"));
580 response.setField(VID_RCC, ERR_BAD_ARGUMENTS);
581 }
582 }
583 else
584 {
585 response.setField(VID_RCC, ERR_OUT_OF_STATE_REQUEST);
586 }
587
588 sendMessage(&response);
589}
590
591/**
2f6c6597
VK
592 * Create channel
593 */
594AgentTunnelCommChannel *AgentTunnel::createChannel()
595{
98f0ac42
VK
596 NXCPMessage request(CMD_CREATE_CHANNEL, InterlockedIncrement(&m_requestId));
597 sendMessage(&request);
598 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, request.getId());
599 if (response == NULL)
600 {
601 debugPrintf(4, _T("createChannel: request timeout"));
602 return NULL;
603 }
604
605 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
606 if (rcc != ERR_SUCCESS)
607 {
608 delete response;
609 debugPrintf(4, _T("createChannel: agent error %d (%s)"), rcc, AgentErrorCodeToText(rcc));
610 return NULL;
611 }
612
613 AgentTunnelCommChannel *channel = new AgentTunnelCommChannel(this, response->getFieldAsUInt32(VID_CHANNEL_ID));
7ca6e370 614 delete response;
98f0ac42
VK
615 MutexLock(m_channelLock);
616 m_channels.set(channel->getId(), channel);
617 MutexUnlock(m_channelLock);
618 debugPrintf(4, _T("createChannel: new channel created (ID=%d)"), channel->getId());
619 return channel;
2f6c6597
VK
620}
621
622/**
9af93576
VK
623 * Process channel close notification from agent
624 */
625void AgentTunnel::processChannelClose(UINT32 channelId)
626{
627 debugPrintf(4, _T("processChannelClose: notification of channel %d closure"), channelId);
628
629 MutexLock(m_channelLock);
630 AgentTunnelCommChannel *ch = m_channels.get(channelId);
631 MutexUnlock(m_channelLock);
632 if (ch != NULL)
633 {
634 ch->shutdown();
635 ch->decRefCount();
636 }
637}
638
639/**
2f6c6597
VK
640 * Close channel
641 */
642void AgentTunnel::closeChannel(AgentTunnelCommChannel *channel)
643{
2852b150
VK
644 if (m_state == AGENT_TUNNEL_SHUTDOWN)
645 return;
646
62432449
VK
647 debugPrintf(4, _T("closeChannel: request to close channel %d"), channel->getId());
648
98f0ac42
VK
649 MutexLock(m_channelLock);
650 m_channels.remove(channel->getId());
651 MutexUnlock(m_channelLock);
2f6c6597 652
98f0ac42
VK
653 // Inform agent that channel is closing
654 NXCPMessage msg(CMD_CLOSE_CHANNEL, InterlockedIncrement(&m_requestId));
655 msg.setField(VID_CHANNEL_ID, channel->getId());
656 sendMessage(&msg);
2f6c6597
VK
657}
658
659/**
660 * Send channel data
661 */
662int AgentTunnel::sendChannelData(UINT32 id, const void *data, size_t len)
663{
664 NXCP_MESSAGE *msg = CreateRawNXCPMessage(CMD_CHANNEL_DATA, id, 0, data, len, NULL, false);
d0739aa1 665 int rc = sslWrite(msg, ntohl(msg->size));
98f0ac42
VK
666 if (rc == ntohl(msg->size))
667 rc = (int)len; // adjust number of bytes to exclude tunnel overhead
2f6c6597
VK
668 free(msg);
669 return rc;
670}
671
672/**
3b89a4c1
VK
673 * Fill NXCP message with tunnel data
674 */
675void AgentTunnel::fillMessage(NXCPMessage *msg, UINT32 baseId) const
676{
677 msg->setField(baseId, m_id);
c733fcc4
VK
678 msg->setField(baseId + 1, m_guid);
679 msg->setField(baseId + 2, m_nodeId);
680 msg->setField(baseId + 3, m_address);
681 msg->setField(baseId + 4, m_systemName);
682 msg->setField(baseId + 5, m_systemInfo);
683 msg->setField(baseId + 6, m_platformName);
684 msg->setField(baseId + 7, m_agentVersion);
685 MutexLock(m_channelLock);
686 msg->setField(baseId + 8, m_channels.size());
687 MutexUnlock(m_channelLock);
a191c634 688 msg->setField(baseId + 9, m_zoneUIN);
3b89a4c1
VK
689}
690
691/**
2f6c6597
VK
692 * Channel constructor
693 */
b0039c52 694AgentTunnelCommChannel::AgentTunnelCommChannel(AgentTunnel *tunnel, UINT32 id) : m_buffer(65536, 65536)
2f6c6597 695{
2852b150 696 tunnel->incRefCount();
2f6c6597
VK
697 m_tunnel = tunnel;
698 m_id = id;
699 m_active = true;
35a9f4a3 700#ifdef _WIN32
c9d5f2cc
VK
701 InitializeCriticalSectionAndSpinCount(&m_bufferLock, 4000);
702 m_dataCondition = CreateEvent(NULL, TRUE, FALSE, NULL);
35a9f4a3
VK
703#else
704 pthread_mutex_init(&m_bufferLock, NULL);
705 pthread_cond_init(&m_dataCondition, NULL);
706#endif
2f6c6597
VK
707}
708
709/**
710 * Channel destructor
711 */
712AgentTunnelCommChannel::~AgentTunnelCommChannel()
713{
2852b150 714 m_tunnel->decRefCount();
35a9f4a3 715#ifdef _WIN32
c9d5f2cc
VK
716 DeleteCriticalSection(&m_bufferLock);
717 CloseHandle(m_dataCondition);
35a9f4a3
VK
718#else
719 pthread_mutex_destroy(&m_bufferLock);
720 pthread_cond_destroy(&m_dataCondition);
721#endif
2f6c6597
VK
722}
723
724/**
725 * Send data
726 */
727int AgentTunnelCommChannel::send(const void *data, size_t size, MUTEX mutex)
728{
729 return m_active ? m_tunnel->sendChannelData(m_id, data, size) : -1;
730}
731
732/**
733 * Receive data
734 */
735int AgentTunnelCommChannel::recv(void *buffer, size_t size, UINT32 timeout)
736{
737 if (!m_active)
738 return 0;
739
35a9f4a3 740#ifdef _WIN32
c9d5f2cc 741 EnterCriticalSection(&m_bufferLock);
b0039c52 742 if (m_buffer.isEmpty())
c9d5f2cc
VK
743 {
744retry_wait:
745 LeaveCriticalSection(&m_bufferLock);
746 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
747 return -2;
2f6c6597 748
c9d5f2cc
VK
749 if (!m_active)
750 return 0; // closed while waiting
d0739aa1 751
c9d5f2cc 752 EnterCriticalSection(&m_bufferLock);
b0039c52 753 if (m_buffer.isEmpty())
c9d5f2cc
VK
754 {
755 ResetEvent(m_dataCondition);
756 goto retry_wait;
757 }
758 }
35a9f4a3
VK
759#else
760 pthread_mutex_lock(&m_bufferLock);
b0039c52 761 if (m_buffer.isEmpty())
35a9f4a3
VK
762 {
763#if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
764 struct timespec ts;
765 ts.tv_sec = timeout / 1000;
766 ts.tv_nsec = (timeout % 1000) * 1000000;
b4a14814 767 int rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
35a9f4a3
VK
768#else
769 struct timeval now;
770 struct timespec ts;
771 gettimeofday(&now, NULL);
772 ts.tv_sec = now.tv_sec + (timeout / 1000);
773 now.tv_usec += (timeout % 1000) * 1000;
774 ts.tv_sec += now.tv_usec / 1000000;
775 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
776 int rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
777#endif
778 if (rc != 0)
779 {
780 pthread_mutex_unlock(&m_bufferLock);
781 return -2; // timeout
782 }
783
784 if (!m_active) // closed while waiting
785 {
786 pthread_mutex_unlock(&m_bufferLock);
787 return 0;
788 }
789 }
790#endif
791
b0039c52 792 size_t bytes = m_buffer.read((BYTE *)buffer, size);
35a9f4a3 793#ifdef _WIN32
b0039c52 794 if (m_buffer.isEmpty())
c9d5f2cc 795 ResetEvent(m_dataCondition);
c9d5f2cc 796 LeaveCriticalSection(&m_bufferLock);
35a9f4a3
VK
797#else
798 pthread_mutex_unlock(&m_bufferLock);
799#endif
2f6c6597
VK
800 return (int)bytes;
801}
802
803/**
804 * Poll for data
805 */
806int AgentTunnelCommChannel::poll(UINT32 timeout, bool write)
807{
808 if (write)
809 return 1;
810
811 if (!m_active)
812 return -1;
813
c9d5f2cc
VK
814 int rc = 0;
815
35a9f4a3 816#ifdef _WIN32
c9d5f2cc 817 EnterCriticalSection(&m_bufferLock);
b0039c52 818 if (m_buffer.isEmpty())
c9d5f2cc
VK
819 {
820retry_wait:
821 LeaveCriticalSection(&m_bufferLock);
822 if (WaitForSingleObject(m_dataCondition, timeout) == WAIT_TIMEOUT)
823 return 0;
824
825 if (!m_active)
826 return -1;
827
828 EnterCriticalSection(&m_bufferLock);
b0039c52 829 if (m_buffer.isEmpty())
c9d5f2cc
VK
830 {
831 ResetEvent(m_dataCondition);
832 goto retry_wait;
833 }
834 }
835 LeaveCriticalSection(&m_bufferLock);
35a9f4a3 836#else
35a9f4a3 837 pthread_mutex_lock(&m_bufferLock);
b0039c52 838 if (m_buffer.isEmpty())
35a9f4a3
VK
839 {
840#if HAVE_PTHREAD_COND_RELTIMEDWAIT_NP
841 struct timespec ts;
842 ts.tv_sec = timeout / 1000;
843 ts.tv_nsec = (timeout % 1000) * 1000000;
b4a14814 844 rc = pthread_cond_reltimedwait_np(&m_dataCondition, &m_bufferLock, &ts);
35a9f4a3
VK
845#else
846 struct timeval now;
847 struct timespec ts;
848 gettimeofday(&now, NULL);
849 ts.tv_sec = now.tv_sec + (timeout / 1000);
850 now.tv_usec += (timeout % 1000) * 1000;
851 ts.tv_sec += now.tv_usec / 1000000;
852 ts.tv_nsec = (now.tv_usec % 1000000) * 1000;
853 rc = pthread_cond_timedwait(&m_dataCondition, &m_bufferLock, &ts);
854#endif
855 }
856 pthread_mutex_unlock(&m_bufferLock);
35a9f4a3 857#endif
c9d5f2cc
VK
858
859 return (rc == 0) ? 1 : 0;
2f6c6597
VK
860}
861
862/**
863 * Shutdown channel
864 */
865int AgentTunnelCommChannel::shutdown()
866{
867 m_active = false;
35a9f4a3 868#ifdef _WIN32
c9d5f2cc
VK
869 EnterCriticalSection(&m_bufferLock);
870 SetEvent(m_dataCondition);
871 LeaveCriticalSection(&m_bufferLock);
35a9f4a3
VK
872#else
873 pthread_cond_broadcast(&m_dataCondition);
874#endif
2f6c6597
VK
875 return 0;
876}
877
878/**
879 * Close channel
880 */
881void AgentTunnelCommChannel::close()
882{
883 m_active = false;
35a9f4a3 884#ifdef _WIN32
c9d5f2cc
VK
885 EnterCriticalSection(&m_bufferLock);
886 SetEvent(m_dataCondition);
887 LeaveCriticalSection(&m_bufferLock);
35a9f4a3
VK
888#else
889 pthread_cond_broadcast(&m_dataCondition);
890#endif
2f6c6597
VK
891 m_tunnel->closeChannel(this);
892}
893
894/**
98f0ac42
VK
895 * Put data into buffer
896 */
897void AgentTunnelCommChannel::putData(const BYTE *data, size_t size)
898{
35a9f4a3 899#ifdef _WIN32
c9d5f2cc 900 EnterCriticalSection(&m_bufferLock);
35a9f4a3
VK
901#else
902 pthread_mutex_lock(&m_bufferLock);
903#endif
b0039c52 904 m_buffer.write(data, size);
35a9f4a3 905#ifdef _WIN32
c9d5f2cc
VK
906 SetEvent(m_dataCondition);
907 LeaveCriticalSection(&m_bufferLock);
35a9f4a3
VK
908#else
909 pthread_cond_broadcast(&m_dataCondition);
910 pthread_mutex_unlock(&m_bufferLock);
911#endif
98f0ac42
VK
912}
913
914/**
cce82c3a
VK
915 * Incoming connection data
916 */
917struct ConnectionRequest
918{
919 SOCKET sock;
920 InetAddress addr;
921};
922
923/**
924 * Setup tunnel
925 */
926static void SetupTunnel(void *arg)
927{
928 ConnectionRequest *request = (ConnectionRequest *)arg;
929
930 SSL_CTX *context = NULL;
931 SSL *ssl = NULL;
932 AgentTunnel *tunnel = NULL;
933 int rc;
934 UINT32 nodeId = 0;
a191c634 935 UINT32 zoneUIN = 0;
cce82c3a
VK
936 X509 *cert = NULL;
937
938 // Setup secure connection
939 const SSL_METHOD *method = SSLv23_method();
940 if (method == NULL)
941 {
42c338ac 942 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot obtain TLS method"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
943 goto failure;
944 }
945
da90891a 946 context = SSL_CTX_new((SSL_METHOD *)method);
cce82c3a
VK
947 if (context == NULL)
948 {
42c338ac 949 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot create TLS context"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
950 goto failure;
951 }
da90891a 952#ifdef SSL_OP_NO_COMPRESSION
cce82c3a 953 SSL_CTX_set_options(context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_COMPRESSION);
da90891a
VK
954#else
955 SSL_CTX_set_options(context, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3);
956#endif
cce82c3a
VK
957 if (!SetupServerTlsContext(context))
958 {
42c338ac 959 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot configure TLS context"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
960 goto failure;
961 }
962
963 ssl = SSL_new(context);
964 if (ssl == NULL)
965 {
42c338ac 966 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): cannot create SSL object"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
967 goto failure;
968 }
969
970 SSL_set_accept_state(ssl);
efe30b82 971 SSL_set_fd(ssl, (int)request->sock);
cce82c3a
VK
972
973retry:
974 rc = SSL_do_handshake(ssl);
975 if (rc != 1)
976 {
977 int sslErr = SSL_get_error(ssl, rc);
978 if (sslErr == SSL_ERROR_WANT_READ)
979 {
980 SocketPoller poller;
981 poller.add(request->sock);
982 if (poller.poll(5000) > 0)
983 goto retry;
42c338ac 984 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): TLS handshake failed (timeout)"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
985 }
986 else
987 {
988 char buffer[128];
42c338ac 989 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): TLS handshake failed (%hs)"),
cce82c3a
VK
990 (const TCHAR *)request->addr.toString(), ERR_error_string(sslErr, buffer));
991 }
992 goto failure;
993 }
994
995 cert = SSL_get_peer_certificate(ssl);
996 if (cert != NULL)
997 {
998 if (ValidateAgentCertificate(cert))
999 {
c48843a5
VK
1000 TCHAR ou[256], cn[256];
1001 if (GetCertificateOU(cert, ou, 256) && GetCertificateCN(cert, cn, 256))
cce82c3a 1002 {
42c338ac 1003 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): certificate OU=%s CN=%s"), (const TCHAR *)request->addr.toString(), ou, cn);
c48843a5
VK
1004 uuid nodeGuid = uuid::parse(ou);
1005 uuid tunnelGuid = uuid::parse(cn);
1006 if (!nodeGuid.isNull() && !tunnelGuid.isNull())
cce82c3a 1007 {
0837f989 1008 Node *node = (Node *)FindObjectByGUID(nodeGuid, OBJECT_NODE);
cce82c3a
VK
1009 if (node != NULL)
1010 {
0837f989
VK
1011 if (tunnelGuid.equals(node->getTunnelId()))
1012 {
42c338ac 1013 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Tunnel attached to node %s [%d]"), (const TCHAR *)request->addr.toString(), node->getName(), node->getId());
5b0d07be
EJ
1014 if (node->getRuntimeFlags() & NDF_NEW_TUNNEL_BIND)
1015 {
1016 node->removeNewTunnelBindFlag();
1017 node->setRecheckCapsFlag();
1018 node->forceConfigurationPoll();
1019 }
0837f989 1020 nodeId = node->getId();
a191c634 1021 zoneUIN = node->getZoneUIN();
0837f989
VK
1022 }
1023 else
1024 {
42c338ac 1025 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Tunnel ID %s is not valid for node %s [%d]"),
0837f989
VK
1026 (const TCHAR *)request->addr.toString(), (const TCHAR *)tunnelGuid.toString(),
1027 node->getName(), node->getId());
1028 }
cce82c3a
VK
1029 }
1030 else
1031 {
42c338ac 1032 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Node with GUID %s not found"), (const TCHAR *)request->addr.toString(), (const TCHAR *)nodeGuid.toString());
cce82c3a
VK
1033 }
1034 }
1035 else
1036 {
42c338ac 1037 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Certificate OU or CN is not a valid GUID"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
1038 }
1039 }
1040 else
1041 {
42c338ac 1042 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Cannot get certificate OU and CN"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
1043 }
1044 }
1045 else
1046 {
42c338ac 1047 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Agent certificate validation failed"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
1048 }
1049 X509_free(cert);
1050 }
1051 else
1052 {
42c338ac 1053 nxlog_debug_tag(DEBUG_TAG, 4, _T("SetupTunnel(%s): Agent certificate not provided"), (const TCHAR *)request->addr.toString());
cce82c3a
VK
1054 }
1055
a191c634 1056 tunnel = new AgentTunnel(context, ssl, request->sock, request->addr, nodeId, zoneUIN);
bffa20b1 1057 RegisterTunnel(tunnel);
cce82c3a
VK
1058 tunnel->start();
1059 tunnel->decRefCount();
1060
1061 delete request;
1062 return;
1063
1064failure:
1065 if (ssl != NULL)
1066 SSL_free(ssl);
1067 if (context != NULL)
1068 SSL_CTX_free(context);
1069 shutdown(request->sock, SHUT_RDWR);
1070 closesocket(request->sock);
1071 delete request;
1072}
bf306af3
VK
1073
1074/**
b1caf010
VK
1075 * Tunnel listener lock
1076 */
1077static Mutex s_tunnelListenerLock;
1078
1079/**
f0718805 1080 * Client listener class
bf306af3 1081 */
f0718805 1082class TunnelListener : public SocketListener
bf306af3 1083{
f0718805
VK
1084protected:
1085 virtual ConnectionProcessingResult processConnection(SOCKET s, const InetAddress& peer);
1086 virtual bool isStopConditionReached();
bf306af3 1087
f0718805
VK
1088public:
1089 TunnelListener(UINT16 port) : SocketListener(port) { setName(_T("AgentTunnels")); }
1090};
bf306af3 1091
f0718805
VK
1092/**
1093 * Listener stop condition
1094 */
1095bool TunnelListener::isStopConditionReached()
1096{
1097 return IsShutdownInProgress();
1098}
bf306af3 1099
f0718805
VK
1100/**
1101 * Process incoming connection
1102 */
1103ConnectionProcessingResult TunnelListener::processConnection(SOCKET s, const InetAddress& peer)
1104{
1105 ConnectionRequest *request = new ConnectionRequest();
1106 request->sock = s;
1107 request->addr = peer;
1108 ThreadPoolExecute(g_mainThreadPool, SetupTunnel, request);
1109 return CPR_BACKGROUND;
1110}
bf306af3 1111
f0718805
VK
1112/**
1113 * Tunnel listener
1114 */
1115THREAD_RESULT THREAD_CALL TunnelListenerThread(void *arg)
1116{
930a2a62 1117 ThreadSetName("TunnelListener");
f0718805
VK
1118 s_tunnelListenerLock.lock();
1119 UINT16 listenPort = (UINT16)ConfigReadULong(_T("AgentTunnelListenPort"), 4703);
1120 TunnelListener listener(listenPort);
1121 listener.setListenAddress(g_szListenAddress);
1122 if (!listener.initialize())
bf306af3 1123 {
f0718805 1124 s_tunnelListenerLock.unlock();
bf306af3
VK
1125 return THREAD_OK;
1126 }
1127
f0718805
VK
1128 listener.mainLoop();
1129 listener.shutdown();
bf306af3 1130
42c338ac 1131 nxlog_debug_tag(DEBUG_TAG, 1, _T("Tunnel listener thread terminated"));
b1caf010 1132 s_tunnelListenerLock.unlock();
bf306af3
VK
1133 return THREAD_OK;
1134}
1135
1136/**
1137 * Close all active agent tunnels
1138 */
1139void CloseAgentTunnels()
1140{
42c338ac 1141 nxlog_debug_tag(DEBUG_TAG, 2, _T("Closing active agent tunnels..."));
b1caf010
VK
1142
1143 // Wait for listener thread
1144 s_tunnelListenerLock.lock();
1145 s_tunnelListenerLock.unlock();
1146
1147 s_tunnelListLock.lock();
1148 Iterator<AgentTunnel> *it = s_boundTunnels.iterator();
1149 while(it->hasNext())
1150 {
1151 AgentTunnel *t = it->next();
1152 t->shutdown();
1153 }
7ca6e370 1154 delete it;
b1caf010 1155 for(int i = 0; i < s_unboundTunnels.size(); i++)
9c144154 1156 s_unboundTunnels.get(i)->shutdown();
b1caf010
VK
1157 s_tunnelListLock.unlock();
1158
1159 bool wait = true;
1160 while(wait)
1161 {
1162 ThreadSleepMs(500);
1163 s_tunnelListLock.lock();
1164 if ((s_boundTunnels.size() == 0) && (s_unboundTunnels.size() == 0))
1165 wait = false;
1166 s_tunnelListLock.unlock();
1167 }
1168
42c338ac 1169 nxlog_debug_tag(DEBUG_TAG, 2, _T("All agent tunnels unregistered"));
bf306af3 1170}