678c2db5f90127b822626695ddbccbb34d1a994f
[public/netxms.git] / src / agent / core / push.cpp
1 /*
2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2013 Victor Kirhenshtein
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be usefu,,
11 ** but ITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 **
19 ** File: push.cpp
20 **
21 **/
22
23 #include "nxagentd.h"
24
25 /**
26 * Request ID
27 */
28 static UINT64 s_requestIdHigh = (UINT64)time(NULL) << 32;
29 static VolatileCounter s_requestIdLow = 0;
30
31 /**
32 * Push parameter's data
33 */
34 bool PushData(const TCHAR *parameter, const TCHAR *value, UINT32 objectId, time_t timestamp)
35 {
36 NXCPMessage msg;
37 bool success = false;
38
39 AgentWriteDebugLog(6, _T("PushData: \"%s\" = \"%s\""), parameter, value);
40
41 msg.setCode(CMD_PUSH_DCI_DATA);
42 msg.setField(VID_NAME, parameter);
43 msg.setField(VID_VALUE, value);
44 msg.setField(VID_OBJECT_ID, objectId);
45 msg.setFieldFromTime(VID_TIMESTAMP, timestamp);
46 msg.setField(VID_REQUEST_ID, s_requestIdHigh | (UINT64)InterlockedIncrement(&s_requestIdLow));
47
48 if (g_dwFlags & AF_SUBAGENT_LOADER)
49 {
50 success = SendMessageToMasterAgent(&msg);
51 }
52 else
53 {
54 MutexLock(g_hSessionListAccess);
55 for(DWORD i = 0; i < g_dwMaxSessions; i++)
56 if (g_pSessionList[i] != NULL)
57 if (g_pSessionList[i]->canAcceptTraps())
58 {
59 g_pSessionList[i]->sendMessage(&msg);
60 success = true;
61 }
62 MutexUnlock(g_hSessionListAccess);
63 }
64 return success;
65 }
66
67 /**
68 * Process push request
69 */
70 static void ProcessPushRequest(HPIPE hPipe)
71 {
72 TCHAR buffer[256];
73
74 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection established"));
75 PipeMessageReceiver receiver(hPipe, 8192, 1048576); // 8K initial, 1M max
76 while(true)
77 {
78 MessageReceiverResult result;
79 NXCPMessage *msg = receiver.readMessage(5000, &result);
80 if (msg == NULL)
81 break;
82 AgentWriteDebugLog(6, _T("ProcessPushRequest: received message %s"), NXCPMessageCodeName(msg->getCode(), buffer));
83 if (msg->getCode() == CMD_PUSH_DCI_DATA)
84 {
85 UINT32 objectId = msg->getFieldAsUInt32(VID_OBJECT_ID);
86 UINT32 count = msg->getFieldAsUInt32(VID_NUM_ITEMS);
87 time_t timestamp = msg->getFieldAsTime(VID_TIMESTAMP);
88 UINT32 varId = VID_PUSH_DCI_DATA_BASE;
89 for(DWORD i = 0; i < count; i++)
90 {
91 TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
92 msg->getFieldAsString(varId++, name, MAX_PARAM_NAME);
93 msg->getFieldAsString(varId++, value, MAX_RESULT_LENGTH);
94 PushData(name, value, objectId, timestamp);
95 }
96 }
97 else
98 {
99 }
100 delete msg;
101 }
102 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection closed"));
103 }
104
105 /**
106 * Connector thread for external push command
107 */
108 #ifdef _WIN32
109
110 static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
111 {
112 SECURITY_ATTRIBUTES sa;
113 PSECURITY_DESCRIPTOR sd = NULL;
114 SID_IDENTIFIER_AUTHORITY sidAuthWorld = SECURITY_WORLD_SID_AUTHORITY;
115 EXPLICIT_ACCESS ea;
116 PSID sidEveryone = NULL;
117 ACL *acl = NULL;
118 TCHAR errorText[1024];
119
120 // Create a well-known SID for the Everyone group.
121 if(!AllocateAndInitializeSid(&sidAuthWorld, 1, SECURITY_WORLD_RID, 0, 0, 0, 0, 0, 0, 0, &sidEveryone))
122 {
123 AgentWriteDebugLog(2, _T("PushConnector: AllocateAndInitializeSid failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
124 goto cleanup;
125 }
126
127 // Initialize an EXPLICIT_ACCESS structure for an ACE.
128 // The ACE will allow either Everyone or given user to access pipe
129 ZeroMemory(&ea, sizeof(EXPLICIT_ACCESS));
130 ea.grfAccessPermissions = (FILE_GENERIC_READ | FILE_GENERIC_WRITE) & ~FILE_CREATE_PIPE_INSTANCE;
131 ea.grfAccessMode = SET_ACCESS;
132 ea.grfInheritance = NO_INHERITANCE;
133 const TCHAR *user = g_config->getValue(_T("/%agent/PushUser"), _T("*"));
134 if ((user[0] == 0) || !_tcscmp(user, _T("*")))
135 {
136 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
137 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
138 ea.Trustee.ptstrName = (LPTSTR)sidEveryone;
139 }
140 else
141 {
142 ea.Trustee.TrusteeForm = TRUSTEE_IS_NAME;
143 ea.Trustee.TrusteeType = TRUSTEE_IS_USER;
144 ea.Trustee.ptstrName = (LPTSTR)user;
145 AgentWriteDebugLog(2, _T("PushConnector: will allow connections only for user %s"), user);
146 }
147
148 // Create a new ACL that contains the new ACEs.
149 if (SetEntriesInAcl(1, &ea, NULL, &acl) != ERROR_SUCCESS)
150 {
151 AgentWriteDebugLog(2, _T("PushConnector: SetEntriesInAcl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
152 goto cleanup;
153 }
154
155 sd = (PSECURITY_DESCRIPTOR)LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH);
156 if (sd == NULL)
157 {
158 AgentWriteDebugLog(2, _T("PushConnector: LocalAlloc failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
159 goto cleanup;
160 }
161
162 if (!InitializeSecurityDescriptor(sd, SECURITY_DESCRIPTOR_REVISION))
163 {
164 AgentWriteDebugLog(2, _T("PushConnector: InitializeSecurityDescriptor failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
165 goto cleanup;
166 }
167
168 // Add the ACL to the security descriptor.
169 if (!SetSecurityDescriptorDacl(sd, TRUE, acl, FALSE))
170 {
171 AgentWriteDebugLog(2, _T("PushConnector: SetSecurityDescriptorDacl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
172 goto cleanup;
173 }
174
175 sa.nLength = sizeof(SECURITY_ATTRIBUTES);
176 sa.bInheritHandle = FALSE;
177 sa.lpSecurityDescriptor = sd;
178 HANDLE hPipe = CreateNamedPipe(_T("\\\\.\\pipe\\nxagentd.push"), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, 1, 8192, 8192, 0, &sa);
179 if (hPipe == INVALID_HANDLE_VALUE)
180 {
181 AgentWriteDebugLog(2, _T("PushConnector: CreateNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
182 goto cleanup;
183 }
184
185 AgentWriteDebugLog(2, _T("PushConnector: named pipe created, waiting for connection"));
186 int connectErrors = 0;
187 while(!(g_dwFlags & AF_SHUTDOWN))
188 {
189 BOOL connected = ConnectNamedPipe(hPipe, NULL);
190 if (connected || (GetLastError() == ERROR_PIPE_CONNECTED))
191 {
192 ProcessPushRequest(hPipe);
193 DisconnectNamedPipe(hPipe);
194 connectErrors = 0;
195 }
196 else
197 {
198 AgentWriteDebugLog(2, _T("PushConnector: ConnectNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
199 connectErrors++;
200 if (connectErrors > 10)
201 break; // Stop this connector if ConnectNamedPipe fails instantly
202 }
203 }
204
205 cleanup:
206 if (hPipe != NULL)
207 CloseHandle(hPipe);
208
209 if (sd != NULL)
210 LocalFree(sd);
211
212 if (acl != NULL)
213 LocalFree(acl);
214
215 if (sidEveryone != NULL)
216 FreeSid(sidEveryone);
217
218 AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
219 return THREAD_OK;
220 }
221
222 #else
223
224 static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
225 {
226 mode_t prevMask = 0;
227
228 SOCKET hPipe = socket(AF_UNIX, SOCK_STREAM, 0);
229 if (hPipe == INVALID_SOCKET)
230 {
231 AgentWriteDebugLog(2, _T("PushConnector: socket failed (%s)"), _tcserror(errno));
232 goto cleanup;
233 }
234
235 struct sockaddr_un addrLocal;
236 addrLocal.sun_family = AF_UNIX;
237 strcpy(addrLocal.sun_path, "/tmp/.nxagentd.push");
238 unlink(addrLocal.sun_path);
239 prevMask = umask(S_IWGRP | S_IWOTH);
240 if (bind(hPipe, (struct sockaddr *)&addrLocal, SUN_LEN(&addrLocal)) == -1)
241 {
242 AgentWriteDebugLog(2, _T("PushConnector: bind failed (%s)"), _tcserror(errno));
243 umask(prevMask);
244 goto cleanup;
245 }
246 umask(prevMask);
247
248 if (listen(hPipe, 5) == -1)
249 {
250 AgentWriteDebugLog(2, _T("PushConnector: listen failed (%s)"), _tcserror(errno));
251 goto cleanup;
252 }
253
254 while(!(g_dwFlags & AF_SHUTDOWN))
255 {
256 struct sockaddr_un addrRemote;
257 socklen_t size = sizeof(struct sockaddr_un);
258 SOCKET cs = accept(hPipe, (struct sockaddr *)&addrRemote, &size);
259 if (cs > 0)
260 {
261 ProcessPushRequest(cs);
262 shutdown(cs, 2);
263 close(cs);
264 }
265 else
266 {
267 AgentWriteDebugLog(2, _T("PushConnector: accept failed (%s)"), _tcserror(errno));
268 }
269 }
270
271 cleanup:
272 if (hPipe != -1)
273 close(hPipe);
274
275 AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
276 return THREAD_OK;
277 }
278
279 #endif
280
281 /**
282 * Start push connector
283 */
284
285 void StartPushConnector()
286 {
287 ThreadCreate(PushConnector, 0, NULL);
288 }