all pollers converted to single thread pool
[public/netxms.git] / include / nxcpapi.h
1 /*
2 ** NetXMS - Network Management System
3 ** NXCP API
4 ** Copyright (C) 2003-2010 Victor Kirhenshtein
5 **
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
10 **
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 **
20 ** File: nxcpapi.h
21 **
22 **/
23
24 #ifndef _nxcpapi_h_
25 #define _nxcpapi_h_
26
27 #include <nms_util.h>
28 #include <nms_threads.h>
29
30 #ifdef _WIN32
31 #include <wincrypt.h>
32 #endif
33
34 /**
35 * Temporary buffer structure for RecvNXCPMessage() function
36 */
37 typedef struct
38 {
39 UINT32 bufferSize;
40 UINT32 bufferPos;
41 char buffer[NXCP_TEMP_BUF_SIZE];
42 } NXCP_BUFFER;
43
44
45 #ifdef __cplusplus
46
47 struct MessageField;
48
49 /**
50 * Parsed NXCP message
51 */
52 class LIBNETXMS_EXPORTABLE NXCPMessage
53 {
54 private:
55 UINT16 m_code;
56 UINT16 m_flags;
57 UINT32 m_id;
58 MessageField *m_fields; // Message fields
59 int m_version; // Protocol version
60 BYTE *m_data; // binary data
61 size_t m_dataSize; // binary data size
62
63 void *set(UINT32 fieldId, BYTE type, const void *value, bool isSigned = false, size_t size = 0);
64 void *get(UINT32 fieldId, BYTE requiredType, BYTE *fieldType = NULL);
65 NXCP_MESSAGE_FIELD *find(UINT32 fieldId);
66
67 public:
68 NXCPMessage(int version = NXCP_VERSION);
69 NXCPMessage(NXCPMessage *msg);
70 NXCPMessage(NXCP_MESSAGE *rawMag, int version = NXCP_VERSION);
71 ~NXCPMessage();
72
73 NXCP_MESSAGE *createMessage();
74
75 UINT16 getCode() { return m_code; }
76 void setCode(UINT16 code) { m_code = code; }
77
78 UINT32 getId() { return m_id; }
79 void setId(UINT32 id) { m_id = id; }
80
81 bool isEndOfFile() { return (m_flags & MF_END_OF_FILE) ? true : false; }
82 bool isEndOfSequence() { return (m_flags & MF_END_OF_SEQUENCE) ? true : false; }
83 bool isReverseOrder() { return (m_flags & MF_REVERSE_ORDER) ? true : false; }
84 bool isBinary() { return (m_flags & MF_BINARY) ? true : false; }
85
86 BYTE *getBinaryData() { return m_data; }
87 size_t getBinaryDataSize() { return m_dataSize; }
88
89 bool isFieldExist(UINT32 fieldId) { return find(fieldId) != NULL; }
90 int getFieldType(UINT32 fieldId);
91
92 void setField(UINT32 fieldId, INT16 value) { set(fieldId, NXCP_DT_INT16, &value, true); }
93 void setField(UINT32 fieldId, UINT16 value) { set(fieldId, NXCP_DT_INT16, &value, false); }
94 void setField(UINT32 fieldId, INT32 value) { set(fieldId, NXCP_DT_INT32, &value, true); }
95 void setField(UINT32 fieldId, UINT32 value) { set(fieldId, NXCP_DT_INT32, &value, false); }
96 void setField(UINT32 fieldId, INT64 value) { set(fieldId, NXCP_DT_INT64, &value, true); }
97 void setField(UINT32 fieldId, UINT64 value) { set(fieldId, NXCP_DT_INT64, &value, false); }
98 void setField(UINT32 fieldId, double value) { set(fieldId, NXCP_DT_FLOAT, &value); }
99 void setField(UINT32 fieldId, const TCHAR *value) { if (value != NULL) set(fieldId, NXCP_DT_STRING, value); }
100 void setField(UINT32 fieldId, const TCHAR *value, size_t maxLen) { if (value != NULL) set(fieldId, NXCP_DT_STRING, value, false, maxLen); }
101 void setField(UINT32 fieldId, BYTE *value, size_t size) { set(fieldId, NXCP_DT_BINARY, value, false, size); }
102 void setField(UINT32 fieldId, const InetAddress &value) { set(fieldId, NXCP_DT_INETADDR, (void *)&value); }
103 #ifdef UNICODE
104 void setFieldFromMBString(UINT32 fieldId, const char *value);
105 #else
106 void setFieldFromMBString(UINT32 fieldId, const char *value) { set(fieldId, NXCP_DT_STRING, value); }
107 #endif
108 void setFieldFromTime(UINT32 fieldId, time_t value) { UINT64 t = (UINT64)value; set(fieldId, NXCP_DT_INT64, &t); }
109 void setFieldFromInt32Array(UINT32 fieldId, size_t numElements, const UINT32 *elements);
110 void setFieldFromInt32Array(UINT32 fieldId, IntegerArray<UINT32> *data);
111 bool setFieldFromFile(UINT32 fieldId, const TCHAR *pszFileName);
112
113 INT16 getFieldAsInt16(UINT32 fieldId);
114 UINT16 getFieldAsUInt16(UINT32 fieldId);
115 INT32 getFieldAsInt32(UINT32 fieldId);
116 UINT32 getFieldAsUInt32(UINT32 fieldId);
117 INT64 getFieldAsInt64(UINT32 fieldId);
118 UINT64 getFieldAsUInt64(UINT32 fieldId);
119 double getFieldAsDouble(UINT32 fieldId);
120 bool getFieldAsBoolean(UINT32 fieldId);
121 time_t getFieldAsTime(UINT32 fieldId);
122 UINT32 getFieldAsInt32Array(UINT32 fieldId, UINT32 numElements, UINT32 *buffer);
123 UINT32 getFieldAsInt32Array(UINT32 fieldId, IntegerArray<UINT32> *data);
124 BYTE *getBinaryFieldPtr(UINT32 fieldId, size_t *size);
125 TCHAR *getFieldAsString(UINT32 fieldId, TCHAR *buffer = NULL, size_t bufferSize = 0);
126 char *getFieldAsMBString(UINT32 fieldId, char *buffer = NULL, size_t bufferSize = 0);
127 char *getFieldAsUtf8String(UINT32 fieldId, char *buffer = NULL, size_t bufferSize = 0);
128 UINT32 getFieldAsBinary(UINT32 fieldId, BYTE *buffer, size_t bufferSize);
129 InetAddress getFieldAsInetAddress(UINT32 fieldId);
130
131 void deleteAllFields();
132
133 void disableEncryption() { m_flags |= MF_DONT_ENCRYPT; }
134 void setEndOfSequence() { m_flags |= MF_END_OF_SEQUENCE; }
135 void setReverseOrderFlag() { m_flags |= MF_REVERSE_ORDER; }
136
137 static String dump(NXCP_MESSAGE *msg, int version);
138 };
139
140 /**
141 * Message waiting queue element structure
142 */
143 typedef struct
144 {
145 void *msg; // Pointer to message, either to NXCPMessage object or raw message
146 UINT64 sequence; // Sequence number
147 UINT32 id; // Message ID
148 UINT32 ttl; // Message time-to-live in milliseconds
149 UINT16 code; // Message code
150 UINT16 isBinary; // 1 for binary (raw) messages
151 } WAIT_QUEUE_ELEMENT;
152
153 /**
154 * Max number of waiting threads in message queue
155 */
156 #define MAX_MSGQUEUE_WAITERS 32
157
158 /**
159 * Message waiting queue class
160 */
161 class LIBNETXMS_EXPORTABLE MsgWaitQueue
162 {
163 private:
164 #ifdef _WIN32
165 CRITICAL_SECTION m_mutex;
166 HANDLE m_wakeupEvents[MAX_MSGQUEUE_WAITERS];
167 BYTE m_waiters[MAX_MSGQUEUE_WAITERS];
168 #else
169 pthread_mutex_t m_mutex;
170 pthread_cond_t m_wakeupCondition;
171 #endif
172 UINT32 m_holdTime;
173 int m_size;
174 int m_allocated;
175 WAIT_QUEUE_ELEMENT *m_elements;
176 UINT64 m_sequence;
177
178 void *waitForMessageInternal(UINT16 isBinary, UINT16 code, UINT32 id, UINT32 timeout);
179
180 void lock()
181 {
182 #ifdef _WIN32
183 EnterCriticalSection(&m_mutex);
184 #else
185 pthread_mutex_lock(&m_mutex);
186 #endif
187 }
188
189 void unlock()
190 {
191 #ifdef _WIN32
192 LeaveCriticalSection(&m_mutex);
193 #else
194 pthread_mutex_unlock(&m_mutex);
195 #endif
196 }
197
198 void housekeeperRun();
199
200 static MUTEX m_housekeeperLock;
201 static HashMap<UINT64, MsgWaitQueue> *m_activeQueues;
202 static CONDITION m_shutdownCondition;
203 static THREAD m_housekeeperThread;
204 static EnumerationCallbackResult houseKeeperCallback(const void *key, const void *object, void *arg);
205 static THREAD_RESULT THREAD_CALL housekeeperThread(void *);
206 static EnumerationCallbackResult diagInfoCallback(const void *key, const void *object, void *arg);
207
208 public:
209 MsgWaitQueue();
210 ~MsgWaitQueue();
211
212 void put(NXCPMessage *pMsg);
213 void put(NXCP_MESSAGE *pMsg);
214 NXCPMessage *waitForMessage(WORD wCode, UINT32 dwId, UINT32 dwTimeOut)
215 {
216 return (NXCPMessage *)waitForMessageInternal(0, wCode, dwId, dwTimeOut);
217 }
218 NXCP_MESSAGE *waitForRawMessage(WORD wCode, UINT32 dwId, UINT32 dwTimeOut)
219 {
220 return (NXCP_MESSAGE *)waitForMessageInternal(1, wCode, dwId, dwTimeOut);
221 }
222
223 void clear();
224 void setHoldTime(UINT32 holdTime) { m_holdTime = holdTime; }
225
226 static void shutdown();
227 static String getDiagInfo();
228 };
229
230 /**
231 * NXCP encryption context
232 */
233 class LIBNETXMS_EXPORTABLE NXCPEncryptionContext : public RefCountObject
234 {
235 private:
236 int m_cipher;
237 BYTE *m_sessionKey;
238 int m_keyLength;
239 BYTE m_iv[EVP_MAX_IV_LENGTH];
240 #ifdef _WITH_ENCRYPTION
241 EVP_CIPHER_CTX m_encryptor;
242 EVP_CIPHER_CTX m_decryptor;
243 MUTEX m_encryptorLock;
244 #endif
245
246 NXCPEncryptionContext();
247 bool initCipher(int cipher);
248
249 public:
250 static NXCPEncryptionContext *create(NXCPMessage *msg, RSA *privateKey);
251 static NXCPEncryptionContext *create(UINT32 ciphers);
252
253 virtual ~NXCPEncryptionContext();
254
255 NXCP_ENCRYPTED_MESSAGE *encryptMessage(NXCP_MESSAGE *msg);
256 bool decryptMessage(NXCP_ENCRYPTED_MESSAGE *msg, BYTE *decryptionBuffer);
257
258 int getCipher() { return m_cipher; }
259 BYTE *getSessionKey() { return m_sessionKey; }
260 int getKeyLength() { return m_keyLength; }
261 BYTE *getIV() { return m_iv; }
262 };
263
264 /**
265 * Message receiver result codes
266 */
267 enum MessageReceiverResult
268 {
269 MSGRECV_SUCCESS = 0,
270 MSGRECV_CLOSED = 1,
271 MSGRECV_TIMEOUT = 2,
272 MSGRECV_COMM_FAILURE = 3,
273 MSGRECV_DECRYPTION_FAILURE = 4
274 };
275
276 /**
277 * Message receiver - abstract base class
278 */
279 class LIBNETXMS_EXPORTABLE AbstractMessageReceiver
280 {
281 private:
282 BYTE *m_buffer;
283 BYTE *m_decryptionBuffer;
284 NXCPEncryptionContext *m_encryptionContext;
285 size_t m_initialSize;
286 size_t m_size;
287 size_t m_maxSize;
288 size_t m_dataSize;
289 size_t m_bytesToSkip;
290
291 NXCPMessage *getMessageFromBuffer();
292
293 protected:
294 virtual int readBytes(BYTE *buffer, size_t size, UINT32 timeout) = 0;
295
296 public:
297 AbstractMessageReceiver(size_t initialSize, size_t maxSize);
298 virtual ~AbstractMessageReceiver();
299
300 void setEncryptionContext(NXCPEncryptionContext *ctx) { m_encryptionContext = ctx; }
301
302 NXCPMessage *readMessage(UINT32 timeout, MessageReceiverResult *result);
303 NXCP_MESSAGE *getRawMessageBuffer() { return (NXCP_MESSAGE *)m_buffer; }
304
305 static const TCHAR *resultToText(MessageReceiverResult result);
306 };
307
308 /**
309 * Message receiver - socket implementation
310 */
311 class LIBNETXMS_EXPORTABLE SocketMessageReceiver : public AbstractMessageReceiver
312 {
313 private:
314 SOCKET m_socket;
315
316 protected:
317 virtual int readBytes(BYTE *buffer, size_t size, UINT32 timeout);
318
319 public:
320 SocketMessageReceiver(SOCKET socket, size_t initialSize, size_t maxSize);
321 virtual ~SocketMessageReceiver();
322 };
323
324 /**
325 * Message receiver - UNIX socket/named pipe implementation
326 */
327 class LIBNETXMS_EXPORTABLE PipeMessageReceiver : public AbstractMessageReceiver
328 {
329 private:
330 HPIPE m_pipe;
331 #ifdef _WIN32
332 HANDLE m_readEvent;
333 #endif
334
335 protected:
336 virtual int readBytes(BYTE *buffer, size_t size, UINT32 timeout);
337
338 public:
339 PipeMessageReceiver(HPIPE pipe, size_t initialSize, size_t maxSize);
340 virtual ~PipeMessageReceiver();
341 };
342
343 /**
344 * NXCP compression methods
345 */
346 enum NXCPCompressionMethod
347 {
348 NXCP_COMPRESSION_NONE = 0,
349 NXCP_COMPRESSION_LZ4 = 1
350 };
351
352 /**
353 * Abstract stream compressor
354 */
355 class LIBNETXMS_EXPORTABLE StreamCompressor
356 {
357 public:
358 virtual size_t compress(const BYTE *in, size_t inSize, BYTE *out, size_t maxOutSize) = 0;
359 virtual size_t decompress(const BYTE *in, size_t inSize, const BYTE **out) = 0;
360 virtual size_t compressBufferSize(size_t dataSize) = 0;
361
362 static StreamCompressor *create(NXCPCompressionMethod method, bool compress, size_t maxBlockSize);
363 };
364
365 /**
366 * Dummy stream compressor
367 */
368 class LIBNETXMS_EXPORTABLE DummyStreamCompressor : public StreamCompressor
369 {
370 public:
371 virtual size_t compress(const BYTE *in, size_t inSize, BYTE *out, size_t maxOutSize);
372 virtual size_t decompress(const BYTE *in, size_t inSize, const BYTE **out);
373 virtual size_t compressBufferSize(size_t dataSize);
374 };
375
376 struct __LZ4_stream_t;
377 struct __LZ4_streamDecode_t;
378
379 /**
380 * LZ4 stream compressor
381 */
382 class LIBNETXMS_EXPORTABLE LZ4StreamCompressor : public StreamCompressor
383 {
384 private:
385 union
386 {
387 __LZ4_stream_t *encoder;
388 __LZ4_streamDecode_t *decoder;
389 } m_stream;
390 char *m_buffer;
391 size_t m_maxBlockSize;
392 size_t m_bufferSize;
393 size_t m_bufferPos;
394 bool m_compress;
395
396 public:
397 LZ4StreamCompressor(bool compress, size_t maxBlockSize);
398 virtual ~LZ4StreamCompressor();
399
400 virtual size_t compress(const BYTE *in, size_t inSize, BYTE *out, size_t maxOutSize);
401 virtual size_t decompress(const BYTE *in, size_t inSize, const BYTE **out);
402 virtual size_t compressBufferSize(size_t dataSize);
403 };
404
405 #if 0
406 /**
407 * NXCP message consumer interface
408 */
409 class LIBNETXMS_EXPORTABLE MessageConsumer
410 {
411 public:
412 virtual SOCKET getSocket() = 0;
413 virtual void processMessage(NXCPMessage *msg) = 0;
414 };
415
416 /**
417 * Socket receiver - manages receiving NXCP messages from multiple sockets
418 */
419 class LIBNETXMS_EXPORTABLE SocketReceiver
420 {
421 private:
422 THREAD m_thread;
423 HashMap<SOCKET, MessageConsumer> *m_consumers;
424
425 static int m_maxSocketsPerThread;
426 static ObjectArray<SocketReceiver> *m_receivers;
427
428 public:
429 static void start();
430 static void shutdown();
431
432 static void addConsumer(MessageConsumer *mc);
433 static void removeConsumer(MessageConsumer *mc);
434
435 static String getDiagInfo();
436 };
437 #endif
438
439 #else /* __cplusplus */
440
441 typedef void NXCPMessage;
442 typedef void NXCPEncryptionContext;
443
444 #endif
445
446
447 //
448 // Functions
449 //
450
451 #ifdef __cplusplus
452
453 int LIBNETXMS_EXPORTABLE RecvNXCPMessage(SOCKET hSocket, NXCP_MESSAGE *pMsg,
454 NXCP_BUFFER *pBuffer, UINT32 dwMaxMsgSize,
455 NXCPEncryptionContext **ppCtx,
456 BYTE *pDecryptionBuffer, UINT32 dwTimeout);
457 int LIBNETXMS_EXPORTABLE RecvNXCPMessageEx(SOCKET hSocket, NXCP_MESSAGE **msgBuffer,
458 NXCP_BUFFER *nxcpBuffer, UINT32 *bufferSize,
459 NXCPEncryptionContext **ppCtx,
460 BYTE **decryptionBuffer, UINT32 dwTimeout,
461 UINT32 maxMsgSize);
462 NXCP_MESSAGE LIBNETXMS_EXPORTABLE *CreateRawNXCPMessage(WORD wCode, UINT32 dwId, WORD flags,
463 UINT32 dwDataSize, void *pData,
464 NXCP_MESSAGE *pBuffer);
465 TCHAR LIBNETXMS_EXPORTABLE *NXCPMessageCodeName(WORD wCode, TCHAR *buffer);
466 BOOL LIBNETXMS_EXPORTABLE SendFileOverNXCP(SOCKET hSocket, UINT32 dwId, const TCHAR *pszFile,
467 NXCPEncryptionContext *pCtx, long offset,
468 void (* progressCallback)(INT64, void *), void *cbArg,
469 MUTEX mutex, NXCPCompressionMethod compressionMethod = NXCP_COMPRESSION_NONE);
470 BOOL LIBNETXMS_EXPORTABLE NXCPGetPeerProtocolVersion(SOCKET hSocket, int *pnVersion, MUTEX mutex);
471
472 bool LIBNETXMS_EXPORTABLE InitCryptoLib(UINT32 dwEnabledCiphers, void (*debugCallback)(int, const TCHAR *, va_list args));
473 UINT32 LIBNETXMS_EXPORTABLE NXCPGetSupportedCiphers();
474 String LIBNETXMS_EXPORTABLE NXCPGetSupportedCiphersAsText();
475 NXCP_ENCRYPTED_MESSAGE LIBNETXMS_EXPORTABLE *NXCPEncryptMessage(NXCPEncryptionContext *pCtx, NXCP_MESSAGE *pMsg);
476 bool LIBNETXMS_EXPORTABLE NXCPDecryptMessage(NXCPEncryptionContext *pCtx,
477 NXCP_ENCRYPTED_MESSAGE *pMsg,
478 BYTE *pDecryptionBuffer);
479 UINT32 LIBNETXMS_EXPORTABLE SetupEncryptionContext(NXCPMessage *pMsg,
480 NXCPEncryptionContext **ppCtx,
481 NXCPMessage **ppResponse,
482 RSA *pPrivateKey, int nNXCPVersion);
483 void LIBNETXMS_EXPORTABLE PrepareKeyRequestMsg(NXCPMessage *pMsg, RSA *pServerKey, bool useX509Format);
484 RSA LIBNETXMS_EXPORTABLE *LoadRSAKeys(const TCHAR *pszKeyFile);
485
486 #ifdef _WIN32
487 BOOL LIBNETXMS_EXPORTABLE SignMessageWithCAPI(BYTE *pMsg, UINT32 dwMsgLen, const CERT_CONTEXT *pCert,
488 BYTE *pBuffer, size_t bufferSize, UINT32 *pdwSigLen);
489 #endif
490
491 #endif
492
493 #endif /* _nxcpapi_h_ */