Signed-off-by: bggRGjQaUbCoE <githubaccount56556@proton.me>
This commit is contained in:
bggRGjQaUbCoE
2025-08-08 22:50:55 +08:00
parent 597fca9fbf
commit bd490b87ca
5 changed files with 49 additions and 27 deletions

View File

@@ -259,6 +259,8 @@ class LiveRoomController extends GetxController {
cancelLiveTimer();
savedDanmaku?.clear();
savedDanmaku = null;
msgStream?.close();
msgStream = null;
scrollController
..removeListener(listener)
..dispose();

View File

@@ -85,9 +85,6 @@ class _LiveRoomPageState extends State<LiveRoomPage>
WidgetsBinding.instance.removeObserver(this);
ScreenBrightness.instance.resetApplicationScreenBrightness();
PlPlayerController.setPlayCallBack(null);
_liveRoomController
..msgStream?.close()
..msgStream = null;
plPlayerController
..removeStatusLister(playerListener)
..dispose();

View File

@@ -5,7 +5,9 @@ import 'dart:typed_data';
import 'package:PiliPlus/services/loggeer.dart';
import 'package:brotli/brotli.dart';
import 'package:flutter/foundation.dart' show kDebugMode;
import 'package:flutter_smart_dialog/flutter_smart_dialog.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class PackageHeader {
int totalSize;
@@ -154,7 +156,7 @@ class LiveMessageStream {
String streamToken;
int roomId, uid;
List<String> servers;
List<void Function(dynamic obj)> eventListeners = [];
final List<void Function(dynamic obj)> _eventListeners = [];
LiveMessageStream({
required this.streamToken,
required this.roomId,
@@ -162,9 +164,10 @@ class LiveMessageStream {
required this.servers,
});
WebSocket? socket;
bool _active = true;
WebSocketChannel? _channel;
StreamSubscription? _socketSubscription;
bool heartBeat = true;
Timer? _timer;
PiliLogger logger = getLogger();
final String logTag = "LiveStreamService";
@@ -190,20 +193,27 @@ class LiveMessageStream {
// final marshaledData = authPackage.marshal();
// logger.d(marshaledData);
try {
Future<WebSocket> getSocket() async {
Future<WebSocketChannel> getSocket() async {
for (final server in servers) {
try {
return WebSocket.connect(server);
final channel = WebSocketChannel.connect(Uri.parse(server));
await channel.ready;
return channel;
} catch (_) {}
}
throw Exception("all servers connect failed");
}
socket = await getSocket();
_channel = await getSocket();
if (!_active) {
if (kDebugMode) logger.i("$logTag init inactive $hashCode");
close();
return;
}
// logger
// ..d('$logTag ===> TCP连接建立')
// ..d('$logTag ===> 发送认证包');
_socketSubscription = socket?.listen(
_socketSubscription = _channel?.stream.listen(
(data) {
PackageHeader? header = PackageHeader.fromBytesData(data);
if (header != null) {
@@ -230,15 +240,15 @@ class LiveMessageStream {
}
_processingData(decompressedData);
} catch (e) {
logger.i(e);
if (kDebugMode) logger.i(e);
}
}
},
);
socket?.add(authPackage.marshal());
_channel?.sink.add(authPackage.marshal());
} catch (e) {
SmartDialog.showToast("弹幕地址链接失败");
// logger.i('$logTag ===> TCP连接失败: $e');
if (kDebugMode) logger.i('$logTag ===> TCP连接失败: $e');
}
}
@@ -251,7 +261,7 @@ class LiveMessageStream {
var msgBody = utf8.decode(
data.sublist(subHeader.headerSize, subHeader.totalSize),
);
for (var f in eventListeners) {
for (var f in _eventListeners) {
f(jsonDecode(msgBody));
}
if (subHeader.totalSize < data.length) {
@@ -259,16 +269,24 @@ class LiveMessageStream {
}
}
} catch (e) {
logger.i('ParseHeader错误: $e');
if (kDebugMode) logger.i('ParseHeader错误: $e');
}
}
Future<void> _heartBeat() async {
logger.i("$logTag 直播间信息流认证成功");
if (!_active) {
if (kDebugMode) logger.i("$logTag init heartBeat inactive $hashCode");
return;
}
if (kDebugMode) logger.i("$logTag 直播间信息流认证成功 $hashCode");
int heartBeatCount = 1;
while (heartBeat) {
await Future.delayed(const Duration(seconds: 30));
//发送心跳包
_timer ??= Timer.periodic(const Duration(seconds: 30), (timer) {
if (!_active) {
if (kDebugMode) logger.i("$logTag heartBeat inactive $hashCode");
timer.cancel();
return;
}
if (kDebugMode) logger.i("$logTag heartBeat $hashCode");
var package = HeartbeatPackage(
header: PackageHeader(
totalSize: 0,
@@ -279,20 +297,24 @@ class LiveMessageStream {
),
);
try {
socket?.add(package.marshal());
} catch (_) {}
_channel?.sink.add(package.marshal());
} catch (_) {
timer.cancel();
}
heartBeatCount++;
}
});
}
void addEventListener(void Function(dynamic) func) {
eventListeners.add(func);
_eventListeners.add(func);
}
void close() {
heartBeat = false;
eventListeners.clear();
_active = false;
if (kDebugMode) logger.i("$logTag close $hashCode");
_timer?.cancel();
_eventListeners.clear();
_socketSubscription?.cancel();
socket?.close();
_channel?.sink.close();
}
}

View File

@@ -2048,7 +2048,7 @@ packages:
source: hosted
version: "1.0.1"
web_socket_channel:
dependency: transitive
dependency: "direct main"
description:
name: web_socket_channel
sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8

View File

@@ -200,6 +200,7 @@ dependencies:
url: https://github.com/bggRGjQaUbCoE/flutter_sortable_wrap.git
ref: master
crclib: ^3.0.0
web_socket_channel: ^3.0.3
vector_math: any
fixnum: any