1197058189c0344f89328d01a951bdbff623133a
[public/netxms.git] / src / agent / core / comm.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003, 2004 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** $module: comm.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25
26 //
27 // Global variables
28 //
29
30 DWORD g_dwAcceptErrors = 0;
31 DWORD g_dwAcceptedConnections = 0;
32 DWORD g_dwRejectedConnections = 0;
33
34
35 //
36 // Constants
37 //
38
39 #define MAX_CLIENT_SESSIONS 32
40
41
42 //
43 // Static data
44 //
45
46 static CommSession *m_pSessionList[MAX_CLIENT_SESSIONS];
47 static MUTEX m_hSessionListAccess;
48
49
50 //
51 // Validates server's address
52 //
53
54 static BOOL IsValidServerAddr(DWORD dwAddr)
55 {
56 DWORD i;
57
58 for(i=0; i < g_dwServerCount; i++)
59 if (dwAddr == g_dwServerAddr[i])
60 return TRUE;
61 return FALSE;
62 }
63
64
65 //
66 // Register new session in list
67 //
68
69 static BOOL RegisterSession(CommSession *pSession)
70 {
71 DWORD i;
72
73 MutexLock(m_hSessionListAccess, INFINITE);
74 for(i = 0; i < MAX_CLIENT_SESSIONS; i++)
75 if (m_pSessionList[i] == NULL)
76 {
77 m_pSessionList[i] = pSession;
78 pSession->SetIndex(i);
79 MutexUnlock(m_hSessionListAccess);
80 return TRUE;
81 }
82
83 MutexUnlock(m_hSessionListAccess);
84 WriteLog(MSG_TOO_MANY_SESSIONS, EVENTLOG_WARNING_TYPE, NULL);
85 return FALSE;
86 }
87
88
89 //
90 // Unregister session
91 //
92
93 void UnregisterSession(DWORD dwIndex)
94 {
95 MutexLock(m_hSessionListAccess, INFINITE);
96 m_pSessionList[dwIndex] = NULL;
97 MutexUnlock(m_hSessionListAccess);
98 }
99
100
101 //
102 // TCP/IP Listener
103 //
104
105 THREAD_RESULT THREAD_CALL ListenerThread(void *)
106 {
107 SOCKET hSocket, hClientSocket;
108 struct sockaddr_in servAddr;
109 int iNumErrors = 0;
110 socklen_t iSize;
111 CommSession *pSession;
112 char szBuffer[256];
113
114 // Create socket
115 if ((hSocket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
116 {
117 WriteLog(MSG_SOCKET_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
118 exit(1);
119 }
120
121 // Fill in local address structure
122 memset(&servAddr, 0, sizeof(struct sockaddr_in));
123 servAddr.sin_family = AF_INET;
124 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
125 servAddr.sin_port = htons(g_wListenPort);
126
127 // Bind socket
128 if (bind(hSocket, (struct sockaddr *)&servAddr, sizeof(struct sockaddr_in)) != 0)
129 {
130 WriteLog(MSG_BIND_ERROR, EVENTLOG_ERROR_TYPE, "e", WSAGetLastError());
131 exit(1);
132 }
133
134 // Set up queue
135 listen(hSocket, SOMAXCONN);
136
137 // Create session list access mutex
138 m_hSessionListAccess = MutexCreate();
139
140 // Wait for connection requests
141 while(!(g_dwFlags & AF_SHUTDOWN))
142 {
143 iSize = sizeof(struct sockaddr_in);
144 if ((hClientSocket = accept(hSocket, (struct sockaddr *)&servAddr, &iSize)) == -1)
145 {
146 int error = WSAGetLastError();
147
148 if (error != WSAEINTR)
149 WriteLog(MSG_ACCEPT_ERROR, EVENTLOG_ERROR_TYPE, "e", error);
150 iNumErrors++;
151 g_dwAcceptErrors++;
152 if (iNumErrors > 1000)
153 {
154 WriteLog(MSG_TOO_MANY_ERRORS, EVENTLOG_WARNING_TYPE, NULL);
155 iNumErrors = 0;
156 }
157 ThreadSleepMs(500);
158 }
159
160 iNumErrors = 0; // Reset consecutive errors counter
161 DebugPrintf("Incoming connection from %s", IpToStr(ntohl(servAddr.sin_addr.s_addr), szBuffer));
162
163 if (IsValidServerAddr(servAddr.sin_addr.s_addr))
164 {
165 g_dwAcceptedConnections++;
166 DebugPrintf("Connection from %s accepted", szBuffer);
167
168 // Create new session structure and threads
169 pSession = new CommSession(hClientSocket, ntohl(servAddr.sin_addr.s_addr));
170 if (!RegisterSession(pSession))
171 {
172 delete pSession;
173 }
174 else
175 {
176 pSession->Run();
177 }
178 }
179 else // Unauthorized connection
180 {
181 g_dwRejectedConnections++;
182 shutdown(hClientSocket, 2);
183 closesocket(hClientSocket);
184 DebugPrintf("Connection from %s rejected", szBuffer);
185 }
186 }
187
188 MutexDestroy(m_hSessionListAccess);
189 return THREAD_OK;
190 }