Initial agent-side code for agent to server connections
[public/netxms.git] / src / agent / core / tunnel.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2016 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: tunnel.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Tunnel class
27 */
28 class Tunnel
29 {
30 private:
31 InetAddress m_address;
32 UINT16 m_port;
33 TCHAR m_login[MAX_OBJECT_NAME];
34 SOCKET m_socket;
35 bool m_connected;
36 UINT32 m_requestId;
37 THREAD m_recvThread;
38 MsgWaitQueue *m_queue;
39
40 Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login);
41
42 bool connectToServer();
43 bool sendMessage(const NXCPMessage *msg);
44 NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return (m_queue != NULL) ? m_queue->waitForMessage(code, id, 5000) : NULL; }
45 void recvThread();
46
47 static THREAD_RESULT THREAD_CALL recvThreadStarter(void *arg);
48
49 public:
50 ~Tunnel();
51
52 void checkConnection();
53 void disconnect();
54
55 const InetAddress& getAddress() const { return m_address; }
56 const TCHAR *getLogin() const { return m_login; }
57
58 static Tunnel *createFromConfig(TCHAR *config);
59 };
60
61 /**
62 * Tunnel constructor
63 */
64 Tunnel::Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login) : m_address(addr)
65 {
66 m_port = port;
67 nx_strncpy(m_login, login, MAX_OBJECT_NAME);
68 m_socket = INVALID_SOCKET;
69 m_connected = false;
70 m_requestId = 0;
71 m_recvThread = INVALID_THREAD_HANDLE;
72 m_queue = NULL;
73 }
74
75 /**
76 * Tunnel destructor
77 */
78 Tunnel::~Tunnel()
79 {
80 disconnect();
81 if (m_socket != INVALID_SOCKET)
82 closesocket(m_socket);
83 }
84
85 /**
86 * Force disconnect
87 */
88 void Tunnel::disconnect()
89 {
90 if (m_socket != INVALID_SOCKET)
91 shutdown(m_socket, SHUT_RDWR);
92 m_connected = false;
93 ThreadJoin(m_recvThread);
94 delete_and_null(m_queue);
95 }
96
97 /**
98 * Receiver thread starter
99 */
100 THREAD_RESULT THREAD_CALL Tunnel::recvThreadStarter(void *arg)
101 {
102 ((Tunnel *)arg)->recvThread();
103 return THREAD_OK;
104 }
105
106 /**
107 * Receiver thread
108 */
109 void Tunnel::recvThread()
110 {
111 SocketMessageReceiver receiver(m_socket, 8192, MAX_AGENT_MSG_SIZE);
112 while(m_connected)
113 {
114 MessageReceiverResult result;
115 NXCPMessage *msg = receiver.readMessage(1000, &result);
116 if (msg != NULL)
117 {
118 m_queue->put(msg);
119 }
120 else if (result != MSGRECV_TIMEOUT)
121 {
122 nxlog_debug(4, _T("Receiver thread for tunnel %s@%s stopped (%s)"), \
123 m_login, (const TCHAR *)m_address.toString(), AbstractMessageReceiver::resultToText(result));
124 break;
125 }
126 }
127 }
128
129 /**
130 * Send message
131 */
132 bool Tunnel::sendMessage(const NXCPMessage *msg)
133 {
134 if (m_socket == INVALID_SOCKET)
135 return false;
136
137 NXCP_MESSAGE *data = msg->createMessage();
138 bool success = SendEx(m_socket, data, ntohl(data->size), 0, NULL);
139 free(data);
140 return success;
141 }
142
143 /**
144 * Connect to server
145 */
146 bool Tunnel::connectToServer()
147 {
148 if (m_socket != INVALID_SOCKET)
149 closesocket(m_socket);
150
151 m_socket = socket(m_address.getFamily(), SOCK_STREAM, 0);
152 if (m_socket == INVALID_SOCKET)
153 {
154 nxlog_debug(4, _T("Cannot create socket for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError()));
155 return false;
156 }
157
158 SockAddrBuffer sa;
159 m_address.fillSockAddr(&sa, m_port);
160 if (ConnectEx(m_socket, (struct sockaddr *)&sa, SA_LEN((struct sockaddr *)&sa), 5000) == -1)
161 {
162 nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError()));
163 return false;
164 }
165
166 delete m_queue;
167 m_queue = new MsgWaitQueue();
168 m_recvThread = ThreadCreateEx(Tunnel::recvThreadStarter, 0, this);
169
170 m_requestId = 1;
171
172 NXCPMessage msg;
173 msg.setCode(CMD_SETUP_AGENT_TUNNEL);
174 msg.setId(m_requestId++);
175 msg.setField(VID_LOGIN_NAME, m_login);
176 msg.setField(VID_SHARED_SECRET, g_szSharedSecret);
177 sendMessage(&msg);
178
179 NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId());
180 if (response == NULL)
181 {
182 nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: request timeout"), m_login, (const TCHAR *)m_address.toString());
183 disconnect();
184 return false;
185 }
186
187 UINT32 rcc = response->getFieldAsUInt32(VID_RCC);
188 delete response;
189 if (rcc != ERR_SUCCESS)
190 {
191 nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: error %d"), m_login, (const TCHAR *)m_address.toString(), rcc);
192 disconnect();
193 return false;
194 }
195
196 m_connected = true;
197 return true;
198 }
199
200 /**
201 * Check tunnel connection and connect as needed
202 */
203 void Tunnel::checkConnection()
204 {
205 if (!m_connected)
206 {
207 if (connectToServer())
208 nxlog_debug(3, _T("Tunnel %s@%s active"), m_login, (const TCHAR *)m_address.toString());
209 }
210 else
211 {
212 NXCPMessage msg;
213 msg.setCode(CMD_KEEPALIVE);
214 msg.setId(m_requestId++);
215 if (sendMessage(&msg))
216 {
217 NXCPMessage *response = waitForMessage(CMD_KEEPALIVE, msg.getId());
218 if (response == NULL)
219 {
220 disconnect();
221 closesocket(m_socket);
222 m_socket = INVALID_SOCKET;
223 nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
224 }
225 else
226 {
227 delete response;
228 }
229 }
230 else
231 {
232 disconnect();
233 closesocket(m_socket);
234 m_socket = INVALID_SOCKET;
235 nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString());
236 }
237 }
238 }
239
240 /**
241 * Create tunnel object from configuration record
242 */
243 Tunnel *Tunnel::createFromConfig(TCHAR *config)
244 {
245 TCHAR *a = _tcschr(config, _T('@'));
246 if (a == NULL)
247 return NULL;
248
249 a++;
250 int port = AGENT_TUNNEL_PORT;
251 TCHAR *p = _tcschr(a, _T(':'));
252 if (p != NULL)
253 {
254 *p = 0;
255 p++;
256
257 TCHAR *eptr;
258 int port = _tcstol(p, &eptr, 10);
259 if ((port < 1) || (port > 65535))
260 return NULL;
261 }
262
263 InetAddress addr = InetAddress::resolveHostName(a);
264 if (!addr.isValidUnicast())
265 return NULL;
266
267 return new Tunnel(addr, port, config);
268 }
269
270 /**
271 * Configured tunnels
272 */
273 static ObjectArray<Tunnel> s_tunnels;
274
275 /**
276 * Parser server connection (tunnel) list
277 */
278 void ParseTunnelList(TCHAR *list)
279 {
280 TCHAR *curr, *next;
281 for(curr = next = list; curr != NULL && *curr != 0; curr = next + 1)
282 {
283 next = _tcschr(curr, _T('\n'));
284 if (next != NULL)
285 *next = 0;
286 StrStrip(curr);
287
288 Tunnel *t = Tunnel::createFromConfig(curr);
289 if (t != NULL)
290 {
291 s_tunnels.add(t);
292 nxlog_debug(1, _T("Added server tunnel %s@%s"), t->getLogin(), (const TCHAR *)t->getAddress().toString());
293 }
294 else
295 {
296 nxlog_write(MSG_INVALID_TUNNEL_CONFIG, NXLOG_ERROR, "s", curr);
297 }
298 }
299 free(list);
300 }
301
302 /**
303 * Tunnel manager
304 */
305 THREAD_RESULT THREAD_CALL TunnelManager(void *)
306 {
307 if (s_tunnels.size() == 0)
308 {
309 nxlog_debug(3, _T("No tunnels configured, tunnel manager will not start"));
310 return THREAD_OK;
311 }
312
313 nxlog_debug(3, _T("Tunnel manager started"));
314 while(!AgentSleepAndCheckForShutdown(30000))
315 {
316 for(int i = 0; i < s_tunnels.size(); i++)
317 {
318 Tunnel *t = s_tunnels.get(i);
319 t->checkConnection();
320 }
321 }
322 nxlog_debug(3, _T("Tunnel manager stopped"));
323 return THREAD_OK;
324 }