Java 7之传统I/O - PipedInputStream和PipedOutputStream(二)

2014-11-24 02:33:23 · 作者: · 浏览: 1
塞,则唤醒该线程 */ while (in == out) { checkStateForReceive(); /* full: kick any waiting readers */ notifyAll(); try { wait(1000); } catch (InterruptedException ex) { throw new java.io.InterruptedIOException(); } } }


PipedInputStream类中的read()方法源代码如下:

PipedInputStream

    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
            throw new IOException("Pipe closed");
        } else if (writeSide != null && !writeSide.isAlive()  && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();  // 获取读取线程
        int trials = 2;
        while (in < 0) {
            if (closedByWriter) {  // 如果in<0(表示管道中无数据)且closedByWriter为true(表示输入管道已经关闭)则直接返回-1
                return -1;
            }
            if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
                throw new IOException("Pipe broken");
            }
            notifyAll();
            try {
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
        }
        int ret = buffer[out++] & 0xFF;
        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            /* now empty */
            in = -1;
        }
        return ret;
    }

    public synchronized int read(byte b[], int off, int len)  throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }
        /* possibly wait on the first character */
        int c = read();
        if (c < 0) {
            return -1;
        }
        b[off] = (byte) c;
        int rlen = 1;
        while ((in >= 0) && (len > 1)) {
            int available;
            if (in > out) {
                available = Math.min((buffer.length - out), (in - out));
            } else {
                available = buffer.length - out;
            }
            // A byte is read beforehand outside the loop
            if (available > (len - 1)) {
                available = len - 1;
            }
            System.arraycopy(buffer, out, b, off + rlen, available);
            out += available;
            rlen += available;
            len -= available;

            if (out >= buffer.length) {
                out = 0;
            }
            if (in == out) {
                in = -1;
            }
        }
        return rlen;
    }




3、刷新与关闭管道


来看一下管道输出流中的刷新和关闭方法,源代码如下:

PipedOutputStream

    // 刷回管道输出流
    public synchronized void flush() throws IOException {
        if (sink != null) {
            synchronized (sink) {
            	/*
            	 * 调用管道输入流的notifyAll(),通知管道输入流放弃对当前资源的占有,
            	 * 让其它的等待线程(等待读取管道输出流的线程)读取管道输出流的值。
            	 */
                sink.notifyAll(); 
            }
        }
    }
    // 关闭管道输出流
    public void close()  throws IOException {
        if (sink != null) {
            sink.receivedLast();// 通知管道输入流,输出管理已经关闭
        }
    }

看一下receivedLast()方法,如下:

PipedInputStream

 synchronized void receivedLast() {
        closedByWriter = true;  // 输出管道标志为true,表示关闭
        notifyAll();            // 唤醒所有的等待线程
 }
通知所有的等待线程,最后的数据已经全部到达。

PipedInputStream
public void close()  throws IOException {  // 关闭管道输出流
        closedByReader = true;
        synchronized (this) {
            in = -1;                      // 清空缓冲区数据 
        }
    }


下面来具体举一个例子,如下:

public class test04 {
    public static void main(String [] args) {  
        Sender sender = new Sender();  
        Receiver receiver = new Receiver();  
          
        PipedOutputStream outStream = sender.getOutStream();  
        PipedInputStream inStream = receiver.getInStream();  
        try {  
            //inStream.connect(outStream); // 与下一句一样  
            outStream.connect(inStream);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
        sender.start();  
        receiver.start();  
    }  
}  
  
class Sender extends Thread {  
    private PipedOutputStream outStream = new PipedOutputStream();  
    public PipedOutputStream getOutStream() {  
        return outStream;  
    }  
    public void run() {  
        String info = "hello, receiver";  
        try {  
            outStream.write(info.getBytes());  
            outStream.close();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
}  
  
class Receiver extends Thread {  
    private PipedInputStream inStream = new PipedInputStream();  
    public PipedInputStream getInStream() {  
        return inStream;  
    }  
    public void run() {  
        byte[] buf = new byte[1024];  
        try {  
            int len = inStream.read(buf);  
            System.out.println("receive message from sender : " + new String(buf, 0, len));  
            inStream.close();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }     
}  

最后运行后输出的结果如下:receive message from sender : hello, receiver




参考文献:

1、http://www.cnblogs.com/lich/archive/2011/12/11/2283928.html

2、ht