5c5dfb1b1f50b504302886fd07e6229e7402d47b
[public/netxms.git] / src / agent / core / comm.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003, 2004 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** $module: comm.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25
26 //
27 // Global variables
28 //
29
30 DWORD g_dwAcceptErrors = 0;
31 DWORD g_dwAcceptedConnections = 0;
32 DWORD g_dwRejectedConnections = 0;
33
34
35 //
36 // Constants
37 //
38
39 #define MAX_CLIENT_SESSIONS 32
40
41
42 //
43 // Static data
44 //
45
46 static CommSession *m_pSessionList[MAX_CLIENT_SESSIONS];
47 static MUTEX m_hSessionListAccess;
48
49
50 //
51 // Validates server's address
52 //
53
54 static BOOL IsValidServerAddr(DWORD dwAddr, BOOL *pbInstallationServer)
55 {
56 DWORD i;
57
58 for(i=0; i < g_dwServerCount; i++)
59 if (dwAddr == g_pServerList[i].dwIpAddr)
60 {
61 *pbInstallationServer = g_pServerList[i].bInstallationServer;
62 return TRUE;
63 }
64 return FALSE;
65 }
66
67
68 //
69 // Register new session in list
70 //
71
72 static BOOL RegisterSession(CommSession *pSession)
73 {
74 DWORD i;
75
76 MutexLock(m_hSessionListAccess, INFINITE);
77 for(i = 0; i < MAX_CLIENT_SESSIONS; i++)
78 if (m_pSessionList[i] == NULL)
79 {
80 m_pSessionList[i] = pSession;
81 pSession->SetIndex(i);
82 MutexUnlock(m_hSessionListAccess);
83 return TRUE;
84 }
85
86 MutexUnlock(m_hSessionListAccess);
87 WriteLog(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
88 return FALSE;
89 }
90
91
92 //
93 // Unregister session
94 //
95
96 void UnregisterSession(DWORD dwIndex)
97 {
98 MutexLock(m_hSessionListAccess, INFINITE);
99 m_pSessionList[dwIndex] = NULL;
100 MutexUnlock(m_hSessionListAccess);
101 }
102
103
104 //
105 // TCP/IP Listener
106 //
107
108 THREAD_RESULT THREAD_CALL ListenerThread(void *)
109 {
110 SOCKET hSocket, hClientSocket;
111 struct sockaddr_in servAddr;
112 int iNumErrors = 0;
113 socklen_t iSize;
114 CommSession *pSession;
115 char szBuffer[256];
116 BOOL bInstallationServer;
117
118 // Create socket
119 if ((hSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
120 {
121 WriteLog(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
122 exit(1);
123 }
124
125 // Fill in local address structure
126 memset(&servAddr, 0, sizeof(struct sockaddr_in));
127 servAddr.sin_family = AF_INET;
128 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
129 servAddr.sin_port = htons(g_wListenPort);
130
131 // Bind socket
132 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
133 {
134 WriteLog(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
135 exit(1);
136 }
137
138 // Set up queue
139 listen(hSocket, SOMAXCONN);
140
141 // Create session list access mutex
142 m_hSessionListAccess = MutexCreate();
143
144 // Wait for connection requests
145 while(!(g_dwFlags & AF_SHUTDOWN))
146 {
147 iSize = sizeof(struct sockaddr_in);
148 if ((hClientSocket = accept(hSocket, (struct sockaddr *)&servAddr, &iSize)) == -1)
149 {
150 int error = WSAGetLastError();
151
152 if (error != WSAEINTR)
153 WriteLog(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
154 iNumErrors++;
155 g_dwAcceptErrors++;
156 if (iNumErrors > 1000)
157 {
158 WriteLog(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
159 iNumErrors = 0;
160 }
161 ThreadSleepMs(500);
162 }
163
164 iNumErrors = 0; // Reset consecutive errors counter
165 DebugPrintf("Incoming connection from %s", IpToStr(ntohl(servAddr.sin_addr.s_addr), szBuffer));
166
167 if (IsValidServerAddr(servAddr.sin_addr.s_addr, &bInstallationServer))
168 {
169 g_dwAcceptedConnections++;
170 DebugPrintf("Connection from %s accepted", szBuffer);
171
172 // Create new session structure and threads
173 pSession = new CommSession(hClientSocket, ntohl(servAddr.sin_addr.s_addr),
174 bInstallationServer);
175 if (!RegisterSession(pSession))
176 {
177 delete pSession;
178 }
179 else
180 {
181 pSession->Run();
182 }
183 }
184 else // Unauthorized connection
185 {
186 g_dwRejectedConnections++;
187 shutdown(hClientSocket, 2);
188 closesocket(hClientSocket);
189 DebugPrintf("Connection from %s rejected", szBuffer);
190 }
191 }
192
193 MutexDestroy(m_hSessionListAccess);
194 return THREAD_OK;
195 }