libnxmb: added simple mechanism of named calls
authorVictor Kirhenshtein <victor@netxms.org>
Thu, 30 Apr 2015 08:21:21 +0000 (11:21 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Thu, 30 Apr 2015 08:21:21 +0000 (11:21 +0300)
include/nxmbapi.h
src/libnxmb/dispatcher.cpp
src/libnxmb/message.cpp
src/libnxmb/subscriber.cpp
src/server/core/evproc.cpp

index 2168f3d..65be0d5 100644 (file)
@@ -1,7 +1,7 @@
 /* 
 ** NetXMS - Network Management System
 ** NetXMS Message Bus API
-** Copyright (C) 2009 Victor Kirhenshtein
+** Copyright (C) 2009-2015 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU General Public License as published by
 #include <nms_threads.h>
 #include <nxqueue.h>
 
-
-//
-// Message class
-//
-
+/**
+ * Message class
+ */
 class LIBNXMB_EXPORTABLE NXMBMessage
 {
 protected:
@@ -59,11 +57,9 @@ public:
        const TCHAR *getSenderId() const { return m_senderId; }
 };
 
-
-//
-// Subscriber class
-//
-
+/**
+ * Subscriber class
+ */
 class LIBNXMB_EXPORTABLE NXMBSubscriber
 {
 protected:
@@ -79,11 +75,9 @@ public:
        virtual bool isOwnedByDispatcher();
 };
 
-
-//
-// Abstract message filter class
-//
-
+/**
+ * Abstract message filter class
+ */
 class LIBNXMB_EXPORTABLE NXMBFilter
 {
 public:
@@ -94,11 +88,9 @@ public:
        virtual bool isOwnedByDispatcher();
 };
 
-
-//
-// Message filter which accept messages of specific type(s)
-//
-
+/**
+ * Message filter which accept messages of specific type(s)
+ */
 class LIBNXMB_EXPORTABLE NXMBTypeFilter : public NXMBFilter
 {
 protected:
@@ -114,16 +106,32 @@ public:
        void removeMessageType(const TCHAR *type);
 };
 
+/**
+ * Call handler
+ */
+typedef bool (* NXMBCallHandler)(const TCHAR *, const void *, void *);
+
+/**
+ * String map template for holding objects as values
+ */
+class CallHandlerMap : public StringMapBase
+{
+public:
+       CallHandlerMap() : StringMapBase(false) { }
 
-//
-// Message dispatcher class
-//
+       void set(const TCHAR *name, NXMBCallHandler handler) { setObject((TCHAR *)name, (void *)handler, false); }
+       NXMBCallHandler get(const TCHAR *name) { return (NXMBCallHandler)getObject(name); }
+};
 
+/**
+ * Message dispatcher class
+ */
 class LIBNXMB_EXPORTABLE NXMBDispatcher
 {
        friend THREAD_RESULT THREAD_CALL WorkerThreadStarter(void *);
 
 private:
+   static MUTEX m_instanceAccess;
        static NXMBDispatcher *m_instance;
 
 protected:
@@ -133,6 +141,8 @@ protected:
        NXMBFilter **m_filters;
        MUTEX m_subscriberListAccess;
        THREAD m_workerThreadHandle;
+   CallHandlerMap *m_callHandlers;
+   MUTEX m_callHandlerAccess;
 
        void workerThread();
 
@@ -141,10 +151,14 @@ public:
        virtual ~NXMBDispatcher();
 
        void postMessage(NXMBMessage *msg);
+   bool call(const TCHAR *callName, const void *input, void *output);
        
        void addSubscriber(NXMBSubscriber *subscriber, NXMBFilter *filter);
        void removeSubscriber(const TCHAR *id);
 
+   void addCallHandler(const TCHAR *callName, NXMBCallHandler handler);
+   void removeCallHandler(const TCHAR *callName);
+
        static NXMBDispatcher *getInstance();
 };
 
index d56c420..08e8edc 100644 (file)
 
 #include "libnxmb.h"
 
-
-//
-// Worker thread starter
-//
-
+/**
+ * Worker thread starter
+ */
 static THREAD_RESULT THREAD_CALL WorkerThreadStarter(void *arg)
 {
        ((NXMBDispatcher *)arg)->workerThread();
        return THREAD_OK;
 }
 
-
-//
-// Constructor
-//
-
+/**
+ * Constructor
+ */
 NXMBDispatcher::NXMBDispatcher()
 {
        m_queue = new Queue;
@@ -47,13 +43,13 @@ NXMBDispatcher::NXMBDispatcher()
        m_filters = NULL;
        m_subscriberListAccess = MutexCreate();
        m_workerThreadHandle = ThreadCreateEx(WorkerThreadStarter, 0, this);
+   m_callHandlers = new CallHandlerMap();
+   m_callHandlerAccess = MutexCreate();
 }
 
-
-//
-// Destructor
-//
-
+/**
+ * Destructor
+ */
 NXMBDispatcher::~NXMBDispatcher()
 {
        NXMBMessage *msg;
@@ -77,13 +73,14 @@ NXMBDispatcher::~NXMBDispatcher()
        }
        safe_free(m_subscribers);
        safe_free(m_filters);
-}
-
 
-//
-// Worker thread
-//
+   MutexDestroy(m_callHandlerAccess);
+   delete m_callHandlers;
+}
 
+/**
+ * Worker thread
+ */
 void NXMBDispatcher::workerThread()
 {
        NXMBMessage *msg;
@@ -108,21 +105,17 @@ void NXMBDispatcher::workerThread()
        }
 }
 
-
-//
-// Post message
-//
-
+/**
+ * Post message
+ */
 void NXMBDispatcher::postMessage(NXMBMessage *msg)
 {
        m_queue->Put(msg);
 }
 
-
-//
-// Add subscriber
-//
-
+/**
+ * Add subscriber
+ */
 void NXMBDispatcher::addSubscriber(NXMBSubscriber *subscriber, NXMBFilter *filter)
 {
        int i;
@@ -165,11 +158,9 @@ void NXMBDispatcher::addSubscriber(NXMBSubscriber *subscriber, NXMBFilter *filte
        MutexUnlock(m_subscriberListAccess);
 }
 
-
-//
-// Remove subscriber
-//
-
+/**
+ * Remove subscriber
+ */
 void NXMBDispatcher::removeSubscriber(const TCHAR *id)
 {
        int i;
@@ -194,16 +185,55 @@ void NXMBDispatcher::removeSubscriber(const TCHAR *id)
        MutexUnlock(m_subscriberListAccess);
 }
 
+/**
+ * Add call handler
+ */
+void NXMBDispatcher::addCallHandler(const TCHAR *callName, NXMBCallHandler handler)
+{
+   MutexLock(m_callHandlerAccess);
+   m_callHandlers->set(callName, handler);
+   MutexUnlock(m_callHandlerAccess);
+}
+
+/**
+ * Remove call handler
+ */
+void NXMBDispatcher::removeCallHandler(const TCHAR *callName)
+{
+   MutexLock(m_callHandlerAccess);
+   m_callHandlers->remove(callName);
+   MutexUnlock(m_callHandlerAccess);
+}
+
+/**
+ * Make a call
+ */
+bool NXMBDispatcher::call(const TCHAR *callName, const void *input, void *output)
+{
+   MutexLock(m_callHandlerAccess);
+   NXMBCallHandler handler = m_callHandlers->get(callName);
+   MutexUnlock(m_callHandlerAccess);
+   return (handler != NULL) ? handler(callName, input, output) : false;
+}
 
-//
-// Get global dispatcher instance
-//
+/**
+ * Synchronization counter
+ */
+MUTEX NXMBDispatcher::m_instanceAccess = MutexCreate();
 
+/**
+ * Global dispatcher instance
+ */
 NXMBDispatcher *NXMBDispatcher::m_instance = NULL;
 
+/**
+ * Get global dispatcher instance
+ */
 NXMBDispatcher *NXMBDispatcher::getInstance()
 {
+   MutexLock(m_instanceAccess);
        if (m_instance == NULL)
                m_instance = new NXMBDispatcher();
+   MutexUnlock(m_instanceAccess);
        return m_instance;
 }
index da32ae2..3d58064 100644 (file)
 
 #include "libnxmb.h"
 
-
-//
-// Default constructor
-//
-
+/**
+ * Default constructor
+ */
 NXMBMessage::NXMBMessage()
 {
        m_type = _tcsdup(_T("NONE"));
        m_senderId = _tcsdup(_T("UNKNOWN"));
 }
 
-
-//
-// Create message with type and sender information
-//
-
+/**
+ * Create message with type and sender information
+ */
 NXMBMessage::NXMBMessage(const TCHAR *type, const TCHAR *senderId)
 {
        m_type = _tcsdup(CHECK_NULL(type));
        m_senderId = _tcsdup(CHECK_NULL(senderId));
 }
 
-
-//
-// Destructor
-//
-
+/**
+ * Destructor
+ */
 NXMBMessage::~NXMBMessage()
 {
        safe_free(m_type);
index 3556c00..34a5f93 100644 (file)
 
 #include "libnxmb.h"
 
-
-//
-// Default constructor
-//
-
+/**
+ * Default constructor
+ */
 NXMBSubscriber::NXMBSubscriber(const TCHAR *id)
 {
        m_id = _tcsdup(CHECK_NULL(id));
 }
 
-
-//
-// Desctructor
-//
-
+/**
+ * Desctructor
+ */
 NXMBSubscriber::~NXMBSubscriber()
 {
        safe_free(m_id);
 }
 
-
-//
-// Default message handler
-//
-
+/**
+ * Default message handler
+ */
 void NXMBSubscriber::messageHandler(NXMBMessage &msg)
 {
 }
 
-
-//
-// If this method returns TRUE, dispatcher will delete subscriber object
-// on unregister or in own destructor
-//
-
+/**
+ * If this method returns TRUE, dispatcher will delete subscriber object
+ * on unregister or in own destructor
+ */
 bool NXMBSubscriber::isOwnedByDispatcher()
 {
        return true;
index f3ce6b3..47b82d8 100644 (file)
@@ -212,8 +212,8 @@ THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
          NetObj *pObject = FindObjectById(pEvent->getSourceId());
          if (pObject == NULL)
             pObject = g_pEntireNet;
-                       DbgPrintf(5, _T("EVENT %d (ID:") UINT64_FMT _T(" F:0x%04X S:%d TAG:\"%s\"%s) FROM %s: %s"), pEvent->getCode(), 
-                   pEvent->getId(), pEvent->getFlags(), pEvent->getSeverity(),
+                       DbgPrintf(5, _T("EVENT %s [%d] (ID:") UINT64_FMT _T(" F:0x%04X S:%d TAG:\"%s\"%s) FROM %s: %s"), 
+                   pEvent->getName(), pEvent->getCode(), pEvent->getId(), pEvent->getFlags(), pEvent->getSeverity(),
                                                 CHECK_NULL_EX(pEvent->getUserTag()),
                    (pEvent->getRootId() == 0) ? _T("") : _T(" CORRELATED"),
                    pObject->getName(), pEvent->getMessage());