# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: /Users/oleksiys/Projects/Grizzly/trunk/modules/grizzly/src/main/java/com/sun/grizzly/async # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: AbstractAsyncQueueReader.java --- AbstractAsyncQueueReader.java Base (BASE) +++ AbstractAsyncQueueReader.java Locally Modified (Based On LOCAL) @@ -35,11 +35,8 @@ * holder. * */ -/* - * To change this template, choose Tools | Templates - * and open the template in the editor. - */ + package com.sun.grizzly.async; import com.sun.grizzly.Controller; @@ -51,6 +48,7 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; @@ -61,36 +59,34 @@ */ public abstract class AbstractAsyncQueueReader implements AsyncQueueReader { private SelectorHandler selectorHandler; - private AsyncQueue readQueue; - private ConcurrentLinkedQueue recordQueue; + private AsyncQueue readQueue; public AbstractAsyncQueueReader(SelectorHandler selectorHandler) { this.selectorHandler = selectorHandler; - readQueue = new AsyncQueue(); - recordQueue = new ConcurrentLinkedQueue(); + readQueue = new AsyncQueue(); } /** * {@inheritDoc} */ - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler) throws IOException { - read(key, buffer, callbackHandler, null); + return read(key, buffer, callbackHandler, null); } /** * {@inheritDoc} */ - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition) throws IOException { - read(key, buffer, callbackHandler, condition, null); + return read(key, buffer, callbackHandler, condition, null); } /** * {@inheritDoc} */ - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition, AsyncQueueDataProcessor readPostProcessor) throws IOException { @@ -100,23 +96,25 @@ "Probably key was cancelled or connection was closed?"); } + AsyncQueueFuture future = + new AsyncQueueFuture(); + SelectableChannel channel = (SelectableChannel) key.channel(); AsyncQueueEntry channelEntry = readQueue.obtainAsyncQueueEntry(channel); - ConcurrentLinkedQueue queue = channelEntry.queue; - AtomicReference currentElement = channelEntry.currentElement; + ConcurrentLinkedQueue queue = channelEntry.queue; + AtomicReference currentElement = channelEntry.currentElement; ReentrantLock lock = channelEntry.queuedActionLock; // If AsyncQueue is empty - try to read ByteBuffer here try { - AsyncReadQueueRecord record = null; + AsyncQueueReadUnit record = new AsyncQueueReadUnit(); OperationResult dstResult = channelEntry.tmpResult; boolean isDirectReadCompleted = false; if (currentElement.get() == null && // Weak comparison for null lock.tryLock()) { - record = obtainRecord(); // Strong comparison for null, because we're in locked region if (currentElement.compareAndSet(null, record)) { @@ -141,12 +139,9 @@ } if (!isDirectReadCompleted && buffer.hasRemaining()) { - if (record == null) { - record = obtainRecord(); - } + record.set(buffer, callbackHandler, condition, + readPostProcessor, future); - record.set(buffer, callbackHandler, condition, readPostProcessor); - boolean isRegisterForReading = false; // add new element to the queue, if it's not current @@ -168,7 +163,7 @@ // If buffer was read directly - set next queue element as current if (lock.isHeldByCurrentThread()) { - AsyncReadQueueRecord nextRecord = queue.poll(); + AsyncQueueReadUnit nextRecord = queue.poll(); if (nextRecord != null) { // if there is something in queue currentElement.set(nextRecord); lock.unlock(); @@ -184,9 +179,12 @@ // Notify callback handler - if (callbackHandler != null) { record.set(buffer, callbackHandler, condition, - readPostProcessor); + readPostProcessor, future); + + future.setResult(record); + + if (callbackHandler != null) { callbackHandler.onReadCompleted(key, dstResult.address, record); } @@ -194,11 +192,7 @@ if (isReregister) { registerForReading(key); } - // Release record element - if (record != null) { - recordQueue.offer(record); } - } } catch(IOException e) { onClose(channel); throw e; @@ -207,6 +201,8 @@ lock.unlock(); } } + + return future; } /** @@ -236,12 +232,12 @@ AsyncQueueEntry channelEntry = readQueue.obtainAsyncQueueEntry(channel); - ConcurrentLinkedQueue queue = channelEntry.queue; - AtomicReference currentElement = channelEntry.currentElement; + ConcurrentLinkedQueue queue = channelEntry.queue; + AtomicReference currentElement = channelEntry.currentElement; ReentrantLock lock = channelEntry.queuedActionLock; if (currentElement.get() == null) { - AsyncReadQueueRecord nextRecord = queue.peek(); + AsyncQueueReadUnit nextRecord = queue.peek(); if (nextRecord != null && lock.tryLock()) { if (!queue.isEmpty() && currentElement.compareAndSet(null, nextRecord)) { @@ -257,7 +253,7 @@ try { OperationResult dstResult = channelEntry.tmpResult; while (currentElement.get() != null) { - AsyncReadQueueRecord queueRecord = currentElement.get(); + AsyncQueueReadUnit queueRecord = currentElement.get(); ByteBuffer byteBuffer = queueRecord.byteBuffer; AsyncQueueDataProcessor readPostProcessor = queueRecord.readPostProcessor; @@ -283,16 +279,17 @@ condition.checkAsyncReadCompleted(key, dstResult.address, byteBuffer))) { currentElement.set(queue.poll()); + ((AsyncQueueFuture) queueRecord.future).setResult(queueRecord); + if (queueRecord.callbackHandler != null) { queueRecord.callbackHandler.onReadCompleted( key, dstResult.address, queueRecord); } - recordQueue.offer(queueRecord); // If last element in queue is null - we have to be careful if (currentElement.get() == null) { lock.unlock(); - AsyncReadQueueRecord nextRecord = queue.peek(); + AsyncQueueReadUnit nextRecord = queue.peek(); if (nextRecord != null && lock.tryLock()) { if (!queue.isEmpty() && currentElement.compareAndSet(null, nextRecord)) { @@ -339,13 +336,4 @@ private void registerForReading(SelectionKey key) { selectorHandler.register(key, SelectionKey.OP_READ); } - - private AsyncReadQueueRecord obtainRecord() { - AsyncReadQueueRecord record = recordQueue.poll(); - if (record == null) { - record = new AsyncReadQueueRecord(); } - - return record; - } -} Index: AbstractAsyncQueueWriter.java --- AbstractAsyncQueueWriter.java Base (BASE) +++ AbstractAsyncQueueWriter.java Locally Modified (Based On LOCAL) @@ -45,7 +45,6 @@ import com.sun.grizzly.Controller; import com.sun.grizzly.SelectorHandler; import com.sun.grizzly.async.AsyncQueue.AsyncQueueEntry; -import com.sun.grizzly.util.ByteBufferFactory; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -53,6 +52,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.WritableByteChannel; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; @@ -63,81 +63,90 @@ */ public abstract class AbstractAsyncQueueWriter implements AsyncQueueWriter { private SelectorHandler selectorHandler; - private AsyncQueue writeQueue; - private ConcurrentLinkedQueue recordQueue; + private AsyncQueue writeQueue; public AbstractAsyncQueueWriter(SelectorHandler selectorHandler) { this.selectorHandler = selectorHandler; - writeQueue = new AsyncQueue(); - recordQueue = new ConcurrentLinkedQueue(); + writeQueue = new AsyncQueue(); } /** * {@inheritDoc} */ - public void write(SelectionKey key, ByteBuffer buffer) throws IOException { - write(key, null, buffer, null); + public Future write(SelectionKey key, + ByteBuffer buffer) throws IOException { + return write(key, null, buffer, null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, ByteBuffer buffer, + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException { - write(key, null, buffer, callbackHandler, null); + return write(key, null, buffer, callbackHandler, null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, ByteBuffer buffer, + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException { - write(key, null, buffer, callbackHandler, writePreProcessor, false); + return write(key, null, buffer, callbackHandler, writePreProcessor, + null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, ByteBuffer buffer, + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) + AsyncQueueDataProcessor writePreProcessor, ByteBufferCloner cloner) throws IOException { - write(key, null, buffer, callbackHandler, writePreProcessor, isCloneByteBuffer); + return write(key, null, buffer, callbackHandler, writePreProcessor, + cloner); } /** * {@inheritDoc} */ - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer) throws IOException { - write(key, dstAddress, buffer, null); + return write(key, dstAddress, buffer, null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException { - write(key, dstAddress, buffer, callbackHandler, null); + return write(key, dstAddress, buffer, callbackHandler, null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException { - write(key, dstAddress, buffer, callbackHandler, writePreProcessor, false); + return write(key, dstAddress, buffer, callbackHandler, + writePreProcessor, null); } /** * {@inheritDoc} */ - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) + AsyncQueueDataProcessor writePreProcessor, ByteBufferCloner cloner) throws IOException { if (key == null) { @@ -145,21 +154,23 @@ "Probably key was cancelled or connection was closed?"); } + AsyncQueueFuture future = + new AsyncQueueFuture(); SelectableChannel channel = key.channel(); AsyncQueueEntry channelEntry = writeQueue.obtainAsyncQueueEntry(channel); - ConcurrentLinkedQueue queue = channelEntry.queue; - AtomicReference currentElement = channelEntry.currentElement; + AsyncQueueWriteUnit record = new AsyncQueueWriteUnit(); + + ConcurrentLinkedQueue queue = channelEntry.queue; + AtomicReference currentElement = channelEntry.currentElement; ReentrantLock lock = channelEntry.queuedActionLock; // If AsyncQueue is empty - try to write ByteBuffer here try { - AsyncWriteQueueRecord record = null; if (currentElement.get() == null && // Weak comparison for null lock.tryLock()) { - record = obtainRecord(); // Strong comparison for null, because we're in locked region if (currentElement.compareAndSet(null, record)) { OperationResult dstResult = channelEntry.tmpResult; @@ -175,22 +186,15 @@ if (buffer.hasRemaining() || (lock.isHeldByCurrentThread() && writePreProcessor != null && writePreProcessor.getInternalByteBuffer().hasRemaining())) { - if (record == null) { - record = obtainRecord(); - } // clone ByteBuffer if required - if (isCloneByteBuffer) { - int size = buffer.remaining(); - ByteBuffer newBuffer = ByteBufferFactory.allocateView( - size, buffer.isDirect()); - - newBuffer.put(buffer); - newBuffer.position(0); - buffer = newBuffer; + if (cloner != null) { + buffer = cloner.clone(buffer); + record.setCloned(true); } - record.set(buffer, callbackHandler, writePreProcessor, dstAddress); + record.set(buffer, callbackHandler, writePreProcessor, + dstAddress, cloner, future); boolean isRegisterForWriting = false; @@ -210,15 +214,18 @@ } } else { // If there are no bytes available for writing + record.set(buffer, callbackHandler, writePreProcessor, + dstAddress, cloner, future); + future.setResult(record); + // Notify callback handler if (callbackHandler != null) { - record.set(buffer, callbackHandler, writePreProcessor, dstAddress); callbackHandler.onWriteCompleted(key, record); } // If buffer was written directly - set next queue element as current if (lock.isHeldByCurrentThread()) { - AsyncWriteQueueRecord nextRecord = queue.poll(); + AsyncQueueWriteUnit nextRecord = queue.poll(); if (nextRecord != null) { // if there is something in queue currentElement.set(nextRecord); lock.unlock(); @@ -231,13 +238,13 @@ } } } - - // Release record element - if (record != null) { - recordQueue.offer(record); } - } } catch(IOException e) { + if (record.callbackHandler != null) { + record.callbackHandler.onIOException(e, key, + buffer, queue); + } + onClose(channel); throw e; } finally { @@ -245,6 +252,8 @@ lock.unlock(); } } + + return future; } /** @@ -274,12 +283,12 @@ AsyncQueueEntry channelEntry = writeQueue.obtainAsyncQueueEntry(channel); - ConcurrentLinkedQueue queue = channelEntry.queue; - AtomicReference currentElement = channelEntry.currentElement; + ConcurrentLinkedQueue queue = channelEntry.queue; + AtomicReference currentElement = channelEntry.currentElement; ReentrantLock lock = channelEntry.queuedActionLock; if (currentElement.get() == null) { - AsyncWriteQueueRecord nextRecord = queue.peek(); + AsyncQueueWriteUnit nextRecord = queue.peek(); if (nextRecord != null && lock.tryLock()) { if (!queue.isEmpty() && currentElement.compareAndSet(null, nextRecord)) { @@ -295,7 +304,7 @@ try { OperationResult dstResult = channelEntry.tmpResult; while (currentElement.get() != null) { - AsyncWriteQueueRecord queueRecord = currentElement.get(); + AsyncQueueWriteUnit queueRecord = currentElement.get(); ByteBuffer byteBuffer = queueRecord.byteBuffer; AsyncQueueDataProcessor writePreProcessor = queueRecord.writePreProcessor; @@ -305,6 +314,8 @@ writePreProcessor, dstResult); channelEntry.processedUnitsCount.addAndGet(dstResult.bytesProcessed); } catch (IOException e) { + ((AsyncQueueFuture) queueRecord.future).setException(e); + if (queueRecord.callbackHandler != null) { queueRecord.callbackHandler.onIOException(e, key, byteBuffer, queue); @@ -326,12 +337,11 @@ } currentElement.set(queue.poll()); - recordQueue.offer(queueRecord); // If last element in queue is null - we have to be careful if (currentElement.get() == null) { lock.unlock(); - AsyncWriteQueueRecord nextRecord = queue.peek(); + AsyncQueueWriteUnit nextRecord = queue.peek(); if (nextRecord != null && lock.tryLock()) { if (!queue.isEmpty() && currentElement.compareAndSet(null, nextRecord)) { @@ -405,13 +415,4 @@ protected void registerForWriting(SelectionKey key) { selectorHandler.register(key, SelectionKey.OP_WRITE); } - - private AsyncWriteQueueRecord obtainRecord() { - AsyncWriteQueueRecord record = recordQueue.poll(); - if (record == null) { - record = new AsyncWriteQueueRecord(); } - - return record; - } -} Index: AsyncQueueFuture.java --- AsyncQueueFuture.java Locally New +++ AsyncQueueFuture.java Locally New @@ -0,0 +1,120 @@ +/* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can obtain + * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html + * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. + * Sun designates this particular file as subject to the "Classpath" exception + * as provided by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the License + * Header, with the fields enclosed by brackets [] replaced by your own + * identifying information: "Portions Copyrighted [year] + * [name of copyright owner]" + * + * Contributor(s): + * + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + * + */ + +package com.sun.grizzly.async; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Async queue {@link Future} implementation + * + * @author Alexey Stashok + */ +public class AsyncQueueFuture implements Future { + private CountDownLatch latch = new CountDownLatch(1); + private E result; + private boolean isCancelled; + private Throwable exception; + + public boolean cancel(boolean mayInterruptIfRunning) { + if (isDone()) return false; + + isCancelled = true; + notifyResult(); + + return true; + } + + public boolean isCancelled() { + return isCancelled; + } + + public boolean isDone() { + return latch.getCount() < 1; + } + + public E get() throws InterruptedException, ExecutionException { + latch.await(); + if (isCancelled) { + throw new CancellationException(); + } else if (exception != null) { + throw new ExecutionException(exception); + } + + return result; + } + + public E get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + latch.await(timeout, unit); + if (isCancelled) { + throw new CancellationException(); + } else if (exception != null) { + throw new ExecutionException(exception); + } + + return result; + } + + public E getResult() { + return result; + } + + public void setResult(E result) { + this.result = result; + notifyResult(); + } + + public Throwable getException() { + return exception; + } + + public void setException(Throwable exception) { + this.exception = exception; + notifyResult(); + } + + protected void notifyResult() { + latch.countDown(); + } +} Index: AsyncQueueReadUnit.java --- AsyncQueueReadUnit.java Locally New +++ AsyncQueueReadUnit.java Locally New @@ -38,26 +38,30 @@ package com.sun.grizzly.async; import java.nio.ByteBuffer; +import java.util.concurrent.Future; /** * {@link AsyncQueue} write data unit * * @author Alexey Stashok */ -public class AsyncReadQueueRecord { - public ByteBuffer byteBuffer; - public AsyncReadCallbackHandler callbackHandler; - public AsyncReadCondition condition; - public AsyncQueueDataProcessor readPostProcessor; +public class AsyncQueueReadUnit { + protected ByteBuffer byteBuffer; + protected AsyncReadCallbackHandler callbackHandler; + protected AsyncReadCondition condition; + protected AsyncQueueDataProcessor readPostProcessor; + protected Future future; public void set(ByteBuffer byteBuffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition, - AsyncQueueDataProcessor readPostProcessor) { + AsyncQueueDataProcessor readPostProcessor, + Future future) { this.byteBuffer = byteBuffer; this.callbackHandler = callbackHandler; this.condition = condition; this.readPostProcessor = readPostProcessor; + this.future = future; } public ByteBuffer getByteBuffer() { @@ -91,4 +95,12 @@ public void setReadPostProcessor(AsyncQueueDataProcessor readPostProcessor) { this.readPostProcessor = readPostProcessor; } + + public Future getFuture() { + return future; } + + public void setFuture(Future future) { + this.future = future; + } +} Index: AsyncQueueReadable.java --- AsyncQueueReadable.java Base (BASE) +++ AsyncQueueReadable.java Locally Modified (Based On LOCAL) @@ -41,6 +41,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.Future; /** * Object, which is able to read data to the {@link ByteBuffer} @@ -69,7 +70,7 @@ * false if read operation was put to queue * @throws java.io.IOException */ - public void readFromAsyncQueue(ByteBuffer buffer, + public Future readFromAsyncQueue(ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler) throws IOException; /** @@ -95,7 +96,7 @@ * false if read operation was put to queue * @throws java.io.IOException */ - public void readFromAsyncQueue(ByteBuffer buffer, + public Future readFromAsyncQueue(ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition) throws IOException; @@ -124,7 +125,7 @@ * false if read operation was put to queue * @throws java.io.IOException */ - public void readFromAsyncQueue(ByteBuffer buffer, + public Future readFromAsyncQueue(ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition, AsyncQueueDataProcessor readPostProcessor) throws IOException; Index: AsyncQueueReader.java --- AsyncQueueReader.java Base (BASE) +++ AsyncQueueReader.java Locally Modified (Based On LOCAL) @@ -42,20 +42,21 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.util.concurrent.Future; /** * * @author Alexey Stashok */ public interface AsyncQueueReader { - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler) throws IOException; - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition) throws IOException; - public void read(SelectionKey key, ByteBuffer buffer, + public Future read(SelectionKey key, ByteBuffer buffer, AsyncReadCallbackHandler callbackHandler, AsyncReadCondition condition, AsyncQueueDataProcessor readPostProcessor) throws IOException; Index: AsyncQueueWritable.java --- AsyncQueueWritable.java Base (BASE) +++ AsyncQueueWritable.java Locally Modified (Based On LOCAL) @@ -42,6 +42,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.Future; /** * Object, which is able to send {@link ByteBuffer} data asynchronously, @@ -64,7 +65,8 @@ * @param buffer {@link ByteBuffer} * @throws java.io.IOException */ - public void writeToAsyncQueue(ByteBuffer buffer) throws IOException; + public Future writeToAsyncQueue(ByteBuffer buffer) + throws IOException; /** * Method writes {@link ByteBuffer} using async write queue. @@ -84,7 +86,7 @@ * {@link ByteBuffer} will be completely written * @throws java.io.IOException */ - public void writeToAsyncQueue(ByteBuffer buffer, + public Future writeToAsyncQueue(ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException; /** @@ -113,7 +115,7 @@ * written on {@link SelectableChannel} * @throws java.io.IOException */ - public void writeToAsyncQueue(ByteBuffer buffer, + public Future writeToAsyncQueue(ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException; @@ -147,10 +149,10 @@ * {@link AsyncQueue} * @throws java.io.IOException */ - public void writeToAsyncQueue(ByteBuffer buffer, + public Future writeToAsyncQueue(ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor, - boolean isCloneByteBuffer) throws IOException; + ByteBufferCloner cloner) throws IOException; /** * Method sends {@link ByteBuffer} using async write queue. @@ -169,8 +171,8 @@ * @param buffer {@link ByteBuffer} * @throws java.io.IOException */ - public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer) - throws IOException; + public Future writeToAsyncQueue( + SocketAddress dstAddress, ByteBuffer buffer) throws IOException; /** * Method sends {@link ByteBuffer} using async write queue. @@ -193,7 +195,8 @@ * {@link ByteBuffer} will be completely written * @throws java.io.IOException */ - public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + public Future writeToAsyncQueue( + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException; /** @@ -225,7 +228,8 @@ * written on {@link SelectableChannel} * @throws java.io.IOException */ - public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + public Future writeToAsyncQueue( + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException; @@ -262,8 +266,9 @@ * {@link AsyncQueue} * @throws java.io.IOException */ - public void writeToAsyncQueue(SocketAddress dstAddress, ByteBuffer buffer, + public Future writeToAsyncQueue( + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor, - boolean isCloneByteBuffer) throws IOException; + ByteBufferCloner cloner) throws IOException; } Index: AsyncQueueWriteUnit.java --- AsyncQueueWriteUnit.java Locally New +++ AsyncQueueWriteUnit.java Locally New @@ -40,26 +40,34 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.util.concurrent.Future; /** * {@link AsyncQueue} write data unit * * @author Alexey Stashok */ -public class AsyncWriteQueueRecord { +public class AsyncQueueWriteUnit { protected ByteBuffer byteBuffer; protected AsyncWriteCallbackHandler callbackHandler; protected AsyncQueueDataProcessor writePreProcessor; protected SocketAddress dstAddress; + protected Future future; + protected ByteBufferCloner byteBufferCloner; + protected boolean isCloned; public void set(ByteBuffer byteBuffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor, - SocketAddress dstAddress) { + SocketAddress dstAddress, + ByteBufferCloner byteBufferCloner, + Future future) { this.byteBuffer = byteBuffer; this.callbackHandler = callbackHandler; this.writePreProcessor = writePreProcessor; this.dstAddress = dstAddress; + this.byteBufferCloner = byteBufferCloner; + this.future = future; } public ByteBuffer getByteBuffer() { @@ -93,4 +101,28 @@ public void setDstAddress(SocketAddress dstAddress) { this.dstAddress = dstAddress; } + + public ByteBufferCloner getByteBufferCloner() { + return byteBufferCloner; } + + public void setByteBufferCloner(ByteBufferCloner byteBufferCloner) { + this.byteBufferCloner = byteBufferCloner; + } + + public boolean isCloned() { + return isCloned; + } + + public void setCloned(boolean wasCloned) { + this.isCloned = wasCloned; + } + + public Future getFuture() { + return future; + } + + public void setFuture(Future future) { + this.future = future; + } +} Index: AsyncQueueWriter.java --- AsyncQueueWriter.java Base (BASE) +++ AsyncQueueWriter.java Locally Modified (Based On LOCAL) @@ -43,6 +43,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; +import java.util.concurrent.Future; /** * Common inteface to be implemented by protocol dependant asynchronous queue @@ -71,7 +72,8 @@ * @param buffer {@link ByteBuffer} * @throws java.io.IOException */ - public void write(SelectionKey key, ByteBuffer buffer) throws IOException; + public Future write(SelectionKey key, + ByteBuffer buffer) throws IOException; /** * Method writes {@link ByteBuffer} to the {@link SelectableChannel} @@ -84,8 +86,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * * @param key {@link SelectionKey} associated with @@ -97,7 +98,8 @@ * {@link ByteBuffer} will be completely written * @throws java.io.IOException */ - public void write(SelectionKey key, ByteBuffer buffer, + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) throws IOException; /** @@ -111,8 +113,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * Before data will be written on {@link SelectableChannel}, first it * will be passed for preprocessing to AsyncQueueDataProcessor, @@ -132,8 +133,8 @@ * written on {@link SelectableChannel} * @throws java.io.IOException */ - public void write(SelectionKey key, ByteBuffer buffer, - AsyncWriteCallbackHandler callbackHandler, + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException; /** @@ -147,8 +148,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * Before data will be written on {@link SelectableChannel}, first it * will be passed for preprocessing to AsyncQueueDataProcessor, @@ -172,9 +172,9 @@ * {@link AsyncQueue} * @throws java.io.IOException */ - public void write(SelectionKey key, ByteBuffer buffer, - AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) + public Future write(SelectionKey key, + ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, + AsyncQueueDataProcessor writePreProcessor, ByteBufferCloner cloner) throws IOException; /** @@ -198,8 +198,8 @@ * @param buffer {@link ByteBuffer} * @throws java.io.IOException */ - public void write(SelectionKey key, SocketAddress dstAddress, - ByteBuffer buffer) throws IOException; + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer) throws IOException; /** * Method sends {@link ByteBuffer} to the {@link SocketAddress} @@ -212,8 +212,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * * @param key {@link SelectionKey} associated with @@ -226,9 +225,9 @@ * {@link ByteBuffer} will be completely written * @throws java.io.IOException */ - public void write(SelectionKey key, SocketAddress dstAddress, - ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler) - throws IOException; + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler) throws IOException; /** * Method sends {@link ByteBuffer} to the {@link SocketAddress} @@ -241,8 +240,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * Before data will be written on {@link SelectableChannel}, first it * will be passed for preprocessing to AsyncQueueDataProcessor, @@ -263,7 +261,8 @@ * written on {@link SelectableChannel} * @throws java.io.IOException */ - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, AsyncQueueDataProcessor writePreProcessor) throws IOException; @@ -278,8 +277,7 @@ * and {@link SelectableChannel} will be registered on * {@link SelectorHandler}, waiting for OP_WRITE event. * If an exception occurs, during direct writing - it will be propagated - * to the caller directly, otherwise, if the {@link ByteBuffer} is - * added to a writing queue - exception notification will come via + * to the caller directly and come via * AsyncWriteCallbackHandler.onIOException() * Before data will be written on {@link SelectableChannel}, first it * will be passed for preprocessing to AsyncQueueDataProcessor, @@ -304,9 +302,11 @@ * {@link AsyncQueue} * @throws java.io.IOException */ - public void write(SelectionKey key, SocketAddress dstAddress, - ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, + AsyncWriteCallbackHandler callbackHandler, + AsyncQueueDataProcessor writePreProcessor, + ByteBufferCloner cloner) throws IOException; /** Index: AsyncReadCallbackHandler.java --- AsyncReadCallbackHandler.java Base (BASE) +++ AsyncReadCallbackHandler.java Locally Modified (Based On LOCAL) @@ -65,7 +65,7 @@ * read */ public void onReadCompleted(SelectionKey key, SocketAddress srcAddress, - AsyncReadQueueRecord readRecord); + AsyncQueueReadUnit readRecord); /** * Method will be called by {@link AsyncQueueReader}, if @@ -80,5 +80,5 @@ * some data, which was successfully read before error occured */ public void onIOException(IOException ioException, SelectionKey key, - ByteBuffer buffer, Queue remainingQueue); + ByteBuffer buffer, Queue remainingQueue); } Index: AsyncReadQueueRecord.java --- AsyncReadQueueRecord.java Base (BASE) +++ AsyncReadQueueRecord.java Locally Deleted @@ -1,94 +0,0 @@ -/* - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - * - */ -package com.sun.grizzly.async; - -import java.nio.ByteBuffer; - -/** - * {@link AsyncQueue} write data unit - * - * @author Alexey Stashok - */ -public class AsyncReadQueueRecord { - public ByteBuffer byteBuffer; - public AsyncReadCallbackHandler callbackHandler; - public AsyncReadCondition condition; - public AsyncQueueDataProcessor readPostProcessor; - - public void set(ByteBuffer byteBuffer, - AsyncReadCallbackHandler callbackHandler, - AsyncReadCondition condition, - AsyncQueueDataProcessor readPostProcessor) { - this.byteBuffer = byteBuffer; - this.callbackHandler = callbackHandler; - this.condition = condition; - this.readPostProcessor = readPostProcessor; - } - - public ByteBuffer getByteBuffer() { - return byteBuffer; - } - - public void setByteBuffer(ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; - } - - public AsyncReadCallbackHandler getCallbackHandler() { - return callbackHandler; - } - - public void setCallbackHandler(AsyncReadCallbackHandler callbackHandler) { - this.callbackHandler = callbackHandler; - } - - public AsyncReadCondition getCondition() { - return condition; - } - - public void setCondition(AsyncReadCondition condition) { - this.condition = condition; - } - - public AsyncQueueDataProcessor getReadPostProcessor() { - return readPostProcessor; - } - - public void setReadPostProcessor(AsyncQueueDataProcessor readPostProcessor) { - this.readPostProcessor = readPostProcessor; - } -} Index: AsyncWriteCallbackHandler.java --- AsyncWriteCallbackHandler.java Base (BASE) +++ AsyncWriteCallbackHandler.java Locally Modified (Based On LOCAL) @@ -62,7 +62,7 @@ * written */ public void onWriteCompleted(SelectionKey key, - AsyncWriteQueueRecord writtenRecord); + AsyncQueueWriteUnit writtenRecord); /** * Method will be called by {@link AsyncQueueWriter}, if @@ -77,5 +77,5 @@ * moment, when exception occured */ public void onIOException(IOException ioException, SelectionKey key, - ByteBuffer buffer, Queue remainingQueue); + ByteBuffer buffer, Queue remainingQueue); } Index: AsyncWriteQueueRecord.java --- AsyncWriteQueueRecord.java Base (BASE) +++ AsyncWriteQueueRecord.java Locally Deleted @@ -1,96 +0,0 @@ -/* - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - * - */ - -package com.sun.grizzly.async; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -/** - * {@link AsyncQueue} write data unit - * - * @author Alexey Stashok - */ -public class AsyncWriteQueueRecord { - protected ByteBuffer byteBuffer; - protected AsyncWriteCallbackHandler callbackHandler; - protected AsyncQueueDataProcessor writePreProcessor; - protected SocketAddress dstAddress; - - public void set(ByteBuffer byteBuffer, - AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, - SocketAddress dstAddress) { - this.byteBuffer = byteBuffer; - this.callbackHandler = callbackHandler; - this.writePreProcessor = writePreProcessor; - this.dstAddress = dstAddress; - } - - public ByteBuffer getByteBuffer() { - return byteBuffer; - } - - public void setByteBuffer(ByteBuffer byteBuffer) { - this.byteBuffer = byteBuffer; - } - - public AsyncWriteCallbackHandler getCallbackHandler() { - return callbackHandler; - } - - public void setCallbackHandler(AsyncWriteCallbackHandler callbackHandler) { - this.callbackHandler = callbackHandler; - } - - public AsyncQueueDataProcessor getWritePreProcessor() { - return writePreProcessor; - } - - public void setWritePreProcessor(AsyncQueueDataProcessor writePreProcessor) { - this.writePreProcessor = writePreProcessor; - } - - public SocketAddress getDstAddress() { - return dstAddress; - } - - public void setDstAddress(SocketAddress dstAddress) { - this.dstAddress = dstAddress; - } -} Index: ByteBufferCloner.java --- ByteBufferCloner.java Locally New +++ ByteBufferCloner.java Locally New @@ -0,0 +1,55 @@ +/* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can obtain + * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html + * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. + * Sun designates this particular file as subject to the "Classpath" exception + * as provided by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the License + * Header, with the fields enclosed by brackets [] replaced by your own + * identifying information: "Portions Copyrighted [year] + * [name of copyright owner]" + * + * Contributor(s): + * + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + * + */ + +package com.sun.grizzly.async; + +import java.nio.ByteBuffer; + +/** + * Cloner, which will be called by {@link AsyncQueueWriter}, when ByteBuffer + * could not be written directly, and will be added to the queue. + * Cloner may create a clone of original ByteBuffer and return it to the + * {@link AsyncQueueWriter} instead of original one. + * Using ByteBufferCloner, developer has a chance to clone a ByteBuffer only in + * case, when it is really required. + * + * @author Alexey Stashok + */ +public interface ByteBufferCloner { + public ByteBuffer clone(ByteBuffer originalByteBuffer); +} Index: TCPAsyncQueueWriter.java --- TCPAsyncQueueWriter.java Base (BASE) +++ TCPAsyncQueueWriter.java Locally Modified (Based On LOCAL) @@ -44,6 +44,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Future; /** * TCP implementation of {@link AsyncQueueWriter} @@ -60,17 +61,18 @@ * {@inheritDoc} */ @Override - public void write(SelectionKey key, SocketAddress dstAddress, + public Future write(SelectionKey key, + SocketAddress dstAddress, ByteBuffer buffer, AsyncWriteCallbackHandler callbackHandler, - AsyncQueueDataProcessor writePreProcessor, boolean isCloneByteBuffer) + AsyncQueueDataProcessor writePreProcessor, ByteBufferCloner cloner) throws IOException { if (dstAddress != null) { throw new UnsupportedOperationException("Destination address should be null for TCP!"); } - super.write(key, null, buffer, callbackHandler, - writePreProcessor, isCloneByteBuffer); + return super.write(key, null, buffer, callbackHandler, + writePreProcessor, cloner); } protected OperationResult doWrite(WritableByteChannel channel,