Queue class can shrink buffer; cosmetic code refactoring
authorVictor Kirhenshtein <victor@netxms.org>
Fri, 5 Jun 2015 18:02:49 +0000 (21:02 +0300)
committerVictor Kirhenshtein <victor@netxms.org>
Fri, 5 Jun 2015 18:03:38 +0000 (21:03 +0300)
32 files changed:
include/nxqueue.h
src/agent/core/datacoll.cpp
src/agent/core/nxagentd.h
src/agent/core/session.cpp
src/agent/core/snmptrapproxy.cpp
src/agent/core/trap.cpp
src/libnetxms/queue.cpp
src/libnetxms/tp.cpp
src/libnxmb/dispatcher.cpp
src/libnxsl/compiler.cpp
src/server/core/datacoll.cpp
src/server/core/dbwrite.cpp
src/server/core/dcitem.cpp
src/server/core/dctarget.cpp
src/server/core/debug.cpp
src/server/core/email.cpp
src/server/core/events.cpp
src/server/core/evproc.cpp
src/server/core/main.cpp
src/server/core/mdsession.cpp
src/server/core/node.cpp
src/server/core/np.cpp
src/server/core/objects.cpp
src/server/core/package.cpp
src/server/core/poll.cpp
src/server/core/session.cpp
src/server/core/sms.cpp
src/server/core/snmptrap.cpp
src/server/core/syslogd.cpp
src/server/core/template.cpp
src/server/include/nms_core.h
tests/test-libnetxms/test-libnetxms.cpp

index 54b4c14..bbb2900 100644 (file)
@@ -38,30 +38,33 @@ class LIBNETXMS_EXPORTABLE Queue
 private:
    MUTEX m_mutexQueueAccess;
    CONDITION m_condWakeup;
-   void **m_pElements;
-   UINT32 m_dwNumElements;
-   UINT32 m_dwBufferSize;
-   UINT32 m_dwFirst;
-   UINT32 m_dwLast;
-   UINT32 m_dwBufferIncrement;
-       BOOL m_bShutdownFlag;
+   void **m_elements;
+   UINT32 m_numElements;
+   UINT32 m_bufferSize;
+   UINT32 m_initialSize;
+   UINT32 m_first;
+   UINT32 m_last;
+   UINT32 m_bufferIncrement;
+       bool m_shutdownFlag;
 
        void commonInit();
    void lock() { MutexLock(m_mutexQueueAccess); }
    void unlock() { MutexUnlock(m_mutexQueueAccess); }
+   void shrink();
 
 public:
    Queue();
-   Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement = 32);
+   Queue(UINT32 initialSize, UINT32 bufferIncrement = 32);
    ~Queue();
 
-   void Put(void *pObject);
-       void Insert(void *pObject);
-       void SetShutdownMode();
-   void *Get();
-   void *GetOrBlock();
-   UINT32 Size() { return m_dwNumElements; }
-   void Clear();
+   void put(void *object);
+       void insert(void *object);
+       void setShutdownMode();
+   void *get();
+   void *getOrBlock();
+   int size() { return (int)m_numElements; }
+   int allocated() { return (int)m_bufferSize; }
+   void clear();
        void *find(void *key, QUEUE_COMPARATOR comparator);
        bool remove(void *key, QUEUE_COMPARATOR comparator);
 };
index 17a5155..e326a56 100644 (file)
@@ -577,7 +577,7 @@ static THREAD_RESULT THREAD_CALL DataSender(void *arg)
    DebugPrintf(INVALID_INDEX, 1, _T("Data sender thread started"));
    while(true)
    {
-      DataElement *e = (DataElement *)s_dataSenderQueue.GetOrBlock();
+      DataElement *e = (DataElement *)s_dataSenderQueue.getOrBlock();
       if (e == INVALID_POINTER_VALUE)
          break;
 
@@ -721,7 +721,7 @@ static UINT32 DataCollectionRun()
 
          if (e != NULL)
          {
-            s_dataSenderQueue.Put(e);
+            s_dataSenderQueue.put(e);
          }
          else
          {
@@ -968,7 +968,7 @@ void ShutdownLocalDataCollector()
    ThreadJoin(s_dataCollectorThread);
 
    DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data sender thread termination"));
-   s_dataSenderQueue.Put(INVALID_POINTER_VALUE);
+   s_dataSenderQueue.put(INVALID_POINTER_VALUE);
    ThreadJoin(s_dataSenderThread);
 
    DebugPrintf(INVALID_INDEX, 5, _T("Waiting for data reconcillation thread termination"));
index 6ec5cfd..9e98401 100644 (file)
@@ -359,8 +359,8 @@ public:
    void run();
    void disconnect();
 
-   virtual void sendMessage(NXCPMessage *msg) { m_sendQueue->Put(msg->createMessage()); }
-   virtual void sendRawMessage(NXCP_MESSAGE *msg) { m_sendQueue->Put(nx_memdup(msg, ntohl(msg->size))); }
+   virtual void sendMessage(NXCPMessage *msg) { m_sendQueue->put(msg->createMessage()); }
+   virtual void sendRawMessage(NXCP_MESSAGE *msg) { m_sendQueue->put(nx_memdup(msg, ntohl(msg->size))); }
        virtual bool sendFile(UINT32 requestId, const TCHAR *file, long offset);
    virtual UINT32 doRequest(NXCPMessage *msg, UINT32 timeout);
    virtual UINT32 generateRequestId();
index 673ed94..d57a59a 100644 (file)
@@ -311,7 +311,7 @@ void CommSession::readThread()
             }
             else
             {
-               m_processingQueue->Put(pMsg);
+               m_processingQueue->put(pMsg);
             }
          }
       }
@@ -324,8 +324,8 @@ void CommSession::readThread()
 #endif
 
    // Notify other threads to exit
-   m_sendQueue->Put(INVALID_POINTER_VALUE);
-   m_processingQueue->Put(INVALID_POINTER_VALUE);
+   m_sendQueue->put(INVALID_POINTER_VALUE);
+   m_processingQueue->put(INVALID_POINTER_VALUE);
    if (m_hProxySocket != -1)
       shutdown(m_hProxySocket, SHUT_RDWR);
 
@@ -381,14 +381,14 @@ void CommSession::writeThread()
 
    while(1)
    {
-      pMsg = (NXCP_MESSAGE *)m_sendQueue->GetOrBlock();
+      pMsg = (NXCP_MESSAGE *)m_sendQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
       if (!sendRawMessage(pMsg, m_pCtx))
          break;
    }
-   m_sendQueue->Clear();
+   m_sendQueue->clear();
 }
 
 /**
@@ -403,7 +403,7 @@ void CommSession::processingThread()
 
    while(1)
    {
-      pMsg = (NXCPMessage *)m_processingQueue->GetOrBlock();
+      pMsg = (NXCPMessage *)m_processingQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
       dwCommand = pMsg->getCode();
@@ -950,10 +950,10 @@ UINT32 CommSession::setupProxyConnection(NXCPMessage *pRequest)
             NXCP_MESSAGE *pRawMsg;
 
             // Stop writing thread
-            m_sendQueue->Put(INVALID_POINTER_VALUE);
+            m_sendQueue->put(INVALID_POINTER_VALUE);
 
             // Wait while all queued messages will be sent
-            while(m_sendQueue->Size() > 0)
+            while(m_sendQueue->size() > 0)
                ThreadSleepMs(100);
 
             // Finish proxy connection setup
index f7ad21e..1a26886 100644 (file)
@@ -43,7 +43,7 @@ static Queue s_snmpTrapQueue;
  */
 void ShutdownSNMPTrapSender()
 {
-       s_snmpTrapQueue.SetShutdownMode();
+       s_snmpTrapQueue.setShutdownMode();
 }
 
 /**
@@ -135,7 +135,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapReceiver(void *pArg)
          message->lenght = iBytes;
          message->rawMessage = rawMessage;
          DebugPrintf(INVALID_INDEX, 6, _T("SNMPTrapReceiver: packet received from %s"), IpToStr(message->ipAddr, ipAddrStr));
-         s_snmpTrapQueue.Put(message);
+         s_snmpTrapQueue.put(message);
       }
       else
       {
@@ -158,7 +158,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapSender(void *pArg)
    while(1)
    {
       DebugPrintf(INVALID_INDEX, 8, _T("SNMPTrapSender: waiting for message"));
-      UdpMessage *pdu = (UdpMessage *)s_snmpTrapQueue.GetOrBlock();
+      UdpMessage *pdu = (UdpMessage *)s_snmpTrapQueue.getOrBlock();
       if (pdu == INVALID_POINTER_VALUE)
          break;
 
@@ -198,7 +198,7 @@ THREAD_RESULT THREAD_CALL SNMPTrapSender(void *pArg)
       if (!sent)
       {
          DebugPrintf(INVALID_INDEX, 6, _T("Cannot forward trap to server"));
-         s_snmpTrapQueue.Put(pdu);
+         s_snmpTrapQueue.put(pdu);
                        ThreadSleep(1);
       }
       else
index 35ec3c4..f28693d 100644 (file)
@@ -45,7 +45,7 @@ THREAD_RESULT THREAD_CALL TrapSender(void *pArg)
        s_trapId = (QWORD)time(NULL) << 32;
    while(1)
    {
-      pMsg = (NXCP_MESSAGE *)s_trapQueue->GetOrBlock();
+      pMsg = (NXCP_MESSAGE *)s_trapQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)
          break;
 
@@ -75,7 +75,7 @@ THREAD_RESULT THREAD_CALL TrapSender(void *pArg)
                }
                else
                {
-         s_trapQueue->Insert(pMsg);    // Re-queue trap
+         s_trapQueue->insert(pMsg);    // Re-queue trap
                        ThreadSleep(1);
                }
    }
@@ -90,7 +90,7 @@ THREAD_RESULT THREAD_CALL TrapSender(void *pArg)
  */
 void ShutdownTrapSender()
 {
-       s_trapQueue->SetShutdownMode();
+       s_trapQueue->setShutdownMode();
 }
 
 /**
@@ -120,7 +120,7 @@ void SendTrap(UINT32 dwEventCode, const TCHAR *eventName, int iNumArgs, TCHAR **
        {
                s_genTrapCount++;
                s_lastTrapTime = time(NULL);
-      s_trapQueue->Put(msg.createMessage());
+      s_trapQueue->put(msg.createMessage());
        }
 }
 
@@ -216,7 +216,7 @@ void ForwardTrap(NXCPMessage *msg)
        {
                s_genTrapCount++;
                s_lastTrapTime = time(NULL);
-      s_trapQueue->Put(msg->createMessage());
+      s_trapQueue->put(msg->createMessage());
        }
 }
 
index b589b96..817be36 100644 (file)
@@ -1,6 +1,6 @@
 /* 
 ** NetXMS - Network Management System
-** Copyright (C) 2003-2013 Victor Kirhenshtein
+** Copyright (C) 2003-2015 Victor Kirhenshtein
 **
 ** This program is free software; you can redistribute it and/or modify
 ** it under the terms of the GNU Lesser General Public License as published
 /**
  * Queue constructor
  */
-Queue::Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement)
+Queue::Queue(UINT32 initialSize, UINT32 bufferIncrement)
 {
-   m_dwBufferSize = dwInitialSize;
-   m_dwBufferIncrement = dwBufferIncrement;
+   m_initialSize = initialSize;
+   m_bufferSize = initialSize;
+   m_bufferIncrement = bufferIncrement;
        commonInit();
 }
 
@@ -38,8 +39,9 @@ Queue::Queue(UINT32 dwInitialSize, UINT32 dwBufferIncrement)
  */
 Queue::Queue()
 {
-   m_dwBufferSize = 256;
-   m_dwBufferIncrement = 32;
+   m_initialSize = 256;
+   m_bufferSize = 256;
+   m_bufferIncrement = 32;
        commonInit();
 }
 
@@ -50,11 +52,11 @@ void Queue::commonInit()
 {
    m_mutexQueueAccess = MutexCreate();
    m_condWakeup = ConditionCreate(FALSE);
-   m_dwNumElements = 0;
-   m_dwFirst = 0;
-   m_dwLast = 0;
-   m_pElements = (void **)malloc(sizeof(void *) * m_dwBufferSize);
-       m_bShutdownFlag = FALSE;
+   m_numElements = 0;
+   m_first = 0;
+   m_last = 0;
+   m_elements = (void **)malloc(sizeof(void *) * m_bufferSize);
+       m_shutdownFlag = FALSE;
 }
 
 /**
@@ -64,30 +66,30 @@ Queue::~Queue()
 {
    MutexDestroy(m_mutexQueueAccess);
    ConditionDestroy(m_condWakeup);
-   safe_free(m_pElements);
+   safe_free(m_elements);
 }
 
 /**
  * Put new element into queue
  */
-void Queue::Put(void *pElement)
+void Queue::put(void *pElement)
 {
    lock();
-   if (m_dwNumElements == m_dwBufferSize)
+   if (m_numElements == m_bufferSize)
    {
       // Extend buffer
-      m_dwBufferSize += m_dwBufferIncrement;
-      m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
+      m_bufferSize += m_bufferIncrement;
+      m_elements = (void **)realloc(m_elements, sizeof(void *) * m_bufferSize);
       
       // Move free space
-      memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
-              sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
-      m_dwFirst += m_dwBufferIncrement;
+      memmove(&m_elements[m_first + m_bufferIncrement], &m_elements[m_first],
+              sizeof(void *) * (m_bufferSize - m_first - m_bufferIncrement));
+      m_first += m_bufferIncrement;
    }
-   m_pElements[m_dwLast++] = pElement;
-   if (m_dwLast == m_dwBufferSize)
-      m_dwLast = 0;
-   m_dwNumElements++;
+   m_elements[m_last++] = pElement;
+   if (m_last == m_bufferSize)
+      m_last = 0;
+   m_numElements++;
    ConditionSet(m_condWakeup);
    unlock();
 }
@@ -95,24 +97,24 @@ void Queue::Put(void *pElement)
 /**
  * Insert new element into the beginning of a queue
  */
-void Queue::Insert(void *pElement)
+void Queue::insert(void *pElement)
 {
    lock();
-   if (m_dwNumElements == m_dwBufferSize)
+   if (m_numElements == m_bufferSize)
    {
       // Extend buffer
-      m_dwBufferSize += m_dwBufferIncrement;
-      m_pElements = (void **)realloc(m_pElements, sizeof(void *) * m_dwBufferSize);
+      m_bufferSize += m_bufferIncrement;
+      m_elements = (void **)realloc(m_elements, sizeof(void *) * m_bufferSize);
       
       // Move free space
-      memmove(&m_pElements[m_dwFirst + m_dwBufferIncrement], &m_pElements[m_dwFirst],
-              sizeof(void *) * (m_dwBufferSize - m_dwFirst - m_dwBufferIncrement));
-      m_dwFirst += m_dwBufferIncrement;
+      memmove(&m_elements[m_first + m_bufferIncrement], &m_elements[m_first],
+              sizeof(void *) * (m_bufferSize - m_first - m_bufferIncrement));
+      m_first += m_bufferIncrement;
    }
-   if (m_dwFirst == 0)
-      m_dwFirst = m_dwBufferSize;
-   m_pElements[--m_dwFirst] = pElement;
-   m_dwNumElements++;
+   if (m_first == 0)
+      m_first = m_bufferSize;
+   m_elements[--m_first] = pElement;
+   m_numElements++;
    ConditionSet(m_condWakeup);
    unlock();
 }
@@ -120,24 +122,25 @@ void Queue::Insert(void *pElement)
 /**
  * Get object from queue. Return NULL if queue is empty
  */
-void *Queue::Get()
+void *Queue::get()
 {
    void *pElement = NULL;
 
    lock();
-       if (m_bShutdownFlag)
+       if (m_shutdownFlag)
        {
                pElement = INVALID_POINTER_VALUE;
        }
        else
    {
-               while((m_dwNumElements > 0) && (pElement == NULL))
+               while((m_numElements > 0) && (pElement == NULL))
                {
-                       pElement = m_pElements[m_dwFirst++];
-                       if (m_dwFirst == m_dwBufferSize)
-                               m_dwFirst = 0;
-                       m_dwNumElements--;
+                       pElement = m_elements[m_first++];
+                       if (m_first == m_bufferSize)
+                               m_first = 0;
+                       m_numElements--;
                }
+      shrink();
    }
    unlock();
    return pElement;
@@ -146,11 +149,11 @@ void *Queue::Get()
 /**
  * Get object from queue or block if queue if empty
  */
-void *Queue::GetOrBlock()
+void *Queue::getOrBlock()
 {
    void *pElement;
 
-   pElement = Get();
+   pElement = get();
    if (pElement != NULL)
    {
       return pElement;
@@ -159,7 +162,7 @@ void *Queue::GetOrBlock()
    do
    {
       ConditionWait(m_condWakeup, INFINITE);
-      pElement = Get();
+      pElement = get();
    } while(pElement == NULL);
    return pElement;
 }
@@ -167,12 +170,13 @@ void *Queue::GetOrBlock()
 /**
  * Clear queue
  */
-void Queue::Clear()
+void Queue::clear()
 {
    lock();
-   m_dwNumElements = 0;
-   m_dwFirst = 0;
-   m_dwLast = 0;
+   m_numElements = 0;
+   m_first = 0;
+   m_last = 0;
+   shrink();
    unlock();
 }
 
@@ -180,10 +184,10 @@ void Queue::Clear()
  * Set shutdown flag
  * When this flag is set, Get() always return INVALID_POINTER_VALUE
  */
-void Queue::SetShutdownMode()
+void Queue::setShutdownMode()
 {
        lock();
-       m_bShutdownFlag = TRUE;
+       m_shutdownFlag = TRUE;
        ConditionSet(m_condWakeup);
        unlock();
 }
@@ -199,15 +203,15 @@ void *Queue::find(void *key, QUEUE_COMPARATOR comparator)
        UINT32 i, pos;
 
        lock();
-       for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
+       for(i = 0, pos = m_first; i < m_numElements; i++)
        {
-               if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
+               if ((m_elements[pos] != NULL) && comparator(key, m_elements[pos]))
                {
-                       element = m_pElements[pos];
+                       element = m_elements[pos];
                        break;
                }
                pos++;
-               if (pos == m_dwBufferSize)
+               if (pos == m_bufferSize)
                        pos = 0;
        }
        unlock();
@@ -224,18 +228,36 @@ bool Queue::remove(void *key, QUEUE_COMPARATOR comparator)
        UINT32 i, pos;
 
        lock();
-       for(i = 0, pos = m_dwFirst; i < m_dwNumElements; i++)
+       for(i = 0, pos = m_first; i < m_numElements; i++)
        {
-               if ((m_pElements[pos] != NULL) && comparator(key, m_pElements[pos]))
+               if ((m_elements[pos] != NULL) && comparator(key, m_elements[pos]))
                {
-                       m_pElements[pos] = NULL;
+                       m_elements[pos] = NULL;
                        success = true;
                        break;
                }
                pos++;
-               if (pos == m_dwBufferSize)
+               if (pos == m_bufferSize)
                        pos = 0;
        }
        unlock();
        return success;
 }
+
+/**
+ * Shrink queue if possible
+ */
+void Queue::shrink()
+{
+   if ((m_bufferSize == m_initialSize) || (m_numElements > m_initialSize / 2) || ((m_numElements > 0) && (m_last < m_first)))
+      return;
+
+   if ((m_numElements > 0) && (m_first > 0))
+   {
+      memmove(&m_elements[0], &m_elements[m_first], sizeof(void *) * m_numElements);
+      m_last -= m_first;
+      m_first = 0;
+   }
+   m_bufferSize = m_initialSize;
+   m_elements = (void **)realloc(m_elements, m_bufferSize * sizeof(void *));
+}
index 0c8808b..91432d5 100644 (file)
@@ -83,7 +83,7 @@ static THREAD_RESULT THREAD_CALL WorkerThread(void *arg)
    Queue *q = ((ThreadPool *)arg)->queue;
    while(true)
    {
-      WorkRequest *rq = (WorkRequest *)q->GetOrBlock();
+      WorkRequest *rq = (WorkRequest *)q->getOrBlock();
       if (rq->func == NULL)
          break;   // stop indicator
       rq->func(rq->arg);
@@ -125,7 +125,7 @@ void LIBNETXMS_EXPORTABLE ThreadPoolDestroy(ThreadPool *p)
    WorkRequest rq;
    rq.func = NULL;
    for(int i = 0; i < p->curThreads; i++)
-      p->queue->Put(&rq);
+      p->queue->put(&rq);
 
    for(int i = 0; i < p->curThreads; i++)
       ThreadJoin(p->threads[i]);
@@ -159,7 +159,7 @@ void LIBNETXMS_EXPORTABLE ThreadPoolExecute(ThreadPool *p, ThreadPoolWorkerFunct
    WorkRequest *rq = (WorkRequest *)malloc(sizeof(WorkRequest));
    rq->func = f;
    rq->arg = arg;
-   p->queue->Put(rq);
+   p->queue->put(rq);
 }
 
 /**
index 7472ec9..47ff6bf 100644 (file)
@@ -55,9 +55,9 @@ NXMBDispatcher::~NXMBDispatcher()
        NXMBMessage *msg;
        int i;
 
-       while((msg = (NXMBMessage *)m_queue->Get()) != NULL)
+       while((msg = (NXMBMessage *)m_queue->get()) != NULL)
                delete msg;
-       m_queue->Put(INVALID_POINTER_VALUE);
+       m_queue->put(INVALID_POINTER_VALUE);
        ThreadJoin(m_workerThreadHandle);
 
        delete m_queue;
@@ -88,7 +88,7 @@ void NXMBDispatcher::workerThread()
 
        while(true)
        {
-               msg = (NXMBMessage *)m_queue->GetOrBlock();
+               msg = (NXMBMessage *)m_queue->getOrBlock();
                if (msg == INVALID_POINTER_VALUE)
                        break;
 
@@ -110,7 +110,7 @@ void NXMBDispatcher::workerThread()
  */
 void NXMBDispatcher::postMessage(NXMBMessage *msg)
 {
-       m_queue->Put(msg);
+       m_queue->put(msg);
 }
 
 /**
index abf16e4..ec47590 100644 (file)
@@ -139,7 +139,7 @@ void NXSL_Compiler::addBreakAddr(UINT32 dwAddr)
        pQueue = (Queue *)m_pBreakStack->peek();
        if (pQueue != NULL)
        {
-               pQueue->Put(CAST_TO_POINTER(dwAddr, void *));
+               pQueue->put(CAST_TO_POINTER(dwAddr, void *));
        }
 }
 
@@ -155,7 +155,7 @@ void NXSL_Compiler::closeBreakLevel(NXSL_Program *pScript)
        pQueue = (Queue *)m_pBreakStack->pop();
        if (pQueue != NULL)
        {
-               while((pAddr = pQueue->Get()) != NULL)
+               while((pAddr = pQueue->get()) != NULL)
                {
                        dwAddr = CAST_FROM_POINTER(pAddr, UINT32);
                        pScript->createJumpAt(dwAddr, pScript->getCodeSize());
index 34865ab..b086b10 100644 (file)
@@ -189,7 +189,7 @@ static THREAD_RESULT THREAD_CALL DataCollector(void *pArg)
    TCHAR *pBuffer = (TCHAR *)malloc(MAX_LINE_SIZE * sizeof(TCHAR));
    while(!IsShutdownInProgress())
    {
-      DCObject *pItem = (DCObject *)g_dataCollectionQueue.GetOrBlock();
+      DCObject *pItem = (DCObject *)g_dataCollectionQueue.getOrBlock();
                DataCollectionTarget *target = (DataCollectionTarget *)pItem->getTarget();
 
                if (pItem->isScheduledForDeletion())
@@ -388,15 +388,15 @@ static THREAD_RESULT THREAD_CALL StatCollector(void *pArg)
          break;      // Shutdown has arrived
 
       // Get current values
-      pollerQS[currPos] = g_dataCollectionQueue.Size();
-      dbWriterQS[currPos] = g_dbWriterQueue->Size();
-      iDataWriterQS[currPos] = g_dciDataWriterQueue->Size();
-      rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->Size();
-      dbAndIDataWriterQS[currPos] = g_dbWriterQueue->Size() + g_dciDataWriterQueue->Size() + g_dciRawDataWriterQueue->Size();
-      statusPollerQS[currPos] = g_statusPollQueue.Size();
-      configPollerQS[currPos] = g_configPollQueue.Size();
-      syslogProcessingQS[currPos] = g_syslogProcessingQueue.Size();
-      syslogWriterQS[currPos] = g_syslogWriteQueue.Size();
+      pollerQS[currPos] = g_dataCollectionQueue.size();
+      dbWriterQS[currPos] = g_dbWriterQueue->size();
+      iDataWriterQS[currPos] = g_dciDataWriterQueue->size();
+      rawDataWriterQS[currPos] = g_dciRawDataWriterQueue->size();
+      dbAndIDataWriterQS[currPos] = g_dbWriterQueue->size() + g_dciDataWriterQueue->size() + g_dciRawDataWriterQueue->size();
+      statusPollerQS[currPos] = g_statusPollQueue.size();
+      configPollerQS[currPos] = g_configPollQueue.size();
+      syslogProcessingQS[currPos] = g_syslogProcessingQueue.size();
+      syslogWriterQS[currPos] = g_syslogWriteQueue.size();
       currPos++;
       if (currPos == 12)
          currPos = 0;
@@ -435,7 +435,7 @@ THREAD_RESULT THREAD_CALL CacheLoader(void *arg)
    DbgPrintf(2, _T("DCI cache loader thread started"));
    while(true)
    {
-      DCItem *dci = (DCItem *)g_dciCacheLoaderQueue.GetOrBlock();
+      DCItem *dci = (DCItem *)g_dciCacheLoaderQueue.getOrBlock();
       if (dci == INVALID_POINTER_VALUE)
          break;
 
index 53c0507..dac909c 100644 (file)
@@ -59,7 +59,7 @@ void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query)
        rq->query = (TCHAR *)&rq->bindings[0];
        _tcscpy(rq->query, query);
        rq->bindCount = 0;
-   g_dbWriterQueue->Put(rq);
+   g_dbWriterQueue->put(rq);
        DbgPrintf(8, _T("SQL request queued: %s"), query);
 }
 
@@ -97,7 +97,7 @@ void NXCORE_EXPORTABLE QueueSQLRequest(const TCHAR *query, int bindCount, int *s
                        pos += align - pos % align;
        }
 
-   g_dbWriterQueue->Put(rq);
+   g_dbWriterQueue->put(rq);
        DbgPrintf(8, _T("SQL request queued: %s"), query);
 }
 
@@ -111,7 +111,7 @@ void QueueIDataInsert(time_t timestamp, UINT32 nodeId, UINT32 dciId, const TCHAR
        rq->nodeId = nodeId;
        rq->dciId = dciId;
        nx_strncpy(rq->value, value, MAX_RESULT_LENGTH);
-       g_dciDataWriterQueue->Put(rq);
+       g_dciDataWriterQueue->put(rq);
 }
 
 /**
@@ -124,7 +124,7 @@ void QueueRawDciDataUpdate(time_t timestamp, UINT32 dciId, const TCHAR *rawValue
        rq->dciId = dciId;
        nx_strncpy(rq->rawValue, rawValue, MAX_RESULT_LENGTH);
        nx_strncpy(rq->transformedValue, transformedValue, MAX_RESULT_LENGTH);
-       g_dciRawDataWriterQueue->Put(rq);
+       g_dciRawDataWriterQueue->put(rq);
 }
 
 /**
@@ -151,7 +151,7 @@ static THREAD_RESULT THREAD_CALL DBWriteThread(void *arg)
 
    while(1)
    {
-      DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_dbWriterQueue->GetOrBlock();
+      DELAYED_SQL_REQUEST *rq = (DELAYED_SQL_REQUEST *)g_dbWriterQueue->getOrBlock();
       if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
          break;
 
@@ -206,7 +206,7 @@ static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
 
    while(1)
    {
-               DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->GetOrBlock();
+               DELAYED_IDATA_INSERT *rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->getOrBlock();
       if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
          break;
 
@@ -238,7 +238,7 @@ static THREAD_RESULT THREAD_CALL IDataWriteThread(void *arg)
                                if (!success || (count > 1000))
                                        break;
 
-                               rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->Get();
+                               rq = (DELAYED_IDATA_INSERT *)g_dciDataWriterQueue->get();
                                if (rq == NULL)
                                        break;
                                if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
@@ -284,7 +284,7 @@ static THREAD_RESULT THREAD_CALL RawDataWriteThread(void *arg)
 
    while(1)
    {
-               DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->GetOrBlock();
+               DELAYED_RAW_DATA_UPDATE *rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->getOrBlock();
       if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
          break;
 
@@ -314,7 +314,7 @@ static THREAD_RESULT THREAD_CALL RawDataWriteThread(void *arg)
                                if (!success || (count > 1000))
                                        break;
 
-                               rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->Get();
+                               rq = (DELAYED_RAW_DATA_UPDATE *)g_dciRawDataWriterQueue->get();
                                if (rq == NULL)
                                        break;
                                if (rq == INVALID_POINTER_VALUE)   // End-of-job indicator
@@ -367,12 +367,12 @@ void StopDBWriter()
    int i;
 
    for(i = 0; i < m_numWriters; i++)
-      g_dbWriterQueue->Put(INVALID_POINTER_VALUE);
+      g_dbWriterQueue->put(INVALID_POINTER_VALUE);
    for(i = 0; i < m_numWriters; i++)
       ThreadJoin(m_hWriteThreadList[i]);
 
-       g_dciDataWriterQueue->Put(INVALID_POINTER_VALUE);
-       g_dciRawDataWriterQueue->Put(INVALID_POINTER_VALUE);
+       g_dciDataWriterQueue->put(INVALID_POINTER_VALUE);
+       g_dciRawDataWriterQueue->put(INVALID_POINTER_VALUE);
        ThreadJoin(m_hIDataWriterThread);
        ThreadJoin(m_hRawDataWriterThread);
 }
index 3dfbb21..44dc26b 100644 (file)
@@ -1041,7 +1041,7 @@ void DCItem::updateCacheSize(UINT32 dwCondId)
          m_pNode->incRefCount();
          m_requiredCacheSize = dwRequiredSize;
          m_bCacheLoaded = false;
-         g_dciCacheLoaderQueue.Put(this);
+         g_dciCacheLoaderQueue.put(this);
       }
       else
       {
index 8499594..c99228c 100644 (file)
@@ -495,7 +495,7 @@ void DataCollectionTarget::queueItemsForPolling(Queue *pPollerQueue)
       {
          object->setBusyFlag(TRUE);
          incRefCount();   // Increment reference count for each queued DCI
-         pPollerQueue->Put(object);
+         pPollerQueue->put(object);
                        DbgPrintf(8, _T("DataCollectionTarget(%s)->QueueItemsForPolling(): item %d \"%s\" added to queue"), m_name, object->getId(), object->getName());
       }
    }
index c6618f5..e431ce4 100644 (file)
@@ -131,7 +131,7 @@ void ShowServerStats(CONSOLE_CTX console)
 void ShowQueueStats(CONSOLE_CTX console, Queue *pQueue, const TCHAR *pszName)
 {
    if (pQueue != NULL)
-      ConsolePrintf(console, _T("%-32s : %d\n"), pszName, pQueue->Size());
+      ConsolePrintf(console, _T("%-32s : %d\n"), pszName, pQueue->size());
 }
 
 /**
index b78a6f1..601dd23 100644 (file)
@@ -385,7 +385,7 @@ static THREAD_RESULT THREAD_CALL MailerThread(void *pArg)
        DbgPrintf(1, _T("SMTP mailer thread started"));
    while(1)
    {
-      MAIL_ENVELOPE *pEnvelope = (MAIL_ENVELOPE *)m_pMailerQueue->GetOrBlock();
+      MAIL_ENVELOPE *pEnvelope = (MAIL_ENVELOPE *)m_pMailerQueue->getOrBlock();
       if (pEnvelope == INVALID_POINTER_VALUE)
          break;
 
@@ -404,7 +404,7 @@ static THREAD_RESULT THREAD_CALL MailerThread(void *pArg)
                        if (pEnvelope->nRetryCount > 0)
                        {
                                // Try posting again
-                               m_pMailerQueue->Put(pEnvelope);
+                               m_pMailerQueue->put(pEnvelope);
                        }
                        else
                        {
@@ -438,8 +438,8 @@ void InitMailer()
  */
 void ShutdownMailer()
 {
-   m_pMailerQueue->Clear();
-   m_pMailerQueue->Put(INVALID_POINTER_VALUE);
+   m_pMailerQueue->clear();
+   m_pMailerQueue->put(INVALID_POINTER_VALUE);
    if (m_hThread != INVALID_THREAD_HANDLE)
       ThreadJoin(m_hThread);
    delete m_pMailerQueue;
@@ -465,5 +465,5 @@ void NXCORE_EXPORTABLE PostMail(const TCHAR *pszRcpt, const TCHAR *pszSubject, c
    pEnvelope->pszText = _tcsdup(pszText);
 #endif
        pEnvelope->nRetryCount = ConfigReadInt(_T("SMTPRetryCount"), 1);
-   m_pMailerQueue->Put(pEnvelope);
+   m_pMailerQueue->put(pEnvelope);
 }
index c831893..c9e3792 100644 (file)
@@ -877,7 +877,7 @@ static BOOL RealPostEvent(Queue *queue, UINT32 eventCode, UINT32 sourceId,
          pEvent = new Event(pEventTemplate, sourceId, userTag, format, names, args);
 
          // Add new event to queue
-         queue->Put(pEvent);
+         queue->put(pEvent);
 
          bResult = TRUE;
       }
@@ -1082,10 +1082,10 @@ void NXCORE_EXPORTABLE ResendEvents(Queue *queue)
 {
    while(1)
    {
-      void *pEvent = queue->Get();
+      void *pEvent = queue->get();
       if (pEvent == NULL)
          break;
-      g_pEventQueue->Put(pEvent);
+      g_pEventQueue->put(pEvent);
    }
 }
 
index 47b82d8..9f22abb 100644 (file)
@@ -96,7 +96,7 @@ static THREAD_RESULT THREAD_CALL EventLogger(void *arg)
 {
    while(!IsShutdownInProgress())
    {
-      Event *pEvent = (Event *)s_loggerQueue->GetOrBlock();
+      Event *pEvent = (Event *)s_loggerQueue->getOrBlock();
       if (pEvent == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -153,7 +153,7 @@ static THREAD_RESULT THREAD_CALL EventLogger(void *arg)
                                        DBExecute(hStmt);
                                        DbgPrintf(8, _T("EventLogger: DBExecute: id=%d,code=%d"), (int)pEvent->getId(), (int)pEvent->getCode());
                                        delete pEvent;
-                                       pEvent = (Event *)s_loggerQueue->Get();
+                                       pEvent = (Event *)s_loggerQueue->get();
                                } while((pEvent != NULL) && (pEvent != INVALID_POINTER_VALUE));
                                DBFreeStatement(hStmt);
                        }
@@ -181,7 +181,7 @@ THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
        s_threadStormDetector = ThreadCreateEx(EventStormDetector, 0, NULL);
    while(!IsShutdownInProgress())
    {
-      Event *pEvent = (Event *)g_pEventQueue->GetOrBlock();
+      Event *pEvent = (Event *)g_pEventQueue->getOrBlock();
       if (pEvent == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -232,7 +232,7 @@ THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
                // Logger will destroy event object after logging
                if ((pEvent->getFlags() & EF_LOG) && (pEvent->getCode() != EVENT_DB_QUERY_FAILED))
                {
-                       s_loggerQueue->Put(pEvent);
+                       s_loggerQueue->put(pEvent);
                }
                else
       {
@@ -243,7 +243,7 @@ THREAD_RESULT THREAD_CALL EventProcessor(void *arg)
       g_totalEventsProcessed++;
    }
 
-       s_loggerQueue->Put(INVALID_POINTER_VALUE);
+       s_loggerQueue->put(INVALID_POINTER_VALUE);
        ThreadJoin(s_threadStormDetector);
        ThreadJoin(s_threadLogger);
        delete s_loggerQueue;
index 3bf8b45..6e8d1f8 100644 (file)
@@ -934,7 +934,7 @@ void NXCORE_EXPORTABLE Shutdown()
        ConditionSet(m_condShutdown);
 
    // Stop DCI cache loading thread
-   g_dciCacheLoaderQueue.SetShutdownMode();
+   g_dciCacheLoaderQueue.setShutdownMode();
 
 #if XMPP_SUPPORTED
    StopXMPPConnector();
@@ -948,8 +948,8 @@ void NXCORE_EXPORTABLE Shutdown()
 #endif
 
        // Stop event processor
-       g_pEventQueue->Clear();
-       g_pEventQueue->Put(INVALID_POINTER_VALUE);
+       g_pEventQueue->clear();
+       g_pEventQueue->put(INVALID_POINTER_VALUE);
 
        ShutdownMailer();
        ShutdownSMSSender();
index e5f768e..90d4ccb 100644 (file)
@@ -207,20 +207,20 @@ void MobileDeviceSession::readThread()
                }
                else
       {
-         m_pMessageQueue->Put(msg);
+         m_pMessageQueue->put(msg);
       }
    }
 
    // Notify other threads to exit
    NXCP_MESSAGE *rawMsg;
-       while((rawMsg = (NXCP_MESSAGE *)m_pSendQueue->Get()) != NULL)
+       while((rawMsg = (NXCP_MESSAGE *)m_pSendQueue->get()) != NULL)
                free(rawMsg);
-   m_pSendQueue->Put(INVALID_POINTER_VALUE);
+   m_pSendQueue->put(INVALID_POINTER_VALUE);
 
    NXCPMessage *msg;
-       while((msg = (NXCPMessage *)m_pMessageQueue->Get()) != NULL)
+       while((msg = (NXCPMessage *)m_pMessageQueue->get()) != NULL)
                delete msg;
-   m_pMessageQueue->Put(INVALID_POINTER_VALUE);
+   m_pMessageQueue->put(INVALID_POINTER_VALUE);
 
    // Wait for other threads to finish
    ThreadJoin(m_hWriteThread);
@@ -252,7 +252,7 @@ void MobileDeviceSession::writeThread()
 
    while(1)
    {
-      pRawMsg = (NXCP_MESSAGE *)m_pSendQueue->GetOrBlock();
+      pRawMsg = (NXCP_MESSAGE *)m_pSendQueue->getOrBlock();
       if (pRawMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -299,7 +299,7 @@ void MobileDeviceSession::processingThread()
 
    while(1)
    {
-      msg = (NXCPMessage *)m_pMessageQueue->GetOrBlock();
+      msg = (NXCPMessage *)m_pMessageQueue->getOrBlock();
       if (msg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
index 61ec882..1a9379f 100644 (file)
@@ -1380,7 +1380,7 @@ restart_agent_check:
                                        // Clear delayed event queue
                                        while(1)
                                        {
-                                               Event *pEvent = (Event *)pQueue->Get();
+                                               Event *pEvent = (Event *)pQueue->get();
                                                if (pEvent == NULL)
                                                        break;
                                                delete pEvent;
index dfc697d..53b8354 100644 (file)
@@ -700,7 +700,7 @@ THREAD_RESULT THREAD_CALL NodePoller(void *arg)
 
    while(!IsShutdownInProgress())
    {
-      pInfo = (NEW_NODE *)g_nodePollerQueue.GetOrBlock();
+      pInfo = (NEW_NODE *)g_nodePollerQueue.getOrBlock();
       if (pInfo == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator received
 
index 2b1bcbe..eeefa5d 100644 (file)
@@ -86,7 +86,7 @@ static THREAD_RESULT THREAD_CALL ApplyTemplateThread(void *pArg)
        DbgPrintf(1, _T("Apply template thread started"));
    while(1)
    {
-      TEMPLATE_UPDATE_INFO *pInfo = (TEMPLATE_UPDATE_INFO *)g_pTemplateUpdateQueue->GetOrBlock();
+      TEMPLATE_UPDATE_INFO *pInfo = (TEMPLATE_UPDATE_INFO *)g_pTemplateUpdateQueue->getOrBlock();
       if (pInfo == INVALID_POINTER_VALUE)
          break;
 
@@ -139,7 +139,7 @@ static THREAD_RESULT THREAD_CALL ApplyTemplateThread(void *pArg)
       else
       {
                        DbgPrintf(8, _T("ApplyTemplateThread: failed"));
-         g_pTemplateUpdateQueue->Put(pInfo);    // Requeue
+         g_pTemplateUpdateQueue->put(pInfo);    // Requeue
          ThreadSleepMs(500);
       }
    }
index c8c738e..33bc40c 100644 (file)
@@ -161,7 +161,7 @@ static THREAD_RESULT THREAD_CALL DeploymentThread(void *pArg)
    while(1)
    {
       // Get node object for upgrade
-      pNode = (Node *)pStartup->pQueue->Get();
+      pNode = (Node *)pStartup->pQueue->get();
       if (pNode == NULL)
          break;   // Queue is empty, exit
 
@@ -342,7 +342,7 @@ THREAD_RESULT THREAD_CALL DeploymentManager(void *pArg)
    msg.setId(pStartup->dwRqId);
    for(i = 0; i < pStartup->nodeList->size(); i++)
    {
-      pQueue->Put(pStartup->nodeList->get(i));
+      pQueue->put(pStartup->nodeList->get(i));
       msg.setField(VID_OBJECT_ID, pStartup->nodeList->get(i)->getId());
       msg.setField(VID_DEPLOYMENT_STATUS, (WORD)DEPLOYMENT_STATUS_PENDING);
       pStartup->pSession->sendMessage(&msg);
index a9c3a90..61d8fab 100644 (file)
@@ -290,7 +290,7 @@ static THREAD_RESULT THREAD_CALL StatusPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pObject = (NetObj *)g_statusPollQueue.GetOrBlock();
+      pObject = (NetObj *)g_statusPollQueue.getOrBlock();
       if (pObject == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -335,7 +335,7 @@ static THREAD_RESULT THREAD_CALL ConfigurationPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_configPollQueue.GetOrBlock();
+      pNode = (Node *)g_configPollQueue.getOrBlock();
       if (pNode == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -369,7 +369,7 @@ static THREAD_RESULT THREAD_CALL InstanceDiscoveryPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_instancePollQueue.GetOrBlock();
+      pNode = (Node *)g_instancePollQueue.getOrBlock();
       if (pNode == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -403,7 +403,7 @@ static THREAD_RESULT THREAD_CALL RoutePoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_routePollQueue.GetOrBlock();
+      pNode = (Node *)g_routePollQueue.getOrBlock();
       if (pNode == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -460,7 +460,7 @@ static void CheckPotentialNode(Node *node, const InetAddress& ipAddr, UINT32 ifI
                                           memcpy(pInfo->bMacAddr, macAddr, MAC_ADDR_LENGTH);
                                   DbgPrintf(5, _T("DiscoveryPoller(): new node queued: %s/%d"),
                                             pInfo->ipAddr.toString(buffer), pInfo->ipAddr.getMaskBits());
-               g_nodePollerQueue.Put(pInfo);
+               g_nodePollerQueue.put(pInfo);
             }
                           else
                           {
@@ -526,7 +526,7 @@ static THREAD_RESULT THREAD_CALL DiscoveryPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pNode = (Node *)g_discoveryPollQueue.GetOrBlock();
+      pNode = (Node *)g_discoveryPollQueue.getOrBlock();
       if (pNode == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -573,8 +573,8 @@ static THREAD_RESULT THREAD_CALL DiscoveryPoller(void *arg)
       pNode->setDiscoveryPollTimeStamp();
       pNode->decRefCount();
    }
-   g_nodePollerQueue.Clear();
-   g_nodePollerQueue.Put(INVALID_POINTER_VALUE);
+   g_nodePollerQueue.clear();
+   g_nodePollerQueue.put(INVALID_POINTER_VALUE);
    SetPollerState((long)arg, _T("finished"));
    return THREAD_OK;
 }
@@ -595,7 +595,7 @@ static THREAD_RESULT THREAD_CALL ConditionPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      pCond = (Condition *)g_conditionPollerQueue.GetOrBlock();
+      pCond = (Condition *)g_conditionPollerQueue.getOrBlock();
       if (pCond == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -626,7 +626,7 @@ static THREAD_RESULT THREAD_CALL TopologyPoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-      Node *node = (Node *)g_topologyPollQueue.GetOrBlock();
+      Node *node = (Node *)g_topologyPollQueue.getOrBlock();
       if (node == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -657,7 +657,7 @@ static THREAD_RESULT THREAD_CALL BusinessServicePoller(void *arg)
    while(!IsShutdownInProgress())
    {
       SetPollerState((long)arg, _T("wait"));
-               BusinessService *service = (BusinessService *)g_businessServicePollerQueue.GetOrBlock();
+               BusinessService *service = (BusinessService *)g_businessServicePollerQueue.getOrBlock();
       if (service == INVALID_POINTER_VALUE)
          break;   // Shutdown indicator
 
@@ -715,7 +715,7 @@ static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
                                                pInfo->zoneId = 0;      /* FIXME: add correct zone ID */
                                                pInfo->ignoreFilter = FALSE;
                                                memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
-                  g_nodePollerQueue.Put(pInfo);
+                  g_nodePollerQueue.put(pInfo);
                }
             }
             else
@@ -727,7 +727,7 @@ static void CheckRange(int nType, UINT32 dwAddr1, UINT32 dwAddr2)
                                        pInfo->zoneId = 0;      /* FIXME: add correct zone ID */
                                        pInfo->ignoreFilter = FALSE;
                                        memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
-               g_nodePollerQueue.Put(pInfo);
+               g_nodePollerQueue.put(pInfo);
             }
          }
       }
@@ -794,42 +794,42 @@ static void QueueForPolling(NetObj *object, void *data)
                                        node->incRefCount();
                                        node->lockForConfigurationPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for configuration poll"), (int)node->getId(), node->getName());
-                                       g_configPollQueue.Put(node);
+                                       g_configPollQueue.put(node);
                                }
                                if (node->isReadyForInstancePoll())
                                {
                                        node->incRefCount();
                                        node->lockForInstancePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for instance discovery poll"), (int)node->getId(), node->getName());
-                                       g_instancePollQueue.Put(node);
+                                       g_instancePollQueue.put(node);
                                }
                                if (node->isReadyForStatusPoll())
                                {
                                        node->incRefCount();
                                        node->lockForStatusPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for status poll"), (int)node->getId(), node->getName());
-                                       g_statusPollQueue.Put(node);
+                                       g_statusPollQueue.put(node);
                                }
                                if (node->isReadyForRoutePoll())
                                {
                                        node->incRefCount();
                                        node->lockForRoutePoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for routing table poll"), (int)node->getId(), node->getName());
-                                       g_routePollQueue.Put(node);
+                                       g_routePollQueue.put(node);
                                }
                                if (node->isReadyForDiscoveryPoll())
                                {
                                        node->incRefCount();
                                        node->lockForDiscoveryPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for discovery poll"), (int)node->getId(), node->getName());
-                                       g_discoveryPollQueue.Put(node);
+                                       g_discoveryPollQueue.put(node);
                                }
                                if (node->isReadyForTopologyPoll())
                                {
                                        node->incRefCount();
                                        node->lockForTopologyPoll();
                                        DbgPrintf(6, _T("Node %d \"%s\" queued for topology poll"), (int)node->getId(), node->getName());
-                                       g_topologyPollQueue.Put(node);
+                                       g_topologyPollQueue.put(node);
                                }
                        }
                        break;
@@ -840,7 +840,7 @@ static void QueueForPolling(NetObj *object, void *data)
                                {
                                        cond->lockForPoll();
                                        DbgPrintf(6, _T("Condition %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_conditionPollerQueue.Put(cond);
+                                       g_conditionPollerQueue.put(cond);
                                }
                        }
                        break;
@@ -852,7 +852,7 @@ static void QueueForPolling(NetObj *object, void *data)
                                        cluster->incRefCount();
                                        cluster->lockForStatusPoll();
                                        DbgPrintf(6, _T("Cluster %d \"%s\" queued for status poll"), (int)cluster->getId(), cluster->getName());
-                                       g_statusPollQueue.Put(cluster);
+                                       g_statusPollQueue.put(cluster);
                                }
                        }
                        break;
@@ -864,7 +864,7 @@ static void QueueForPolling(NetObj *object, void *data)
                                        service->incRefCount();
                                        service->lockForPolling();
                                        DbgPrintf(6, _T("Business service %d \"%s\" queued for poll"), (int)object->getId(), object->getName());
-                                       g_businessServicePollerQueue.Put(service);
+                                       g_businessServicePollerQueue.put(service);
                                }
                        }
                        break;
@@ -958,29 +958,29 @@ THREAD_RESULT THREAD_CALL PollManager(void *pArg)
    }
 
    // Send stop signal to all pollers
-   g_statusPollQueue.Clear();
-   g_statusPollQueue.SetShutdownMode();
+   g_statusPollQueue.clear();
+   g_statusPollQueue.setShutdownMode();
 
-   g_configPollQueue.Clear();
-   g_configPollQueue.SetShutdownMode();
+   g_configPollQueue.clear();
+   g_configPollQueue.setShutdownMode();
 
-   g_instancePollQueue.Clear();
-   g_instancePollQueue.SetShutdownMode();
+   g_instancePollQueue.clear();
+   g_instancePollQueue.setShutdownMode();
 
-   g_discoveryPollQueue.Clear();
-   g_discoveryPollQueue.SetShutdownMode();
+   g_discoveryPollQueue.clear();
+   g_discoveryPollQueue.setShutdownMode();
 
-   g_routePollQueue.Clear();
-   g_routePollQueue.SetShutdownMode();
+   g_routePollQueue.clear();
+   g_routePollQueue.setShutdownMode();
 
-   g_conditionPollerQueue.Clear();
-   g_conditionPollerQueue.SetShutdownMode();
+   g_conditionPollerQueue.clear();
+   g_conditionPollerQueue.setShutdownMode();
 
-   g_topologyPollQueue.Clear();
-   g_topologyPollQueue.SetShutdownMode();
+   g_topologyPollQueue.clear();
+   g_topologyPollQueue.setShutdownMode();
 
-   g_businessServicePollerQueue.Clear();
-   g_businessServicePollerQueue.SetShutdownMode();
+   g_businessServicePollerQueue.clear();
+   g_businessServicePollerQueue.setShutdownMode();
 
    DbgPrintf(1, _T("PollManager: main thread terminated"));
    return THREAD_OK;
@@ -995,7 +995,7 @@ void ResetDiscoveryPoller()
    NEW_NODE *pInfo;
 
    // Clear queues
-   while((pNode = (Node *)g_discoveryPollQueue.Get()) != NULL)
+   while((pNode = (Node *)g_discoveryPollQueue.get()) != NULL)
    {
       if (pNode != INVALID_POINTER_VALUE)
       {
@@ -1003,7 +1003,7 @@ void ResetDiscoveryPoller()
          pNode->decRefCount();
       }
    }
-   while((pInfo = (NEW_NODE *)g_nodePollerQueue.Get()) != NULL)
+   while((pInfo = (NEW_NODE *)g_nodePollerQueue.get()) != NULL)
    {
       if (pInfo != INVALID_POINTER_VALUE)
          free(pInfo);
index 4c915a0..dbc42e7 100644 (file)
@@ -528,7 +528,7 @@ void ClientSession::readThread()
                        }
                        else
          {
-            m_pMessageQueue->Put(msg);
+            m_pMessageQueue->put(msg);
          }
       }
    }
@@ -537,19 +537,19 @@ void ClientSession::readThread()
    m_dwFlags |= CSF_TERMINATED;
 
        // Finish update thread first
-   m_pUpdateQueue->Put(INVALID_POINTER_VALUE);
+   m_pUpdateQueue->put(INVALID_POINTER_VALUE);
    ThreadJoin(m_hUpdateThread);
 
    // Notify other threads to exit
    NXCP_MESSAGE *rawMsg;
-   while((rawMsg = (NXCP_MESSAGE *)m_pSendQueue->Get()) != NULL)
+   while((rawMsg = (NXCP_MESSAGE *)m_pSendQueue->get()) != NULL)
       free(rawMsg);
-   m_pSendQueue->Put(INVALID_POINTER_VALUE);
+   m_pSendQueue->put(INVALID_POINTER_VALUE);
 
    NXCPMessage *msg;
-       while((msg = (NXCPMessage *)m_pMessageQueue->Get()) != NULL)
+       while((msg = (NXCPMessage *)m_pMessageQueue->get()) != NULL)
                delete msg;
-   m_pMessageQueue->Put(INVALID_POINTER_VALUE);
+   m_pMessageQueue->put(INVALID_POINTER_VALUE);
 
    // Wait for other threads to finish
    ThreadJoin(m_hWriteThread);
@@ -603,7 +603,7 @@ void ClientSession::writeThread()
 {
    while(true)
    {
-      NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)m_pSendQueue->GetOrBlock();
+      NXCP_MESSAGE *rawMsg = (NXCP_MESSAGE *)m_pSendQueue->getOrBlock();
       if (rawMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -622,7 +622,7 @@ void ClientSession::updateThread()
 
    while(1)
    {
-      pUpdate = (UPDATE_INFO *)m_pUpdateQueue->GetOrBlock();
+      pUpdate = (UPDATE_INFO *)m_pUpdateQueue->getOrBlock();
       if (pUpdate == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -740,7 +740,7 @@ void ClientSession::processingThread()
 
    while(1)
    {
-      pMsg = (NXCPMessage *)m_pMessageQueue->GetOrBlock();
+      pMsg = (NXCPMessage *)m_pMessageQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)    // Session termination indicator
          break;
 
@@ -2597,7 +2597,7 @@ void ClientSession::onNewEvent(Event *pEvent)
          msg->setCode(CMD_EVENTLOG_RECORDS);
          pEvent->prepareMessage(msg);
          pUpdate->pData = msg;
-         m_pUpdateQueue->Put(pUpdate);
+         m_pUpdateQueue->put(pUpdate);
       }
    }
 }
@@ -2616,7 +2616,7 @@ void ClientSession::onObjectChange(NetObj *object)
          pUpdate->dwCategory = INFO_CAT_OBJECT_CHANGE;
          pUpdate->pData = object;
          object->incRefCount();
-         m_pUpdateQueue->Put(pUpdate);
+         m_pUpdateQueue->put(pUpdate);
       }
 }
 
@@ -5054,7 +5054,7 @@ void ClientSession::onAlarmUpdate(UINT32 dwCode, NXC_ALARM *pAlarm)
             pUpdate->dwCategory = INFO_CAT_ALARM;
             pUpdate->dwCode = dwCode;
             pUpdate->pData = nx_memdup(pAlarm, sizeof(NXC_ALARM));
-            m_pUpdateQueue->Put(pUpdate);
+            m_pUpdateQueue->put(pUpdate);
          }
    }
 }
@@ -5700,7 +5700,7 @@ void ClientSession::onActionDBUpdate(UINT32 dwCode, NXC_ACTION *pAction)
          pUpdate->dwCategory = INFO_CAT_ACTION;
          pUpdate->dwCode = dwCode;
          pUpdate->pData = nx_memdup(pAction, sizeof(NXC_ACTION));
-         m_pUpdateQueue->Put(pUpdate);
+         m_pUpdateQueue->put(pUpdate);
       }
    }
 }
@@ -7975,16 +7975,16 @@ void ClientSession::sendServerStats(UINT32 dwRqId)
 #endif
 
        // Queues
-       msg.setField(VID_QSIZE_CONDITION_POLLER, g_conditionPollerQueue.Size());
-       msg.setField(VID_QSIZE_CONF_POLLER, g_configPollQueue.Size());
-       msg.setField(VID_QSIZE_DCI_POLLER, g_dataCollectionQueue.Size());
-       msg.setField(VID_QSIZE_DCI_CACHE_LOADER, g_dciCacheLoaderQueue.Size());
-       msg.setField(VID_QSIZE_DBWRITER, g_dbWriterQueue->Size());
-       msg.setField(VID_QSIZE_EVENT, g_pEventQueue->Size());
-       msg.setField(VID_QSIZE_DISCOVERY, g_discoveryPollQueue.Size());
-       msg.setField(VID_QSIZE_NODE_POLLER, g_nodePollerQueue.Size());
-       msg.setField(VID_QSIZE_ROUTE_POLLER, g_routePollQueue.Size());
-       msg.setField(VID_QSIZE_STATUS_POLLER, g_statusPollQueue.Size());
+       msg.setField(VID_QSIZE_CONDITION_POLLER, g_conditionPollerQueue.size());
+       msg.setField(VID_QSIZE_CONF_POLLER, g_configPollQueue.size());
+       msg.setField(VID_QSIZE_DCI_POLLER, g_dataCollectionQueue.size());
+       msg.setField(VID_QSIZE_DCI_CACHE_LOADER, g_dciCacheLoaderQueue.size());
+       msg.setField(VID_QSIZE_DBWRITER, g_dbWriterQueue->size());
+       msg.setField(VID_QSIZE_EVENT, g_pEventQueue->size());
+       msg.setField(VID_QSIZE_DISCOVERY, g_discoveryPollQueue.size());
+       msg.setField(VID_QSIZE_NODE_POLLER, g_nodePollerQueue.size());
+       msg.setField(VID_QSIZE_ROUTE_POLLER, g_routePollQueue.size());
+       msg.setField(VID_QSIZE_STATUS_POLLER, g_statusPollQueue.size());
 
    // Send response
    sendMessage(&msg);
@@ -8318,7 +8318,7 @@ void ClientSession::onSyslogMessage(NX_SYSLOG_RECORD *pRec)
          pUpdate = (UPDATE_INFO *)malloc(sizeof(UPDATE_INFO));
          pUpdate->dwCategory = INFO_CAT_SYSLOG_MSG;
          pUpdate->pData = nx_memdup(pRec, sizeof(NX_SYSLOG_RECORD));
-         m_pUpdateQueue->Put(pUpdate);
+         m_pUpdateQueue->put(pUpdate);
       }
    }
 }
@@ -8451,7 +8451,7 @@ void ClientSession::onNewSNMPTrap(NXCPMessage *pMsg)
          pUpdate = (UPDATE_INFO *)malloc(sizeof(UPDATE_INFO));
          pUpdate->dwCategory = INFO_CAT_SNMP_TRAP;
          pUpdate->pData = new NXCPMessage(pMsg);
-         m_pUpdateQueue->Put(pUpdate);
+         m_pUpdateQueue->put(pUpdate);
       }
    }
 }
@@ -9784,7 +9784,7 @@ void ClientSession::exportConfiguration(NXCPMessage *pRequest)
                        // Close document
                        str += _T("</configuration>\n");
 
-         // Put result into message
+         // put result into message
          msg.setField(VID_RCC, RCC_SUCCESS);
          msg.setField(VID_NXMP_CONTENT, (const TCHAR *)str);
       }
@@ -10875,7 +10875,7 @@ void ClientSession::onSituationChange(NXCPMessage *msg)
       pUpdate = (UPDATE_INFO *)malloc(sizeof(UPDATE_INFO));
       pUpdate->dwCategory = INFO_CAT_SITUATION;
       pUpdate->pData = new NXCPMessage(msg);
-      m_pUpdateQueue->Put(pUpdate);
+      m_pUpdateQueue->put(pUpdate);
    }
 }
 
@@ -10909,7 +10909,7 @@ void ClientSession::registerAgent(NXCPMessage *pRequest)
             info->ipAddr = InetAddress::createFromSockaddr(m_clientAddr);
                                info->zoneId = 0;       // Add to default zone
                                info->ignoreFilter = TRUE;              // Ignore discovery filters and add node anyway
-                               g_nodePollerQueue.Put(info);
+                               g_nodePollerQueue.put(info);
                        }
                        msg.setField(VID_RCC, RCC_SUCCESS);
                }
@@ -11288,7 +11288,7 @@ void ClientSession::cancelJob(NXCPMessage *request)
 }
 
 /**
- * Put server job on hold
+ * put server job on hold
  */
 void ClientSession::holdJob(NXCPMessage *request)
 {
@@ -11985,7 +11985,7 @@ void ClientSession::onLibraryImageChange(uuid_t *guid, bool removed)
     info->guid = (uuid_t *)(nx_memdup(guid, UUID_LENGTH));
     info->removed = removed;
     pUpdate->pData = info;
-    m_pUpdateQueue->Put(pUpdate);
+    m_pUpdateQueue->put(pUpdate);
   }
 }
 
index 6630e88..c1d5100 100644 (file)
@@ -48,7 +48,7 @@ static THREAD_RESULT THREAD_CALL SenderThread(void *pArg)
 
    while(1)
    {
-      pMsg = (SMS *)m_pMsgQueue->GetOrBlock();
+      pMsg = (SMS *)m_pMsgQueue->getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)
          break;
 
@@ -129,8 +129,8 @@ void ShutdownSMSSender()
 {
    if (m_pMsgQueue != NULL)
    {
-      m_pMsgQueue->Clear();
-      m_pMsgQueue->Put(INVALID_POINTER_VALUE);
+      m_pMsgQueue->clear();
+      m_pMsgQueue->put(INVALID_POINTER_VALUE);
       if (m_hThread != INVALID_THREAD_HANDLE)
          ThreadJoin(m_hThread);
       delete m_pMsgQueue;
@@ -148,5 +148,5 @@ void NXCORE_EXPORTABLE PostSMS(const TCHAR *pszRcpt, const TCHAR *pszText)
        SMS *pMsg = (SMS *)malloc(sizeof(SMS));
        nx_strncpy(pMsg->szRcpt, pszRcpt, MAX_RCPT_ADDR_LEN);
        nx_strncpy(pMsg->szText, pszText, 160);
-       m_pMsgQueue->Put(pMsg);
+       m_pMsgQueue->put(pMsg);
 }
index a8b0d0e..1990fb2 100644 (file)
@@ -420,7 +420,7 @@ void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, int srcPort, SNMP_Tr
                                pInfo->zoneId = 0;      /* FIXME: add correct zone ID */
                                pInfo->ignoreFilter = FALSE;
                                memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
-            g_nodePollerQueue.Put(pInfo);
+            g_nodePollerQueue.put(pInfo);
          }
       }
       else
@@ -432,7 +432,7 @@ void ProcessTrap(SNMP_PDU *pdu, const InetAddress& srcAddr, int srcPort, SNMP_Tr
                        pInfo->zoneId = 0;      /* FIXME: add correct zone ID */
                        pInfo->ignoreFilter = FALSE;
                        memset(pInfo->bMacAddr, 0, MAC_ADDR_LENGTH);
-         g_nodePollerQueue.Put(pInfo);
+         g_nodePollerQueue.put(pInfo);
       }
    }
    else  // unknown node, discovery disabled
index b13a77e..bf7c9e8 100644 (file)
@@ -325,7 +325,7 @@ static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
    DbgPrintf(1, _T("Syslog writer thread started"));
    while(true)
    {
-      NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.GetOrBlock();
+      NX_SYSLOG_RECORD *r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.getOrBlock();
       if (r == INVALID_POINTER_VALUE)
          break;
 
@@ -366,7 +366,7 @@ static THREAD_RESULT THREAD_CALL SyslogWriterThread(void *arg)
          count++;
          if (count == 1000)
             break;
-         r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.Get();
+         r = (NX_SYSLOG_RECORD *)g_syslogWriteQueue.get();
          if ((r == NULL) || (r == INVALID_POINTER_VALUE))
             break;
       }
@@ -393,7 +393,7 @@ static void ProcessSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
       record.qwMsgId = s_msgId++;
       Node *node = BindMsgToNode(&record, dwSourceIP);
 
-      g_syslogWriteQueue.Put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
+      g_syslogWriteQueue.put(nx_memdup(&record, sizeof(NX_SYSLOG_RECORD)));
 
       // Send message to all connected clients
       EnumerateClientSessions(BroadcastSyslogMessage, &record);
@@ -433,7 +433,7 @@ static THREAD_RESULT THREAD_CALL SyslogProcessingThread(void *pArg)
 
    while(1)
    {
-      pMsg = (QUEUED_SYSLOG_MESSAGE *)g_syslogProcessingQueue.GetOrBlock();
+      pMsg = (QUEUED_SYSLOG_MESSAGE *)g_syslogProcessingQueue.getOrBlock();
       if (pMsg == INVALID_POINTER_VALUE)
          break;
 
@@ -455,7 +455,7 @@ static void QueueSyslogMessage(char *psMsg, int nMsgLen, UINT32 dwSourceIP)
    pMsg->dwSourceIP = dwSourceIP;
    pMsg->nBytes = nMsgLen;
    pMsg->psMsg = (char *)nx_memdup(psMsg, nMsgLen + 1);
-   g_syslogProcessingQueue.Put(pMsg);
+   g_syslogProcessingQueue.put(pMsg);
 }
 
 /**
@@ -645,11 +645,11 @@ THREAD_RESULT THREAD_CALL SyslogDaemon(void *pArg)
    }
 
    // Stop processing thread
-   g_syslogProcessingQueue.Put(INVALID_POINTER_VALUE);
+   g_syslogProcessingQueue.put(INVALID_POINTER_VALUE);
    ThreadJoin(hProcessingThread);
 
    // Stop writer thread - it must be done after processing thread already finished
-   g_syslogWriteQueue.Put(INVALID_POINTER_VALUE);
+   g_syslogWriteQueue.put(INVALID_POINTER_VALUE);
    ThreadJoin(hWriterThread);
 
        delete s_parser;
index 08b38dc..6b0661e 100644 (file)
@@ -937,7 +937,7 @@ void Template::queueUpdate()
          pInfo->pTemplate = this;
          pInfo->targetId = m_pChildList[i]->getId();
          pInfo->removeDCI = false;
-         g_pTemplateUpdateQueue->Put(pInfo);
+         g_pTemplateUpdateQueue->put(pInfo);
       }
    unlockProperties();
 }
@@ -954,7 +954,7 @@ void Template::queueRemoveFromTarget(UINT32 targetId, bool removeDCI)
    pInfo->pTemplate = this;
    pInfo->targetId = targetId;
    pInfo->removeDCI = removeDCI;
-   g_pTemplateUpdateQueue->Put(pInfo);
+   g_pTemplateUpdateQueue->put(pInfo);
    unlockProperties();
 }
 
index bc2c54d..46820d9 100644 (file)
@@ -360,7 +360,7 @@ public:
 
    void run();
 
-   void postMessage(NXCPMessage *pMsg) { m_pSendQueue->Put(pMsg->createMessage()); }
+   void postMessage(NXCPMessage *pMsg) { m_pSendQueue->put(pMsg->createMessage()); }
    void sendMessage(NXCPMessage *pMsg);
 
        int getId() { return m_id; }
@@ -699,7 +699,7 @@ public:
 
    void run();
 
-   void postMessage(NXCPMessage *pMsg) { m_pSendQueue->Put(pMsg->createMessage()); }
+   void postMessage(NXCPMessage *pMsg) { m_pSendQueue->put(pMsg->createMessage()); }
    void sendMessage(NXCPMessage *pMsg);
    void sendRawMessage(NXCP_MESSAGE *pMsg);
    void sendPollerMsg(UINT32 dwRqId, const TCHAR *pszMsg);
@@ -741,7 +741,7 @@ public:
    void kill();
    void notify(UINT32 dwCode, UINT32 dwData = 0);
 
-       void queueUpdate(UPDATE_INFO *pUpdate) { m_pUpdateQueue->Put(pUpdate); }
+       void queueUpdate(UPDATE_INFO *pUpdate) { m_pUpdateQueue->put(pUpdate); }
    void onNewEvent(Event *pEvent);
    void onSyslogMessage(NX_SYSLOG_RECORD *pRec);
    void onNewSNMPTrap(NXCPMessage *pMsg);
index 45da0d4..b88ff0e 100644 (file)
@@ -1,5 +1,6 @@
 #include <nms_common.h>
 #include <nms_util.h>
+#include <nxqueue.h>
 #include <testtools.h>
 
 void TestMsgWaitQueue();
@@ -344,6 +345,42 @@ static void TestItoa()
    EndTest();
 }
 
+/**
+ * Test queue
+ */
+static void TestQueue()
+{
+   Queue *q = new Queue(16, 16);
+
+   StartTest(_T("Queue: put/get"));
+   for(int i = 0; i < 40; i++)
+      q->put(CAST_TO_POINTER(i + 1, void *));
+   AssertEquals(q->size(), 40);
+   AssertEquals(q->allocated(), 48);
+   for(int i = 0; i < 40; i++)
+   {
+      void *p = q->get();
+      AssertNotNull(p);
+      AssertEquals(CAST_FROM_POINTER(p, int), i + 1);
+   }
+   EndTest();
+
+   StartTest(_T("Queue: shrink"));
+   for(int i = 0; i < 60; i++)
+      q->put(CAST_TO_POINTER(i + 1, void *));
+   AssertEquals(q->size(), 60);
+   AssertEquals(q->allocated(), 64);
+   for(int i = 0; i < 55; i++)
+   {
+      void *p = q->get();
+      AssertNotNull(p);
+      AssertEquals(CAST_FROM_POINTER(p, int), i + 1);
+   }
+   AssertEquals(q->size(), 5);
+   AssertEquals(q->allocated(), 16);
+   EndTest();
+}
+
 /**
  * main()
  */
@@ -361,5 +398,6 @@ int main(int argc, char *argv[])
    TestMsgWaitQueue();
    TestInetAddress();
    TestItoa();
+   TestQueue();
    return 0;
 }