+ sockets reuse (*nix only)
[public/netxms.git] / src / agent / core / comm.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003, 2004 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** $module: comm.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25
26 //
27 // Global variables
28 //
29
30 DWORD g_dwAcceptErrors = 0;
31 DWORD g_dwAcceptedConnections = 0;
32 DWORD g_dwRejectedConnections = 0;
33
34
35 //
36 // Constants
37 //
38
39 #define MAX_CLIENT_SESSIONS 32
40
41
42 //
43 // Static data
44 //
45
46 static CommSession *m_pSessionList[MAX_CLIENT_SESSIONS];
47 static MUTEX m_hSessionListAccess;
48
49
50 //
51 // Validates server's address
52 //
53
54 static BOOL IsValidServerAddr(DWORD dwAddr, BOOL *pbInstallationServer)
55 {
56 DWORD i;
57
58 for(i=0; i < g_dwServerCount; i++)
59 if (dwAddr == g_pServerList[i].dwIpAddr)
60 {
61 *pbInstallationServer = g_pServerList[i].bInstallationServer;
62 return TRUE;
63 }
64 return FALSE;
65 }
66
67
68 //
69 // Register new session in list
70 //
71
72 static BOOL RegisterSession(CommSession *pSession)
73 {
74 DWORD i;
75
76 MutexLock(m_hSessionListAccess, INFINITE);
77 for(i = 0; i < MAX_CLIENT_SESSIONS; i++)
78 if (m_pSessionList[i] == NULL)
79 {
80 m_pSessionList[i] = pSession;
81 pSession->SetIndex(i);
82 MutexUnlock(m_hSessionListAccess);
83 return TRUE;
84 }
85
86 MutexUnlock(m_hSessionListAccess);
87 WriteLog(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
88 return FALSE;
89 }
90
91
92 //
93 // Unregister session
94 //
95
96 void UnregisterSession(DWORD dwIndex)
97 {
98 MutexLock(m_hSessionListAccess, INFINITE);
99 m_pSessionList[dwIndex] = NULL;
100 MutexUnlock(m_hSessionListAccess);
101 }
102
103
104 //
105 // TCP/IP Listener
106 //
107
108 THREAD_RESULT THREAD_CALL ListenerThread(void *)
109 {
110 SOCKET hSocket, hClientSocket;
111 struct sockaddr_in servAddr;
112 int iNumErrors = 0;
113 socklen_t iSize;
114 CommSession *pSession;
115 char szBuffer[256];
116 BOOL bInstallationServer;
117
118 // Create socket
119 if ((hSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
120 {
121 WriteLog(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
122 exit(1);
123 }
124
125 SetSocketReuseFlag(hSocket);
126
127 // Fill in local address structure
128 memset(&servAddr, 0, sizeof(struct sockaddr_in));
129 servAddr.sin_family = AF_INET;
130 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
131 servAddr.sin_port = htons(g_wListenPort);
132
133 // Bind socket
134 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
135 {
136 WriteLog(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
137 exit(1);
138 }
139
140 // Set up queue
141 listen(hSocket, SOMAXCONN);
142
143 // Create session list access mutex
144 m_hSessionListAccess = MutexCreate();
145
146 // Wait for connection requests
147 while(!(g_dwFlags & AF_SHUTDOWN))
148 {
149 iSize = sizeof(struct sockaddr_in);
150 if ((hClientSocket = accept(hSocket, (struct sockaddr *)&servAddr, &iSize)) == -1)
151 {
152 int error = WSAGetLastError();
153
154 if (error != WSAEINTR)
155 WriteLog(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
156 iNumErrors++;
157 g_dwAcceptErrors++;
158 if (iNumErrors > 1000)
159 {
160 WriteLog(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
161 iNumErrors = 0;
162 }
163 ThreadSleepMs(500);
164 }
165
166 iNumErrors = 0; // Reset consecutive errors counter
167 DebugPrintf("Incoming connection from %s", IpToStr(ntohl(servAddr.sin_addr.s_addr), szBuffer));
168
169 if (IsValidServerAddr(servAddr.sin_addr.s_addr, &bInstallationServer))
170 {
171 g_dwAcceptedConnections++;
172 DebugPrintf("Connection from %s accepted", szBuffer);
173
174 // Create new session structure and threads
175 pSession = new CommSession(hClientSocket, ntohl(servAddr.sin_addr.s_addr),
176 bInstallationServer);
177 if (!RegisterSession(pSession))
178 {
179 delete pSession;
180 }
181 else
182 {
183 pSession->Run();
184 }
185 }
186 else // Unauthorized connection
187 {
188 g_dwRejectedConnections++;
189 shutdown(hClientSocket, 2);
190 closesocket(hClientSocket);
191 DebugPrintf("Connection from %s rejected", szBuffer);
192 }
193 }
194
195 MutexDestroy(m_hSessionListAccess);
196 return THREAD_OK;
197 }