mod: live schedule

Closes #581

Signed-off-by: bggRGjQaUbCoE <githubaccount56556@proton.me>
This commit is contained in:
bggRGjQaUbCoE
2025-04-02 18:03:47 +08:00
parent d1a6798f2e
commit 86abf006d0
8 changed files with 378 additions and 293 deletions

View File

@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
@@ -156,12 +157,15 @@ class LiveMessageStream {
int roomId, uid;
List<String> servers;
List<void Function(dynamic obj)> eventListeners = [];
LiveMessageStream(
{required this.streamToken,
required this.roomId,
required this.uid,
required this.servers});
LiveMessageStream({
required this.streamToken,
required this.roomId,
required this.uid,
required this.servers,
});
WebSocket? socket;
StreamSubscription? _socketSubscription;
bool heartBeat = true;
PiliLogger logger = getLogger();
final String logTag = "LiveStreamService";
@@ -202,38 +206,39 @@ class LiveMessageStream {
socket = await getSocket();
// logger.d('$logTag ===> TCP连接建立');
socket?.add(authPackage.marshal());
// logger.d('$logTag ===> 发送认证包');
await for (var data in socket!) {
PackageHeader? header = PackageHeader.fromBytesData(data);
if (header != null) {
List<int> decompressedData = [];
//心跳包回复不用处理
if (header.operationCode == 3) continue;
if (header.operationCode == 8) {
_heartBeat();
}
try {
switch (header.protocolVer) {
case 0:
case 1:
_processingData(data);
continue;
case 2:
decompressedData = ZLibDecoder().convert(data.sublist(0x10));
break;
case 3:
decompressedData =
const BrotliDecoder().convert(data.sublist(0x10));
//debugPrint('Body: ${utf8.decode()}');
_socketSubscription = socket?.listen(
(data) {
PackageHeader? header = PackageHeader.fromBytesData(data);
if (header != null) {
List<int> decompressedData = [];
//心跳包回复不用处理
if (header.operationCode == 3) return;
if (header.operationCode == 8) {
_heartBeat();
}
try {
switch (header.protocolVer) {
case 0:
case 1:
_processingData(data);
return;
case 2:
decompressedData = ZLibDecoder().convert(data.sublist(0x10));
break;
case 3:
decompressedData =
const BrotliDecoder().convert(data.sublist(0x10));
//debugPrint('Body: ${utf8.decode()}');
}
_processingData(decompressedData);
} catch (e) {
logger.i(e);
}
_processingData(decompressedData);
} catch (e) {
logger.i(e);
}
}
}
socket?.close();
},
);
socket?.add(authPackage.marshal());
} catch (e) {
SmartDialog.showToast("弹幕地址链接失败");
// logger.i('$logTag ===> TCP连接失败: $e');
@@ -266,12 +271,14 @@ class LiveMessageStream {
await Future.delayed(const Duration(seconds: 30));
//发送心跳包
var package = HeartbeatPackage(
header: PackageHeader(
totalSize: 0,
headerSize: 0,
protocolVer: 1,
operationCode: 2,
seq: heartBeatCount));
header: PackageHeader(
totalSize: 0,
headerSize: 0,
protocolVer: 1,
operationCode: 2,
seq: heartBeatCount,
),
);
try {
socket?.add(package.marshal());
} catch (_) {}
@@ -283,8 +290,10 @@ class LiveMessageStream {
eventListeners.add(func);
}
void close() {
socket?.close();
void close() async {
heartBeat = false;
eventListeners.clear();
_socketSubscription?.cancel();
socket?.close();
}
}