Skip to content

Commit 8345831

Browse files
author
caozhiyi
committedJun 9, 2018
change memory charge
1 parent b79b34d commit 8345831

8 files changed

+153
-105
lines changed
 

‎CppNet.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@ void ReadFunc(CMemSharePtr<CEventHandler>& event, int error) {
2929

3030
event->_buffer->Clear();
3131
if (error != EVENT_ERROR_CLOSED) {
32-
event->_client_socket.Lock()->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back);
3332
event->_client_socket.Lock()->SyncRead(read_back);
33+
event->_client_socket.Lock()->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back);
3434
} else {
3535
if (client_map.size() < 10) {
3636
int a = 0;
3737
a++;
3838
}
39+
std::unique_lock<std::mutex> lock(__mutex);
3940
client_map.erase(event->_client_socket.Lock()->GetSocket());
4041
}
4142
}
@@ -44,7 +45,7 @@ void AcceptFunc(CMemSharePtr<CAcceptEventHandler>& event, int error) {
4445
client_map[event->_client_socket->GetSocket()] = event->_client_socket;
4546
//std::cout << "AcceptFunc" << std::endl;
4647
//std::cout << "client address :" << event->_client_socket->GetAddress() << std::endl << std::endl;
47-
48+
std::unique_lock<std::mutex> lock(__mutex);
4849
event->_client_socket->SyncRead(read_back);
4950
}
5051
#include "Log.h"
@@ -61,7 +62,7 @@ int main() {
6162

6263
std::vector<std::thread> thread_vec;
6364

64-
for (int i = 0; i < 1; i++) {
65+
for (int i = 0; i < 16; i++) {
6566
thread_vec.push_back(std::thread(std::bind(&CEventActions::ProcessEvent, event_actions)));
6667
}
6768

@@ -70,7 +71,7 @@ int main() {
7071
sock.Listen(10);
7172
sock.SyncAccept(accept_func, read_back);
7273

73-
for (int i = 0; i < 1; i++) {
74+
for (int i = 0; i < 16; i++) {
7475
thread_vec[i].join();
7576
}
7677
DeallocSocket();

‎CppNet.sln

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 14
44
VisualStudioVersion = 14.0.25420.1
55
MinimumVisualStudioVersion = 10.0.40219.1
6-
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "CppNet", "CppNet\CppNet.vcxproj", "{E98183C7-10C3-4B1A-B8FB-D60BB6217962}"
6+
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "CppNet", "CppNet.vcxproj", "{E98183C7-10C3-4B1A-B8FB-D60BB6217962}"
77
EndProject
88
Global
99
GlobalSection(SolutionConfigurationPlatforms) = preSolution

‎CppNet.vcxproj.filters

+4-4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
<Filter>net\OS</Filter>
5353
</ClCompile>
5454
<ClCompile Include="SocketBase.cpp">
55-
<Filter>net</Filter>
55+
<Filter>net\OS</Filter>
5656
</ClCompile>
5757
</ItemGroup>
5858
<ItemGroup>
@@ -107,9 +107,6 @@
107107
<ClInclude Include="EventHandler.h">
108108
<Filter>net</Filter>
109109
</ClInclude>
110-
<ClInclude Include="WinExpendFunc.h">
111-
<Filter>net\netio</Filter>
112-
</ClInclude>
113110
<ClInclude Include="Socket.h">
114111
<Filter>net</Filter>
115112
</ClInclude>
@@ -119,5 +116,8 @@
119116
<ClInclude Include="SocketBase.h">
120117
<Filter>net</Filter>
121118
</ClInclude>
119+
<ClInclude Include="WinExpendFunc.h">
120+
<Filter>net\OS</Filter>
121+
</ClInclude>
122122
</ItemGroup>
123123
</Project>

‎EventHandler.h

+6-4
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
#define INVALID_TIMER -1
1212

1313
enum EVENT_FLAG {
14-
EVENT_READ = 0x0001, //read event
15-
EVENT_WRITE = 0x0002, //write event
16-
EVENT_ACCEPT = 0x0004, //accept event
17-
EVENT_TIMER = 0x0008 //timer event
14+
EVENT_READ = 0x0001, //read event
15+
EVENT_WRITE = 0x0002, //write event
16+
EVENT_ACCEPT = 0x0004, //accept event
17+
EVENT_TIMER = 0x0008, //timer event
18+
EVENT_CONNECT = 0x0010, //connect event
19+
EVENT_DISCONNECT = 0x0020 //disconnect event
1820
};
1921

2022
enum EVENT_ERROR {

‎IOCP.cpp

+62-58
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ bool CIOCP::Dealloc() {
4141

4242
bool CIOCP::AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr<CEventHandler>& event) {
4343
_timer.AddTimer(interval, event_flag, event);
44-
event->_event_flag_set |= EVENT_TIMER;
4544
return true;
4645
}
4746

@@ -55,7 +54,6 @@ bool CIOCP::AddSendEvent(CMemSharePtr<CEventHandler>& event) {
5554
}
5655
}
5756
((EventOverlapped*)event->_data)->_event = &event;
58-
event->_event_flag_set |= EVENT_WRITE;
5957
socket_ptr->SetInActions(true);
6058
return _PostSend(event);
6159
}
@@ -74,7 +72,6 @@ bool CIOCP::AddRecvEvent(CMemSharePtr<CEventHandler>& event) {
7472
}
7573
((EventOverlapped*)event->_data)->_event = &event;
7674
socket_ptr->SetInActions(true);
77-
event->_event_flag_set |= EVENT_READ;
7875
return _PostRecv(event);
7976
}
8077
LOG_WARN("read event is already distroyed!");
@@ -88,10 +85,8 @@ bool CIOCP::AddAcceptEvent(CMemSharePtr<CAcceptEventHandler>& event) {
8885
return false;
8986
}
9087
}
91-
9288
((EventOverlapped*)event->_data)->_event = &event;
9389
event->_accept_socket->SetInActions(true);
94-
event->_event_flag_set |= EVENT_ACCEPT;
9590
return _PostAccept(event);
9691
}
9792

@@ -117,65 +112,27 @@ void CIOCP::ProcessEvent() {
117112
int res = GetQueuedCompletionStatus(_iocp_handler, &bytes_transfered, PULONG_PTR(&socket_context),
118113
&over_lapped, wait_time);
119114

120-
if (!res) {
121-
DWORD dwErr = GetLastError();
122-
//timer out event
123-
if (WAIT_TIMEOUT == GetLastError()) {
124-
if (!timer_vec.empty()) {
125-
for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) {
126-
if (iter->_event_flag & EVENT_READ) {
127-
auto socket_ptr = iter->_event->_client_socket.Lock();
128-
if (socket_ptr) {
129-
socket_ptr->_Recv(iter->_event);
130-
}
131-
132-
} else if (iter->_event_flag & EVENT_WRITE) {
133-
auto socket_ptr = iter->_event->_client_socket.Lock();
134-
if (socket_ptr) {
135-
socket_ptr->_Send(iter->_event);
136-
}
137-
}
138-
}
139-
timer_vec.clear();
140-
}
141-
142-
//ERROR_NETNAME_DELETED is abnormal terminal, should notify the upper level
143-
} else if (ERROR_NETNAME_DELETED !=GetLastError()) {
144-
LOG_ERROR("IOCP GetQueuedCompletionStatus return error : %d", GetLastError());
145-
continue;
146-
}
147-
}
115+
DWORD dw_err = 0;
116+
if (res) {
117+
dw_err = NO_ERROR;
148118

149-
if (!over_lapped) {
150-
continue;
119+
} else {
120+
dw_err = GetLastError();
151121
}
122+
if (dw_err == WAIT_TIMEOUT) {
123+
if (!timer_vec.empty()) {
124+
_DoTimeoutEvent(timer_vec);
125+
}
152126

153-
//new event happening
154-
socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped);
155-
if (socket_context->_event_flag_set & EVENT_ACCEPT) {
156-
CMemSharePtr<CAcceptEventHandler>* event = (CMemSharePtr<CAcceptEventHandler>*)socket_context->_event;
157-
if (event) {
158-
(*event)->_client_socket->_read_event->_off_set = bytes_transfered;
159-
(*event)->_accept_socket->_Accept((*event));
127+
} else if (ERROR_NETNAME_DELETED == dw_err || NO_ERROR == dw_err || ERROR_IO_PENDING == dw_err) {
128+
if (over_lapped) {
129+
socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped);
130+
_DoEvent(socket_context, bytes_transfered);
160131
}
161132

162133
} else {
163-
CMemSharePtr<CEventHandler>* event = (CMemSharePtr<CEventHandler>*)socket_context->_event;
164-
if (event) {
165-
(*event)->_off_set = bytes_transfered;
166-
if ((*event)->_event_flag_set & EVENT_READ) {
167-
auto socket_ptr = (*event)->_client_socket.Lock();
168-
if (socket_ptr) {
169-
socket_ptr->_Recv((*event));
170-
}
171-
172-
} else if ((*event)->_event_flag_set & EVENT_WRITE) {
173-
auto socket_ptr = (*event)->_client_socket.Lock();
174-
if (socket_ptr) {
175-
socket_ptr->_Send((*event));
176-
}
177-
}
178-
}
134+
LOG_ERROR("IOCP GetQueuedCompletionStatus return error : %d", dw_err);
135+
continue;
179136
}
180137
}
181138
}
@@ -237,4 +194,51 @@ bool CIOCP::_PostSend(CMemSharePtr<CEventHandler>& event) {
237194
return false;
238195
}
239196
return true;
197+
}
198+
199+
void CIOCP::_DoTimeoutEvent(std::vector<TimerEvent>& timer_vec) {
200+
for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) {
201+
if (iter->_event_flag & EVENT_READ) {
202+
auto socket_ptr = iter->_event->_client_socket.Lock();
203+
if (socket_ptr) {
204+
socket_ptr->_Recv(iter->_event);
205+
}
206+
207+
}
208+
else if (iter->_event_flag & EVENT_WRITE) {
209+
auto socket_ptr = iter->_event->_client_socket.Lock();
210+
if (socket_ptr) {
211+
socket_ptr->_Send(iter->_event);
212+
}
213+
}
214+
}
215+
timer_vec.clear();
216+
}
217+
218+
void CIOCP::_DoEvent(EventOverlapped *socket_context, int bytes) {
219+
if (socket_context->_event_flag_set & EVENT_ACCEPT) {
220+
CMemSharePtr<CAcceptEventHandler>* event = (CMemSharePtr<CAcceptEventHandler>*)socket_context->_event;
221+
if (event) {
222+
(*event)->_client_socket->_read_event->_off_set = bytes;
223+
(*event)->_accept_socket->_Accept((*event));
224+
}
225+
226+
} else {
227+
CMemSharePtr<CEventHandler>* event = (CMemSharePtr<CEventHandler>*)socket_context->_event;
228+
if (event) {
229+
(*event)->_off_set = bytes;
230+
if (socket_context->_event_flag_set & EVENT_READ) {
231+
auto socket_ptr = (*event)->_client_socket.Lock();
232+
if (socket_ptr) {
233+
socket_ptr->_Recv((*event));
234+
}
235+
236+
} else if ((*event)->_event_flag_set & EVENT_WRITE) {
237+
auto socket_ptr = (*event)->_client_socket.Lock();
238+
if (socket_ptr) {
239+
socket_ptr->_Send((*event));
240+
}
241+
}
242+
}
243+
}
240244
}

‎IOCP.h

+2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class CIOCP: public CEventActions
5959
bool _PostAccept(CMemSharePtr<CAcceptEventHandler>& event);
6060
bool _PostSend(CMemSharePtr<CEventHandler>& event);
6161

62+
void _DoTimeoutEvent(std::vector<TimerEvent>& timer_vec);
63+
void _DoEvent(EventOverlapped *socket_context, int bytes);
6264
private:
6365
HANDLE _iocp_handler;
6466
};

‎Socket.cpp

+67-31
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,21 @@
77
#include "EventActions.h"
88
#include "Socket.h"
99

10-
CSocket::CSocket(std::shared_ptr<CEventActions>& event_actions) : CSocketBase(event_actions) {
10+
CSocket::CSocket(std::shared_ptr<CEventActions>& event_actions) : CSocketBase(event_actions), _post_event_num(0){
1111
_read_event = MakeNewSharedPtr<CEventHandler>(_pool.get());
1212
_write_event = MakeNewSharedPtr<CEventHandler>(_pool.get());
1313
}
1414

1515
CSocket::~CSocket() {
1616
if (_read_event && _read_event->_data) {
17-
OVERLAPPED* lapped = &((EventOverlapped*)_read_event->_data)->_overlapped;
1817
EventOverlapped* temp = (EventOverlapped*)_read_event->_data;
1918
_pool->PoolDelete<EventOverlapped>(temp);
2019
_read_event->_data = nullptr;
21-
lapped = nullptr;
2220
}
2321
if (_write_event && _write_event->_data) {
24-
OVERLAPPED* lapped = &((EventOverlapped*)_write_event->_data)->_overlapped;
2522
EventOverlapped* temp = (EventOverlapped*)_write_event->_data;
2623
_pool->PoolDelete<EventOverlapped>(temp);
2724
_write_event->_data = nullptr;
28-
lapped = nullptr;
2925
}
3026
}
3127

@@ -45,7 +41,10 @@ void CSocket::SyncRead(const std::function<void(CMemSharePtr<CEventHandler>&, in
4541
}
4642

4743
if (_event_actions) {
48-
_event_actions->AddRecvEvent(_read_event);
44+
_read_event->_event_flag_set |= EVENT_READ;
45+
if (_event_actions->AddRecvEvent(_read_event)) {
46+
_post_event_num++;
47+
}
4948
}
5049
}
5150

@@ -69,7 +68,26 @@ void CSocket::SyncWrite(char* src, int len, const std::function<void(CMemSharePt
6968
_write_event->_client_socket = _read_event->_client_socket;
7069
}
7170
if (_event_actions) {
72-
_event_actions->AddSendEvent(_write_event);
71+
_write_event->_event_flag_set |= EVENT_WRITE;
72+
if (_event_actions->AddSendEvent(_write_event)) {
73+
_post_event_num++;
74+
}
75+
}
76+
}
77+
78+
void CSocket::SyncConnect(const std::string& ip, short port, const std::function<void(CMemSharePtr<CEventHandler>&, int err)>& const) {
79+
if (ip.length() > 16) {
80+
LOG_ERROR("a wrong ip!");
81+
return;
82+
}
83+
84+
SOCKADDR_IN addr;
85+
addr.sin_family = AF_INET;
86+
addr.sin_port = htons(port);
87+
addr.sin_addr.S_un.S_addr = inet_addr(ip.c_str());
88+
strcpy(_ip, ip.c_str());
89+
if (__ConnectEx) {
90+
//__ConnectEx(_sock, (sockaddr*)&addr, sizeof(addr), nullptr, 0, nullptr, 0, );
7391
}
7492
}
7593

@@ -90,11 +108,16 @@ void CSocket::SyncRead(unsigned int interval, const std::function<void(CMemShare
90108
}
91109

92110
if (_event_actions) {
93-
_event_actions->AddRecvEvent(_read_event);
111+
_read_event->_event_flag_set |= EVENT_READ;
112+
if (_event_actions->AddRecvEvent(_read_event)) {
113+
_post_event_num++;
114+
}
94115
}
95116

96117
if (_event_actions) {
118+
_read_event->_event_flag_set |= EVENT_TIMER;
97119
_event_actions->AddTimerEvent(interval, EVENT_READ, _read_event);
120+
_post_event_num++;
98121
}
99122
}
100123

@@ -118,11 +141,16 @@ void CSocket::SyncWrite(unsigned int interval, char* src, int len, const std::fu
118141
_write_event->_buffer->Write(src, len);
119142

120143
if (_event_actions) {
121-
_event_actions->AddSendEvent(_write_event);
144+
_write_event->_event_flag_set |= EVENT_WRITE;
145+
if (_event_actions->AddSendEvent(_write_event)) {
146+
_post_event_num++;
147+
}
122148
}
123149

124150
if (_event_actions) {
151+
_write_event->_event_flag_set |= EVENT_TIMER;
125152
_event_actions->AddTimerEvent(interval, EVENT_WRITE, _write_event);
153+
_post_event_num++;
126154
}
127155
}
128156

@@ -154,38 +182,46 @@ bool operator!=(const CSocketBase& s1, const CSocketBase& s2) {
154182
void CSocket::_Recv(CMemSharePtr<CEventHandler>& event) {
155183
EventOverlapped* context = (EventOverlapped*)event->_data;
156184

157-
int error = EVENT_ERROR_NO;
158-
if (event->_off_set) {
159-
event->_buffer->Write(context->_wsa_buf.buf, event->_off_set);
160-
161-
} else {
162-
error = EVENT_ERROR_CLOSED;
163-
context->_event = nullptr;
164-
}
165-
185+
_post_event_num--;
186+
int err = -1;
166187
if (event->_timer_out) {
167-
error = EVENT_ERROR_TIMEOUT;
188+
err = EVENT_ERROR_TIMEOUT;
189+
190+
} else if (!event->_off_set) {
191+
if (_post_event_num == 0) {
192+
err = EVENT_ERROR_CLOSED;
193+
}
194+
195+
} else {
196+
err = EVENT_ERROR_NO;
197+
event->_buffer->Write(context->_wsa_buf.buf, event->_off_set);
168198
}
169-
event->_event_flag_set = 0;
170-
if (event->_call_back) {
171-
event->_call_back(event, error);
199+
if (event->_call_back && err > -1) {
200+
event->_call_back(event, err);
201+
event->_event_flag_set = 0;
172202
}
173203
}
174204

175205
void CSocket::_Send(CMemSharePtr<CEventHandler>& event) {
176206
EventOverlapped* context = (EventOverlapped*)event->_data;
177207

178-
int error = EVENT_ERROR_NO;
179-
if (!event->_off_set) {
180-
error = EVENT_ERROR_CLOSED;
181-
}
182-
208+
_post_event_num--;
209+
int err = -1;
183210
if (event->_timer_out) {
184-
error = EVENT_ERROR_TIMEOUT;
211+
err = EVENT_ERROR_TIMEOUT;
212+
213+
} else if (!event->_off_set) {
214+
if (_post_event_num == 0) {
215+
err = EVENT_ERROR_CLOSED;
216+
}
217+
218+
} else {
219+
err = EVENT_ERROR_NO;
185220
}
186-
event->_event_flag_set = 0;
187-
if (event->_call_back) {
188-
event->_call_back(event, error);
221+
222+
if (event->_call_back && err > -1) {
223+
event->_call_back(event, err);
224+
event->_event_flag_set = 0;
189225
}
190226
}
191227

‎Socket.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ class CSocket : public CSocketBase {
1717
CSocket(std::shared_ptr<CEventActions>& event_actions);
1818
~CSocket();
1919

20-
void SyncRead(const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back = nullptr);
21-
void SyncWrite(char* src, int len, const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back = nullptr);
20+
void SyncRead(const std::function<void(CMemSharePtr<CEventHandler>&, int err)>& call_back = nullptr);
21+
void SyncWrite(char* src, int len, const std::function<void(CMemSharePtr<CEventHandler>&, int err)>& call_back = nullptr);
2222

23-
void SyncRead(unsigned int interval, const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back = nullptr);
23+
void SyncRead(unsigned int interval, const std::function<void(CMemSharePtr<CEventHandler>&, int err)>& call_back = nullptr);
2424
void SyncWrite(unsigned int interval, char* src, int len, const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back = nullptr);
2525

26+
void SyncConnect(const std::string& ip, short port, const std::function<void(CMemSharePtr<CEventHandler>&, int err)>& const = nullptr);
27+
2628
void SetReadCallBack(const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back);
2729
void SetWriteCallBack(const std::function<void(CMemSharePtr<CEventHandler>&, int error)>& call_back);
2830

@@ -38,6 +40,7 @@ class CSocket : public CSocketBase {
3840
public:
3941
CMemSharePtr<CEventHandler> _read_event;
4042
CMemSharePtr<CEventHandler> _write_event;
43+
unsigned int _post_event_num;
4144
};
4245

4346
#endif

0 commit comments

Comments
 (0)
Please sign in to comment.