Socket 概述
套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将 I/O 插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是 IP 地址与端口的组合。socket 提供的函数是操作系统内核将“TCP/IP 协议栈 + 底层网卡”抽象出来的一个个用户友好的函数,用于操纵本地的“TCP/IP 协议栈 + 底层网卡”与远端的服务器/主机完成通信的任务。
TCP
🔸 TCP 是面向连接的通信协议
🔸 通过三次握手
建立连接,通讯完成时要拆除连接
🔸 由于 TCP 是面向连接的所以只能用于端到端的通讯
🔸 三次握手四次挥手
🔸 具有校验机制,可靠,数据传输稳定
简单的 Socket 小案例
Socket 客户端
package TcpDemo; import java.io.*; import java.net.*; import java.util.Scanner;
public class Client { private static final int REMOTE_PORT = 20000;
private static final int LOCAL_PORT = 30000;
public static void main(String[] args) throws IOException { Socket socket = creatSocket(); initSocket(socket); socket.bind(new InetSocketAddress(InetAddress.getLocalHost(), LOCAL_PORT)); socket.connect(new InetSocketAddress(InetAddress.getLocalHost(), REMOTE_PORT), 3000); System.out.println("客户端已经发起连接"); System.out.println("客户端信息:" + socket.getLocalAddress() + "port:" + socket.getLocalPort()); System.out.println("服务端信息" + socket.getInetAddress() + "port:" + socket.getPort()); try { sendMsg(socket); } catch (Exception e) { System.err.println("连接异常关闭!!!!"); e.printStackTrace(); } finally { socket.close(); } }
private static void initSocket(Socket socket) throws SocketException { socket.setSoTimeout(3000); socket.setReuseAddress(true); socket.setTcpNoDelay(false); socket.setKeepAlive(true);
socket.setSoLinger(true, 0);
socket.setOOBInline(true); socket.setReceiveBufferSize(64*1024); socket.setSendBufferSize(64*1024); socket.setPerformancePreferences(1,1,1); } @SuppressWarnings("all") private static Socket creatSocket() throws IOException {
Socket socket = new Socket(); return socket; }
private static void sendMsg(Socket socket) throws IOException { Scanner scanner = new Scanner(System.in); OutputStream socketOutputStream = socket.getOutputStream(); PrintStream printStream = new PrintStream(socketOutputStream); InputStream inputStream = socket.getInputStream(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
boolean flag = true; do { printStream.println(scanner.nextLine()); String s = bufferedReader.readLine(); if ("bye".equals(s)) { flag = false; } else { System.out.println("服务端响应:" + s); } } while (flag); bufferedReader.close(); printStream.close(); scanner.close(); } }
|
Socket 服务端
这里为了同时处理多个客户端设计成了多线程异步的模式
package TcpDemo; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.*;
public class Server {
private static final int SERVER_PORT = 20000;
public static void main(String[] args) throws IOException { ServerSocket server = creatServerSocket(); initServerSocket(server); server.bind(new InetSocketAddress(InetAddress.getLocalHost(), SERVER_PORT), 50); System.out.println("服务器准备就绪"); System.out.println("服务端信息" + server.getInetAddress() + " port:" + server.getLocalPort()); while (true) { Socket client = server.accept(); ClientHandle clientHandle = new ClientHandle(client); new Thread(clientHandle).start(); } }
private static void initServerSocket(ServerSocket server) throws SocketException { server.setReuseAddress(true); server.setReceiveBufferSize(64 * 1024); server.setPerformancePreferences(1, 1, 1); }
private static ServerSocket creatServerSocket() throws IOException { ServerSocket server = new ServerSocket(); return server; }
private static class ClientHandle implements Runnable { private Socket socket;
ClientHandle(Socket client) { this.socket = client; }
public void run() { System.out.println("新客户端连接:" + socket.getInetAddress() + "port:" + socket.getPort()); try { BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintStream printStream = new PrintStream(socket.getOutputStream()); boolean flag = true; do { String s = reader.readLine(); if ("bye".equalsIgnoreCase(s)) { flag = false; System.out.println("客户端关闭了连接"); printStream.println("bye"); } else { System.out.println(s); printStream.println("字符串长度#" + s.length()); } } while (flag); reader.close(); printStream.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
|
具体的一些常用的方法解释都在代码的注释中 参考

传输基本数据类型
上面的哪个小案例传送的都是字符串类型的数据,也许有同学会说这些基本类型不都是可以通过字符串来传吗?为什么要费那个劲去传这些基本类型?其实不然,这里假设要传送的是 int 类型的 12345678 ,如果通过 int
来传输只要在范围内都是** 4 个字节大小固定,然而通过String
传送将会是”12345678” 也就是 8 **个字节,消耗要比使用int
要大,而且长度不固定,不方便后期接受的长度判断。
int
类型
在网络上传输的都是以** Byte 为基本单位,如果要传送 int 我们就需要将 int 转换为 byte,一个 int 是 4 个字节,我们可以将其转换为一个 byte[] **数组,废话不多说,上代码
public class ByteTools { public static byte[] int2byte(int a) { return new byte[]{ (byte) (a >>> 24), (byte) (a >>> 16), (byte) (a >>> 8), (byte) (a) }; } public static int byte2int(byte[] a) { return a[3] & 0xff | (a[2] & 0xff) << 8 | (a[1] & 0xff) << 16 | (a[0] & 0xff) << 24; } }
|
为什么这样做的一些细节可以参考 这篇博客
有了这个工具类我们就可以将** int 转换为 byte[] **后进行传输,同时接收端也可以通过这个方法将数据还原。
等等🙄 ,这样一来不是所有的类型对应的都要去写个这样的转换的方法?那还是有点麻烦的,而且也没有什么技术含量,所以这样的事情** JDK **帮我们做了
ByteBuffer
**:nio 中的一个包,这里我还不太熟悉这个具体的作用,目前只知道可以用来包装 byte[]**,然后可以实现上面的类型转换
Client 发送端
byte[] buffer=new byte[256];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
byteBuffer.put((byte) 126);
byteBuffer.putInt(123);
byteBuffer.putChar('A');
byteBuffer.putLong(323333231234124321L); boolean isOk=true;
byteBuffer.put((byte) (isOk?1:0));
byteBuffer.putFloat(123.2132F);
byteBuffer.putDouble(231.1412421321);
byteBuffer.put("HelloWorld".getBytes());
socketOutputStream.write(buffer,0,byteBuffer.position());
|
需要注意的地方就是最后** write 的时候,第二个参数 len,直接传 position()**, 就可以了,不用+1,这个 position 是下一个字节位置
这里其实我看的教程这里是加 1 了的,最后接收过来的数据长度死活对不上,我开始还以为是什么内存对齐,什么乱七八糟的然后才发现是这里有问题。
Server 接收端
基本类型的读取与上面对应的** get,最后一个 String 需要注意,直接用原始的 buffer 就可以了,不需要借助 ByteBuffer**,这里同样后面不用-1
String str = new String(buffer, byteBuffer.position(), readByteCount-byteBuffer.position());
|
测试结果
服务器准备就绪 服务端信息 LAPTOP-V5R5ABUJ/192.168.25.1 port:20000 新客户端连接:/192.168.25.1port:30000 当前下标 28 接受到 Client 数据长度 (byte):38 Client 发送的数据: 126 123 A 323333231234124321 true 123.2132 231.1412421321 HelloWorld
|
UDP
🔸 UDP 是无连接的,即发送数据之前不需要建立连接,因此减少了开销和发送数据之前的时延。
🔸 UDP 使用尽最大努力交付,即不保证可靠交付,因此主机不需要维持复杂的连接状态表。
🔸 UDP 是面向报文的。发送方的 UDP 对应用程序交下来的报文,在添加首部后就向下交付 IP 层。UDP 对应用层交下来的报文,既不合并,也不拆分,而是保留这些报文的边界。因此,应用程序必须选择合适大小的报文。
🔸 UDP 没有拥塞控制,因此网络出现的拥塞不会使源主机的发送速率降低。很多的实时应用(如 IP 电话、实时视频会议等)要去源主机以恒定的速率发送数据,并且允许在网络发生拥塞时丢失一些数据,但却不允许数据有太多的时延。UDP 正好符合这种要求。
🔸 UDP 支持一对一、一对多、多对一和多对多的交互通信。
🔸 UDP 的首部开销小,只有 8 个字节,比 TCP 的 20 个字节的首部要短。
单播
消息接收者
import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketAddress;
public class UDPProvide { public static void main(String[] args) throws IOException { DatagramSocket socket=new DatagramSocket(20000); System.out.println("UDPProvide is start...."); final byte[] buf=new byte[512]; DatagramPacket udp_receive=new DatagramPacket(buf,buf.length); socket.receive(udp_receive); SocketAddress socketAddress = udp_receive.getSocketAddress(); int datalen = udp_receive.getLength(); String receive=new String(udp_receive.getData(),0,datalen); System.out.println("receive from the: "+socketAddress); System.out.println("receive data: "+ receive); byte[] bytes = ("provider receive the data success "+datalen).getBytes(); DatagramPacket udp_sendBack=new DatagramPacket(bytes,bytes.length,socketAddress); socket.send(udp_sendBack); System.out.println("UDPProvide Finished."); socket.close(); } }
|
消息发送者
import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress;
public class UDPSearch { public static void main(String[] args) throws IOException { System.out.println("UDPSearch is ready..."); DatagramSocket socket = new DatagramSocket(); byte[] buff= "hello world".getBytes(); DatagramPacket udp_send=new DatagramPacket(buff,buff.length); udp_send.setAddress(InetAddress.getLocalHost()); udp_send.setPort(20000); socket.send(udp_send); final byte[] buf=new byte[512]; DatagramPacket udp_receive=new DatagramPacket(buf,buf.length); socket.receive(udp_receive); String s = new String(udp_receive.getData(), 0, udp_receive.getLength()); System.out.println(s); System.out.println("UDPSearch is over"); socket.close(); } }
|
多播&广播
消息建造器
public class MessageCreator { private static final String SN_HEADER = "收到暗号,我是 SN:"; private static final String PORT_HEADER = "这是暗号,请回送到该端口:";
public static String buildWithPort(int port) { return PORT_HEADER + port; }
public static int parsePort(String sn) { if (sn.startsWith(PORT_HEADER)) { return Integer.parseInt(sn.substring(PORT_HEADER.length())); } return -1; }
public static String buildWithSn(String sn){ return SN_HEADER+sn; }
public static String parseSn(String sn){ if(sn.startsWith(SN_HEADER)){ return sn.substring(SN_HEADER.length()); } return null; } }
|
消息接受者
package UdpDemo2; import java.io.IOException; import java.net.*; import java.util.UUID;
public class UDPProvide { public static int PROVIDE_LISTEN_PORT = 20000;
public static void main(String[] args) throws IOException { String sn = UUID.randomUUID().toString(); Provider provider = new Provider(sn); new Thread(provider).start(); System.in.read(); provider.shutdown(); }
public static class Provider implements Runnable { public volatile boolean isDone = false; public DatagramSocket socket = null; public final String sn;
public Provider(String sn) { this.sn = sn; }
public void run() { System.out.println("UDPProvide is start...."); try { socket = new DatagramSocket(PROVIDE_LISTEN_PORT); while (!isDone) { final byte[] buf = new byte[512]; DatagramPacket udp_receive = new DatagramPacket(buf, buf.length); socket.receive(udp_receive); InetSocketAddress socketAddress = (InetSocketAddress) udp_receive.getSocketAddress(); int datalen = udp_receive.getLength(); String receive = new String(udp_receive.getData(), 0, datalen); System.out.println("receive from the: " + socketAddress); System.out.println("receive data: " + receive); int port = MessageCreator.parsePort(receive); if (port != -1) { byte[] responseBody = MessageCreator.buildWithSn(sn).getBytes(); DatagramPacket udp_sendBack = new DatagramPacket(responseBody, 0, responseBody.length, socketAddress.getAddress(), port); socket.send(udp_sendBack); } } } catch (IOException e) { } finally { closeRes(); } System.out.println("UDPProvide Finished."); }
public void shutdown() { isDone = true; closeRes(); }
private void closeRes() { if (socket != null) { socket.close(); socket = null; } } } }
|
与上面不同的是这里为了随时可以停止将其构建成了异步线程,当接受到终止信号的时候就会改变状态量,并 close 资源,然后利用异常停止线程。
消息发送者
package UdpDemo2;
import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch;
public class UDPSearch { private static final int SEARCH_LISTEN_PORT = 30000; private static DatagramSocket socket = null; public static void main(String[] args) throws IOException, InterruptedException { System.out.println("UDPSearch is start..."); Listener listen = listen(); sendBoard(); System.in.read(); List<Device> devices = listen.closeAndGetDeviceList(); for (Device device : devices) { System.out.println(device); } }
private static Listener listen() throws InterruptedException { System.out.println("UDPSearch Listener is start"); CountDownLatch countDownLatch=new CountDownLatch(1); Listener listener=new Listener(SEARCH_LISTEN_PORT,countDownLatch); new Thread(listener).start(); countDownLatch.await(); return listener; }
public static void sendBoard() throws IOException { DatagramSocket socket = new DatagramSocket(); byte[] buff = MessageCreator.buildWithPort(SEARCH_LISTEN_PORT).getBytes(); DatagramPacket udp_send = new DatagramPacket(buff, buff.length); udp_send.setAddress(InetAddress.getByName("255.255.255.255")); udp_send.setPort(UDPProvide.PROVIDE_LISTEN_PORT); socket.send(udp_send); socket.close(); System.out.println("UDPSearch Board is over"); }
private static class Device { int port; String ip; String sn;
public Device(int port, String ip, String sn) { this.port = port; this.ip = ip; this.sn = sn; } @Override public String toString() { return "Device{" + "port=" + port + ", ip='" + ip + '\'' + ", sn='" + sn + '\'' + '}'; } }
public static class Listener implements Runnable { private final int listenPort; private final CountDownLatch countDownLatch; private final List<Device> deviceList = new ArrayList<Device>();
private boolean isDone = false;
public Listener(int listenPort, CountDownLatch countDownLatch) { this.listenPort = listenPort; this.countDownLatch = countDownLatch; }
public void run() { countDownLatch.countDown(); try { socket = new DatagramSocket(listenPort); while (!isDone) { final byte[] buf = new byte[512]; DatagramPacket udp_receive = new DatagramPacket(buf, buf.length); socket.receive(udp_receive); InetSocketAddress socketAddress = (InetSocketAddress) udp_receive.getSocketAddress(); int datalen = udp_receive.getLength(); String sn = new String(udp_receive.getData(), 0, datalen); System.out.println("back from the:" + socketAddress); System.out.println("back data:" + sn); sn=MessageCreator.parseSn(sn); if (sn != null) { deviceList.add(new Device(socketAddress.getPort(), socketAddress.getAddress().toString(), sn)); } } } catch (IOException e) { } finally { closeRes(); } System.out.println("UDPSearch Listener is Finished..."); }
public void closeRes() { if (socket != null) { socket.close(); socket = null; } }
List<Device> closeAndGetDeviceList() { isDone = true; closeRes(); return deviceList; } } }
|
这里需要注意的就是广播的地址** 255.255.255.255**
如果是在局域网内和其他机器通信需要关闭虚拟机的网卡,不然是走的虚拟机的网卡,其他机器接收不到。(我说怎么发送的** IP 不是我的本机的 IP**)
🔸UDP 是面向无连接的通讯协议,基于用户数据报的协议
🔸UDP 数据包括目的端口号和源端口号信息
🔸通讯不需要连接,所以可以实现广播发送,并不局限于端到端
🔸结构简单,无校验,速度快,容易丢包,可广播
🔸他一旦把应用程序发给网络层的数据发送出去就不保留数据备份
UDP 辅助 TCP 实现点对点传输
客户端先利用UDP
向局域网发送广播,然后对应的服务器接收到之后就会将对应的TCP
的端口回送给客户端,然后二者进行 TCP 的双向通信,代码太多这里就只放一下Server
端的Handler
package udp_tcp_concurrency.server.handle;
import udp_tcp_concurrency.utils.CloseUtils;
import java.io.*; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ClientHandler {
private final Socket socket; private final ServerReadHandler serverReadHandler; private final ServerWriterHandler serverWriterHandle;
private final CloseNotify closeNotify;
public ClientHandler(Socket socket, CloseNotify closeNotify) throws IOException { this.socket = socket; this.serverReadHandler = new ServerReadHandler(socket.getInputStream()); this.serverWriterHandle = new ServerWriterHandler(socket.getOutputStream()); this.closeNotify = closeNotify; System.out.println("新客户端连接:" + socket.getInetAddress() + "port:" + socket.getPort()); }
public void send(String str) { serverWriterHandle.send(str); }
public void stop() { serverReadHandler.stopRead(); serverWriterHandle.stopWriter(); CloseUtils.close(socket); System.out.println("客户端已经退出"); System.out.println("address:" + socket.getInetAddress() + ",port:" + socket.getPort()); }
private void stopByMyself() { stop(); closeNotify.onSelfClosed(this); }
public void read2Print() { new Thread(serverReadHandler).start(); }
public interface CloseNotify{ void onSelfClosed(ClientHandler clientHandler); }
class ServerReadHandler implements Runnable { private boolean done = false; private final InputStream inputStream;
public ServerReadHandler(InputStream inputStream) { this.inputStream = inputStream; }
public void run() { try { BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); do { String s = reader.readLine(); if (s == null) { System.out.println("客户端已经无法发送数据"); ClientHandler.this.stopByMyself(); break; } System.out.println(s); } while (!done); } catch (IOException e) { if (!done) { System.err.println("连接异常断开"+e.getMessage()); ClientHandler.this.stopByMyself(); } } finally { CloseUtils.close(inputStream); } }
public void stopRead() { done = true; CloseUtils.close(inputStream); } }
class ServerWriterHandler { private boolean done = false; private final PrintStream printStream; private final ExecutorService executorService;
public ServerWriterHandler(OutputStream outputStream) { this.printStream = new PrintStream(outputStream); executorService = Executors.newSingleThreadExecutor(); }
public void send(String str) { executorService.submit(new WriteRunnable(str)); }
class WriteRunnable implements Runnable { private final String msg;
public WriteRunnable(String msg) { this.msg = msg; }
@Override public void run() { if(ServerWriterHandler.this.done){ return; } try { ServerWriterHandler.this.printStream.println(msg); }catch (Exception e){ System.out.println("write 异常退出:"+e.getMessage()); } } } public void stopWriter() { done = true; CloseUtils.close(printStream); executorService.shutdownNow(); } } }
|
完整代码放在 github 有一点需要注意的是这里用了一个单线程池去处理服务端发送消息的功能,这里其实用线程通信机制wait/notify
也可以做到但是相比使用线程池会复杂许多。
局域网聊天室实现
这里的聊天室,其实关键的地方就在于对客户端发送的消息交由服务端进行转发。
基于上面的进行改造
public interface ClientHandleCallBack {
void onSelfClosed(ClientHandler clientHandler);
void onNewMessageArrived(ClientHandler clientHandler,String msg); }
|
增加一个消息抵达的接口,然后为了避免阻塞交给异步的单线程池去处理
@Override public void onNewMessageArrived(ClientHandler clientHandler, String msg) { System.out.println("Receive from:"+clientHandler.getClientInfo()+" msg:"+msg); forwardThreadPool.submit(()->{ for (ClientHandler clientHandle : clientHandles) { if(clientHandle.equals(clientHandler)){ continue; } clientHandle.send(msg); } }); }
|
详细代码 见 Github
感谢鼓励