- 相關(guān)推薦
如何解決Java Socket通信技術(shù)收發(fā)線程互斥
Java Socket通信技術(shù)在很長的時(shí)間里都在使用,在不少的程序員眼中都有很多高的評價(jià)。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。
下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。
1.package com.bill99.svr;
2.import java.io.IOException;
3.import java.io.InputStream;
4.import java.io.OutputStream;
5.import java.net.InetSocketAddress;
6.import java.net.Socket;
7.import java.net.SocketException;
8.import java.net.SocketTimeoutException;
9.import java.text.SimpleDateFormat;
10.import java.util.Date;
11.import java.util.Properties;
12.import java.util.Timer;
13.import java.util.TimerTask;
14.import java.util.concurrent.ConcurrentHashMap;
15.import java.util.concurrent.TimeUnit;
16.import java.util.concurrent.locks.Condition;
17.import java.util.concurrent.locks.ReentrantLock;
18.import org.apache.log4j.Logger;
19./**
20.*
title: socket通信包裝類
21.*
Description:
22.*
CopyRight: CopyRight (c) 2009
23.*
Company: 99bill.com
24.*
Create date: 2009-10-14
25.*author sunnylocus
26. * v0.10 2009-10-14 初類
27.* v0.11 2009-11-12 對命令收發(fā)邏輯及收發(fā)線程互斥機(jī)制進(jìn)行了優(yōu)化,
處理命令速度由原來8~16個(gè)/秒提高到25~32個(gè)/秒
28.*/ public class SocketConnection {
29.private volatile Socket socket;
30.private int timeout = 1000*10; //超時(shí)時(shí)間,初始值10秒
31.private boolean isLaunchHeartcheck = false;//是否已啟動(dòng)心跳檢測
32.private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接
33.private static String host = "";
34.private static int port;
35.static InputStream inStream = null;
36.static OutputStream outStream = null;
37.private static Logger log =Logger.getLogger
(SocketConnection.class);
38.private static SocketConnection socketConnection = null;
39.private static java.util.Timer heartTimer=null;
40.//private final Map
synchronizedMap(new HashMap
41.private final ConcurrentHashMap
= new ConcurrentHashMap
42.private static Thread receiveThread = null;
43.private final ReentrantLock lock = new ReentrantLock();
44.private SocketConnection(){
45.Properties conf = new Properties();
46.try {
47.conf.load(SocketConnection.class.getResourceAsStream
("test.conf"));
48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));
49.init(conf.getProperty("ip"),Integer.valueOf
(conf.getProperty("port")));
50.} catch(IOException e) {
51.log.fatal("socket初始化異常!",e);
52.throw new RuntimeException("socket初始化異常,請檢查配置參數(shù)");
53.}
54.}
55./**
56.* 單態(tài)模式
57.*/
58.public static SocketConnection getInstance() {
59.if(socketConnection==null) {
60.synchronized(SocketConnection.class) {
61.if(socketConnection==null) {
62.socketConnection = new SocketConnection();
63.return socketConnection;
64.}
65.}
66.}
67.return socketConnection;
68.}
69.private void init(String host,int port) throws IOException {
70.InetSocketAddress addr = new InetSocketAddress(host,port);
71.socket = new Socket();
72.synchronized (this) {
73.log.info("【準(zhǔn)備與"+addr+"建立連接】");
74.socket.connect(addr, timeout);
75.log.info("【與"+addr+"連接已建立】");
76.inStream = socket.getInputStream();
77.outStream = socket.getOutputStream();
78.socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送
79.socket.setSoLinger(true, 0);//socket關(guān)閉時(shí),立即釋放資源
80.socket.setKeepAlive(true);
81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸
82.isNetworkConnect=true;
83.receiveThread = new Thread(new ReceiveWorker());
84.receiveThread.start();
85.SocketConnection.host=host;
86.SocketConnection.port=port;
87.if(!isLaunchHeartcheck)
88.launchHeartcheck();
89.}
90.}
91./**
92.* 心跳包檢測
93.*/
94.private void launchHeartcheck() {
95.if(socket == null)
96.throw new IllegalStateException("socket is not
established!");
97.heartTimer = new Timer();
98.isLaunchHeartcheck = true;
99.heartTimer.schedule(new TimerTask() {
100.public void run() {
101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");
102.int mstType =9999;//999-心跳包請求
103.SimpleDateFormat dateformate = new SimpleDateFormat
("yyyyMMddHHmmss");
104.String msgDateTime = dateformate.format(new Date());
105.int msgLength =38;//消息頭長度
106.String commandstr = "00" +msgLength + mstType + msgStreamNo;
107.log.info("心跳檢測包 -> IVR "+commandstr);
108.int reconnCounter = 1;
109.while(true) {
110.String responseMsg =null;
111.try {
112.responseMsg = readReqMsg(commandstr);
113.} catch (IOException e) {
114.log.error("IO流異常",e);
115.reconnCounter ++;
116.}
117.if(responseMsg!=null) {
118.log.info("心跳響應(yīng)包 <- IVR "+responseMsg);
119.reconnCounter = 1;
120.break;
121.} else {
122.reconnCounter ++;
123.}
124.if(reconnCounter >3) {//重連次數(shù)已達(dá)三次,判定網(wǎng)絡(luò)連接中斷,
重新建立連接。連接未被建立時(shí)不釋放鎖
125.reConnectToCTCC(); break;
126.}
127.}
128.}
129.},1000 * 60*1,1000*60*2);
130.}
131./**
132.* 重連與目標(biāo)IP建立重連
133.*/
134.private void reConnectToCTCC() {
135.new Thread(new Runnable(){
136.public void run(){
137.log.info("重新建立與"+host+":"+port+"的連接");
138.//清理工作,中斷計(jì)時(shí)器,中斷接收線程,恢復(fù)初始變量
139.heartTimer.cancel();
140.isLaunchHeartcheck=false;
141.isNetworkConnect = false;
142.receiveThread.interrupt();
143.try {
144.socket.close();
145.} catch (IOException e1) {log.error("重連時(shí),關(guān)閉socket連
接發(fā)生IO流異常",e1);}
146.//----------------
147.synchronized(this){
148.for(; ;){
149.try {
150.Thread.currentThread();
151.Thread.sleep(1000 * 1);
152.init(host,port);
153.this.notifyAll();
154.break ;
155.} catch (IOException e) {
156.log.error("重新建立連接未成功",e);
157.} catch (InterruptedException e){
158.log.error("重連線程中斷",e);
159.}
160.}
161.}
162.}
163.}).start();
164.}
165./**
166.* 發(fā)送命令并接受響應(yīng)
167.* @param requestMsg
168.* @return
169.* @throws SocketTimeoutException
170.* @throws IOException
171.*/
172.public String readReqMsg(String requestMsg) throws IOException {
173.if(requestMsg ==null) {
174.return null;
175.}
176.if(!isNetworkConnect) {
177.synchronized(this){
178.try {
179.this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒有恢復(fù),拋出IO流異常
180.if(!isNetworkConnect) {
181.throw new IOException("網(wǎng)絡(luò)連接中斷!");
182.}
183.} catch (InterruptedException e) {
184.log.error("發(fā)送線程中斷",e);
185.}
186.}
187.}
188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號
189.outStream = socket.getOutputStream();
190.outStream.write(requestMsg.getBytes());
191.outStream.flush();
192.Condition msglock = lock.newCondition(); //消息鎖
193.//注冊等待接收消息
194.recMsgMap.put(msgNo, msglock);
195.try {
196.lock.lock();
197.msglock.await(timeout,TimeUnit.MILLISECONDS);
198.} catch (InterruptedException e) {
199.log.error("發(fā)送線程中斷",e);
200.} finally {
201.lock.unlock();
202.}
203.Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息
204.if(respMsg!=null &&(respMsg != msglock)) {
205.//已經(jīng)接收到消息,注銷等待,成功返回消息
206.return (String) respMsg;
207.} else {
208.log.error(msgNo+" 超時(shí),未收到響應(yīng)消息");
209.throw new SocketTimeoutException(msgNo+" 超時(shí),未收到響應(yīng)消息");
210.}
211.}
212.public void finalize() {
213.if (socket != null) {
214.try {
215.socket.close();
216.} catch (IOException e) {
217.e.printStackTrace();
218.}
219.}
220.}
221.//消息接收線程
222.private class ReceiveWorker implements Runnable {
223.String intStr= null;
224.public void run() {
225.while(!Thread.interrupted()){
226.try {
227.byte[] headBytes = new byte[4];
228.if(inStream.read(headBytes)==-1){
229.log.warn("讀到流未尾,對方已關(guān)閉流!");
230.reConnectToCTCC();//讀到流未尾,對方已關(guān)閉流
231.return;
232.}
233.byte[] tmp =new byte[4];
234.tmp = headBytes;
235.String tempStr = new String(tmp).trim();
236.if(tempStr==null || tempStr.equals("")) {
237.log.error("received message is null");
238.continue;
239.}
240.intStr = new String(tmp);
241.int totalLength =Integer.parseInt(intStr);
242.//----------------
243.byte[] msgBytes = new byte[totalLength-4];
244.inStream.read(msgBytes);
245.String resultMsg = new String(headBytes)+ new
String(msgBytes);
246.//抽出消息ID
247.String msgNo = resultMsg.substring(8, 8 + 24);
248.Condition msglock =(Condition) recMsgMap.get(msgNo);
249.if(msglock ==null) {
250.log.warn(msgNo+"序號可能已被注銷!響應(yīng)消息丟棄");
251.recMsgMap.remove(msgNo);
252.continue;
253.}
254.recMsgMap.put(msgNo, resultMsg);
255.try{
256.lock.lock();
257.msglock.signalAll();
258.}finally {
259.lock.unlock();
260.}
261.}catch(SocketException e){
262.log.error("服務(wù)端關(guān)閉socket",e);
263.reConnectToCTCC();
264.} catch(IOException e) {
265.log.error("接收線程讀取響應(yīng)數(shù)據(jù)時(shí)發(fā)生IO流異常",e);
266.} catch(NumberFormatException e){
267.log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr);
268.}
269.}
270.}
271.}
272.}
【如何解決Java Socket通信技術(shù)收發(fā)線程互斥】相關(guān)文章:
Java線程同步的方法10-25
java多線程面試題201710-03
2016年java多線程面試題及答案07-02
sun認(rèn)證考試輔導(dǎo):java關(guān)于多線程的部分操作07-27
PHP socket的配置08-04
超線程技術(shù)是什么意思09-09
如何編譯java程序09-28
如何讓JAVA代碼更高效07-18