import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; import 'package:PiliPlus/pages/mine/controller.dart'; import 'package:PiliPlus/services/loggeer.dart'; import 'package:brotli/brotli.dart'; import 'package:flutter_smart_dialog/flutter_smart_dialog.dart'; class PackageHeader { int totalSize; int headerSize; int protocolVer; int operationCode; int seq; @override String toString() { return 'PackageHeader{totalSize: $totalSize, headerSize: $headerSize, protocolVer: $protocolVer, operationCode: $operationCode, seq: $seq}'; } PackageHeader({ required this.totalSize, required this.headerSize, required this.protocolVer, required this.operationCode, required this.seq, }); Uint8List toBytes() { final buffer = BytesBuilder(); buffer.add(_int32ToBytes(totalSize)); buffer.add(_int16ToBytes(headerSize)); buffer.add(_int16ToBytes(protocolVer)); buffer.add(_int32ToBytes(operationCode)); buffer.add(_int32ToBytes(seq)); return buffer.toBytes(); } List _int32ToBytes(int value) { final bytes = ByteData(4); bytes.setInt32(0, value, Endian.big); return bytes.buffer.asUint8List(); } List _int16ToBytes(int value) { final bytes = ByteData(2); bytes.setInt16(0, value, Endian.big); return bytes.buffer.asUint8List(); } static PackageHeader? fromBytesData(Uint8List data) { if (data.length < 10) { getLogger().i('数据不足以解析PackageHeader'); return null; } final byteData = ByteData.sublistView(data); int totalSize = byteData.getUint32(0, Endian.big); int headerSize = byteData.getUint16(4, Endian.big); int protocolVer = byteData.getUint16(6, Endian.big); int operationCode = byteData.getUint32(8, Endian.big); int seq = byteData.getUint32(12, Endian.big); return PackageHeader( totalSize: totalSize, headerSize: headerSize, protocolVer: protocolVer, operationCode: operationCode, seq: seq, ); } } abstract class Message { String toJsonStr(); int getMessageSize(); } class AuthMessage implements Message { int roomid; int uid; int protover; String platform; int type; String key; AuthMessage({ required this.roomid, required this.uid, required this.protover, required this.platform, required this.type, required this.key, }); @override String toJsonStr() { final message = { 'roomid': roomid, 'uid': uid, 'protover': protover, 'platform': platform, 'type': type, 'key': key, }; return jsonEncode(message); } @override int getMessageSize() { return utf8.encode(toJsonStr()).length; } } abstract class AbstractPackage { PackageHeader header; T body; Uint8List marshal(); AbstractPackage({required this.header, required this.body}); } //认证包 class AuthPackage extends AbstractPackage { AuthPackage({required super.header, required super.body}); @override Uint8List marshal() { int size = body.getMessageSize(); header.headerSize = 0x10; // 固定大小 size += header.headerSize; header.totalSize = size; final buffer = BytesBuilder(); buffer.add(header.toBytes()); buffer.add(utf8.encode(body.toJsonStr())); return buffer.toBytes(); } } //心跳包 class HeartbeatPackage extends AbstractPackage { HeartbeatPackage({required super.header, super.body}); @override Uint8List marshal() { final buffer = BytesBuilder(); header.headerSize = 0x10; header.totalSize = 0x10; buffer.add(header.toBytes()); return buffer.toBytes(); } } class LiveMessageStream { String streamToken; int roomId, uid; List servers; List eventListeners = []; LiveMessageStream({ required this.streamToken, required this.roomId, required this.uid, required this.servers, }); WebSocket? socket; StreamSubscription? _socketSubscription; bool heartBeat = true; PiliLogger logger = getLogger(); final String logTag = "LiveStreamService"; void init() async { final authPackage = AuthPackage( header: PackageHeader( totalSize: 0, headerSize: 0, protocolVer: 1, operationCode: 7, seq: 1, ), body: AuthMessage( roomid: roomId, uid: uid, protover: 3, platform: 'web', type: 2, key: streamToken, ), ); // final marshaledData = authPackage.marshal(); // logger.d(marshaledData); try { Future getSocket() async { for (final server in servers) { try { return await WebSocket.connect( server, headers: MineController.anonymity.value ? {'cookie': ''} : null, ); } catch (_) {} } throw Exception("all servers connect failed"); } socket = await getSocket(); // logger.d('$logTag ===> TCP连接建立'); // logger.d('$logTag ===> 发送认证包'); _socketSubscription = socket?.listen( (data) { PackageHeader? header = PackageHeader.fromBytesData(data); if (header != null) { List decompressedData = []; //心跳包回复不用处理 if (header.operationCode == 3) return; if (header.operationCode == 8) { _heartBeat(); } try { switch (header.protocolVer) { case 0: case 1: _processingData(data); return; case 2: decompressedData = ZLibDecoder().convert(data.sublist(0x10)); break; case 3: decompressedData = const BrotliDecoder().convert(data.sublist(0x10)); //debugPrint('Body: ${utf8.decode()}'); } _processingData(decompressedData); } catch (e) { logger.i(e); } } }, ); socket?.add(authPackage.marshal()); } catch (e) { SmartDialog.showToast("弹幕地址链接失败"); // logger.i('$logTag ===> TCP连接失败: $e'); } } void _processingData(List data) { try { PackageHeader? subHeader = PackageHeader.fromBytesData(Uint8List.fromList(data)); if (subHeader != null) { var msgBody = utf8 .decode(data.sublist(subHeader.headerSize, subHeader.totalSize)); for (var f in eventListeners) { f(jsonDecode(msgBody)); } if (subHeader.totalSize < data.length) { _processingData(data.sublist(subHeader.totalSize)); } } } catch (e) { logger.i('ParseHeader错误: $e'); } } void _heartBeat() async { logger.i("$logTag 直播间信息流认证成功"); int heartBeatCount = 1; while (heartBeat) { await Future.delayed(const Duration(seconds: 30)); //发送心跳包 var package = HeartbeatPackage( header: PackageHeader( totalSize: 0, headerSize: 0, protocolVer: 1, operationCode: 2, seq: heartBeatCount, ), ); try { socket?.add(package.marshal()); } catch (_) {} heartBeatCount++; } } void addEventListener(void Function(dynamic) func) { eventListeners.add(func); } void close() async { heartBeat = false; eventListeners.clear(); _socketSubscription?.cancel(); socket?.close(); } }