8
8
9
9
#import " LFStreamRtmpSocket.h"
10
10
#import " rtmp.h"
11
+ #import " YYDispatchQueuePool.h"
11
12
12
13
static const NSInteger RetryTimesBreaken = 20 ;// /< 重连1分钟 3秒一次 一共20次
13
14
static const NSInteger RetryTimesMargin = 3 ;
14
15
16
+ static dispatch_queue_t YYRtmpSendQueue () {
17
+ YYDispatchQueuePool *pool = [[YYDispatchQueuePool alloc ] initWithName: @" com.youku.laifeng.rtmpsendQueue" queueCount: 1 qos: NSQualityOfServiceDefault ];
18
+ dispatch_queue_t queue = [pool queue ];
19
+ return queue;
20
+ }
21
+ #define RTMP_RECEIVE_TIMEOUT 2
15
22
#define DATA_ITEMS_MAX_COUNT 100
16
23
#define RTMP_DATA_RESERVE_SIZE 400
17
24
#define RTMP_HEAD_SIZE (sizeof (RTMPPacket)+RTMP_MAX_HEADER_SIZE)
@@ -46,7 +53,6 @@ @interface LFStreamRtmpSocket ()<LFStreamingBufferDelegate>
46
53
@property (nonatomic , weak ) id <LFStreamSocketDelegate> delegate;
47
54
@property (nonatomic , strong ) LFLiveStreamInfo *stream;
48
55
@property (nonatomic , strong ) LFStreamingBuffer *buffer;
49
- @property (nonatomic , strong ) dispatch_queue_t socketQueue;
50
56
@property (nonatomic , strong ) LFLiveDebug *debugInfo;
51
57
// 错误信息
52
58
@property (nonatomic , assign ) RTMPError error;
@@ -83,32 +89,41 @@ - (nullable instancetype)initWithStream:(nullable LFLiveStreamInfo*)stream video
83
89
}
84
90
85
91
- (void ) start {
86
- dispatch_async (self.socketQueue , ^{
87
- if (!_stream) return ;
88
- if (_isConnecting) return ;
89
- if (_rtmp != NULL ) return ;
90
- self.debugInfo .streamId = self.stream .streamId ;
91
- self.debugInfo .uploadUrl = self.stream .url ;
92
- self.debugInfo .isRtmp = YES ;
93
- [self clean ];
94
- [self RTMP264_Connect: (char *)[_stream.url cStringUsingEncoding: NSASCIIStringEncoding]];
92
+ dispatch_async (YYRtmpSendQueue (), ^{
93
+ [self _start ];
95
94
});
96
95
}
97
96
97
+ - (void )_start {
98
+ if (!_stream) return ;
99
+ if (_isConnecting) return ;
100
+ if (_rtmp != NULL ) return ;
101
+ self.debugInfo .streamId = self.stream .streamId ;
102
+ self.debugInfo .uploadUrl = self.stream .url ;
103
+ self.debugInfo .isRtmp = YES ;
104
+ [self clean ];
105
+ [self RTMP264_Connect: (char *)[_stream.url cStringUsingEncoding: NSASCIIStringEncoding]];
106
+ }
107
+
98
108
- (void ) stop {
99
- dispatch_async (self.socketQueue , ^{
100
- if (_rtmp != NULL ){
101
- PILI_RTMP_Close (_rtmp, &_error);
102
- PILI_RTMP_Free (_rtmp);
103
- _rtmp = NULL ;
104
- }
109
+ dispatch_async (YYRtmpSendQueue (), ^{
110
+ [self _stop ];
105
111
});
106
112
}
107
113
114
+ - (void )_stop {
115
+ if (self.delegate && [self .delegate respondsToSelector: @selector (socketStatus:status: )]){
116
+ [self .delegate socketStatus: self status: LFLiveStop];
117
+ }
118
+ if (_rtmp != NULL ){
119
+ PILI_RTMP_Close (_rtmp, &_error);
120
+ PILI_RTMP_Free (_rtmp);
121
+ _rtmp = NULL ;
122
+ }
123
+ }
124
+
108
125
- (void ) sendFrame : (LFFrame*)frame {
109
- __weak typeof (self) _self = self;
110
- dispatch_async (self.socketQueue , ^{
111
- __strong typeof (_self) self = _self;
126
+ dispatch_async (YYRtmpSendQueue (), ^{
112
127
if (!frame) return ;
113
128
[self .buffer appendObject: frame];
114
129
[self sendFrame ];
@@ -209,6 +224,7 @@ -(NSInteger) RTMP264_Connect:(char *)push_url{
209
224
_rtmp->m_connCallback = ConnectionTimeCallback;
210
225
_rtmp->m_userData = (__bridge void *)self;
211
226
_rtmp->m_msgCounter = 1 ;
227
+ _rtmp->Link .timeout = RTMP_RECEIVE_TIMEOUT;
212
228
// 设置可写,即发布流,这个函数必须在连接前使用,否则无效
213
229
PILI_RTMP_EnableWrite (_rtmp);
214
230
@@ -398,7 +414,10 @@ - (NSInteger)RtmpPacketSend:(PILI_RTMPPacket*)packet{
398
414
int success = PILI_RTMP_SendPacket (_rtmp,packet,0 ,&_error);
399
415
if (success){
400
416
self.isSending = NO ;
401
- [self sendFrame ];
417
+ dispatch_after (dispatch_time (DISPATCH_TIME_NOW, (int64_t )(0.1 * NSEC_PER_SEC)), dispatch_get_main_queue (), ^{
418
+ [self sendFrame ];
419
+ });
420
+
402
421
}
403
422
return success;
404
423
}
@@ -438,30 +457,24 @@ - (void)sendAudio:(LFFrame*)frame {
438
457
439
458
// 断线重连
440
459
-(void ) reconnect {
441
- _isReconnecting = NO ;
442
- if (_isConnected) return ;
443
- if (_rtmp){
444
- PILI_RTMP_ReconnectStream (_rtmp, 0 , &_error);
445
- }else {
446
- [self stop ];
447
- [self start ];
448
- }
460
+ dispatch_async (YYRtmpSendQueue (), ^{
461
+ _isReconnecting = NO ;
462
+ if (_isConnected) return ;
463
+
464
+ [self _stop ];
465
+ [self _start ];
466
+ });
449
467
}
450
468
451
469
#pragma mark -- CallBack
452
470
void RTMPErrorCallback (RTMPError *error, void *userData){
453
471
LFStreamRtmpSocket *socket = (__bridge LFStreamRtmpSocket*)userData;
454
- if (error->code == RTMPErrorSocketClosedByPeer){
455
- [socket stop ];
456
- if (socket.delegate && [socket.delegate respondsToSelector: @selector (socketStatus:status: )]){
457
- [socket.delegate socketStatus: socket status: LFLiveError];
458
- }
459
- }else {
472
+ if (error->code < 0 ){
460
473
if (socket.retryTimes4netWorkBreaken ++ < socket.reconnectCount && !socket.isReconnecting ){
461
474
socket.isConnected = NO ;
462
475
socket.isConnecting = NO ;
463
476
socket.isReconnecting = YES ;
464
- dispatch_after (dispatch_time (DISPATCH_TIME_NOW, (int64_t )(socket.reconnectInterval * NSEC_PER_SEC)), socket. socketQueue , ^{
477
+ dispatch_after (dispatch_time (DISPATCH_TIME_NOW, (int64_t )(socket.reconnectInterval * NSEC_PER_SEC)), dispatch_get_main_queue () , ^{
465
478
[socket reconnect ];
466
479
});
467
480
}else if (socket.retryTimes4netWorkBreaken >= socket.reconnectCount ){
@@ -480,12 +493,6 @@ void ConnectionTimeCallback(PILI_CONNECTION_TIME* conn_time, void *userData){
480
493
}
481
494
482
495
#pragma mark -- Getter Setter
483
- - (dispatch_queue_t )socketQueue {
484
- if (!_socketQueue){
485
- _socketQueue = dispatch_queue_create (" com.youku.LaiFeng.live.socketQueue" , NULL );
486
- }
487
- return _socketQueue;
488
- }
489
496
490
497
- (LFStreamingBuffer*)buffer {
491
498
if (!_buffer){
0 commit comments