blob: 9615b5230aa95760cdb68911f6fbf75d23c7314f
1 | #define LOG_TAG "CMsgQueueThread" |
2 | |
3 | #include "CMsgQueue.h" |
4 | #include <CTvLog.h> |
5 | #include <utils/Timers.h> |
6 | |
7 | CMessage::CMessage() |
8 | { |
9 | mDelayMs = 0; |
10 | mWhenMs = 0; |
11 | } |
12 | |
13 | CMessage::~CMessage() |
14 | { |
15 | } |
16 | |
17 | CMsgQueueThread::CMsgQueueThread() |
18 | { |
19 | } |
20 | |
21 | CMsgQueueThread::~CMsgQueueThread() |
22 | { |
23 | //请求退出处理线程,并阻塞 |
24 | requestExitAndWait(); |
25 | } |
26 | |
27 | nsecs_t CMsgQueueThread::getNowMs() |
28 | { |
29 | return systemTime(SYSTEM_TIME_MONOTONIC) / 1000000; |
30 | } |
31 | |
32 | void CMsgQueueThread::sendMsg(CMessage &msg) |
33 | { |
34 | CMutex::Autolock _l(mLockQueue); |
35 | msg.mWhenMs = getNowMs() + msg.mDelayMs;// |
36 | int i = 0; |
37 | while (i < (int)m_v_msg.size() && m_v_msg[i].mWhenMs <= msg.mWhenMs) i++; //find the index that will insert(i) |
38 | m_v_msg.insertAt(msg, i);//insert at index i |
39 | CMessage msg1 = m_v_msg[0]; |
40 | LOGD("sendmsg now = %lld msg[0] when = %lld", getNowMs(), msg1.mWhenMs); |
41 | // |
42 | //if(i == 0)// is empty or new whenMS is at index 0, low all ms in list. so ,need to wakeup loop, to get new delay time. |
43 | mGetMsgCondition.signal(); |
44 | } |
45 | |
46 | //有个缺陷,只能根据消息类型移除,同类型消息会全部移除,但足够用了 |
47 | void CMsgQueueThread::removeMsg(CMessage &msg) |
48 | { |
49 | CMutex::Autolock _l(mLockQueue); |
50 | int beforeSize = (int)m_v_msg.size(); |
51 | for (int i = 0; i < (int)m_v_msg.size(); i++) { |
52 | const CMessage &_msg = m_v_msg.itemAt(i); |
53 | if (_msg.mType == msg.mType) { |
54 | m_v_msg.removeAt(i); |
55 | } |
56 | } |
57 | //some msg removeed |
58 | if (beforeSize > (int)m_v_msg.size()) |
59 | mGetMsgCondition.signal(); |
60 | } |
61 | |
62 | void CMsgQueueThread::clearMsg() |
63 | { |
64 | CMutex::Autolock _l(mLockQueue); |
65 | m_v_msg.clear(); |
66 | } |
67 | |
68 | int CMsgQueueThread::startMsgQueue() |
69 | { |
70 | CMutex::Autolock _l(mLockQueue); |
71 | this->run(); |
72 | return 0; |
73 | } |
74 | |
75 | bool CMsgQueueThread::threadLoop() |
76 | { |
77 | int sleeptime = 100;//ms |
78 | while (!exitPending()) { //requietexit() or requietexitWait() not call |
79 | mLockQueue.lock(); |
80 | while (m_v_msg.size() == 0) { //msg queue is empty |
81 | mGetMsgCondition.wait(mLockQueue);//first unlock,when return,lock again,so need,call unlock |
82 | } |
83 | mLockQueue.unlock(); |
84 | //get delay time |
85 | CMessage msg; |
86 | nsecs_t delayMs = 0, nowMS = 0; |
87 | do { //wait ,until , the lowest time msg's whentime is low nowtime, to go on |
88 | if (m_v_msg.size() <= 0) { |
89 | LOGD("msg size is 0, break"); |
90 | break; |
91 | } |
92 | mLockQueue.lock();//get msg ,first lock. |
93 | msg = m_v_msg[0];//get first |
94 | mLockQueue.unlock(); |
95 | |
96 | delayMs = msg.mWhenMs - getNowMs(); |
97 | LOGD("threadLoop now = %lld mswhen = %lld delayMs = %lld msg type = %d", getNowMs(), msg.mWhenMs, delayMs, msg.mType); |
98 | if (delayMs > 0) { |
99 | mLockQueue.lock();//get msg ,first lock. |
100 | int ret = mGetMsgCondition.waitRelative(mLockQueue, delayMs); |
101 | mLockQueue.unlock(); |
102 | LOGD("msg queue wait ret = %d", ret); |
103 | } else { |
104 | break; |
105 | } |
106 | } while (true); //msg[0], timeout |
107 | |
108 | if (m_v_msg.size() > 0) { |
109 | mLockQueue.lock();// |
110 | msg = m_v_msg[0]; |
111 | m_v_msg.removeAt(0); |
112 | mLockQueue.unlock();// |
113 | //handle it |
114 | handleMessage(msg); |
115 | } |
116 | |
117 | //usleep(sleeptime * 1000); |
118 | } |
119 | //exit |
120 | //return true, run again, return false,not run. |
121 | return false; |
122 | } |
123 |