2 ** NetXMS multiplatform core agent
3 ** Copyright (C) 2003-2017 Victor Kirhenshtein
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
10 ** This program is distributed in the hope that it will be usefu,,
11 ** but ITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 static UINT64 s_requestIdHigh
= (UINT64
)time(NULL
) << 32;
29 static VolatileCounter s_requestIdLow
= 0;
32 * Push parameter's data
34 bool PushData(const TCHAR
*parameter
, const TCHAR
*value
, UINT32 objectId
, time_t timestamp
)
39 AgentWriteDebugLog(6, _T("PushData: \"%s\" = \"%s\""), parameter
, value
);
41 msg
.setCode(CMD_PUSH_DCI_DATA
);
42 msg
.setField(VID_NAME
, parameter
);
43 msg
.setField(VID_VALUE
, value
);
44 msg
.setField(VID_OBJECT_ID
, objectId
);
45 msg
.setFieldFromTime(VID_TIMESTAMP
, timestamp
);
46 msg
.setField(VID_REQUEST_ID
, s_requestIdHigh
| (UINT64
)InterlockedIncrement(&s_requestIdLow
));
48 if (g_dwFlags
& AF_SUBAGENT_LOADER
)
50 success
= SendMessageToMasterAgent(&msg
);
54 MutexLock(g_hSessionListAccess
);
55 for(DWORD i
= 0; i
< g_dwMaxSessions
; i
++)
56 if (g_pSessionList
[i
] != NULL
)
57 if (g_pSessionList
[i
]->canAcceptTraps())
59 g_pSessionList
[i
]->sendMessage(&msg
);
62 MutexUnlock(g_hSessionListAccess
);
68 * Process push request
70 static void ProcessPushRequest(NamedPipe
*pipe
, void *arg
)
74 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection established"));
75 PipeMessageReceiver
receiver(pipe
->handle(), 8192, 1048576); // 8K initial, 1M max
78 MessageReceiverResult result
;
79 NXCPMessage
*msg
= receiver
.readMessage(5000, &result
);
82 AgentWriteDebugLog(6, _T("ProcessPushRequest: received message %s"), NXCPMessageCodeName(msg
->getCode(), buffer
));
83 if (msg
->getCode() == CMD_PUSH_DCI_DATA
)
85 UINT32 objectId
= msg
->getFieldAsUInt32(VID_OBJECT_ID
);
86 UINT32 count
= msg
->getFieldAsUInt32(VID_NUM_ITEMS
);
87 time_t timestamp
= msg
->getFieldAsTime(VID_TIMESTAMP
);
88 UINT32 varId
= VID_PUSH_DCI_DATA_BASE
;
89 for(DWORD i
= 0; i
< count
; i
++)
91 TCHAR name
[MAX_PARAM_NAME
], value
[MAX_RESULT_LENGTH
];
92 msg
->getFieldAsString(varId
++, name
, MAX_PARAM_NAME
);
93 msg
->getFieldAsString(varId
++, value
, MAX_RESULT_LENGTH
);
94 PushData(name
, value
, objectId
, timestamp
);
99 AgentWriteDebugLog(5, _T("ProcessPushRequest: connection closed"));
103 * Pipe listener for push requests
105 static NamedPipeListener
*s_listener
;
108 * Start push connector
110 void StartPushConnector()
112 s_listener
= NamedPipeListener
::create(_T("nxagentd.push"), ProcessPushRequest
, NULL
);
113 if (s_listener
!= NULL
)