亚洲AV日韩AⅤ综合手机在线观看,激情婷婷久久综合色,欧美色五月婷婷久久,久久国产精品99久久人人澡

  • <abbr id="uk6uq"><abbr id="uk6uq"></abbr></abbr>
  • <tbody id="uk6uq"></tbody>
  • 如何解決Java Socket通信技術(shù)收發(fā)線程互斥

    時(shí)間:2024-08-28 18:14:33 SUN認(rèn)證 我要投稿
    • 相關(guān)推薦

    如何解決Java Socket通信技術(shù)收發(fā)線程互斥

      Java Socket通信技術(shù)在很長的時(shí)間里都在使用,在不少的程序員眼中都有很多高的評價(jià)。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。

      下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。

      1.package com.bill99.svr;

      2.import java.io.IOException;

      3.import java.io.InputStream;

      4.import java.io.OutputStream;

      5.import java.net.InetSocketAddress;

      6.import java.net.Socket;

      7.import java.net.SocketException;

      8.import java.net.SocketTimeoutException;

      9.import java.text.SimpleDateFormat;

      10.import java.util.Date;

      11.import java.util.Properties;

      12.import java.util.Timer;

      13.import java.util.TimerTask;

      14.import java.util.concurrent.ConcurrentHashMap;

      15.import java.util.concurrent.TimeUnit;

      16.import java.util.concurrent.locks.Condition;

      17.import java.util.concurrent.locks.ReentrantLock;

      18.import org.apache.log4j.Logger;

      19./**

      20.*

    title: socket通信包裝類

     

      21.*

    Description:

     

      22.*

    CopyRight: CopyRight (c) 2009

     

      23.*

    Company: 99bill.com

     

      24.*

    Create date: 2009-10-14

     

      25.*author sunnylocus

      26. * v0.10 2009-10-14 初類

      27.* v0.11 2009-11-12 對命令收發(fā)邏輯及收發(fā)線程互斥機(jī)制進(jìn)行了優(yōu)化,

      處理命令速度由原來8~16個(gè)/秒提高到25~32個(gè)/秒

      28.*/ public class SocketConnection {

      29.private volatile Socket socket;

      30.private int timeout = 1000*10; //超時(shí)時(shí)間,初始值10秒

      31.private boolean isLaunchHeartcheck = false;//是否已啟動(dòng)心跳檢測

      32.private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接

      33.private static String host = "";

      34.private static int port;

      35.static InputStream inStream = null;

      36.static OutputStream outStream = null;

      37.private static Logger log =Logger.getLogger

      (SocketConnection.class);

      38.private static SocketConnection socketConnection = null;

      39.private static java.util.Timer heartTimer=null;

      40.//private final Map recMsgMap= Collections.

      synchronizedMap(new HashMap());

      41.private final ConcurrentHashMap recMsgMap

      = new ConcurrentHashMap();

      42.private static Thread receiveThread = null;

      43.private final ReentrantLock lock = new ReentrantLock();

      44.private SocketConnection(){

      45.Properties conf = new Properties();

      46.try {

      47.conf.load(SocketConnection.class.getResourceAsStream

      ("test.conf"));

      48.this.timeout = Integer.valueOf(conf.getProperty("timeout"));

      49.init(conf.getProperty("ip"),Integer.valueOf

      (conf.getProperty("port")));

      50.} catch(IOException e) {

      51.log.fatal("socket初始化異常!",e);

      52.throw new RuntimeException("socket初始化異常,請檢查配置參數(shù)");

      53.}

      54.}

      55./**

      56.* 單態(tài)模式

      57.*/

      58.public static SocketConnection getInstance() {

      59.if(socketConnection==null) {

      60.synchronized(SocketConnection.class) {

      61.if(socketConnection==null) {

      62.socketConnection = new SocketConnection();

      63.return socketConnection;

      64.}

      65.}

      66.}

      67.return socketConnection;

      68.}

      69.private void init(String host,int port) throws IOException {

      70.InetSocketAddress addr = new InetSocketAddress(host,port);

      71.socket = new Socket();

      72.synchronized (this) {

      73.log.info("【準(zhǔn)備與"+addr+"建立連接】");

      74.socket.connect(addr, timeout);

      75.log.info("【與"+addr+"連接已建立】");

      76.inStream = socket.getInputStream();

      77.outStream = socket.getOutputStream();

      78.socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送

      79.socket.setSoLinger(true, 0);//socket關(guān)閉時(shí),立即釋放資源

      80.socket.setKeepAlive(true);

      81.socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸

      82.isNetworkConnect=true;

      83.receiveThread = new Thread(new ReceiveWorker());

      84.receiveThread.start();

      85.SocketConnection.host=host;

      86.SocketConnection.port=port;

      87.if(!isLaunchHeartcheck)

      88.launchHeartcheck();

      89.}

      90.}

      91./**

      92.* 心跳包檢測

      93.*/

      94.private void launchHeartcheck() {

      95.if(socket == null)

      96.throw new IllegalStateException("socket is not

      established!");

      97.heartTimer = new Timer();

      98.isLaunchHeartcheck = true;

      99.heartTimer.schedule(new TimerTask() {

      100.public void run() {

      101.String msgStreamNo = StreamNoGenerator.getStreamNo("kq");

      102.int mstType =9999;//999-心跳包請求

      103.SimpleDateFormat dateformate = new SimpleDateFormat

      ("yyyyMMddHHmmss");

      104.String msgDateTime = dateformate.format(new Date());

      105.int msgLength =38;//消息頭長度

      106.String commandstr = "00" +msgLength + mstType + msgStreamNo;

      107.log.info("心跳檢測包 -> IVR "+commandstr);

      108.int reconnCounter = 1;

      109.while(true) {

      110.String responseMsg =null;

      111.try {

      112.responseMsg = readReqMsg(commandstr);

      113.} catch (IOException e) {

      114.log.error("IO流異常",e);

      115.reconnCounter ++;

      116.}

      117.if(responseMsg!=null) {

      118.log.info("心跳響應(yīng)包 <- IVR "+responseMsg);

      119.reconnCounter = 1;

      120.break;

      121.} else {

      122.reconnCounter ++;

      123.}

      124.if(reconnCounter >3) {//重連次數(shù)已達(dá)三次,判定網(wǎng)絡(luò)連接中斷,

      重新建立連接。連接未被建立時(shí)不釋放鎖

      125.reConnectToCTCC(); break;

      126.}

      127.}

      128.}

      129.},1000 * 60*1,1000*60*2);

      130.}

      131./**

      132.* 重連與目標(biāo)IP建立重連

      133.*/

      134.private void reConnectToCTCC() {

      135.new Thread(new Runnable(){

      136.public void run(){

      137.log.info("重新建立與"+host+":"+port+"的連接");

      138.//清理工作,中斷計(jì)時(shí)器,中斷接收線程,恢復(fù)初始變量

      139.heartTimer.cancel();

      140.isLaunchHeartcheck=false;

      141.isNetworkConnect = false;

      142.receiveThread.interrupt();

      143.try {

      144.socket.close();

      145.} catch (IOException e1) {log.error("重連時(shí),關(guān)閉socket連

      接發(fā)生IO流異常",e1);}

      146.//----------------

      147.synchronized(this){

      148.for(; ;){

      149.try {

      150.Thread.currentThread();

      151.Thread.sleep(1000 * 1);

      152.init(host,port);

      153.this.notifyAll();

      154.break ;

      155.} catch (IOException e) {

      156.log.error("重新建立連接未成功",e);

      157.} catch (InterruptedException e){

      158.log.error("重連線程中斷",e);

      159.}

      160.}

      161.}

      162.}

      163.}).start();

      164.}

      165./**

      166.* 發(fā)送命令并接受響應(yīng)

      167.* @param requestMsg

      168.* @return

      169.* @throws SocketTimeoutException

      170.* @throws IOException

      171.*/

      172.public String readReqMsg(String requestMsg) throws IOException {

      173.if(requestMsg ==null) {

      174.return null;

      175.}

      176.if(!isNetworkConnect) {

      177.synchronized(this){

      178.try {

      179.this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒有恢復(fù),拋出IO流異常

      180.if(!isNetworkConnect) {

      181.throw new IOException("網(wǎng)絡(luò)連接中斷!");

      182.}

      183.} catch (InterruptedException e) {

      184.log.error("發(fā)送線程中斷",e);

      185.}

      186.}

      187.}

      188.String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號

      189.outStream = socket.getOutputStream();

      190.outStream.write(requestMsg.getBytes());

      191.outStream.flush();

      192.Condition msglock = lock.newCondition(); //消息鎖

      193.//注冊等待接收消息

      194.recMsgMap.put(msgNo, msglock);

      195.try {

      196.lock.lock();

      197.msglock.await(timeout,TimeUnit.MILLISECONDS);

      198.} catch (InterruptedException e) {

      199.log.error("發(fā)送線程中斷",e);

      200.} finally {

      201.lock.unlock();

      202.}

      203.Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息

      204.if(respMsg!=null &&(respMsg != msglock)) {

      205.//已經(jīng)接收到消息,注銷等待,成功返回消息

      206.return (String) respMsg;

      207.} else {

      208.log.error(msgNo+" 超時(shí),未收到響應(yīng)消息");

      209.throw new SocketTimeoutException(msgNo+" 超時(shí),未收到響應(yīng)消息");

      210.}

      211.}

      212.public void finalize() {

      213.if (socket != null) {

      214.try {

      215.socket.close();

      216.} catch (IOException e) {

      217.e.printStackTrace();

      218.}

      219.}

      220.}

      221.//消息接收線程

      222.private class ReceiveWorker implements Runnable {

      223.String intStr= null;

      224.public void run() {

      225.while(!Thread.interrupted()){

      226.try {

      227.byte[] headBytes = new byte[4];

      228.if(inStream.read(headBytes)==-1){

      229.log.warn("讀到流未尾,對方已關(guān)閉流!");

      230.reConnectToCTCC();//讀到流未尾,對方已關(guān)閉流

      231.return;

      232.}

      233.byte[] tmp =new byte[4];

      234.tmp = headBytes;

      235.String tempStr = new String(tmp).trim();

      236.if(tempStr==null || tempStr.equals("")) {

      237.log.error("received message is null");

      238.continue;

      239.}

      240.intStr = new String(tmp);

      241.int totalLength =Integer.parseInt(intStr);

      242.//----------------

      243.byte[] msgBytes = new byte[totalLength-4];

      244.inStream.read(msgBytes);

      245.String resultMsg = new String(headBytes)+ new

      String(msgBytes);

      246.//抽出消息ID

      247.String msgNo = resultMsg.substring(8, 8 + 24);

      248.Condition msglock =(Condition) recMsgMap.get(msgNo);

      249.if(msglock ==null) {

      250.log.warn(msgNo+"序號可能已被注銷!響應(yīng)消息丟棄");

      251.recMsgMap.remove(msgNo);

      252.continue;

      253.}

      254.recMsgMap.put(msgNo, resultMsg);

      255.try{

      256.lock.lock();

      257.msglock.signalAll();

      258.}finally {

      259.lock.unlock();

      260.}

      261.}catch(SocketException e){

      262.log.error("服務(wù)端關(guān)閉socket",e);

      263.reConnectToCTCC();

      264.} catch(IOException e) {

      265.log.error("接收線程讀取響應(yīng)數(shù)據(jù)時(shí)發(fā)生IO流異常",e);

      266.} catch(NumberFormatException e){

      267.log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr);

      268.}

      269.}

      270.}

      271.}

      272.}

    【如何解決Java Socket通信技術(shù)收發(fā)線程互斥】相關(guān)文章:

    PHP中如何使用socket進(jìn)行通信08-21

    Java線程同步的方法10-25

    Java多線程的實(shí)現(xiàn)方式07-08

    java多線程面試題201710-03

    2016年java多線程面試題及答案07-02

    sun認(rèn)證考試輔導(dǎo):java關(guān)于多線程的部分操作07-27

    PHP socket的配置08-04

    超線程技術(shù)是什么意思09-09

    如何編譯java程序09-28

    如何讓JAVA代碼更高效07-18