nxapush can push data on behalf of other nodes
[public/netxms.git] / src / agent / core / push.cpp
CommitLineData
4cd1e46b
AK
1/*
2** NetXMS multiplatform core agent
3** Copyright (C) 2003-2013 Victor Kirhenshtein
4**
5** This program is free software; you can redistribute it and/or modify
6** it under the terms of the GNU General Public License as published by
7** the Free Software Foundation; either version 2 of the License, or
8** (at your option) any later version.
9**
10** This program is distributed in the hope that it will be usefu,,
11** but ITHOUT ANY WARRANTY; without even the implied warranty of
12** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13** GNU General Public License for more details.
14**
15** You should have received a copy of the GNU General Public License
16** along with this program; if not, write to the Free Software
17** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18**
19** File: push.cpp
20**
21**/
22
23#include "nxagentd.h"
24
25/**
26 * Push parameter's data
27 */
4e46505f 28bool PushData(const TCHAR *parameter, const TCHAR *value, UINT32 objectId)
4cd1e46b
AK
29{
30 CSCPMessage msg;
31 bool success = false;
32
33 AgentWriteDebugLog(6, _T("PushData: \"%s\" = \"%s\""), parameter, value);
34
35 msg.SetCode(CMD_PUSH_DCI_DATA);
36 msg.SetVariable(VID_NAME, parameter);
37 msg.SetVariable(VID_VALUE, value);
4e46505f 38 msg.SetVariable(VID_OBJECT_ID, objectId);
4cd1e46b
AK
39
40 if (g_dwFlags & AF_SUBAGENT_LOADER)
41 {
42 success = SendMessageToMasterAgent(&msg);
43 }
44 else
45 {
46 MutexLock(g_hSessionListAccess);
47 for(DWORD i = 0; i < g_dwMaxSessions; i++)
48 if (g_pSessionList[i] != NULL)
49 if (g_pSessionList[i]->canAcceptTraps())
50 {
51 g_pSessionList[i]->sendMessage(&msg);
52 success = true;
53 }
54 MutexUnlock(g_hSessionListAccess);
55 }
56 return success;
57}
58
59/**
60 * Process push request
61 */
62static void ProcessPushRequest(HPIPE hPipe)
63{
64 TCHAR buffer[256];
65
66 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection established"));
67 while(true)
68 {
69 CSCPMessage *msg = ReadMessageFromPipe(hPipe, NULL);
70 if (msg == NULL)
71 break;
72 AgentWriteDebugLog(6, _T("ProcessPushRequest: received message %s"), NXCPMessageCodeName(msg->GetCode(), buffer));
73 if (msg->GetCode() == CMD_PUSH_DCI_DATA)
74 {
4e46505f
VK
75 UINT32 objectId = msg->GetVariableLong(VID_OBJECT_ID);
76 UINT32 count = msg->GetVariableLong(VID_NUM_ITEMS);
77 UINT32 varId = VID_PUSH_DCI_DATA_BASE;
4cd1e46b
AK
78 for(DWORD i = 0; i < count; i++)
79 {
80 TCHAR name[MAX_PARAM_NAME], value[MAX_RESULT_LENGTH];
81 msg->GetVariableStr(varId++, name, MAX_PARAM_NAME);
82 msg->GetVariableStr(varId++, value, MAX_RESULT_LENGTH);
4e46505f 83 PushData(name, value, objectId);
4cd1e46b
AK
84 }
85 }
86 else
87 {
88 }
89 delete msg;
90 }
91 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection closed"));
92}
93
94/**
95 * Connector thread for external push command
96 */
97#ifdef _WIN32
98
99static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
100{
101 SECURITY_ATTRIBUTES sa;
102 PSECURITY_DESCRIPTOR sd = NULL;
103 SID_IDENTIFIER_AUTHORITY sidAuthWorld = SECURITY_WORLD_SID_AUTHORITY;
104 EXPLICIT_ACCESS ea;
105 PSID sidEveryone = NULL;
106 ACL *acl = NULL;
107 TCHAR errorText[1024];
108
109 // Create a well-known SID for the Everyone group.
110 if(!AllocateAndInitializeSid(&sidAuthWorld, 1, SECURITY_WORLD_RID, 0, 0, 0, 0, 0, 0, 0, &sidEveryone))
111 {
112 AgentWriteDebugLog(2, _T("PushConnector: AllocateAndInitializeSid failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
113 goto cleanup;
114 }
115
116 // Initialize an EXPLICIT_ACCESS structure for an ACE.
117 // The ACE will allow either Everyone or given user to access pipe
118 ZeroMemory(&ea, sizeof(EXPLICIT_ACCESS));
119 ea.grfAccessPermissions = (FILE_GENERIC_READ | FILE_GENERIC_WRITE) & ~FILE_CREATE_PIPE_INSTANCE;
120 ea.grfAccessMode = SET_ACCESS;
121 ea.grfInheritance = NO_INHERITANCE;
122 const TCHAR *user = g_config->getValue(_T("/agent/PushUser"), _T("*"));
123 if ((user[0] == 0) || !_tcscmp(user, _T("*")))
124 {
125 ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
126 ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
127 ea.Trustee.ptstrName = (LPTSTR)sidEveryone;
128 }
129 else
130 {
131 ea.Trustee.TrusteeForm = TRUSTEE_IS_NAME;
132 ea.Trustee.TrusteeType = TRUSTEE_IS_USER;
133 ea.Trustee.ptstrName = (LPTSTR)user;
134 AgentWriteDebugLog(2, _T("PushConnector: will allow connections only for user %s"), user);
135 }
136
137 // Create a new ACL that contains the new ACEs.
138 if (SetEntriesInAcl(1, &ea, NULL, &acl) != ERROR_SUCCESS)
139 {
140 AgentWriteDebugLog(2, _T("PushConnector: SetEntriesInAcl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
141 goto cleanup;
142 }
143
144 sd = (PSECURITY_DESCRIPTOR)LocalAlloc(LPTR, SECURITY_DESCRIPTOR_MIN_LENGTH);
145 if (sd == NULL)
146 {
147 AgentWriteDebugLog(2, _T("PushConnector: LocalAlloc failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
148 goto cleanup;
149 }
150
151 if (!InitializeSecurityDescriptor(sd, SECURITY_DESCRIPTOR_REVISION))
152 {
153 AgentWriteDebugLog(2, _T("PushConnector: InitializeSecurityDescriptor failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
154 goto cleanup;
155 }
156
157 // Add the ACL to the security descriptor.
158 if (!SetSecurityDescriptorDacl(sd, TRUE, acl, FALSE))
159 {
160 AgentWriteDebugLog(2, _T("PushConnector: SetSecurityDescriptorDacl failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
161 goto cleanup;
162 }
163
164 sa.nLength = sizeof(SECURITY_ATTRIBUTES);
165 sa.bInheritHandle = FALSE;
166 sa.lpSecurityDescriptor = sd;
167 HANDLE hPipe = CreateNamedPipe(_T("\\\\.\\pipe\\nxagentd.push"), PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE, 1, 8192, 8192, 0, &sa);
168 if (hPipe == INVALID_HANDLE_VALUE)
169 {
170 AgentWriteDebugLog(2, _T("PushConnector: CreateNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
171 goto cleanup;
172 }
173
174 AgentWriteDebugLog(2, _T("PushConnector: named pipe created, waiting for connection"));
175 int connectErrors = 0;
176 while(!(g_dwFlags & AF_SHUTDOWN))
177 {
178 BOOL connected = ConnectNamedPipe(hPipe, NULL);
179 if (connected || (GetLastError() == ERROR_PIPE_CONNECTED))
180 {
181 ProcessPushRequest(hPipe);
182 DisconnectNamedPipe(hPipe);
183 connectErrors = 0;
184 }
185 else
186 {
187 AgentWriteDebugLog(2, _T("PushConnector: ConnectNamedPipe failed (%s)"), GetSystemErrorText(GetLastError(), errorText, 1024));
188 connectErrors++;
189 if (connectErrors > 10)
190 break; // Stop this connector if ConnectNamedPipe fails instantly
191 }
192 }
193
194cleanup:
195 if (hPipe != NULL)
196 CloseHandle(hPipe);
197
198 if (sd != NULL)
199 LocalFree(sd);
200
201 if (acl != NULL)
202 LocalFree(acl);
203
204 if (sidEveryone != NULL)
205 FreeSid(sidEveryone);
206
207 AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
208 return THREAD_OK;
209}
210
211#else
212
213static THREAD_RESULT THREAD_CALL PushConnector(void *arg)
214{
215 mode_t prevMask = 0;
216
217 SOCKET hPipe = socket(AF_UNIX, SOCK_STREAM, 0);
218 if (hPipe == INVALID_SOCKET)
219 {
220 AgentWriteDebugLog(2, _T("PushConnector: socket failed (%s)"), _tcserror(errno));
221 goto cleanup;
222 }
223
224 struct sockaddr_un addrLocal;
225 addrLocal.sun_family = AF_UNIX;
226 strcpy(addrLocal.sun_path, "/tmp/.nxagentd.push");
227 unlink(addrLocal.sun_path);
228 prevMask = umask(S_IWGRP | S_IWOTH);
229 if (bind(hPipe, (struct sockaddr *)&addrLocal, SUN_LEN(&addrLocal)) == -1)
230 {
231 AgentWriteDebugLog(2, _T("PushConnector: bind failed (%s)"), _tcserror(errno));
232 umask(prevMask);
233 goto cleanup;
234 }
235 umask(prevMask);
236
237 if (listen(hPipe, 5) == -1)
238 {
239 AgentWriteDebugLog(2, _T("PushConnector: listen failed (%s)"), _tcserror(errno));
240 goto cleanup;
241 }
242
243 while(!(g_dwFlags & AF_SHUTDOWN))
244 {
245 struct sockaddr_un addrRemote;
246 socklen_t size = sizeof(struct sockaddr_un);
247 SOCKET cs = accept(hPipe, (struct sockaddr *)&addrRemote, &size);
248 if (cs > 0)
249 {
250 ProcessPushRequest(cs);
251 shutdown(cs, 2);
252 close(cs);
253 }
254 else
255 {
256 AgentWriteDebugLog(2, _T("PushConnector: accept failed (%s)"), _tcserror(errno));
257 }
258 }
259
260cleanup:
261 if (hPipe != -1)
262 close(hPipe);
263
264 AgentWriteDebugLog(2, _T("PushConnector: listener thread stopped"));
265 return THREAD_OK;
266}
267
268#endif
269
270/**
271 * Start push connector
272 */
273
274void StartPushConnector()
275{
276 ThreadCreate(PushConnector, 0, NULL);
277}