基于类成员函数回调的线程池-连接池-任务队列

在现实生活中,经常会用到线程池与队列的应用,所以写了一套可复利用接口。

其中优先级队列的蓝本从stl中复制出来的,唯一修改的地方就是添加了一个写锁,当top时加锁,pop后才解锁,保证真正不重复取出或丢失数据。
因为使用模板来构造,所以目前只支持类成员函数的回调。
与线程池共同使用时,外部调用时,需要new一个任务类来添加到队列中,线程池主函数会调用完成后自动delete析构。

此基于成员函数回调的线程池需要同优先级队列共同使用,原理是初始化一个初始为5个线程的线程池,并将自己的控制线程添加到最高优先级启动。控制线程的目的是当任务队列比较多时增加线程来增加效率,当任务队列少时减速线程来节约资源(目前未实现)

连接池,需要同线程池与任务队列共同使用,同样为先初始化一定数量的连接,供多线程调度使用,其中包含一个控制线程,来控制连接池中的连接数量,当取出一个连接来用时,会给取出的连接加锁,所以使用完后需要调用返回函数,来归还使用完的连接。

连接池的作用就是不必每次都创建连接使用完成后断开连接,而使用模板来构造扩展性更强,所以需要自己额外来写创建连接,与断开连接函数。

需要注意连接池需要在线程之前析构。

任务队列文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/*******************************************************************************
* Author : huwq
* Email : sohunjug@hotmail.com
* Last modified : 2013-11-18 10:46
* Filename : CTaskQueue.h
* Description :
* *****************************************************************************/
#ifndef __TASH_QUEUE_H__
#define __TASH_QUEUE_H__
#include <queue>
#include <vector>
#include <pthread.h>
#include <new>

namespace QUEUE
{
enum Priority
{
Urgent = 1,
Highest,
High,
Middle,
Low,
Minimum
};

class CTaskBase
{
public:
Priority priority;
CTaskBase()
{
is_virtual = 0;
argv = NULL;
}
virtual ~CTaskBase(){}

virtual int Start()
{
return 0;
}

bool operator () (CTaskBase* &a, CTaskBase* &b)
{
return !((*a) < (*b));
}

bool operator () (CTaskBase &a, CTaskBase &b)
{
return !(a < b);
}
bool operator < (const CTaskBase &a) const
{
return (bool)((int)priority < (int)a.priority);
}

protected:
void *argv;
int is_virtual;
};

template <class Type>
class CTaskQueue : public CTaskBase
{
public:
union
{
int (Type::*func)(void*);
int (*CallBack)(Type*, void*);
}func;
CTaskQueue()
{
ClassName = NULL;
func.func = NULL;
}
~CTaskQueue()
{
Clear();
}

void Clear()
{
argv = NULL;
ClassName = NULL;
func.func = NULL;
}

int SetFunc(Priority priority, Type* ClassName, int (Type::*Func)(void*), void* argv)
{
this->priority = priority;
this->ClassName = ClassName;
this->argv = argv;
this->func.func = Func;
return 1;
}
int Start()
{
//return func.CallBack(ClassName, argv);
return (ClassName->*(func.func))(argv);
}

private:
Type* ClassName;

};

template<typename _Tp, typename _Sequence = std::vector<_Tp>,
typename _Compare = std::less<typename _Sequence::value_type> >
class priority_queue
{
// concept requirements
pthread_rwlock_t m_mutex;

public:
typedef typename _Sequence::value_type value_type;
typedef typename _Sequence::reference reference;
typedef typename _Sequence::const_reference const_reference;
typedef typename _Sequence::size_type size_type;
typedef _Sequence container_type;

protected:
// See queue::c for notes on these names.
_Sequence c;
_Compare comp;

public:
explicit priority_queue(const _Compare& __x = _Compare(), const _Sequence& __s = _Sequence())
: c(__s), comp(__x)
{
pthread_rwlock_init(&m_mutex, NULL);
std::make_heap(c.begin(), c.end(), comp);
}

template<typename _InputIterator>
priority_queue(_InputIterator __first, _InputIterator __last, const _Compare& __x = _Compare(), const _Sequence& __s = _Sequence())
: c(__s), comp(__x)
{
pthread_rwlock_init(&m_mutex, NULL);
__glibcxx_requires_valid_range(__first, __last);
c.insert(c.end(), __first, __last);
std::make_heap(c.begin(), c.end(), comp);
}

bool empty()
{
pthread_rwlock_wrlock(&m_mutex);
bool e = c.empty();
pthread_rwlock_unlock(&m_mutex);
return e;
}

size_type size()
{
pthread_rwlock_wrlock(&m_mutex);
size_type _size = c.size();
pthread_rwlock_unlock(&m_mutex);
return _size;
}

const_reference top()
{
pthread_rwlock_wrlock(&m_mutex);
const_reference reference_value = c.front();
return reference_value;
}

void push(const value_type& __x)
{
pthread_rwlock_wrlock(&m_mutex);
c.push_back(__x);
std::push_heap(c.begin(), c.end(), comp);
pthread_rwlock_unlock(&m_mutex);
}
void pop()
{
std::pop_heap(c.begin(), c.end(), comp);
c.pop_back();
pthread_rwlock_unlock(&m_mutex);
}

};
typedef priority_queue<CTaskBase*, std::vector<CTaskBase*>, CTaskBase> queue_priority;

};

#endif

线程池与连接池文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
/*******************************************************************************
* Author : RKhuwq
* Email : sohunjug@hotmail.com
* Last modified : 2013-11-18 16:55
* Filename : Pool.h
* Description :
* *****************************************************************************/

#ifndef __POOL_H__
#define __POOL_H__

#include <pthread.h>
#include <map>
#include <stdio.h>
#include "TaskQueue.h"
#include <signal.h>

namespace POOL
{
enum PoolStatue
{
Wait = 1,
Kill
};
class CPoolBase
{
protected:
int m_init;
int m_iNum;
int m_iCount;
pthread_attr_t m_attr;
virtual void Create() = 0;
virtual int ControlThread(void* argv) = 0;
public:
PoolStatue statues;
pthread_mutex_t m_mutex;
pthread_mutexattr_t m_mutexAttr;
CPoolBase(int num, PoolStatue statue) : m_init(num), m_iNum(5), m_iCount(0), statues(statue)
{
pthread_attr_init(&m_attr);
pthread_attr_setdetachstate(&m_attr,PTHREAD_CREATE_JOINABLE);
pthread_mutexattr_init(&m_mutexAttr);
pthread_mutex_init(&m_mutex, &m_mutexAttr);
}
virtual ~CPoolBase()
{
pthread_mutexattr_destroy(&m_mutexAttr);
};
};

class CThreadPool : public CPoolBase
{
struct pthread_statue
{
pthread_t pthread_id;
int statue;
pthread_mutex_t m_threadMutex;
pthread_cond_t m_threadCond;
};
public:
std::map<pthread_t, pthread_statue*> m_mthread;
std::map<pthread_t, pthread_statue*>::iterator m_iter;
QUEUE::queue_priority& queue;

private:
bool is_clear;
pthread_mutex_t m_mutexCount;
QUEUE::CTaskQueue<CThreadPool> *owntask;
void Create()
{
pthread_statue *new_pthread = new pthread_statue();
pthread_mutex_lock(&m_mutex);
if(pthread_create(&new_pthread->pthread_id, &m_attr, ThreadMain, this)!=0)
{
return ;
}
m_mthread.insert(std::pair<pthread_t, pthread_statue*> (new_pthread->pthread_id, new_pthread));
pthread_mutex_unlock(&m_mutex);
pthread_mutex_lock(&m_mutexCount);
m_iCount++;
pthread_mutex_unlock(&m_mutexCount);
}
int ControlThread(void* argv)
{
int in_count_add = 0;
int in_count_del = 0;
int task_count;
bool change;
CThreadPool* pclass = (CThreadPool*)argv;
pthread_statue *statue_self;
std::map<pthread_t, pthread_statue*>::iterator iter;
pthread_mutex_lock(&pclass->m_mutex);
statue_self = pclass->m_mthread.find(pthread_self())->second;
pthread_mutex_unlock(&pclass->m_mutex);
statue_self->statue = 1;
while(1)
{
while (m_iCount < m_init && m_iCount < m_iNum && ((statues == Wait && !queue.empty()) || !is_clear))
{
Create();
}
task_count = queue.size();
if (task_count)
{
pthread_mutex_lock(&m_mutex);
iter = m_mthread.begin();
while (iter != m_mthread.end() && ((statues == Wait && !queue.empty()) || !is_clear))
{
if (iter->second->statue == 2 && iter->second->pthread_id != pthread_self())
{
pthread_cond_signal (&iter->second->m_threadCond);
break;
}
iter++;
}
if (iter == m_mthread.end())
m_iNum ++;
pthread_mutex_unlock(&m_mutex);
if (task_count > m_iCount)
{
in_count_add++;
in_count_del = 0;
}
else if (task_count < m_iCount)
{
//in_count_del++;
in_count_add = 0;
}
if (in_count_add > 10)
{
in_count_add = 0;
m_iNum++;
change = true;
}
if (in_count_del > 10 && m_iNum > m_init)
{
m_iNum--;
change = true;
}
}
if (task_count == 0)
usleep(100);

if ((statue_self->statue == 0 || is_clear) && !(statues == Wait && !queue.empty()))
{
pthread_mutex_lock(&m_mutexCount);
pclass->m_iCount--;
pthread_mutex_unlock(&m_mutexCount);
break;
}
}
return 0;
}
public:
static void* ThreadMain(void *arg)
{
pthread_statue* statue_self;
CThreadPool* pclass = (CThreadPool*)arg;
QUEUE::CTaskBase* task = NULL;
pthread_mutex_lock(&pclass->m_mutex);
statue_self = pclass->m_mthread.find(pthread_self())->second;
pthread_mutex_unlock(&pclass->m_mutex);
pthread_mutex_init (&statue_self->m_threadMutex, &pclass->m_mutexAttr);
pthread_cond_init (&statue_self->m_threadCond, NULL);
statue_self->statue = 1;
//timespec time;
statue_self->pthread_id = pthread_self();
while(1)
{
if (pclass->statues != Wait || pclass->queue.empty())
{
if (statue_self->statue == 0)
{
pthread_mutex_lock(&pclass->m_mutexCount);
pclass->m_iCount--;
pthread_mutex_unlock(&pclass->m_mutexCount);
break;
}
}
task = NULL;

pthread_mutex_lock(&pclass->m_mutex);
if(pclass->queue.empty())
{
if (pthread_mutex_trylock(&statue_self->m_threadMutex) != 0)
{
pthread_mutex_unlock(&pclass->m_mutex);
continue;
}
pthread_mutex_unlock(&pclass->m_mutex);
statue_self->statue = 2;
pthread_cond_wait(&statue_self->m_threadCond, &statue_self->m_threadMutex);
pthread_mutex_unlock(&statue_self->m_threadMutex);
continue;
}

task=pclass->queue.top();
pclass->queue.pop();
pthread_mutex_unlock(&pclass->m_mutex);

if (task)
{
statue_self->statue = 1;
if (task->Start() != 0)
{
}
if (!task)
delete task;
}
}
return NULL;
}
void Stop(pthread_t pthread_id)
{
pthread_statue *temp;
pthread_mutex_lock(&m_mutex);
temp = m_mthread.find(pthread_id)->second;
pthread_mutex_unlock(&m_mutex);
if (!temp)
return;
while(pthread_kill(temp->pthread_id, 0) == 0)
{
pthread_mutex_lock(&temp->m_threadMutex);
temp->statue = 0;
pthread_mutex_unlock(&temp->m_threadMutex);
pthread_cond_signal(&temp->m_threadCond);
usleep(10000);
}
pthread_mutex_lock(&m_mutex);
temp = m_mthread.find(pthread_id)->second;
m_mthread.erase(pthread_id);
pthread_mutex_unlock(&m_mutex);
delete temp;
}
void ClearQueue()
{
while (!queue.empty())
{
delete queue.top();
queue.pop();
}
}
void Clear()
{
is_clear = true;
pthread_mutex_lock(&m_mutex);
std::map<pthread_t, pthread_statue*>::iterator iter = m_mthread.begin();
for (; iter != m_mthread.end(); iter = m_mthread.begin())
{
pthread_mutex_unlock(&m_mutex);
Stop(iter->first);
pthread_mutex_lock(&m_mutex);
}
pthread_mutex_unlock(&m_mutex);
}

public:
CThreadPool(int num, QUEUE::queue_priority& queue, PoolStatue Pool = Wait) : CPoolBase(num, Pool), queue(queue), is_clear(false)
{
pthread_mutex_init (&m_mutexCount, NULL);
owntask = new QUEUE::CTaskQueue<CThreadPool>;
owntask->SetFunc(QUEUE::Urgent, this, &CThreadPool::ControlThread, this);
queue.push(owntask);
Create();
}
~CThreadPool()
{
if (statues == Wait)
Clear();
ClearQueue();
if (statues != Wait)
Clear();
}
};

template<typename CConnectionClass>
class CConnectPool : public CPoolBase
{
private:
std::map<CConnectionClass*, pthread_mutex_t> m_mapConnection;
typename std::map<CConnectionClass*, pthread_mutex_t>::iterator m_iter;
bool is_clear;
typedef bool (CConnectionClass::*Connect)(const char*, const char*, const char*);
typedef void (CConnectionClass::*Close)(void);
Connect m_fconnect;
Close m_fclose;
char* m_sConnectParam[3];
int m_iUsed;
pthread_mutex_t m_mapmutex;
public:
CConnectPool(int num) : CPoolBase(num, Wait), m_iUsed(0)
{
is_clear = false;
m_mapConnection.clear();
m_sConnectParam[0] = NULL;
m_sConnectParam[1] = NULL;
m_sConnectParam[2] = NULL;
pthread_mutex_init(&m_mapmutex, NULL);
}
~CConnectPool()
{
Clear();
free(m_sConnectParam[0]);
free(m_sConnectParam[1]);
free(m_sConnectParam[2]);
}

void Clear()
{
is_clear = true;
m_iter = m_mapConnection.begin();
while (m_iter != m_mapConnection.end())
{
if (pthread_mutex_trylock(&m_iter->second) == 0)
{
(m_iter->first->*(m_fclose))();
delete m_iter->first;
m_mapConnection.erase(m_iter);
m_iter = m_mapConnection.begin();
pthread_mutex_lock(&m_mutex);
m_iCount --;
pthread_mutex_unlock(&m_mutex);
}
else m_iter++;
if (m_iter == m_mapConnection.end())
m_iter = m_mapConnection.begin();
}
m_mapConnection.clear();
}

void SetFunc(Connect connect, Close close, char* value[])
{
m_fconnect = connect;
m_fclose = close;
m_sConnectParam[0] = (char*)malloc(strlen(value[0]) + 1);
m_sConnectParam[1] = (char*)malloc(strlen(value[1]) + 1);
m_sConnectParam[2] = (char*)malloc(strlen(value[2]) + 1);
strcpy(m_sConnectParam[0], value[0]);
strcpy(m_sConnectParam[1], value[1]);
strcpy(m_sConnectParam[2], value[2]);
}

CConnectionClass* GetConnection()
{
pthread_mutex_lock(&m_mapmutex);
int count = 0;
m_iter = m_mapConnection.begin();
while (1)
{
if (m_iter == m_mapConnection.end())
{
count ++;
m_iter = m_mapConnection.begin();
if (count > 10)
{
Create();
pthread_mutex_lock(&m_mutex);
m_iNum++;
pthread_mutex_unlock(&m_mutex);
m_iter = m_mapConnection.begin();
}
}
if (m_iter != m_mapConnection.end())
if (pthread_mutex_trylock(&m_iter->second) == 0)
{
pthread_mutex_lock(&m_mutex);
m_iUsed++;
pthread_mutex_unlock(&m_mutex);
pthread_mutex_unlock(&m_mapmutex);
return m_iter->first;
}
else m_iter++;
}
pthread_mutex_unlock(&m_mapmutex);
return NULL;
}

void BackConnection(CConnectionClass* oneConnect)
{
if (!oneConnect)
return;
pthread_mutex_lock(&m_mapmutex);
pthread_mutex_unlock(&m_mapConnection.find(oneConnect)->second);
pthread_mutex_unlock(&m_mapmutex);
pthread_mutex_lock(&m_mutex);
m_iUsed--;
pthread_mutex_unlock(&m_mutex);
}

void Create()
{
pthread_mutex_t rwlock;
pthread_mutex_init(&rwlock, NULL);
pthread_mutex_lock(&m_mutex);
CConnectionClass* oneConnect = new CConnectionClass();
(oneConnect->*(m_fconnect))(m_sConnectParam[0], m_sConnectParam[1], m_sConnectParam[2]);
m_mapConnection.insert(std::pair<CConnectionClass*, pthread_mutex_t> (oneConnect, rwlock));
m_iCount ++;
pthread_mutex_unlock(&m_mutex);
}

int Count()
{
return m_mapConnection.size();
}

int ControlThread(void* argv)
{
int in_count_add = 0;
int in_count_del = 0;
int task_count;
int second = 10000;
bool change;
while(1)
{
task_count = this->Count();
if (task_count <= m_iNum)
{
pthread_mutex_lock(&m_mapmutex);
Create();
pthread_mutex_unlock(&m_mapmutex);
}
if (m_iUsed > 0)
if (task_count)
{
if (task_count > m_iUsed)
in_count_del++;
if (in_count_del > 10)
change = true;
}
if (task_count != 0)
usleep(second);

if (is_clear)
break;
}
return 0;
}
};

};

#endif

测试文件代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <stdio.h>
#include "Pool.h"
#include "TaskQueue.h"
#include "DBDatabase.h"
#include "DBStatement.h"

using namespace POOL;

using namespace QUEUE;
class B
{
public:
B(){printf("B\n");}
virtual int process(void* arg){return 0;};
virtual ~B(){printf("B\n");}
};
class A : public B
{
public:
A(){}
~A(){}
int process(void* arg)
{
char** argv = (char**)arg;
for (int iLoop = 0; iLoop < 2; iLoop++)
{
printf("%s ", argv[iLoop]);
}
printf("\n");
return 0;
}
};

int main(int argc, char** argv)
{
CThreadPool threadpool(2, task_priority_queue);
char conn[3][20] = {"system", "system", "tsdc"};
char *value[3];
value[0] = conn[0];
value[1] = conn[1];
value[2] = conn[2];
CConnectPool<CDBDatabase> connnectpool(5);
connnectpool.SetFunc(&CDBDatabase::Connect, &CDBDatabase::Disconnect, value);
int count = 0;
A p;
CTaskQueue<CConnectPool<CDBDatabase> >* atask = new CTaskQueue<CConnectPool<CDBDatabase> >;
char se[10] = "10000000";
char *s[2];
s[0] = se;
s[1] = NULL;
atask->SetFunc(Highest, &connnectpool, &CConnectPool<CDBDatabase>::ControlThread, s);
task_priority_queue.push(atask);
CDBDatabase *temp = connnectpool.GetConnection();
CDBDatabase *temp2 = connnectpool.GetConnection();
CDBStatement* dbstatement = new CDBStatement(temp);
CDBStatement* dbs = new CDBStatement(temp2);
dbstatement->Prepare("select city_code from bb_device_rent_info_t where user_id = 48303537");
dbs->Prepare("select user_id from bb_device_rent_info_t where user_id = 48303537");
dbstatement->Open();
dbs->Open();
if (dbstatement->Next() && dbs->Next())
{
printf("%s\n", dbstatement->Field(0).AsString());
printf("%s\n", dbs->Field(0).AsString());
}
connnectpool.BackConnection(temp);
connnectpool.BackConnection(temp2);
while(1)
{
char time[2][10];
char *ttt[3];
sprintf(time[0], "time:");
sprintf(time[1], "%d", count++);
ttt[0] = time[0];
ttt[1] = time[1];
ttt[2] = NULL;
CTaskQueue<A>* task = new CTaskQueue<A>;
task->SetFunc(Low, &p, &A::process, (void*)ttt);
task_priority_queue.push(task);
usleep(1000000);
}

A a;
CTaskQueue<A>* task = new CTaskQueue<A>;
char aaa[2][10] = {"aaa", "bbb"};
char *aa[3];
char *bb[3];
aa[0] = aaa[0];
aa[1] = aaa[1];
aa[2] = NULL;
task->SetFunc(Low, &a, &A::process, (void*)aa);
task_priority_queue.push(task);
char bbb[2][10] = {"ccc", "bbb"};
bb[0] = bbb[0];
bb[1] = bbb[1];
bb[2] = NULL;
task = new CTaskQueue<A>;
task->SetFunc(Minimum, &a, &A::process, (void*)bb);
//task_priority_queue.push(task);

connnectpool.Clear();
usleep(30000);
threadpool.Clear();

return 0;
}

github: GitLink….

可能还有bug,还有很多不足,欢迎大家帮忙测试.

文章目录