From 842378a4794cd0a138247e62537a116ba6b209c8 Mon Sep 17 00:00:00 2001 From: Victor Kirhenshtein Date: Thu, 8 Sep 2016 12:16:53 +0300 Subject: [PATCH] Initial agent-side code for agent to server connections --- include/nms_agent.h | 1 + include/nms_cscp.h | 2 +- src/agent/core/Makefile.am | 2 +- src/agent/core/messages.mc | 6 + src/agent/core/nxagentd.cpp | 15 +- src/agent/core/nxagentd.h | 2 + src/agent/core/session.cpp | 7 +- src/agent/core/tunnel.cpp | 324 ++++++++++++++++++++++++++++++++++++ src/libnetxms/nxcp.cpp | 2 +- 9 files changed, 351 insertions(+), 10 deletions(-) create mode 100644 src/agent/core/tunnel.cpp diff --git a/include/nms_agent.h b/include/nms_agent.h index dabb17b42..e050fccd6 100644 --- a/include/nms_agent.h +++ b/include/nms_agent.h @@ -57,6 +57,7 @@ * Constants */ #define AGENT_LISTEN_PORT 4700 +#define AGENT_TUNNEL_PORT 4703 #define AGENT_PROTOCOL_VERSION 2 #define MAX_RESULT_LENGTH 256 #define MAX_CMD_LEN 256 diff --git a/include/nms_cscp.h b/include/nms_cscp.h index 04c07210d..377218f24 100644 --- a/include/nms_cscp.h +++ b/include/nms_cscp.h @@ -318,7 +318,7 @@ typedef struct #define CMD_CREATE_ACTION 0x0051 #define CMD_DELETE_ACTION 0x0052 #define CMD_ACTION_DATA 0x0053 -//UNUSED: #define CMD_GET_CONTAINER_CAT_LIST 0x0054 +#define CMD_SETUP_AGENT_TUNNEL 0x0054 //UNUSED: #define CMD_CONTAINER_CAT_DATA 0x0055 //UNUSED: #define CMD_DELETE_CONTAINER_CAT 0x0056 //UNUSED: #define CMD_CREATE_CONTAINER_CAT 0x0057 diff --git a/src/agent/core/Makefile.am b/src/agent/core/Makefile.am index 12262e263..8463bd340 100644 --- a/src/agent/core/Makefile.am +++ b/src/agent/core/Makefile.am @@ -6,7 +6,7 @@ nxagentd_SOURCES = messages.c actions.cpp appagent.cpp comm.cpp config.cpp \ nxagentd.cpp policy.cpp push.cpp register.cpp sa.cpp \ sd.cpp session.cpp snmpproxy.cpp snmptrapproxy.cpp \ static_subagents.cpp subagent.cpp sysinfo.cpp syslog.cpp \ - tools.cpp trap.cpp upgrade.cpp watchdog.cpp + tools.cpp trap.cpp tunnel.cpp upgrade.cpp watchdog.cpp if USE_INTERNAL_EXPAT nxagentd_LDADD = ../../appagent/libappagent.la ../libnxagent/libnxagent.la @top_srcdir@/src/db/libnxdb/libnxdb.la @top_srcdir@/src/libnetxms/libnetxms.la @top_srcdir@/src/snmp/libnxsnmp/libnxsnmp.la @top_srcdir@/src/libexpat/libexpat/libnxexpat.la @SUBAGENT_LIBS@ else diff --git a/src/agent/core/messages.mc b/src/agent/core/messages.mc index 6b443c235..87ced8e3f 100644 --- a/src/agent/core/messages.mc +++ b/src/agent/core/messages.mc @@ -326,4 +326,10 @@ Language=English Data collection database schema upgrade failed . +MessageId= +SymbolicName=MSG_INVALID_TUNNEL_CONFIG +Language=English +Invalid server connection configuration record "%1" +. + ;#endif diff --git a/src/agent/core/nxagentd.cpp b/src/agent/core/nxagentd.cpp index f58b9e4c7..44a5b990d 100644 --- a/src/agent/core/nxagentd.cpp +++ b/src/agent/core/nxagentd.cpp @@ -65,6 +65,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapReceiver(void *); THREAD_RESULT THREAD_CALL SNMPTrapSender(void *); THREAD_RESULT THREAD_CALL SyslogReceiver(void *); THREAD_RESULT THREAD_CALL SyslogSender(void *); +THREAD_RESULT THREAD_CALL TunnelManager(void *); void ShutdownTrapSender(); void ShutdownSNMPTrapSender(); @@ -85,6 +86,8 @@ BOOL RegisterOnServer(const TCHAR *pszServer); void UpdatePolicyInventory(); +void ParseTunnelList(TCHAR *list); + #if !defined(_WIN32) void InitStaticSubagents(); #endif @@ -203,6 +206,7 @@ static TCHAR *m_pszShExtParamList = NULL; static TCHAR *m_pszParamProviderList = NULL; static TCHAR *m_pszExtSubagentList = NULL; static TCHAR *m_pszAppAgentList = NULL; +static TCHAR *s_serverConnectionList = NULL; static UINT32 s_enabledCiphers = 0xFFFF; static THREAD s_sessionWatchdogThread = INVALID_THREAD_HANDLE; static THREAD s_listenerThread = INVALID_THREAD_HANDLE; @@ -212,6 +216,7 @@ static THREAD s_snmpTrapSenderThread = INVALID_THREAD_HANDLE; static THREAD s_syslogReceiverThread = INVALID_THREAD_HANDLE; static THREAD s_syslogSenderThread = INVALID_THREAD_HANDLE; static THREAD s_masterAgentListenerThread = INVALID_THREAD_HANDLE; +static THREAD s_tunnelManagerThread = INVALID_THREAD_HANDLE; static TCHAR s_processToWaitFor[MAX_PATH] = _T(""); static TCHAR s_dumpDir[MAX_PATH] = _T("C:\\"); static UINT64 s_maxLogSize = 16384 * 1024; @@ -282,6 +287,7 @@ static NX_CFG_TEMPLATE m_cfgTemplate[] = { _T("PlatformSuffix"), CT_STRING, 0, 0, MAX_PSUFFIX_LENGTH, 0, g_szPlatformSuffix, NULL }, { _T("RequireAuthentication"), CT_BOOLEAN, 0, 0, AF_REQUIRE_AUTH, 0, &g_dwFlags, NULL }, { _T("RequireEncryption"), CT_BOOLEAN, 0, 0, AF_REQUIRE_ENCRYPTION, 0, &g_dwFlags, NULL }, + { _T("ServerConnection"), CT_STRING_LIST, '\n', 0, 0, 0, &s_serverConnectionList, NULL }, { _T("Servers"), CT_STRING_LIST, ',', 0, 0, 0, &m_pszServerList, NULL }, { _T("SessionIdleTimeout"), CT_LONG, 0, 0, 0, 0, &g_dwIdleTimeout, NULL }, { _T("SessionAgentPort"), CT_WORD, 0, 0, 0, 0, &g_sessionAgentPort, NULL }, @@ -805,6 +811,10 @@ BOOL Initialize() if (!InitParameterList()) return FALSE; + // Parse outgoing server connection (tunnel) list + if (s_serverConnectionList != NULL) + ParseTunnelList(s_serverConnectionList); + // Parse server lists if (m_pszMasterServerList != NULL) ParseServerList(m_pszMasterServerList, true, true); @@ -1030,7 +1040,9 @@ BOOL Initialize() { RegisterOnServer(g_szRegistrar); } - } + + s_tunnelManagerThread = ThreadCreateEx(TunnelManager, 0, NULL); + } #if defined(_WIN32) s_shutdownCondition = ConditionCreate(TRUE); @@ -1082,6 +1094,7 @@ void Shutdown() ShutdownTrapSender(); ThreadJoin(s_sessionWatchdogThread); ThreadJoin(s_listenerThread); + ThreadJoin(s_tunnelManagerThread); } ThreadJoin(s_eventSenderThread); if (g_dwFlags & AF_ENABLE_SNMP_TRAP_PROXY) diff --git a/src/agent/core/nxagentd.h b/src/agent/core/nxagentd.h index e5f3977dd..0e897c7fd 100644 --- a/src/agent/core/nxagentd.h +++ b/src/agent/core/nxagentd.h @@ -100,6 +100,8 @@ #define MAX_PSUFFIX_LENGTH 32 #define MAX_SERVERS 32 #define MAX_ESA_USER_NAME 64 +#define MAX_AGENT_MSG_SIZE 4194304 + #define AF_DAEMON 0x00000001 #define AF_USE_SYSLOG 0x00000002 diff --git a/src/agent/core/session.cpp b/src/agent/core/session.cpp index d4835373a..5f5df2bd2 100644 --- a/src/agent/core/session.cpp +++ b/src/agent/core/session.cpp @@ -36,11 +36,6 @@ UINT32 UninstallPolicy(CommSession *session, NXCPMessage *request); UINT32 GetPolicyInventory(CommSession *session, NXCPMessage *msg); void ClearDataCollectionConfiguration(); -/** - * Max message size - */ -#define MAX_MSG_SIZE 4194304 - /** * SNMP proxy thread pool */ @@ -206,7 +201,7 @@ void CommSession::disconnect() */ void CommSession::readThread() { - SocketMessageReceiver receiver(m_hSocket, 4096, MAX_MSG_SIZE); + SocketMessageReceiver receiver(m_hSocket, 4096, MAX_AGENT_MSG_SIZE); while(true) { if (!m_proxyConnection) diff --git a/src/agent/core/tunnel.cpp b/src/agent/core/tunnel.cpp new file mode 100644 index 000000000..af65ade0e --- /dev/null +++ b/src/agent/core/tunnel.cpp @@ -0,0 +1,324 @@ +/* +** NetXMS multiplatform core agent +** Copyright (C) 2003-2016 Victor Kirhenshtein +** +** This program is free software; you can redistribute it and/or modify +** it under the terms of the GNU General Public License as published by +** the Free Software Foundation; either version 2 of the License, or +** (at your option) any later version. +** +** This program is distributed in the hope that it will be useful, +** but WITHOUT ANY WARRANTY; without even the implied warranty of +** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +** GNU General Public License for more details. +** +** You should have received a copy of the GNU General Public License +** along with this program; if not, write to the Free Software +** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +** +** File: tunnel.cpp +** +**/ + +#include "nxagentd.h" + +/** + * Tunnel class + */ +class Tunnel +{ +private: + InetAddress m_address; + UINT16 m_port; + TCHAR m_login[MAX_OBJECT_NAME]; + SOCKET m_socket; + bool m_connected; + UINT32 m_requestId; + THREAD m_recvThread; + MsgWaitQueue *m_queue; + + Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login); + + bool connectToServer(); + bool sendMessage(const NXCPMessage *msg); + NXCPMessage *waitForMessage(UINT16 code, UINT32 id) { return (m_queue != NULL) ? m_queue->waitForMessage(code, id, 5000) : NULL; } + void recvThread(); + + static THREAD_RESULT THREAD_CALL recvThreadStarter(void *arg); + +public: + ~Tunnel(); + + void checkConnection(); + void disconnect(); + + const InetAddress& getAddress() const { return m_address; } + const TCHAR *getLogin() const { return m_login; } + + static Tunnel *createFromConfig(TCHAR *config); +}; + +/** + * Tunnel constructor + */ +Tunnel::Tunnel(const InetAddress& addr, UINT16 port, const TCHAR *login) : m_address(addr) +{ + m_port = port; + nx_strncpy(m_login, login, MAX_OBJECT_NAME); + m_socket = INVALID_SOCKET; + m_connected = false; + m_requestId = 0; + m_recvThread = INVALID_THREAD_HANDLE; + m_queue = NULL; +} + +/** + * Tunnel destructor + */ +Tunnel::~Tunnel() +{ + disconnect(); + if (m_socket != INVALID_SOCKET) + closesocket(m_socket); +} + +/** + * Force disconnect + */ +void Tunnel::disconnect() +{ + if (m_socket != INVALID_SOCKET) + shutdown(m_socket, SHUT_RDWR); + m_connected = false; + ThreadJoin(m_recvThread); + delete_and_null(m_queue); +} + +/** + * Receiver thread starter + */ +THREAD_RESULT THREAD_CALL Tunnel::recvThreadStarter(void *arg) +{ + ((Tunnel *)arg)->recvThread(); + return THREAD_OK; +} + +/** + * Receiver thread + */ +void Tunnel::recvThread() +{ + SocketMessageReceiver receiver(m_socket, 8192, MAX_AGENT_MSG_SIZE); + while(m_connected) + { + MessageReceiverResult result; + NXCPMessage *msg = receiver.readMessage(1000, &result); + if (msg != NULL) + { + m_queue->put(msg); + } + else if (result != MSGRECV_TIMEOUT) + { + nxlog_debug(4, _T("Receiver thread for tunnel %s@%s stopped (%s)"), \ + m_login, (const TCHAR *)m_address.toString(), AbstractMessageReceiver::resultToText(result)); + break; + } + } +} + +/** + * Send message + */ +bool Tunnel::sendMessage(const NXCPMessage *msg) +{ + if (m_socket == INVALID_SOCKET) + return false; + + NXCP_MESSAGE *data = msg->createMessage(); + bool success = SendEx(m_socket, data, ntohl(data->size), 0, NULL); + free(data); + return success; +} + +/** + * Connect to server + */ +bool Tunnel::connectToServer() +{ + if (m_socket != INVALID_SOCKET) + closesocket(m_socket); + + m_socket = socket(m_address.getFamily(), SOCK_STREAM, 0); + if (m_socket == INVALID_SOCKET) + { + nxlog_debug(4, _T("Cannot create socket for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError())); + return false; + } + + SockAddrBuffer sa; + m_address.fillSockAddr(&sa, m_port); + if (ConnectEx(m_socket, (struct sockaddr *)&sa, SA_LEN((struct sockaddr *)&sa), 5000) == -1) + { + nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: %s"), m_login, (const TCHAR *)m_address.toString(), _tcserror(WSAGetLastError())); + return false; + } + + delete m_queue; + m_queue = new MsgWaitQueue(); + m_recvThread = ThreadCreateEx(Tunnel::recvThreadStarter, 0, this); + + m_requestId = 1; + + NXCPMessage msg; + msg.setCode(CMD_SETUP_AGENT_TUNNEL); + msg.setId(m_requestId++); + msg.setField(VID_LOGIN_NAME, m_login); + msg.setField(VID_SHARED_SECRET, g_szSharedSecret); + sendMessage(&msg); + + NXCPMessage *response = waitForMessage(CMD_REQUEST_COMPLETED, msg.getId()); + if (response == NULL) + { + nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: request timeout"), m_login, (const TCHAR *)m_address.toString()); + disconnect(); + return false; + } + + UINT32 rcc = response->getFieldAsUInt32(VID_RCC); + delete response; + if (rcc != ERR_SUCCESS) + { + nxlog_debug(4, _T("Cannot establish connection for tunnel %s@%s: error %d"), m_login, (const TCHAR *)m_address.toString(), rcc); + disconnect(); + return false; + } + + m_connected = true; + return true; +} + +/** + * Check tunnel connection and connect as needed + */ +void Tunnel::checkConnection() +{ + if (!m_connected) + { + if (connectToServer()) + nxlog_debug(3, _T("Tunnel %s@%s active"), m_login, (const TCHAR *)m_address.toString()); + } + else + { + NXCPMessage msg; + msg.setCode(CMD_KEEPALIVE); + msg.setId(m_requestId++); + if (sendMessage(&msg)) + { + NXCPMessage *response = waitForMessage(CMD_KEEPALIVE, msg.getId()); + if (response == NULL) + { + disconnect(); + closesocket(m_socket); + m_socket = INVALID_SOCKET; + nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString()); + } + else + { + delete response; + } + } + else + { + disconnect(); + closesocket(m_socket); + m_socket = INVALID_SOCKET; + nxlog_debug(3, _T("Connection test failed for tunnel %s@%s"), m_login, (const TCHAR *)m_address.toString()); + } + } +} + +/** + * Create tunnel object from configuration record + */ +Tunnel *Tunnel::createFromConfig(TCHAR *config) +{ + TCHAR *a = _tcschr(config, _T('@')); + if (a == NULL) + return NULL; + + a++; + int port = AGENT_TUNNEL_PORT; + TCHAR *p = _tcschr(a, _T(':')); + if (p != NULL) + { + *p = 0; + p++; + + TCHAR *eptr; + int port = _tcstol(p, &eptr, 10); + if ((port < 1) || (port > 65535)) + return NULL; + } + + InetAddress addr = InetAddress::resolveHostName(a); + if (!addr.isValidUnicast()) + return NULL; + + return new Tunnel(addr, port, config); +} + +/** + * Configured tunnels + */ +static ObjectArray s_tunnels; + +/** + * Parser server connection (tunnel) list + */ +void ParseTunnelList(TCHAR *list) +{ + TCHAR *curr, *next; + for(curr = next = list; curr != NULL && *curr != 0; curr = next + 1) + { + next = _tcschr(curr, _T('\n')); + if (next != NULL) + *next = 0; + StrStrip(curr); + + Tunnel *t = Tunnel::createFromConfig(curr); + if (t != NULL) + { + s_tunnels.add(t); + nxlog_debug(1, _T("Added server tunnel %s@%s"), t->getLogin(), (const TCHAR *)t->getAddress().toString()); + } + else + { + nxlog_write(MSG_INVALID_TUNNEL_CONFIG, NXLOG_ERROR, "s", curr); + } + } + free(list); +} + +/** + * Tunnel manager + */ +THREAD_RESULT THREAD_CALL TunnelManager(void *) +{ + if (s_tunnels.size() == 0) + { + nxlog_debug(3, _T("No tunnels configured, tunnel manager will not start")); + return THREAD_OK; + } + + nxlog_debug(3, _T("Tunnel manager started")); + while(!AgentSleepAndCheckForShutdown(30000)) + { + for(int i = 0; i < s_tunnels.size(); i++) + { + Tunnel *t = s_tunnels.get(i); + t->checkConnection(); + } + } + nxlog_debug(3, _T("Tunnel manager stopped")); + return THREAD_OK; +} diff --git a/src/libnetxms/nxcp.cpp b/src/libnetxms/nxcp.cpp index 1537d26e8..614ce418a 100644 --- a/src/libnetxms/nxcp.cpp +++ b/src/libnetxms/nxcp.cpp @@ -119,7 +119,7 @@ TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD code, TCHAR *pszBuffer) _T("CMD_CREATE_ACTION"), _T("CMD_DELETE_ACTION"), _T("CMD_ACTION_DATA"), - _T(""), + _T("CMD_SETUP_AGENT_TUNNEL"), _T(""), _T(""), _T(""), -- 2.20.1