2 ** NetXMS - Network Management System
4 ** Copyright (C) 2003-2010 Victor Kirhenshtein
6 ** This program is free software; you can redistribute it and/or modify
7 ** it under the terms of the GNU Lesser General Public License as published by
8 ** the Free Software Foundation; either version 3 of the License, or
9 ** (at your option) any later version.
11 ** This program is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ** GNU General Public License for more details.
16 ** You should have received a copy of the GNU Lesser General Public License
17 ** along with this program; if not, write to the Free Software
18 ** Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 #include <nms_threads.h>
36 * Temporary buffer structure for RecvNXCPMessage() function
42 char buffer
[NXCP_TEMP_BUF_SIZE
];
53 class LIBNETXMS_EXPORTABLE NXCPMessage
59 MessageField
*m_fields
; // Message fields
60 int m_version
; // Protocol version
61 BYTE
*m_data
; // binary data
62 size_t m_dataSize
; // binary data size
64 void *set(UINT32 fieldId
, BYTE type
, const void *value
, bool isSigned
= false, size_t size
= 0);
65 void *get(UINT32 fieldId
, BYTE requiredType
, BYTE
*fieldType
= NULL
) const;
66 NXCP_MESSAGE_FIELD
*find(UINT32 fieldId
) const;
69 NXCPMessage(int version
= NXCP_VERSION
);
70 NXCPMessage(NXCPMessage
*msg
);
71 NXCPMessage(NXCP_MESSAGE
*rawMag
, int version
= NXCP_VERSION
);
74 NXCP_MESSAGE
*createMessage() const;
76 UINT16
getCode() const { return m_code
; }
77 void setCode(UINT16 code
) { m_code
= code
; }
79 UINT32
getId() const { return m_id
; }
80 void setId(UINT32 id
) { m_id
= id
; }
82 bool isEndOfFile() const { return (m_flags
& MF_END_OF_FILE
) ? true : false; }
83 bool isEndOfSequence() const { return (m_flags
& MF_END_OF_SEQUENCE
) ? true : false; }
84 bool isReverseOrder() const { return (m_flags
& MF_REVERSE_ORDER
) ? true : false; }
85 bool isBinary() const { return (m_flags
& MF_BINARY
) ? true : false; }
86 bool isControl() const { return (m_flags
& MF_CONTROL
) ? true : false; }
87 bool isCompressed() const { return (m_flags
& MF_COMPRESSED
) ? true : false; }
89 const BYTE
*getBinaryData() const { return m_data
; }
90 size_t getBinaryDataSize() const { return m_dataSize
; }
92 bool isFieldExist(UINT32 fieldId
) const { return find(fieldId
) != NULL
; }
93 int getFieldType(UINT32 fieldId
) const;
95 void setField(UINT32 fieldId
, INT16 value
) { set(fieldId
, NXCP_DT_INT16
, &value
, true); }
96 void setField(UINT32 fieldId
, UINT16 value
) { set(fieldId
, NXCP_DT_INT16
, &value
, false); }
97 void setField(UINT32 fieldId
, INT32 value
) { set(fieldId
, NXCP_DT_INT32
, &value
, true); }
98 void setField(UINT32 fieldId
, UINT32 value
) { set(fieldId
, NXCP_DT_INT32
, &value
, false); }
99 void setField(UINT32 fieldId
, INT64 value
) { set(fieldId
, NXCP_DT_INT64
, &value
, true); }
100 void setField(UINT32 fieldId
, UINT64 value
) { set(fieldId
, NXCP_DT_INT64
, &value
, false); }
101 void setField(UINT32 fieldId
, double value
) { set(fieldId
, NXCP_DT_FLOAT
, &value
); }
102 void setField(UINT32 fieldId
, bool value
) { INT16 v
= value
? 1 : 0; set(fieldId
, NXCP_DT_INT16
, &v
, true); }
103 void setField(UINT32 fieldId
, const TCHAR
*value
) { if (value
!= NULL
) set(fieldId
, NXCP_DT_STRING
, value
); }
104 void setField(UINT32 fieldId
, const TCHAR
*value
, size_t maxLen
) { if (value
!= NULL
) set(fieldId
, NXCP_DT_STRING
, value
, false, maxLen
); }
105 void setField(UINT32 fieldId
, const BYTE
*value
, size_t size
) { set(fieldId
, NXCP_DT_BINARY
, value
, false, size
); }
106 void setField(UINT32 fieldId
, const InetAddress
& value
) { set(fieldId
, NXCP_DT_INETADDR
, &value
); }
107 void setField(UINT32 fieldId
, const uuid
& value
) { set(fieldId
, NXCP_DT_BINARY
, value
.getValue(), false, UUID_LENGTH
); }
109 void setFieldFromMBString(UINT32 fieldId
, const char *value
);
111 void setFieldFromMBString(UINT32 fieldId
, const char *value
) { set(fieldId
, NXCP_DT_STRING
, value
); }
113 void setFieldFromTime(UINT32 fieldId
, time_t value
) { UINT64 t
= (UINT64
)value
; set(fieldId
, NXCP_DT_INT64
, &t
); }
114 void setFieldFromInt32Array(UINT32 fieldId
, size_t numElements
, const UINT32
*elements
);
115 void setFieldFromInt32Array(UINT32 fieldId
, IntegerArray
<UINT32
> *data
);
116 bool setFieldFromFile(UINT32 fieldId
, const TCHAR
*pszFileName
);
118 INT16
getFieldAsInt16(UINT32 fieldId
) const;
119 UINT16
getFieldAsUInt16(UINT32 fieldId
) const;
120 INT32
getFieldAsInt32(UINT32 fieldId
) const;
121 UINT32
getFieldAsUInt32(UINT32 fieldId
) const;
122 INT64
getFieldAsInt64(UINT32 fieldId
) const;
123 UINT64
getFieldAsUInt64(UINT32 fieldId
) const;
124 double getFieldAsDouble(UINT32 fieldId
) const;
125 bool getFieldAsBoolean(UINT32 fieldId
) const;
126 time_t getFieldAsTime(UINT32 fieldId
) const;
127 UINT32
getFieldAsInt32Array(UINT32 fieldId
, UINT32 numElements
, UINT32
*buffer
) const;
128 UINT32
getFieldAsInt32Array(UINT32 fieldId
, IntegerArray
<UINT32
> *data
) const;
129 const BYTE
*getBinaryFieldPtr(UINT32 fieldId
, size_t *size
) const;
130 TCHAR
*getFieldAsString(UINT32 fieldId
, TCHAR
*buffer
= NULL
, size_t bufferSize
= 0) const;
131 char *getFieldAsMBString(UINT32 fieldId
, char *buffer
= NULL
, size_t bufferSize
= 0) const;
132 char *getFieldAsUtf8String(UINT32 fieldId
, char *buffer
= NULL
, size_t bufferSize
= 0) const;
133 UINT32
getFieldAsBinary(UINT32 fieldId
, BYTE
*buffer
, size_t bufferSize
) const;
134 InetAddress
getFieldAsInetAddress(UINT32 fieldId
) const;
135 uuid
getFieldAsGUID(UINT32 fieldId
) const;
137 void deleteAllFields();
139 void disableEncryption() { m_flags
|= MF_DONT_ENCRYPT
; }
140 void setEndOfSequence() { m_flags
|= MF_END_OF_SEQUENCE
; }
141 void setReverseOrderFlag() { m_flags
|= MF_REVERSE_ORDER
; }
143 static String
dump(const NXCP_MESSAGE
*msg
, int version
);
147 * Message waiting queue element structure
151 void *msg
; // Pointer to message, either to NXCPMessage object or raw message
152 UINT64 sequence
; // Sequence number
153 UINT32 id
; // Message ID
154 UINT32 ttl
; // Message time-to-live in milliseconds
155 UINT16 code
; // Message code
156 UINT16 isBinary
; // 1 for binary (raw) messages
157 } WAIT_QUEUE_ELEMENT
;
160 * Max number of waiting threads in message queue
162 #define MAX_MSGQUEUE_WAITERS 32
165 * Message waiting queue class
167 class LIBNETXMS_EXPORTABLE MsgWaitQueue
171 CRITICAL_SECTION m_mutex
;
172 HANDLE m_wakeupEvents
[MAX_MSGQUEUE_WAITERS
];
173 BYTE m_waiters
[MAX_MSGQUEUE_WAITERS
];
174 #elif defined(_USE_GNU_PTH)
176 pth_cond_t m_wakeupCondition
;
178 pthread_mutex_t m_mutex
;
179 pthread_cond_t m_wakeupCondition
;
184 WAIT_QUEUE_ELEMENT
*m_elements
;
187 void *waitForMessageInternal(UINT16 isBinary
, UINT16 code
, UINT32 id
, UINT32 timeout
);
192 EnterCriticalSection(&m_mutex
);
193 #elif defined(_USE_GNU_PTH)
194 pth_mutex_acquire(&m_mutex
, FALSE
, NULL
);
196 pthread_mutex_lock(&m_mutex
);
203 LeaveCriticalSection(&m_mutex
);
204 #elif defined(_USE_GNU_PTH)
205 pth_mutex_release(&m_mutex
);
207 pthread_mutex_unlock(&m_mutex
);
211 void housekeeperRun();
213 static MUTEX m_housekeeperLock
;
214 static HashMap
<UINT64
, MsgWaitQueue
> *m_activeQueues
;
215 static CONDITION m_shutdownCondition
;
216 static THREAD m_housekeeperThread
;
217 static EnumerationCallbackResult
houseKeeperCallback(const void *key
, const void *object
, void *arg
);
218 static THREAD_RESULT THREAD_CALL
housekeeperThread(void *);
219 static EnumerationCallbackResult
diagInfoCallback(const void *key
, const void *object
, void *arg
);
225 void put(NXCPMessage
*pMsg
);
226 void put(NXCP_MESSAGE
*pMsg
);
227 NXCPMessage
*waitForMessage(WORD wCode
, UINT32 dwId
, UINT32 dwTimeOut
)
229 return (NXCPMessage
*)waitForMessageInternal(0, wCode
, dwId
, dwTimeOut
);
231 NXCP_MESSAGE
*waitForRawMessage(WORD wCode
, UINT32 dwId
, UINT32 dwTimeOut
)
233 return (NXCP_MESSAGE
*)waitForMessageInternal(1, wCode
, dwId
, dwTimeOut
);
237 void setHoldTime(UINT32 holdTime
) { m_holdTime
= holdTime
; }
239 static void shutdown();
240 static String
getDiagInfo();
244 * NXCP encryption context
246 class LIBNETXMS_EXPORTABLE NXCPEncryptionContext
: public RefCountObject
252 BYTE m_iv
[EVP_MAX_IV_LENGTH
];
253 #ifdef _WITH_ENCRYPTION
254 EVP_CIPHER_CTX m_encryptor
;
255 EVP_CIPHER_CTX m_decryptor
;
256 MUTEX m_encryptorLock
;
259 NXCPEncryptionContext();
260 bool initCipher(int cipher
);
263 static NXCPEncryptionContext
*create(NXCPMessage
*msg
, RSA
*privateKey
);
264 static NXCPEncryptionContext
*create(UINT32 ciphers
);
266 virtual ~NXCPEncryptionContext();
268 NXCP_ENCRYPTED_MESSAGE
*encryptMessage(NXCP_MESSAGE
*msg
);
269 bool decryptMessage(NXCP_ENCRYPTED_MESSAGE
*msg
, BYTE
*decryptionBuffer
);
271 int getCipher() { return m_cipher
; }
272 BYTE
*getSessionKey() { return m_sessionKey
; }
273 int getKeyLength() { return m_keyLength
; }
274 BYTE
*getIV() { return m_iv
; }
278 * Message receiver result codes
280 enum MessageReceiverResult
285 MSGRECV_COMM_FAILURE
= 3,
286 MSGRECV_DECRYPTION_FAILURE
= 4,
287 MSGRECV_PROTOCOL_ERROR
= 5
291 * Message receiver - abstract base class
293 class LIBNETXMS_EXPORTABLE AbstractMessageReceiver
297 BYTE
*m_decryptionBuffer
;
298 NXCPEncryptionContext
*m_encryptionContext
;
299 size_t m_initialSize
;
303 size_t m_bytesToSkip
;
305 NXCPMessage
*getMessageFromBuffer(bool *protocolError
);
308 virtual int readBytes(BYTE
*buffer
, size_t size
, UINT32 timeout
) = 0;
311 AbstractMessageReceiver(size_t initialSize
, size_t maxSize
);
312 virtual ~AbstractMessageReceiver();
314 void setEncryptionContext(NXCPEncryptionContext
*ctx
) { m_encryptionContext
= ctx
; }
316 NXCPMessage
*readMessage(UINT32 timeout
, MessageReceiverResult
*result
);
317 NXCP_MESSAGE
*getRawMessageBuffer() { return (NXCP_MESSAGE
*)m_buffer
; }
319 static const TCHAR
*resultToText(MessageReceiverResult result
);
323 * Message receiver - socket implementation
325 class LIBNETXMS_EXPORTABLE SocketMessageReceiver
: public AbstractMessageReceiver
331 virtual int readBytes(BYTE
*buffer
, size_t size
, UINT32 timeout
);
334 SocketMessageReceiver(SOCKET socket
, size_t initialSize
, size_t maxSize
);
335 virtual ~SocketMessageReceiver();
339 * Message receiver - UNIX socket/named pipe implementation
341 class LIBNETXMS_EXPORTABLE PipeMessageReceiver
: public AbstractMessageReceiver
350 virtual int readBytes(BYTE
*buffer
, size_t size
, UINT32 timeout
);
353 PipeMessageReceiver(HPIPE pipe
, size_t initialSize
, size_t maxSize
);
354 virtual ~PipeMessageReceiver();
358 * NXCP compression methods
360 enum NXCPCompressionMethod
362 NXCP_COMPRESSION_NONE
= 0,
363 NXCP_COMPRESSION_LZ4
= 1
367 * Abstract stream compressor
369 class LIBNETXMS_EXPORTABLE StreamCompressor
372 virtual ~StreamCompressor();
374 virtual size_t compress(const BYTE
*in
, size_t inSize
, BYTE
*out
, size_t maxOutSize
) = 0;
375 virtual size_t decompress(const BYTE
*in
, size_t inSize
, const BYTE
**out
) = 0;
376 virtual size_t compressBufferSize(size_t dataSize
) = 0;
378 static StreamCompressor
*create(NXCPCompressionMethod method
, bool compress
, size_t maxBlockSize
);
382 * Dummy stream compressor
384 class LIBNETXMS_EXPORTABLE DummyStreamCompressor
: public StreamCompressor
387 virtual ~DummyStreamCompressor();
389 virtual size_t compress(const BYTE
*in
, size_t inSize
, BYTE
*out
, size_t maxOutSize
);
390 virtual size_t decompress(const BYTE
*in
, size_t inSize
, const BYTE
**out
);
391 virtual size_t compressBufferSize(size_t dataSize
);
394 struct __LZ4_stream_t
;
395 struct __LZ4_streamDecode_t
;
398 * LZ4 stream compressor
400 class LIBNETXMS_EXPORTABLE LZ4StreamCompressor
: public StreamCompressor
405 __LZ4_stream_t
*encoder
;
406 __LZ4_streamDecode_t
*decoder
;
409 size_t m_maxBlockSize
;
415 LZ4StreamCompressor(bool compress
, size_t maxBlockSize
);
416 virtual ~LZ4StreamCompressor();
418 virtual size_t compress(const BYTE
*in
, size_t inSize
, BYTE
*out
, size_t maxOutSize
);
419 virtual size_t decompress(const BYTE
*in
, size_t inSize
, const BYTE
**out
);
420 virtual size_t compressBufferSize(size_t dataSize
);
425 * NXCP message consumer interface
427 class LIBNETXMS_EXPORTABLE MessageConsumer
430 virtual SOCKET
getSocket() = 0;
431 virtual void processMessage(NXCPMessage
*msg
) = 0;
435 * Socket receiver - manages receiving NXCP messages from multiple sockets
437 class LIBNETXMS_EXPORTABLE SocketReceiver
441 HashMap
<SOCKET
, MessageConsumer
> *m_consumers
;
443 static int m_maxSocketsPerThread
;
444 static ObjectArray
<SocketReceiver
> *m_receivers
;
448 static void shutdown();
450 static void addConsumer(MessageConsumer
*mc
);
451 static void removeConsumer(MessageConsumer
*mc
);
453 static String
getDiagInfo();
457 #else /* __cplusplus */
459 typedef void NXCPMessage
;
460 typedef void NXCPEncryptionContext
;
471 int LIBNETXMS_EXPORTABLE
RecvNXCPMessage(SOCKET hSocket
, NXCP_MESSAGE
*pMsg
,
472 NXCP_BUFFER
*pBuffer
, UINT32 dwMaxMsgSize
,
473 NXCPEncryptionContext
**ppCtx
,
474 BYTE
*pDecryptionBuffer
, UINT32 dwTimeout
);
475 int LIBNETXMS_EXPORTABLE
RecvNXCPMessageEx(SOCKET hSocket
, NXCP_MESSAGE
**msgBuffer
,
476 NXCP_BUFFER
*nxcpBuffer
, UINT32
*bufferSize
,
477 NXCPEncryptionContext
**ppCtx
,
478 BYTE
**decryptionBuffer
, UINT32 dwTimeout
,
480 NXCP_MESSAGE LIBNETXMS_EXPORTABLE
*CreateRawNXCPMessage(WORD wCode
, UINT32 dwId
, WORD flags
,
481 UINT32 dwDataSize
, void *pData
,
482 NXCP_MESSAGE
*pBuffer
);
483 TCHAR LIBNETXMS_EXPORTABLE
*NXCPMessageCodeName(WORD wCode
, TCHAR
*buffer
);
484 BOOL LIBNETXMS_EXPORTABLE
SendFileOverNXCP(SOCKET hSocket
, UINT32 dwId
, const TCHAR
*pszFile
,
485 NXCPEncryptionContext
*pCtx
, long offset
,
486 void (* progressCallback
)(INT64
, void *), void *cbArg
,
487 MUTEX mutex
, NXCPCompressionMethod compressionMethod
= NXCP_COMPRESSION_NONE
);
488 BOOL LIBNETXMS_EXPORTABLE
NXCPGetPeerProtocolVersion(SOCKET hSocket
, int *pnVersion
, MUTEX mutex
);
490 bool LIBNETXMS_EXPORTABLE
InitCryptoLib(UINT32 dwEnabledCiphers
);
491 UINT32 LIBNETXMS_EXPORTABLE
NXCPGetSupportedCiphers();
492 String LIBNETXMS_EXPORTABLE
NXCPGetSupportedCiphersAsText();
493 NXCP_ENCRYPTED_MESSAGE LIBNETXMS_EXPORTABLE
*NXCPEncryptMessage(NXCPEncryptionContext
*pCtx
, NXCP_MESSAGE
*pMsg
);
494 bool LIBNETXMS_EXPORTABLE
NXCPDecryptMessage(NXCPEncryptionContext
*pCtx
,
495 NXCP_ENCRYPTED_MESSAGE
*pMsg
,
496 BYTE
*pDecryptionBuffer
);
497 UINT32 LIBNETXMS_EXPORTABLE
SetupEncryptionContext(NXCPMessage
*pMsg
,
498 NXCPEncryptionContext
**ppCtx
,
499 NXCPMessage
**ppResponse
,
500 RSA
*pPrivateKey
, int nNXCPVersion
);
501 void LIBNETXMS_EXPORTABLE
PrepareKeyRequestMsg(NXCPMessage
*pMsg
, RSA
*pServerKey
, bool useX509Format
);
502 RSA LIBNETXMS_EXPORTABLE
*LoadRSAKeys(const TCHAR
*pszKeyFile
);
505 BOOL LIBNETXMS_EXPORTABLE
SignMessageWithCAPI(BYTE
*pMsg
, UINT32 dwMsgLen
, const CERT_CONTEXT
*pCert
,
506 BYTE
*pBuffer
, size_t bufferSize
, UINT32
*pdwSigLen
);
511 #endif /* _nxcpapi_h_ */