org.eclipse.team.core/src/org/eclipse/team/internal/core/streams/TimeoutInputStream.java

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1.8 - (download) (annotate)
Wed May 10 17:47:43 2006 UTC (3 years, 6 months ago) by mvalenta
Branch: MAIN
CVS Tags: I20060510, I200607032320, I20080819, I20091013-0800, I20091028-0800, I200609251710, I20070409, R3_2, I20070402, r33x_20070724, I20071015, I20080528, I200610021645, I20080722, I20071120, I20070709, I20071127, I20090211, I20080526, I20080923, I20080428, I20080425, I20080422, I20070202, I20091020-0800, I20071204, I20061208, I20080513, I20080514, I20080515, I20070528, I20070529, I20070522, I20070523, r34x_20080902, I20070524, I20070525, I20090128-0800, I20070115, r34x_20081015, I20090430-0408, I20080604, I20080605, I20070801, I200608211720, I20070808, r33x_20071024, I20080826, R3_3_1_1, I20091006-0800, I20090427-0800, I20060917, I20060911, I20090721-0750, I20090803-1300, I20090224-0800, r34x_20080115, I20080803, I20080804, I20080415, I20090126-0800, I200702071200, R3_4, R3_5, I200606191715, R3_3, I20080529, R3_4_1, I20080521, I20090714-0800, r34x_20080808, I20090521-1750, I20070416, I20090922-0800, I20090428, pre_R3_3, I20070924, I20071001, I20090421, r34x_20080723, I20080322, I20080207-0700, I20080514-2000a, I20070326, I20070730, I20080514-2000, I20081125, I20060714, I20070316, I20080930, I20070212, I20070219, I20070607, I20070605, I20070604, I20090120, I200605181830, I200606051140, I20061218, I20080115, v20090210-0615, I20070516, I20070515, I20070514, I20070511, I20070101, I200702081200, I20090306-1030, I200702051300, I20070430, I20090630-2000, I200606121730, I200703211300, r33x_20080128, I20070312, I20081014, r34x_20080107, I20090514-0808, I20070220, I20070226, I200703191600, I20090522-1010, I20070507, I20090113, I20090309-1300, I20080122, I20070426, I20070129, I200702081800, r33x_20070709, I20090429-0800, I20070130, I20081202, I20090210-0800, I20071023, M20060711, I20090309-1800, I20090527-0620, I20090217-0800, r33x_20070807, I20070723, I200610161750, r33x_20071129, I20081118, r35x_20090930-0800, I20090511-2000, I20080909, I20070910, I20070914, I20091026-1300, R3_4_2, I20071210, I20090323-1100, M20061114, I20070501, I20070502, I20080415-1358, I20090311-0800, I20060605-1430, R3_2_1, R3_2_2, I20090407-0800, I200610232323, r34x_20080827, r33x_20070730, I20090331-0800, I20090929-0800, I20090303-0800, r34x_20090115, I20090825-0800, I20080204-0800, I20080304, I20080715, I20070305, I20070303, I20070716, I20081029, I20081021, I20080820, I20070423, I20090916-0800, I20090106, R3_5_1, I20070531, I20091117-0800, I20071026, I20070122, I20081216, I20070510, I20081210, I20090508-2000, I200607101745, I20080226, I20080326, R3_3_1, R3_3_2, I20071113, v20070212, I20080915, I20080917, I20090317-1800, HEAD
Branch point for: R3_3_maintenance, R3_4_maintenance, R3_5_maintenance, R3_2_maintenance
Changes since 1.7: +1 -1 lines
Updated Copyrights
/*******************************************************************************
 * Copyright (c) 2000, 2006 IBM Corporation and others.
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v1.0
 * which accompanies this distribution, and is available at
 * http://www.eclipse.org/legal/epl-v10.html
 *
 * Contributors:
 *     IBM Corporation - initial API and implementation
 *******************************************************************************/
package org.eclipse.team.internal.core.streams;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import org.eclipse.team.internal.core.Policy;

/**
 * Wraps an input stream that blocks indefinitely to simulate timeouts on read(),
 * skip(), and close().  The resulting input stream is buffered and supports
 * retrying operations that failed due to an InterruptedIOException.
 *
 * Supports resuming partially completed operations after an InterruptedIOException
 * REGARDLESS of whether the underlying stream does unless the underlying stream itself
 * generates InterruptedIOExceptions in which case it must also support resuming.
 * Check the bytesTransferred field to determine how much of the operation completed;
 * conversely, at what point to resume.
 */
public class TimeoutInputStream extends FilterInputStream {
	// unsynchronized variables
	private final long readTimeout; // read() timeout in millis
	private final long closeTimeout; // close() timeout in millis, or -1

	// requests for the thread (synchronized)
	private boolean closeRequested = false; // if true, close requested

	// responses from the thread (synchronized)
	private Thread thread;   // if null, thread has terminated
	private byte[] iobuffer; // circular buffer
	private int head = 0;    // points to first unread byte
	private int length = 0;  // number of remaining unread bytes
	private IOException ioe = null; // if non-null, contains a pending exception
	private boolean waitingForClose = false; // if true, thread is waiting for close()
	
	private boolean growWhenFull = false; // if true, buffer will grow when it is full

	/**
	 * Creates a timeout wrapper for an input stream.
	 * @param in the underlying input stream
	 * @param bufferSize the buffer size in bytes; should be large enough to mitigate
	 *        Thread synchronization and context switching overhead
	 * @param readTimeout the number of milliseconds to block for a read() or skip() before
	 *        throwing an InterruptedIOException; 0 blocks indefinitely
	 * @param closeTimeout the number of milliseconds to block for a close() before throwing
	 *        an InterruptedIOException; 0 blocks indefinitely, -1 closes the stream in the background
	 */
	public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout) {
		super(in);
		this.readTimeout = readTimeout;
		this.closeTimeout = closeTimeout;
		this.iobuffer = new byte[bufferSize];
		thread = new Thread(new Runnable() {
			public void run() {
				runThread();
			}
		}, "TimeoutInputStream");//$NON-NLS-1$
		thread.setDaemon(true);
		thread.start();
	}
	
	public TimeoutInputStream(InputStream in, int bufferSize, long readTimeout, long closeTimeout, boolean growWhenFull) {
		this(in, bufferSize, readTimeout, closeTimeout);
		this.growWhenFull = growWhenFull;
	}

	/**
	 * Wraps the underlying stream's method.
	 * It may be important to wait for a stream to actually be closed because it
	 * holds an implicit lock on a system resoure (such as a file) while it is
	 * open.  Closing a stream may take time if the underlying stream is still
	 * servicing a previous request.
	 * @throws InterruptedIOException if the timeout expired
	 * @throws IOException if an i/o error occurs
	 */
	public void close() throws IOException {
		Thread oldThread;
		synchronized (this) {
			if (thread == null) return;
			oldThread = thread;
			closeRequested = true;
			thread.interrupt();
			checkError();
		}
		if (closeTimeout == -1) return;
		try {
			oldThread.join(closeTimeout);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
		}
		synchronized (this) {
			checkError();
			if (thread != null) throw new InterruptedIOException();
		}
	}
	
	/**
	 * Returns the number of unread bytes in the buffer.
	 * @throws IOException if an i/o error occurs
	 */
	public synchronized int available() throws IOException {
		if (length == 0) checkError();
		return length > 0 ? length : 0;
	}
	
	/**
	 * Reads a byte from the stream.
	 * @throws InterruptedIOException if the timeout expired and no data was received,
	 *         bytesTransferred will be zero
	 * @throws IOException if an i/o error occurs
	 */
	public synchronized int read() throws IOException {
		if (! syncFill()) return -1; // EOF reached
		int b = iobuffer[head++] & 255;
		if (head == iobuffer.length) head = 0;
		length--;
		notify();
		return b;
	}
	
	/**
	 * Reads multiple bytes from the stream.
	 * @throws InterruptedIOException if the timeout expired and no data was received,
	 *         bytesTransferred will be zero
	 * @throws IOException if an i/o error occurs
	 */
	public synchronized int read(byte[] buffer, int off, int len) throws IOException {
		if (! syncFill()) return -1; // EOF reached
		int pos = off;
		if (len > length) len = length;
		while (len-- > 0) {
			buffer[pos++] = iobuffer[head++];
			if (head == iobuffer.length) head = 0;
			length--;
		}
		notify();
		return pos - off;
	}

	/**
	 * Skips multiple bytes in the stream.
	 * @throws InterruptedIOException if the timeout expired before all of the
	 *         bytes specified have been skipped, bytesTransferred may be non-zero
	 * @throws IOException if an i/o error occurs
	 */
	public synchronized long skip(long count) throws IOException {
		long amount = 0;
		try {
			do {
				if (! syncFill()) break; // EOF reached
				int skip = (int) Math.min(count - amount, length);
				head = (head + skip) % iobuffer.length;
				length -= skip;
				amount += skip;
			} while (amount < count);
		} catch (InterruptedIOException e) {
			e.bytesTransferred = (int) amount; // assumes amount < Integer.MAX_INT
			throw e;
		}
		notify();
		return amount;
	}

	/**
	 * Mark is not supported by the wrapper even if the underlying stream does, returns false.
	 */
	public boolean markSupported() {
		return false;
	}

	/**
	 * Waits for the buffer to fill if it is empty and the stream has not reached EOF.
	 * @return true if bytes are available, false if EOF has been reached
	 * @throws InterruptedIOException if EOF not reached but no bytes are available
	 */
	private boolean syncFill() throws IOException {
		if (length != 0) return true;
		checkError(); // check errors only after we have read all remaining bytes
		if (waitingForClose) return false;
		notify();
		try {
			wait(readTimeout);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt(); // we weren't expecting to be interrupted
		}
		if (length != 0) return true;
		checkError(); // check errors only after we have read all remaining bytes
		if (waitingForClose) return false;
		throw new InterruptedIOException();
	}

	/**
	 * If an exception is pending, throws it.
	 */
	private void checkError() throws IOException {
		if (ioe != null) {
			IOException e = ioe;
			ioe = null;
			throw e;
		}
	}
	
	/**
	 * Runs the thread in the background.
	 */
	private void runThread() {
		try {
			readUntilDone();
		} catch (IOException e) {
			synchronized (this) { ioe = e; }
		} finally {
			waitUntilClosed();
			try {
				in.close();
			} catch (IOException e) {
				synchronized (this) { ioe = e; } 
			} finally {
				synchronized (this) {
					thread = null;
					notify();
				}
			}
		}
	}
	
	/**
	 * Waits until we have been requested to close the stream.
	 */
	private synchronized void waitUntilClosed() {
		waitingForClose = true;
		notify();
		while (! closeRequested) {
			try {
				wait();
			} catch (InterruptedException e) {
				closeRequested = true; // alternate quit signal
			}
		}
	}

	/**
	 * Reads bytes into the buffer until EOF, closed, or error.
	 */
	private void readUntilDone() throws IOException {
		for (;;) {
			int off, len;
			synchronized (this) {
				while (isBufferFull()) {
					if (closeRequested) return; // quit signal
					waitForRead();
				}
				off = (head + length) % iobuffer.length;
				len = ((head > off) ? head : iobuffer.length) - off;
			}
			int count;
			try {
				// the i/o operation might block without releasing the lock,
				// so we do this outside of the synchronized block
				count = in.read(iobuffer, off, len);
				if (count == -1) return; // EOF encountered
			} catch (InterruptedIOException e) {
				count = e.bytesTransferred; // keep partial transfer
			}
			synchronized (this) {
				length += count;
				notify();
			}
		}				
	}
	
	/*
	 * Wait for a read when the buffer is full (with the implication
	 * that space will become available in the buffer after the read 
	 * takes place).
	 */
	private synchronized void waitForRead() {
		try {
			if (growWhenFull) {
				// wait a second before growing to let reads catch up
				wait(readTimeout);
			} else {
				wait();
			}
		} catch (InterruptedException e) {
			closeRequested = true; // alternate quit signal
		}
		// If the buffer is still full, give it a chance to grow
		if (growWhenFull && isBufferFull()) {
			growBuffer();
		}
	}

	private synchronized void growBuffer() {
		int newSize = 2 * iobuffer.length;
		if (newSize > iobuffer.length) {
			if (Policy.DEBUG_STREAMS) {
				System.out.println("InputStream growing to " + newSize + " bytes"); //$NON-NLS-1$ //$NON-NLS-2$
			}
			byte[] newBuffer = new byte[newSize];
			int pos = 0;
			int len = length;
			while (len-- > 0) {
				newBuffer[pos++] = iobuffer[head++];
				if (head == iobuffer.length) head = 0;
			}
			iobuffer = newBuffer;
			head = 0;
			// length instance variable was not changed by this method
		}
	}

	private boolean isBufferFull() {
		return length == iobuffer.length;
	}
}