基于类成员函数回调的线程池-连接池-任务队列

在现实生活中,经常会用到线程池与队列的应用,所以写了一套可复利用接口。

其中优先级队列的蓝本从stl中复制出来的,唯一修改的地方就是添加了一个写锁,当top时加锁,pop后才解锁,保证真正不重复取出或丢失数据。
因为使用模板来构造,所以目前只支持类成员函数的回调。
与线程池共同使用时,外部调用时,需要new一个任务类来添加到队列中,线程池主函数会调用完成后自动delete析构。

此基于成员函数回调的线程池需要同优先级队列共同使用,原理是初始化一个初始为5个线程的线程池,并将自己的控制线程添加到最高优先级启动。控制线程的目的是当任务队列比较多时增加线程来增加效率,当任务队列少时减速线程来节约资源(目前未实现)

连接池,需要同线程池与任务队列共同使用,同样为先初始化一定数量的连接,供多线程调度使用,其中包含一个控制线程,来控制连接池中的连接数量,当取出一个连接来用时,会给取出的连接加锁,所以使用完后需要调用返回函数,来归还使用完的连接。

连接池的作用就是不必每次都创建连接使用完成后断开连接,而使用模板来构造扩展性更强,所以需要自己额外来写创建连接,与断开连接函数。

需要注意连接池需要在线程之前析构。

任务队列文件代码:

/*******************************************************************************
 * Author : huwq
 * Email : sohunjug@hotmail.com
 * Last modified : 2013-11-18 10:46
 * Filename : CTaskQueue.h
 * Description : 
 * *****************************************************************************/
#ifndef __TASH_QUEUE_H__
#define __TASH_QUEUE_H__
#include <queue>
#include <vector>
#include <pthread.h>
#include <new>

namespace QUEUE
{
    enum Priority
    {
        Urgent = 1,
        Highest,
        High,
        Middle,
        Low,
        Minimum
    };

    class CTaskBase
    {
        public:
            Priority priority;
            CTaskBase()
            {
                is_virtual = 0;
                argv = NULL;
            }
            virtual ~CTaskBase(){}

            virtual int Start()
            {
                return 0;
            }

            bool operator () (CTaskBase* &a, CTaskBase* &b) 
            {
                return !((*a) < (*b));
            }

            bool operator () (CTaskBase &a, CTaskBase &b) 
            {
                return !(a < b);
            }
            bool operator < (const CTaskBase &a) const
            {
                return (bool)((int)priority < (int)a.priority);
            }

        protected:
            void *argv;
            int  is_virtual;
    };

    template <class Type>
        class CTaskQueue : public CTaskBase
    {
        public:
            union
            {
                int (Type::*func)(void*);
                int (*CallBack)(Type*, void*);
            }func;
            CTaskQueue()
            {
                ClassName = NULL;
                func.func = NULL;
            }
            ~CTaskQueue()
            {
                Clear();
            }

            void Clear()
            {
                argv = NULL;
                ClassName = NULL;
                func.func = NULL;
            }

            int SetFunc(Priority priority, Type* ClassName, int (Type::*Func)(void*), void* argv)
            {
                this->priority = priority;
                this->ClassName = ClassName;
                this->argv = argv;
                this->func.func = Func;
                return 1;
            }
            int Start()
            {
                //return func.CallBack(ClassName, argv);
                return (ClassName->*(func.func))(argv);
            }

        private:
            Type* ClassName;

    };

    template<typename _Tp, typename _Sequence = std::vector<_Tp>,
        typename _Compare  = std::less<typename _Sequence::value_type> >
            class priority_queue
            {
                // concept requirements
                pthread_rwlock_t m_mutex;

                public:
                typedef typename _Sequence::value_type                value_type;
                typedef typename _Sequence::reference                 reference;
                typedef typename _Sequence::const_reference           const_reference;
                typedef typename _Sequence::size_type                 size_type;
                typedef          _Sequence                            container_type;

                protected:
                //  See queue::c for notes on these names.
                _Sequence  c;
                _Compare   comp;

                public:
                explicit priority_queue(const _Compare& __x = _Compare(), const _Sequence& __s = _Sequence())
                    : c(__s), comp(__x)
                { 
                    pthread_rwlock_init(&m_mutex, NULL);
                    std::make_heap(c.begin(), c.end(), comp); 
                }

                template<typename _InputIterator>
                    priority_queue(_InputIterator __first, _InputIterator __last, const _Compare& __x = _Compare(), const _Sequence& __s = _Sequence())
                    : c(__s), comp(__x)
                    {
                        pthread_rwlock_init(&m_mutex, NULL);
                        __glibcxx_requires_valid_range(__first, __last);
                        c.insert(c.end(), __first, __last);
                        std::make_heap(c.begin(), c.end(), comp);
                    }

                bool empty() 
                { 
                    pthread_rwlock_wrlock(&m_mutex);
                    bool e = c.empty(); 
                    pthread_rwlock_unlock(&m_mutex);
                    return e;
                }

                size_type size() 
                { 
                    pthread_rwlock_wrlock(&m_mutex);
                    size_type _size = c.size(); 
                    pthread_rwlock_unlock(&m_mutex);
                    return _size;
                }

                const_reference top() 
                {
                    pthread_rwlock_wrlock(&m_mutex);
                    const_reference reference_value = c.front();
                    return reference_value;
                }

                void push(const value_type& __x)
                {
                    pthread_rwlock_wrlock(&m_mutex);
                    c.push_back(__x);
                    std::push_heap(c.begin(), c.end(), comp);
                    pthread_rwlock_unlock(&m_mutex);
                }
                void pop()
                {
                    std::pop_heap(c.begin(), c.end(), comp);
                    c.pop_back();
                    pthread_rwlock_unlock(&m_mutex);
                }

            };
    typedef priority_queue<CTaskBase*, std::vector<CTaskBase*>, CTaskBase> queue_priority;

};

#endif

线程池与连接池文件代码:

/*******************************************************************************
 * Author : RKhuwq
 * Email : sohunjug@hotmail.com
 * Last modified : 2013-11-18 16:55
 * Filename : Pool.h
 * Description : 
 * *****************************************************************************/

#ifndef __POOL_H__
#define __POOL_H__

#include <pthread.h>
#include <map>
#include <stdio.h>
#include "TaskQueue.h"
#include <signal.h>

namespace POOL
{
    enum PoolStatue
    {
        Wait = 1,
        Kill
    };
    class CPoolBase
    {
        protected:
            int m_init;
            int m_iNum;
            int m_iCount;
            pthread_attr_t m_attr;
            virtual void  Create() = 0;
            virtual int   ControlThread(void* argv) = 0;
        public:
            PoolStatue statues;
            pthread_mutex_t m_mutex;
            pthread_mutexattr_t  m_mutexAttr;
            CPoolBase(int num, PoolStatue statue) : m_init(num), m_iNum(5), m_iCount(0), statues(statue)
        {
            pthread_attr_init(&m_attr);
            pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_JOINABLE);
            pthread_mutexattr_init(&m_mutexAttr);
            pthread_mutex_init(&m_mutex, &m_mutexAttr);
        }
            virtual ~CPoolBase()
            {
                pthread_mutexattr_destroy(&m_mutexAttr);
            };
    };

    class CThreadPool : public CPoolBase
    {
        struct pthread_statue
        {
            pthread_t pthread_id;
            int statue;
            pthread_mutex_t m_threadMutex;
            pthread_cond_t  m_threadCond;
        };
        public:
        std::map<pthread_t, pthread_statue*> m_mthread;
        std::map<pthread_t, pthread_statue*>::iterator m_iter;
        QUEUE::queue_priority& queue;

        private:
        bool is_clear;
        pthread_mutex_t m_mutexCount;
        QUEUE::CTaskQueue<CThreadPool> *owntask;
        void Create()
        {
            pthread_statue *new_pthread = new pthread_statue();
            pthread_mutex_lock(&m_mutex);
            if(pthread_create(&new_pthread->pthread_id, &m_attr, ThreadMain, this)!=0)
            {
                return ;
            }
            m_mthread.insert(std::pair<pthread_t, pthread_statue*> (new_pthread->pthread_id, new_pthread));
            pthread_mutex_unlock(&m_mutex);
            pthread_mutex_lock(&m_mutexCount);
            m_iCount++;
            pthread_mutex_unlock(&m_mutexCount);
        }
        int   ControlThread(void* argv)
        {
            int in_count_add = 0;
            int in_count_del = 0;
            int task_count;
            bool change;
            CThreadPool* pclass = (CThreadPool*)argv;
            pthread_statue *statue_self;
            std::map<pthread_t, pthread_statue*>::iterator iter;
            pthread_mutex_lock(&pclass->m_mutex);
            statue_self = pclass->m_mthread.find(pthread_self())->second;
            pthread_mutex_unlock(&pclass->m_mutex);
            statue_self->statue = 1;
            while(1)
            {
                while (m_iCount < m_init && m_iCount < m_iNum && ((statues == Wait && !queue.empty()) || !is_clear))
                {
                    Create();
                }
                task_count = queue.size();
                if (task_count)
                {
                    pthread_mutex_lock(&m_mutex);
                    iter = m_mthread.begin();
                    while (iter != m_mthread.end() && ((statues == Wait && !queue.empty()) || !is_clear))
                    {
                        if (iter->second->statue == 2 && iter->second->pthread_id != pthread_self())
                        {
                            pthread_cond_signal (&iter->second->m_threadCond);
                            break;
                        }
                        iter++;
                    }
                    if (iter == m_mthread.end())
                        m_iNum ++;
                    pthread_mutex_unlock(&m_mutex);
                    if (task_count > m_iCount)
                    {
                        in_count_add++;
                        in_count_del = 0;
                    }
                    else if (task_count < m_iCount)
                    {
                        //in_count_del++;
                        in_count_add = 0;
                    }
                    if (in_count_add > 10)
                    {
                        in_count_add = 0;
                        m_iNum++;
                        change = true;
                    }
                    if (in_count_del > 10 && m_iNum > m_init)
                    {
                        m_iNum--;
                        change = true;
                    }
                }
                if (task_count == 0)
                    usleep(100);

                if ((statue_self->statue == 0 || is_clear) && !(statues == Wait && !queue.empty()))
                {
                    pthread_mutex_lock(&m_mutexCount);
                    pclass->m_iCount--;
                    pthread_mutex_unlock(&m_mutexCount);
                    break;
                }
            }
            return 0;
        } 
        public:
        static void* ThreadMain(void *arg)
        {
            pthread_statue* statue_self;
            CThreadPool* pclass = (CThreadPool*)arg;
            QUEUE::CTaskBase* task = NULL;
            pthread_mutex_lock(&pclass->m_mutex);
            statue_self = pclass->m_mthread.find(pthread_self())->second;
            pthread_mutex_unlock(&pclass->m_mutex);
            pthread_mutex_init (&statue_self->m_threadMutex, &pclass->m_mutexAttr);
            pthread_cond_init (&statue_self->m_threadCond, NULL);
            statue_self->statue = 1;
            //timespec time;
            statue_self->pthread_id = pthread_self();
            while(1)
            { 
                if (pclass->statues != Wait || pclass->queue.empty())
                {
                    if (statue_self->statue == 0)
                    {
                        pthread_mutex_lock(&pclass->m_mutexCount);
                        pclass->m_iCount--;
                        pthread_mutex_unlock(&pclass->m_mutexCount);
                        break;
                    }
                }
                task = NULL;

                pthread_mutex_lock(&pclass->m_mutex);
                if(pclass->queue.empty())
                {
                    if (pthread_mutex_trylock(&statue_self->m_threadMutex) != 0)
                    {
                        pthread_mutex_unlock(&pclass->m_mutex);
                        continue;
                    }
                    pthread_mutex_unlock(&pclass->m_mutex);
                    statue_self->statue = 2;
                    pthread_cond_wait(&statue_self->m_threadCond, &statue_self->m_threadMutex);
                    pthread_mutex_unlock(&statue_self->m_threadMutex);
                    continue;
                }

                task=pclass->queue.top();
                pclass->queue.pop();
                pthread_mutex_unlock(&pclass->m_mutex);

                if (task)
                {
                    statue_self->statue = 1;
                    if (task->Start() != 0)
                    {
                    }
                    if (!task)
                        delete task;
                }
            }
            return NULL;
        }
        void Stop(pthread_t pthread_id)
        {
            pthread_statue *temp;
            pthread_mutex_lock(&m_mutex);
            temp = m_mthread.find(pthread_id)->second;
            pthread_mutex_unlock(&m_mutex);
            if (!temp)
                return;
            while(pthread_kill(temp->pthread_id, 0) == 0)
            {
                pthread_mutex_lock(&temp->m_threadMutex);
                temp->statue = 0;
                pthread_mutex_unlock(&temp->m_threadMutex);
                pthread_cond_signal(&temp->m_threadCond);
                usleep(10000);
            }
            pthread_mutex_lock(&m_mutex);
            temp = m_mthread.find(pthread_id)->second;
            m_mthread.erase(pthread_id);
            pthread_mutex_unlock(&m_mutex);
            delete temp;
        }
        void ClearQueue() 
        { 
            while (!queue.empty())
            {
                delete queue.top();
                queue.pop();
            }
        }
        void Clear()
        {
            is_clear = true;
            pthread_mutex_lock(&m_mutex); 
            std::map<pthread_t, pthread_statue*>::iterator iter = m_mthread.begin();
            for (; iter != m_mthread.end(); iter = m_mthread.begin())
            {
                pthread_mutex_unlock(&m_mutex); 
                Stop(iter->first);
                pthread_mutex_lock(&m_mutex); 
            }
            pthread_mutex_unlock(&m_mutex); 
        }

        public:
        CThreadPool(int num, QUEUE::queue_priority& queue, PoolStatue Pool = Wait) : CPoolBase(num, Pool), queue(queue), is_clear(false) 
        { 
            pthread_mutex_init (&m_mutexCount, NULL);
            owntask = new QUEUE::CTaskQueue<CThreadPool>;
            owntask->SetFunc(QUEUE::Urgent, this, &CThreadPool::ControlThread, this);
            queue.push(owntask);
            Create(); 
        }
        ~CThreadPool()
        { 
            if (statues == Wait)
                Clear(); 
            ClearQueue(); 
            if (statues != Wait)
                Clear();
        }
    };

    template<typename CConnectionClass>
        class CConnectPool : public CPoolBase
    {
        private:
            std::map<CConnectionClass*, pthread_mutex_t> m_mapConnection;
            typename std::map<CConnectionClass*, pthread_mutex_t>::iterator m_iter;
            bool is_clear;
            typedef bool (CConnectionClass::*Connect)(const char*, const char*, const char*);
            typedef void (CConnectionClass::*Close)(void);
            Connect m_fconnect;
            Close m_fclose;
            char* m_sConnectParam[3];
            int m_iUsed;
            pthread_mutex_t m_mapmutex;
        public:
            CConnectPool(int num) : CPoolBase(num, Wait), m_iUsed(0)
        {
            is_clear = false;
            m_mapConnection.clear();
            m_sConnectParam[0] = NULL;
            m_sConnectParam[1] = NULL;
            m_sConnectParam[2] = NULL;
            pthread_mutex_init(&m_mapmutex, NULL);
        }
            ~CConnectPool()
            {
                Clear();
                free(m_sConnectParam[0]);
                free(m_sConnectParam[1]);
                free(m_sConnectParam[2]);
            }

            void Clear()
            {
                is_clear = true;
                m_iter = m_mapConnection.begin();
                while (m_iter != m_mapConnection.end())
                {
                    if (pthread_mutex_trylock(&m_iter->second) == 0)
                    {
                        (m_iter->first->*(m_fclose))();
                        delete m_iter->first;
                        m_mapConnection.erase(m_iter);
                        m_iter = m_mapConnection.begin();
                        pthread_mutex_lock(&m_mutex);
                        m_iCount --;
                        pthread_mutex_unlock(&m_mutex);
                    }
                    else m_iter++;
                    if (m_iter == m_mapConnection.end())
                        m_iter = m_mapConnection.begin();
                }
                m_mapConnection.clear();
            }

            void SetFunc(Connect connect, Close close, char* value[])
            {
                m_fconnect = connect;
                m_fclose = close;
                m_sConnectParam[0] = (char*)malloc(strlen(value[0]) + 1);
                m_sConnectParam[1] = (char*)malloc(strlen(value[1]) + 1);
                m_sConnectParam[2] = (char*)malloc(strlen(value[2]) + 1);
                strcpy(m_sConnectParam[0], value[0]);
                strcpy(m_sConnectParam[1], value[1]);
                strcpy(m_sConnectParam[2], value[2]);
            }

            CConnectionClass* GetConnection()
            {
                pthread_mutex_lock(&m_mapmutex);
                int count = 0;
                m_iter = m_mapConnection.begin();
                while (1)
                {
                    if (m_iter == m_mapConnection.end())
                    {
                        count ++;
                        m_iter = m_mapConnection.begin();
                        if (count > 10)
                        {
                            Create();
                            pthread_mutex_lock(&m_mutex);
                            m_iNum++;
                            pthread_mutex_unlock(&m_mutex);
                            m_iter = m_mapConnection.begin();
                        }
                    }
                    if (m_iter != m_mapConnection.end())
                        if (pthread_mutex_trylock(&m_iter->second) == 0)
                        {
                            pthread_mutex_lock(&m_mutex);
                            m_iUsed++;
                            pthread_mutex_unlock(&m_mutex);
                            pthread_mutex_unlock(&m_mapmutex);
                            return m_iter->first;
                        }
                        else m_iter++;
                }
                pthread_mutex_unlock(&m_mapmutex);
                return NULL;
            }

            void BackConnection(CConnectionClass* oneConnect)
            {
                if (!oneConnect)
                    return;
                pthread_mutex_lock(&m_mapmutex);
                pthread_mutex_unlock(&m_mapConnection.find(oneConnect)->second);
                pthread_mutex_unlock(&m_mapmutex);
                pthread_mutex_lock(&m_mutex);
                m_iUsed--;
                pthread_mutex_unlock(&m_mutex);
            }

            void Create()
            {
                pthread_mutex_t rwlock;
                pthread_mutex_init(&rwlock, NULL);
                pthread_mutex_lock(&m_mutex);
                CConnectionClass* oneConnect = new CConnectionClass();
                (oneConnect->*(m_fconnect))(m_sConnectParam[0], m_sConnectParam[1], m_sConnectParam[2]);
                m_mapConnection.insert(std::pair<CConnectionClass*, pthread_mutex_t> (oneConnect, rwlock));
                m_iCount ++;
                pthread_mutex_unlock(&m_mutex);
            }

            int  Count()
            {
                return m_mapConnection.size();
            }

            int  ControlThread(void* argv)
            {
                int in_count_add = 0;
                int in_count_del = 0;
                int task_count;
                int second = 10000;
                bool change;
                while(1)
                {
                    task_count = this->Count();
                    if (task_count <= m_iNum)
                    {
                        pthread_mutex_lock(&m_mapmutex);
                        Create();
                        pthread_mutex_unlock(&m_mapmutex);
                    }
                    if (m_iUsed > 0)
                        if (task_count)
                        {
                            if (task_count > m_iUsed)
                                in_count_del++;
                            if (in_count_del > 10)
                                change = true;
                        }
                    if (task_count != 0)
                        usleep(second);

                    if (is_clear)
                        break;
                }
                return 0;
            }
    };

};

#endif

测试文件代码:

#include <stdio.h>
#include "Pool.h"
#include "TaskQueue.h"
#include "DBDatabase.h"
#include "DBStatement.h"

using namespace POOL;

using namespace QUEUE;
class B
{
    public:
        B(){printf("B\n");}
        virtual int process(void* arg){return 0;};
        virtual ~B(){printf("B\n");}
};
class A : public B
{
    public:
        A(){}
        ~A(){}
        int process(void* arg)
        {
            char** argv = (char**)arg;
            for (int iLoop = 0; iLoop < 2; iLoop++)
            {
                printf("%s ", argv[iLoop]);
            }
            printf("\n");
            return 0;
        }
};

int main(int argc, char** argv)
{
    CThreadPool threadpool(2, task_priority_queue);
    char conn[3][20] = {"system", "system", "tsdc"};
    char *value[3];
    value[0] = conn[0];
    value[1] = conn[1];
    value[2] = conn[2];
    CConnectPool<CDBDatabase> connnectpool(5);
    connnectpool.SetFunc(&CDBDatabase::Connect, &CDBDatabase::Disconnect, value);
    int count = 0;
    A p;
    CTaskQueue<CConnectPool<CDBDatabase> >* atask = new CTaskQueue<CConnectPool<CDBDatabase> >;
    char se[10] = "10000000";
    char *s[2];
    s[0] = se;
    s[1] = NULL;
    atask->SetFunc(Highest, &connnectpool, &CConnectPool<CDBDatabase>::ControlThread, s);
    task_priority_queue.push(atask);
    CDBDatabase *temp = connnectpool.GetConnection();
    CDBDatabase *temp2 = connnectpool.GetConnection();
    CDBStatement* dbstatement = new CDBStatement(temp);
    CDBStatement* dbs = new CDBStatement(temp2);
    dbstatement->Prepare("select city_code from bb_device_rent_info_t where user_id = 48303537");
    dbs->Prepare("select user_id from bb_device_rent_info_t where user_id = 48303537");
    dbstatement->Open();
    dbs->Open();
    if (dbstatement->Next() && dbs->Next())
    {
        printf("%s\n", dbstatement->Field(0).AsString());
        printf("%s\n", dbs->Field(0).AsString());
    }
    connnectpool.BackConnection(temp);
    connnectpool.BackConnection(temp2);
    while(1)
    {
    char time[2][10];
    char *ttt[3];
    sprintf(time[0], "time:");
    sprintf(time[1], "%d", count++);
    ttt[0] = time[0];
    ttt[1] = time[1];
    ttt[2] = NULL;
    CTaskQueue<A>* task = new CTaskQueue<A>;
    task->SetFunc(Low, &p, &A::process, (void*)ttt);
    task_priority_queue.push(task);
    usleep(1000000);
    }
    
    A a;
    CTaskQueue<A>* task = new CTaskQueue<A>;
    char aaa[2][10] = {"aaa", "bbb"};
    char *aa[3];
    char *bb[3];
    aa[0] = aaa[0];
    aa[1] = aaa[1];
    aa[2] = NULL;
    task->SetFunc(Low, &a, &A::process, (void*)aa);
    task_priority_queue.push(task);
    char bbb[2][10] = {"ccc", "bbb"};
    bb[0] = bbb[0];
    bb[1] = bbb[1];
    bb[2] = NULL;
    task = new CTaskQueue<A>;
    task->SetFunc(Minimum, &a, &A::process, (void*)bb);
    //task_priority_queue.push(task);
    
    connnectpool.Clear();
    usleep(30000);
    threadpool.Clear();

    return 0;
}

github: GitLink….

可能还有bug,还有很多不足,欢迎大家帮忙测试.

文章目录

最近访客