PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据.如下图所示。

PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东西.还有两个变量in和out。in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费.
1、创建连接与初始化
在两者的构造函数中,都相互提供了连接的构造方法,分别用于接收对方的管道实例,然后调用各自的connect()方法进行连接,如PipedInputStream:
PipedInputStream
private static final int DEFAULT_PIPE_SIZE = 1024;
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
// 省略部分构造函数
public PipedInputStream(PipedOutputStream src, int pipeSize) throws IOException {
initPipe(pipeSize);
connect(src);
}
private void initPipe(int pipeSize) { // 初始化buffer的大小,pipeSize可以通过构造方法指定,也可以使用默认的PIPE_SIZE的大小
if (pipeSize <= 0) {
throw new IllegalArgumentException("Pipe Size <= 0");
}
buffer = new byte[pipeSize]; // 初始化缓冲区的大小
}
public void connect(PipedOutputStream src) throws IOException {
src.connect(this); // 连接输入管道
}
看一下PipedOutputStream的构造函数,如下:
PipedOutputStream
private PipedInputStream sink;
public PipedOutputStream(PipedInputStream snk) throws IOException {
connect(snk); // 连接输出管理
}
public PipedOutputStream() { }
public synchronized void connect(PipedInputStream snk) throws IOException {
if (snk == null) {
throw new NullPointerException();
} else if (sink != null || snk.connected) {
throw new IOException("Already connected");
}
sink = snk;
snk.in = -1; // buffer数组中无数据
snk.out = 0; // 取出的数据为0
snk.connected = true;// 表示连接成功
}
可以看到,相互之间会调用connect()方法来连接,其效果是一样的。
2、数据的写入与读取
连接成功就,就可以进行数据的写入与读出操作了,在PipedOutputStream中的write()写法如下:
PipedOutputStream
public void write(int b) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
}
sink.receive(b); // 调用receive方法进行数据的输出
}
public void write(byte b[], int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Pipe not connected");
} else if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
sink.receive(b, off, len);
}
方法在写入到byte[]数组缓存区数据后,就会调用PipedInputStream中的receive方法。输出管道中的receive()方法如下:
PipedInputStream
protected synchronized void receive(int b) throws IOException { // 只会在PipedOutputStream类的write()中调用
checkStateForReceive();
writeSide = Thread.currentThread();
if (in == out) // in==out表示buffer数组已满
awaitSpace(); // 等待空闲空间
if (in < 0) { // 输入管道无数据
in = 0;
out = 0;
}
buffer[in++] = (byte)(b & 0xFF);
if (in >= buffer.length) {
in = 0; // 缓冲区已经满了,等待下一次从头写入
}
}
synchronized void receive(byte b[], int off, int len) throws IOException { // 将下标off开始的len个数组数据写入到输出管道中
checkStateForReceive(); // 检查管道的状态
writeSide = Thread.currentThread(); // 获取写入线程
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out) // 缓冲数组已满,只能等待
awaitSpace();
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0); // nextTransferAmount<=0,则终止程序的执行
System.arraycopy(b, off, buffer, in, nextTransferAmount);// 拷贝数组中数据到缓冲区
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
}
输入管理通过如上的对应方法接收到数据并保存到输入缓冲区后,下面就可以使用read()方法读出这些数据了。
看一下awaitSpace()方法的实现源代码:
PipedInputStream
/*
* 若写入管道的数据正好全部被读取完(例如,管道缓冲满),则执行awaitSpace()操作;
* 让读取管道的线程管道产生读取数据请求,从而才能继续的向“管道”中写入数据
*/
private void awaitSpace() throws IOException {
/*
* 如果管道中被读取的数据,等于写入管道的数据时,
* 则每隔1000ms检查“管道状态”,并唤醒管道操作:若有读取管道数据线程被阻