blob: 81c9aecff41a36bf812c14432016892c0727a8e9
1 | #include "CMsgQueue.h" |
2 | #include <CTvLog.h> |
3 | #include <utils/Timers.h> |
4 | |
5 | #define LOG_TAG "CMsgQueueThread" |
6 | |
7 | CMessage::CMessage() |
8 | { |
9 | mDelayMs = 0; |
10 | mWhenMs = 0; |
11 | } |
12 | |
13 | CMessage::~CMessage() |
14 | { |
15 | |
16 | } |
17 | |
18 | CMsgQueueThread::CMsgQueueThread() |
19 | { |
20 | } |
21 | |
22 | CMsgQueueThread::~CMsgQueueThread() |
23 | { |
24 | //请求退出处理线程,并阻塞 |
25 | requestExitAndWait(); |
26 | } |
27 | |
28 | nsecs_t CMsgQueueThread::getNowMs() |
29 | { |
30 | return systemTime(SYSTEM_TIME_MONOTONIC) / 1000000; |
31 | } |
32 | void CMsgQueueThread::sendMsg(CMessage &msg) |
33 | { |
34 | CMutex::Autolock _l(mLockQueue); |
35 | msg.mWhenMs = getNowMs() + msg.mDelayMs;// |
36 | int i = 0; |
37 | while (i < m_v_msg.size() && m_v_msg[i].mWhenMs <= msg.mWhenMs) i++; //find the index that will insert(i) |
38 | m_v_msg.insertAt(msg, i);//insert at index i |
39 | CMessage msg1 = m_v_msg[0]; |
40 | LOGD("sendmsg now = %lld msg[0] when = %lld", getNowMs(), msg1.mWhenMs); |
41 | // |
42 | //if(i == 0)// is empty or new whenMS is at index 0, low all ms in list. so ,need to wakeup loop, to get new delay time. |
43 | mGetMsgCondition.signal(); |
44 | } |
45 | //有个缺陷,只能根据消息类型移除,同类型消息会全部移除,但足够用了 |
46 | void CMsgQueueThread::removeMsg(CMessage &msg) |
47 | { |
48 | CMutex::Autolock _l(mLockQueue); |
49 | int beforeSize = m_v_msg.size(); |
50 | for (int i = 0; i < m_v_msg.size(); i++) { |
51 | const CMessage &_msg = m_v_msg.itemAt(i); |
52 | if (_msg.mType == msg.mType) { |
53 | m_v_msg.removeAt(i); |
54 | } |
55 | } |
56 | //some msg removeed |
57 | if (beforeSize > m_v_msg.size()) |
58 | mGetMsgCondition.signal(); |
59 | } |
60 | |
61 | void CMsgQueueThread::clearMsg() |
62 | { |
63 | CMutex::Autolock _l(mLockQueue); |
64 | m_v_msg.clear(); |
65 | } |
66 | |
67 | int CMsgQueueThread::startMsgQueue() |
68 | { |
69 | CMutex::Autolock _l(mLockQueue); |
70 | this->run(); |
71 | return 0; |
72 | } |
73 | |
74 | bool CMsgQueueThread::threadLoop() |
75 | { |
76 | int sleeptime = 100;//ms |
77 | while (!exitPending()) { //requietexit() or requietexitWait() not call |
78 | mLockQueue.lock(); |
79 | while (m_v_msg.size() == 0) { //msg queue is empty |
80 | mGetMsgCondition.wait(mLockQueue);//first unlock,when return,lock again,so need,call unlock |
81 | } |
82 | mLockQueue.unlock(); |
83 | //get delay time |
84 | CMessage msg; |
85 | nsecs_t delayMs = 0, nowMS = 0; |
86 | do { //wait ,until , the lowest time msg's whentime is low nowtime, to go on |
87 | if (m_v_msg.size() <= 0) { |
88 | LOGD("msg size is 0, break"); |
89 | break; |
90 | } |
91 | mLockQueue.lock();//get msg ,first lock. |
92 | msg = m_v_msg[0];//get first |
93 | mLockQueue.unlock(); |
94 | |
95 | delayMs = msg.mWhenMs - getNowMs(); |
96 | LOGD("threadLoop now = %lld mswhen = %lld delayMs = %lld msg type = %d", getNowMs(), msg.mWhenMs, delayMs, msg.mType); |
97 | if (delayMs > 0) { |
98 | mLockQueue.lock();//get msg ,first lock. |
99 | int ret = mGetMsgCondition.waitRelative(mLockQueue, delayMs); |
100 | mLockQueue.unlock(); |
101 | LOGD("msg queue wait ret = %d", ret); |
102 | } else { |
103 | break; |
104 | } |
105 | } while (true); //msg[0], timeout |
106 | |
107 | if (m_v_msg.size() > 0) { |
108 | mLockQueue.lock();// |
109 | msg = m_v_msg[0]; |
110 | m_v_msg.removeAt(0); |
111 | mLockQueue.unlock();// |
112 | //handle it |
113 | handleMessage(msg); |
114 | } |
115 | |
116 | //usleep(sleeptime * 1000); |
117 | } |
118 | //exit |
119 | //return true, run again, return false,not run. |
120 | return false; |
121 | } |
122 |