鸿蒙与Java跨平台Socket通信实战

鸿蒙与Java跨平台Socket通信实战

 目录

1.整体通信架构

2.鸿蒙 ArkTS TCP 客户端实现

2.1 完整代码

2.2 核心代码解析

3.Java 多线程 TCP 服务器实现

3.1 主服务类 Server.java

3.2 工作线程类 WorkThread.java

3.3 核心代码解析

4.运行效果展示

4.1 鸿蒙客户端界面

4.2 Java 服务端控制台

5.核心知识点总结


        本篇博客将从零实现一个鸿蒙 ArkTS TCP 客户端Java 多线程 TCP 服务器的双向聊天功能,涵盖【绑定端口→建立连接→持续收发→资源释放】全流程,代码可直接运行,适配鸿蒙 5.0 + 与 Java 8 + 环境。

1.整体通信架构

鸿蒙客户端(ArkTS)

 绑定本地端口 → 连接Java服务端 → 发送消息(带换行符做边界)

Java服务端(Java)

 监听端口(ServerSocket)→ 线程池分配工作线程 → 读取客户端消息(按行解析) → 控制台输入回发消息 → 客户端接收并展示

2.鸿蒙 ArkTS TCP 客户端实现

2.1 完整代码

import { socket } from "@kit.NetworkKit" import { BusinessError } from "@kit.BasicServicesKit" // 导入鸿蒙工具库,这里主要用其编解码能力(TextDecoder) import util from "@ohos.util" // 构建TCP套接字实例,这是整个TCP通信的核心对象,所有TCP操作都基于该实例 let tcpSocket: socket.TCPSocket = socket.constructTCPSocketInstance() @Entry @Component struct Index { // 本地绑定的端口号,默认9990 @State localPort: number = 9990 // 消息历史记录,拼接所有收发消息 @State msgHistory: scroller: Scroller = new Scroller() // 远程TCP服务器的IP地址,默认局域网IP @State serverIP: string = "192.168.247.1" // 远程TCP服务器的端口号 @State serverPort: number = 9980 // 控制“连接服务器”按钮的可用状态:绑定本地端口成功后才启用 @State visibleFlag: boolean = false // 控制“发送消息”按钮的可用状态:连接服务器成功后才启用 @State sendFlag: boolean = false // 输入框中待发送的消息内容 @State sendMsg: // 绑定本地IP和端口 async bindPort() { // 定义本地地址对象:address为0.0.0.0表示绑定本机“所有网卡”的该端口 let localAddress: socket.NetAddress = { address: "0.0.0.0", port: this.localPort } await tcpSocket.bind(localAddress).then(() => { this.msgHistory = '绑定服务成功' + "\r\n" this.visibleFlag = true }).catch((e: BusinessError) => { this.msgHistory = "绑定服务失败" + "\r\n" }) // 注册TCP的message事件监听:服务器发送消息时,触发该回调 tcpSocket.on("message", async (value) => { console.log("鸿蒙接受到服务器传递的消息") // value.message是服务器发送的“二进制缓冲区”,鸿蒙的TCPSocket接收的消息是ArrayBuffer类型,必须通过util.TextDecoder解码为字符串才能展示 let buffer = value.message // 创建UTF-8解码器(鸿蒙标准编解码API) let textDecoder = util.TextDecoder.create("UTF-8") // 将二进制缓冲区转为Uint8Array,再解码为UTF-8字符串 let str = textDecoder.decodeToString(new Uint8Array(buffer)) // 拼接消息历史:服务器消息+时间戳+换行,实现日志式展示 this.msgHistory +="服务器发送的消息为:["+this.getCurrentTimeString()+"]:"+str+"\r\n" // 滚动器自动滚到底部,显示最新的服务器消息 this.scroller.scrollEdge(Edge.Bottom) }) } // 连接远程 TCP 服务器 async connServer() { // 封装服务器地址对象:由UI输入框的serverIP/serverPort赋值 let serverAddress: socket.NetAddress = { address: this.serverIP, port: this.serverPort } // 异步连接服务器:TCP的三次握手过程,IO操作需异步处理 await tcpSocket.connect({ address: serverAddress }).then(() => { this.msgHistory = "连接服务器成功" + "\r\n" this.sendFlag = true }).catch(() => { this.msgHistory = "连接服务器失败" + "\r\n" }) } // 补零工具函数 padZero = (n: number) => n < 10 ? "0" + n : n // 获取当前时间戳 getCurrentTimeString(): string { let let date = new Date() // time = date.getHours().toString() + ":" + date.getMinutes().toString() + ":" + date.getSeconds().toString() // 加上补零处理后的时间戳 time = this.padZero(date.getHours()) + ":" + this.padZero(date.getMinutes()) + ":" + this.padZero(date.getSeconds()) return time } // 向服务器发送消息 sendMessageServer() { // TCP是面向字节流的协议,没有“消息边界”,服务器无法区分连续发送的多条消息,因此添加换行符作为消息分隔符,是 TCP 字节流通信的通用解决方案。 tcpSocket.send({ data: this.sendMsg + "\r\n" }) .then(() => { this.msgHistory += "我:[" + this.getCurrentTimeString() + "]:" + this.sendMsg + "\r\n" }).catch(() => { this.msgHistory += "我:发送失败" + "\r\n" }) } build() { Column({ space: 20 }) { Text("鸿蒙套接字通信示例").width("100%").textAlign(TextAlign.Center).fontWeight(FontWeight.Bold) // 本地端口绑定区 Flex({ direction: FlexDirection.Row, justifyContent: FlexAlign.Start, alignItems: ItemAlign.Center }) { Text("本地IP和端口:").width("30%").fontSize(12) // 数字类型输入框,绑定localPort,输入变化时更新变量 TextInput({ text: this.localPort.toString() }).type(InputType.Number).width("40%").onChange((value) => { this.localPort = Number(value) console.log("输入本地的端口为:" + this.localPort) }) Button("绑定IP和端口").onClick(() => { this.bindPort() }).width("30%").fontSize(12) } // 服务器连接区 Flex({ direction: FlexDirection.Row, justifyContent: FlexAlign.Start, alignItems: ItemAlign.Center }) { Text("服务器地址:").width("20%").fontSize(12) TextInput({ text: this.serverIP }).width("25%").onChange((value) => { this.serverIP = value console.log("输入服务器IP地址为:" + this.serverIP) }) TextInput({ text: this.serverPort.toString() }).width("25%").onChange((value) => { this.serverPort = Number(value) console.log("输入服务器PORT为:" + this.serverPort) }) // 连接按钮:仅visibleFlag为true时可用,点击触发connServer() Button("连接服务器").enabled(this.visibleFlag).width("30%").onClick(() => { this.connServer() }) } // 消息发送区 Flex({ direction: FlexDirection.Row, justifyContent: FlexAlign.Start, alignItems: ItemAlign.Center }) { TextInput({ placeholder: "请输入要发送的消息:" }).onChange((value) => { this.sendMsg = value }) // 发送按钮:仅sendFlag为true时可用,点击触发sendMessageServer() Button("发送消息").enabled(this.sendFlag).width("30%").onClick(() => { this.sendMessageServer() }) } // 消息历史展示区 Scroll(this.scroller) { Text(this.msgHistory) .textAlign(TextAlign.Start) .padding(10).width("100%") } .align(Alignment.Top) .height(300) .backgroundColor(0xeeeeee) .scrollable(ScrollDirection.Vertical) .scrollBar(BarState.On) .scrollBarWidth(20) }.width("100%").height("100%") } }

2.2 核心代码解析

(1)TCP 套接字初始化

鸿蒙提供的TCPSocket是核心通信对象,所有 TCP 操作(绑定、连接、收发)都基于该实例。

let tcpSocket: socket.TCPSocket = socket.constructTCPSocketInstance()

(2)绑定本地端口

0.0.0.0表示绑定本机所有网卡,确保局域网内其他设备可连接。绑定成功后启用【连接服务器】按钮。

let localAddress: socket.NetAddress = { address: "0.0.0.0", port: this.localPort } await tcpSocket.bind(localAddress)

(3)消息监听与解码

服务器消息是二进制ArrayBuffer,需用TextDecoder解码为 UTF-8 字符串,自动滚动到底部保证最新消息可见。

tcpSocket.on("message", async (value) => { let buffer = value.message let textDecoder = util.TextDecoder.create("UTF-8") let str = textDecoder.decodeToString(new Uint8Array(buffer)) this.msgHistory += "服务器发送的消息为:[" + this.getCurrentTimeString() + "]:" + str + "\r\n" this.scroller.scrollEdge(Edge.Bottom) })

(4)发送消息(带边界)

TCP 是字节流协议,无天然消息边界,添加\r\n作为分隔符,与 Java 服务端的readLine()完美匹配。

tcpSocket.send({ data: this.sendMsg + "\r\n" })

3.Java 多线程 TCP 服务器实现

3.1 主服务类 Server.java

package com.pp.chapter1; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server { // 服务器端的核心套接字对象,用于监听客户端的TCP连接请求 private ServerSocket serverSocket; // 固定线程池:管理工作线程,避免线程泛滥 private ExecutorService threadPool; // 服务器绑定端口(与鸿蒙客户端默认端口一致) private static final int SERVER_PORT = 9980; // 线程池核心线程数 private static final int THREAD_POOL_SIZE = 10; // 构造方法:初始化服务、启动监听、线程池 public Server() { try { // 初始化固定线程池 threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE); // 创建ServerSocket并绑定端口 serverSocket = new ServerSocket(SERVER_PORT); // 避免端口被占用 serverSocket.setReuseAddress(true); System.out.println("=== TCP服务器启动成功 ==="); System.out.println("监听端口:" + SERVER_PORT); System.out.println("线程池初始化完成,核心线程数:" + THREAD_POOL_SIZE); // 死循环:持续监听客户端连接 while(true) { // 阻塞方法:调用后程序会暂停执行,直到有客户端发起 TCP连接请求并完成三次握手,才会返回Socket对象并继续执行 Socket socket = serverSocket.accept(); // 获取客户端IP+端口并打印 String clientInfo = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); System.out.println("【新客户端连接】" + clientInfo); // 将通信任务提交到线程池 threadPool.execute(new WorkThread(socket)); } } catch (IOException e) { System.err.println("=== TCP服务器启动失败 ==="); System.err.println("失败原因:端口" + SERVER_PORT + "被占用/权限不足," + e.getMessage()); System.exit(1); // 启动失败直接退出程序 } } public static void main(String[] args) { new Server(); } } 

3.2 工作线程类 WorkThread.java

package com.pp.chapter1; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; public class WorkThread implements Runnable { // 与单个客户端通信的Socket对象(由Server的accept()返回) private final Socket socket; // 客户端IP+端口(用于日志打印) private String clientInfo; // 构造方法:接收客户端Socket public WorkThread(Socket socket) { this.socket = socket; this.clientInfo = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); } @Override public void run() { // try-with-resources语法:自动关闭所有实现AutoCloseable的资源 // 一次性创建流,循环复用,避免重复创建;统一指定UTF-8编码,解决跨语言乱码 try ( // 客户端消息输入流:字节流→字符流(UTF-8)→缓冲流,一次创建持续使用 BufferedReader clientReader = new BufferedReader( new InputStreamReader(socket.getInputStream(), "UTF-8") ); // 服务器向客户端输出流:字节流→打印流(UTF-8+自动刷新),无需手动flush PrintWriter serverWriter = new PrintWriter( new OutputStreamWriter(socket.getOutputStream(), "UTF-8"), true // 自动刷新 ); // 服务器控制台输入流:读取控制台回发消息,UTF-8编码 BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in, "UTF-8") ) ) { System.out.println("【通信线程启动】" + clientInfo + ",开始监听客户端消息..."); String clientMsg; // 循环读取客户端消息 // readLine()返回null → 客户端主动断开连接(鸿蒙应用关闭/网络断开) while ((clientMsg = clientReader.readLine()) != null) { // 过滤客户端空消息(避免无效处理) if (clientMsg.trim().isEmpty()) { System.out.println("【空消息忽略】" + clientInfo + "发送了空消息"); continue; } // 打印客户端消息:线程名+客户端信息+消息 System.out.printf("[%s] 【客户端消息】%s:%s%n", Thread.currentThread().getName(), clientInfo, clientMsg); // 服务器控制台输入回发消息 System.out.print("请输入要回发给" + clientInfo + "的消息:"); String serverMsg = consoleReader.readLine(); // 过滤服务器空消息,避免发送空内容给客户端 if (serverMsg == null || serverMsg.trim().isEmpty()) { serverMsg = "【服务器】消息不能为空,已忽略"; } // 发送消息给客户端:PrintWriter开启自动刷新,直接println即可 serverWriter.println(serverMsg); System.out.printf("[%s] 【服务器回发】%s:%s%n", Thread.currentThread().getName(), clientInfo, serverMsg); } } catch (IOException e) { // 精细化异常日志:区分客户端正常断开/异常断开 if (socket.isClosed() || e.getMessage().contains("Connection reset")) { System.out.println("【客户端断开】" + clientInfo + "(正常/异常断开)"); } else { System.err.println("【通信异常】" + clientInfo + ",原因:" + e.getMessage()); } } finally { // 确保Socket关闭(即使try-with-resources出问题) try { if (socket != null && !socket.isClosed()) { socket.close(); } } catch (IOException e) { System.err.println("【Socket关闭失败】" + clientInfo + ",原因:" + e.getMessage()); } System.out.println("【通信线程销毁】" + clientInfo + ",释放所有通信资源\n"); } } } 

3.3 核心代码解析

(1)线程池管理多客户端

固定线程池避免大量客户端连接导致的线程泛滥,保证服务端稳定性,核心线程数可根据业务调整。

threadPool = Executors.newFixedThreadPool(THREAD_POOL_SIZE);

(2)try-with-resources自动释放资源

JVM 自动关闭所有实现AutoCloseable的资源,彻底解决 IO 流泄漏问题,关闭顺序与声明顺序逆序(先关输出流,再关输入流)。

try ( BufferedReader clientReader = new BufferedReader(...); PrintWriter serverWriter = new PrintWriter(...); BufferedReader consoleReader = new BufferedReader(...) ) { // 业务逻辑 }

(3)UTF-8 编码统一

所有流转换显式指定 UTF-8,与鸿蒙客户端编码一致,避免跨语言乱码。

new InputStreamReader(socket.getInputStream(), "UTF-8") new OutputStreamWriter(socket.getOutputStream(), "UTF-8")

(4)循环监听客户端消息

readLine()\r\n分割消息,返回null表示客户端断开连接,自动退出循环并释放资源。

while ((clientMsg = clientReader.readLine()) != null) { // 处理消息 }

4.运行效果展示

4.1 鸿蒙客户端界面

  • 绑定本地端口(9990)→ 连接服务器(192.168.247.1:9980)
  • 输入消息发送,服务端回发
  • 消息面板自动滚动,展示带时间戳的收发记录

4.2 Java 服务端控制台

  • 服务端启动后监听 9980 端口,线程池初始化完成
  • 客户端连接后打印 IP + 端口,分配工作线程处理通信
  • 接收客户端消息后,控制台输入回发内容,自动发送给客户端

5.核心知识点总结

跨语言通信关键:统一 UTF-8 编码 + 换行符做消息边界,保证 ArkTS 与 Java 的字节流解析一致。

TCP 服务端架构ServerSocket监听 + 线程池管理 +WorkThread处理单客户端。

资源管理:Java 用try-with-resources自动释放 IO 流,鸿蒙用异步 API 避免阻塞主线程。

UI 状态联动:鸿蒙客户端用@State变量控制按钮可用状态,实现【绑定→连接→发送】的流程化交互。

Read more

流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(上)

流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(上)

第一章:引言:数据处理的范式革命与Python的崛起 1.1 数据处理范式的演进:从批处理到实时智能 * 批处理时代(ETL 1.0):T+1模式,Hadoop/MapReduce主导,数据价值滞后,决策延迟显著。Python在脚本化、数据清洗环节崭露头角(Pandas, NumPy)。 * 流处理兴起(ETL 2.0):Kafka, Storm, Spark Streaming等推动“准实时”处理,满足监控、告警等场景。Python通过PySpark、Faust等库开始涉足流处理。 * 实时分析时代(ETL 3.0):Flink, Kafka Streams等实现毫秒级延迟,支持复杂事件处理(CEP)、实时仪表盘、在线机器学习。Python生态(Apache Beam Python

By Ne0inhk

【Python】6 种方法轻松将 Python 脚本打包成 EXE 应用

引言 Python 凭借其简洁的语法和强大的功能,在数据分析、Web 开发、自动化脚本等领域广受欢迎。它“开箱即用”的特性让开发者能够快速构建原型和应用程序。然而,对于最终用户而言,运行 Python 脚本往往意味着需要预先安装 Python 解释器及相关依赖库,这对非技术背景的用户来说无疑增加了门槛。 为了解决这一问题,将 Python 代码打包成独立的可执行文件(通常在 Windows 上是 .exe 文件)成为了一个非常实用的选择。这样,用户无需任何额外环境配置,就能像运行普通软件一样直接启动您的 Python 应用。本文将为您介绍六种主流且有效的 Python 打包工具,助您轻松实现跨平台分发。 1. PyInstaller: 最流行的选择 PyInstaller 是目前最广为人知、社区支持最广泛的 Python 打包工具之一。它能够很好地处理各种复杂的依赖关系,并支持将整个应用及其所需资源打包成一个或多个独立的可执行文件。 * 特点: * 支持 Windows,

By Ne0inhk

python:backtrader 使用指南

Backtrader 使用指南 Backtrader 是一款功能强大的 Python 量化交易回测框架,支持策略回测、实盘交易、多数据源、多时间周期等核心功能,适用于股票、期货、加密货币等各类交易品种。以下从核心概念、快速上手、进阶用法三个维度展开讲解。 一、核心概念 1. 核心组件 组件作用Cerebro主引擎,负责整合策略、数据、资金、佣金等,执行回测/交易Strategy策略类,自定义交易逻辑(开仓、平仓、止损止盈等)Data Feed数据源,支持 CSV、Yahoo Finance、Tushare 等,可自定义多时间周期数据Broker经纪商模拟,处理订单执行、佣金计算、资金管理Sizer仓位管理,控制每次交易的手数/股数Indicator技术指标,内置 MA、RSI、MACD 等百余种指标,

By Ne0inhk

【2025最新】Python量化数据接口指南:baostock 免费获取分钟级K线教程

baostock 是一个对Python量化爱好者非常友好的免费开源证券数据平台,尤其适合获取A股历史行情数据。我为你准备了这份2025年更新的baostock使用指南,希望能帮助你高效地获取数据。 1. 认识baostock Baostock(证券宝)是一个免费、开源的证券数据平台。它通过Python API提供大量准确、完整的证券历史行情数据、上市公司财务数据等,能满足量化交易投资者、数量金融爱好者、计量经济从业者的数据需求。 它的数据返回格式为pandas DataFrame类型,这对于使用pandas/NumPy/Matplotlib进行数据分析和可视化非常友好。 2. 数据范围与时间 baostock的数据覆盖范围主要包括: 数据类型 包含内容 时间范围 备注                 股票数据 日、周、月K线数据 1990-12-19至今 5、15、30、60分钟K线数据 1999-07-26至今 指数数据 综合指数,规模指数,一级行业指数,二级行业指数,策略指数,成长指数,价值指数,主题指数,基金指数,

By Ne0inhk