# This patch file was generated by NetBeans IDE # Following Index: paths are relative to: /Users/oleksiys/Projects/Grizzly/trunk/modules/http # This patch can be applied using context Tools: Patch action on respective folder. # It uses platform neutral UTF-8 encoding and \n newlines. # Above lines and this line are ignored by the patching process. Index: src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java --- src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java Base (BASE) +++ src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java Locally Modified (Based On LOCAL) @@ -403,6 +403,11 @@ protected boolean disableUploadTimeout = true; + /** + * Flag, which indicates if async HTTP write is enabled + */ + protected boolean isAsyncHttpWriteEnabled; + // ----------------------------------------------------- Constructor ---- // public DefaultProcessorTask(){ @@ -449,6 +454,7 @@ maxHttpHeaderSize, bufferResponse); } + request.setInputBuffer(inputBuffer); response.setOutputBuffer(outputBuffer); @@ -508,7 +514,11 @@ inputStream = (InputReader)input; SocketChannelOutputBuffer channelOutputBuffer = ((SocketChannelOutputBuffer)outputBuffer); - channelOutputBuffer.setChannel((SocketChannel)key.channel()); + channelOutputBuffer.setAsyncHttpWriteEnabled( + isAsyncHttpWriteEnabled); + channelOutputBuffer.setAsyncQueueWriter( + selectorThread.getSelectorHandler().getAsyncQueueWriter()); + channelOutputBuffer.setSelectionKey(key); response.setChannel((SocketChannel)key.channel()); } configPreProcess(); @@ -1613,8 +1623,25 @@ return uploadTimeout; } + /** + * Is async HTTP write enabled. + * @return true, if async HTTP write enabled, or false + * otherwise. + */ + public boolean isAsyncHttpWriteEnabled() { + return isAsyncHttpWriteEnabled; + } /** + * Set if async HTTP write enabled. + * @param isAsyncHttpWriteEnabled true, if async HTTP write + * enabled, or false otherwise. + */ + public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) { + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled; + } + + /** * Register a new RequestProcessor instance. */ private void registerMonitoring(){ Index: src/main/java/com/sun/grizzly/http/SelectorThread.java --- src/main/java/com/sun/grizzly/http/SelectorThread.java Base (BASE) +++ src/main/java/com/sun/grizzly/http/SelectorThread.java Locally Modified (Based On LOCAL) @@ -195,6 +195,12 @@ protected int maxHttpHeaderSize = Constants.DEFAULT_HEADER_SIZE; + /** + * Is Async HTTP write enabled. + */ + protected boolean isAsyncHttpWriteEnabled; + + protected int maxPostSize = 2 * 1024 * 1024; @@ -964,7 +970,7 @@ } - protected ProcessorTask configureProcessorTask(DefaultProcessorTask task){ + protected ProcessorTask configureProcessorTask(DefaultProcessorTask task) { task.setAdapter(adapter); task.setMaxHttpHeaderSize(maxHttpHeaderSize); task.setBufferSize(requestBufferSize); @@ -975,6 +981,7 @@ task.setMaxPostSize(maxPostSize); task.setTimeout(uploadTimeout); task.setDisableUploadTimeout(disableUploadTimeout); + task.setAsyncHttpWriteEnabled(isAsyncHttpWriteEnabled); if ( keepAliveCounter.dropConnection() ) { task.setDropConnection(true); @@ -1681,8 +1688,25 @@ this.maxHttpHeaderSize = maxHttpHeaderSize; } + /** + * Is async HTTP write enabled + * @return true, if async HTTP write enabled, + * or false otherwise. + */ + public boolean isAsyncHttpWriteEnabled() { + return isAsyncHttpWriteEnabled; + } /** + * Set if async HTTP write enabled + * @param isAsyncHttpWriteEnabled true, if async HTTP + * write enabled, or false otherwise. + */ + public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) { + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled; + } + + /** * The minimun threads created at startup. */ public void setMinThreads(int minWorkerThreads){ @@ -1719,6 +1743,10 @@ } + public TCPSelectorHandler getSelectorHandler() { + return selectorHandler; + } + public Controller getController() { return controller; } Index: src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java --- src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java Base (BASE) +++ src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java Locally Modified (Based On LOCAL) @@ -139,6 +139,12 @@ "com.sun.grizzly.useFileCache"; + private final static String IS_ASYNC_HTTP_WRITE = + "com.sun.grizzly.http.asyncwrite.enabled"; + + private final static String ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE = + "com.sun.grizzly.http.asyncwrite.maxBufferPoolSize"; + /** * The string manager for this package. */ @@ -311,16 +317,25 @@ } } - if (System.getProperty(USE_FILE_CACHE)!= null){ selectorThread.setFileCacheIsEnabled( Boolean.valueOf( System.getProperty(USE_FILE_CACHE)).booleanValue()); selectorThread.setLargeFileCacheEnabled(selectorThread.isFileCacheEnabled()); } + + if (System.getProperty(IS_ASYNC_HTTP_WRITE)!= null) { + selectorThread.setAsyncHttpWriteEnabled( + Boolean.getBoolean(IS_ASYNC_HTTP_WRITE)); } + if (System.getProperty(ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE) != null) { + SocketChannelOutputBuffer.setMaxBufferPoolSize(Integer.getInteger( + ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE, -1)); + } + } + /** * Configure properties on {@link SelectorThread} */ Index: src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java --- src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java Base (BASE) +++ src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java Locally Modified (Based On LOCAL) @@ -38,6 +38,10 @@ package com.sun.grizzly.http; +import com.sun.grizzly.async.AsyncQueueWriteUnit; +import com.sun.grizzly.async.AsyncQueueWriter; +import com.sun.grizzly.async.AsyncWriteCallbackHandler; +import com.sun.grizzly.async.ByteBufferCloner; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -45,7 +49,14 @@ import com.sun.grizzly.util.OutputWriter; import com.sun.grizzly.tcp.Response; import com.sun.grizzly.tcp.http11.InternalOutputBuffer; +import com.sun.grizzly.util.ByteBufferFactory; import java.nio.channels.Channel; +import java.nio.channels.SelectionKey; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Output buffer. @@ -54,16 +65,60 @@ * * @author Jean-Francois Arcand * @author Scott Oaks + * @author Alexey Stashok */ -public class SocketChannelOutputBuffer extends InternalOutputBuffer{ +public class SocketChannelOutputBuffer extends InternalOutputBuffer { + private static Logger logger = SelectorThread.logger(); + private static final int DEFAULT_BUFFER_POOL_SIZE = 16384; + + private static int maxBufferPoolSize = DEFAULT_BUFFER_POOL_SIZE; + /** + * ByteBuffer pool to be used with async write + */ + private static Queue bufferPool = + new ArrayBlockingQueue(maxBufferPoolSize); + + /** + * {@link ByteBufferCloner} implementation, which is called by Grizzly + * framework at the time, when asynchronous write queue can not write + * the buffer direcly on socket and instead will put it in queue. + * This implementation tries to get temporary ByteBuffer from the pool, + * if no ByteBuffer is available - then new one will be created. + */ + private static final ByteBufferCloner asyncHttpByteBufferCloner = + new ByteBufferClonerImpl(); + + /** + * {@link AsyncWriteCallbackHandler} implementation, which is responsible + * for returning cloned ByteBuffers to the pool + */ + private static final AsyncWriteCallbackHandler asyncHttpWriteCallbackHandler = + new AsyncWriteCallbackHandlerImpl(); + + /** * Underlying output channel. */ protected Channel channel; + /** + * Underlying selection key of the output channel. + */ + protected SelectionKey selectionKey; /** + * Flag, which indicates if async HTTP write is enabled + */ + protected boolean isAsyncHttpWriteEnabled; + + /** + * Asynchronous queue writer, which will be used if asyncHttp mode + * is enabled + */ + protected AsyncQueueWriter asyncQueueWriter; + + /** * Underlying ByteByteBuffer */ protected ByteBuffer outputByteBuffer; @@ -132,6 +187,66 @@ return channel; } + + /** + * Gets the underlying selection key of the output channel. + * @return the underlying selection key of the output channel. + */ + public SelectionKey getSelectionKey() { + return selectionKey; + } + + + /** + * Sets the underlying selection key of the output channel. + * @param selectionKey the underlying selection key of the output channel. + */ + public void setSelectionKey(SelectionKey selectionKey) { + this.selectionKey = selectionKey; + channel = selectionKey.channel(); + } + + /** + * Is async HTTP write enabled. + * @return true, if async HTTP write enabled, or false + * otherwise. + */ + public boolean isAsyncHttpWriteEnabled() { + return isAsyncHttpWriteEnabled; + } + + /** + * Set if async HTTP write enabled. + * @param isAsyncHttpWriteEnabled true, if async HTTP write + * enabled, or false otherwise. + */ + public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) { + this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled; + } + + /** + * Gets the asynchronous queue writer, which will be used if asyncHttp mode + * is enabled + * + * @return The asynchronous queue writer, which will be used if asyncHttp + * mode is enabled + */ + protected AsyncQueueWriter getAsyncQueueWriter() { + return asyncQueueWriter; + } + + /** + * Sets the asynchronous queue writer, which will be used if asyncHttp mode + * is enabled + * + * @param asyncQueueWriter The asynchronous queue writer, which will be + * used if asyncHttp mode is enabled + */ + protected void setAsyncQueueWriter(AsyncQueueWriter asyncQueueWriter) { + this.asyncQueueWriter = asyncQueueWriter; + } + + // --------------------------------------------------------- Public Methods /** @@ -180,10 +295,28 @@ * Flush the buffer by looping until the {@link ByteBuffer} is empty * @param bb the ByteBuffer to write. */ - public void flushChannel(ByteBuffer bb) throws IOException{ - OutputWriter.flushChannel(((SocketChannel)channel), bb); + public void flushChannel(ByteBuffer bb) throws IOException { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("flushChannel isAsyncHttpWriteEnabled=" + + isAsyncHttpWriteEnabled + " bb=" + bb); + } + + if (!isAsyncHttpWriteEnabled) { + OutputWriter.flushChannel(((SocketChannel) channel), bb); bb.clear(); + } else if (asyncQueueWriter != null) { + Future future = asyncQueueWriter.write(selectionKey, bb, + asyncHttpWriteCallbackHandler, null, + asyncHttpByteBufferCloner); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("async flushChannel isDone=" + future.isDone()); } + bb.clear(); + } else { + SelectorThread.logger().log(Level.WARNING, + "HTTP async write is enabled, but AsyncWriter is null."); + } + } /** @@ -270,6 +403,102 @@ } + /** + * {@link AsyncWriteCallbackHandler} implementation, which is responsible + * for returning cloned ByteBuffers to the pool + */ + private static final class AsyncWriteCallbackHandlerImpl implements + AsyncWriteCallbackHandler { + public void onWriteCompleted(SelectionKey key, + AsyncQueueWriteUnit writtenRecord) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("onWriteCompleted isCloned=" + + writtenRecord.isCloned()); + } + + if (writtenRecord.isCloned()) { + returnBuffer(writtenRecord.getByteBuffer()); + } + } + + public void onIOException(IOException ioException, SelectionKey key, + ByteBuffer buffer, Queue remainingQueue) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("onIOException key=" + key + + " exception=" + ioException); + } + returnBuffer(buffer); + + for(AsyncQueueWriteUnit unit : remainingQueue) { + returnBuffer(unit.getByteBuffer()); + } + } + + private boolean returnBuffer(ByteBuffer buffer) { + buffer.clear(); + int size = buffer.remaining(); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("return buffer buffer=" + buffer + " maxSize=" + + maxBufferedBytes); + } + + if (size <= maxBufferedBytes) { + boolean wasReturned = bufferPool.offer(buffer); + if (logger.isLoggable(Level.FINEST)) { + logger.finest("return buffer to pool. result=" + wasReturned); + } + + return wasReturned; + } + + return false; + } + } + + /** + * {@link ByteBufferCloner} implementation, which is called by Grizzly + * framework at the time, when asynchronous write queue can not write + * the buffer direcly on socket and instead will put it in queue. + * This implementation tries to get temporary ByteBuffer from the pool, + * if no ByteBuffer is available - then new one will be created. + */ + private static final class ByteBufferClonerImpl + implements ByteBufferCloner { + + public ByteBuffer clone(ByteBuffer originalByteBuffer) { + if (logger.isLoggable(Level.FINEST)) { + logger.finest("clone buffer=" + originalByteBuffer + + " maxBufferedBytes=" + maxBufferedBytes); + } + + int size = originalByteBuffer.remaining(); + + ByteBuffer clone = null; + + if (size <= maxBufferedBytes) { + clone = bufferPool.poll(); + } + + if (logger.isLoggable(Level.FINEST)) { + logger.finest("clone buffer from pool=" + clone); + } + + if (clone == null || clone.remaining() < size) { + int allocateSize = Math.max(size, maxBufferedBytes / 2); + clone = createByteBuffer(allocateSize, + originalByteBuffer.isDirect()); + } + + clone.put(originalByteBuffer); + clone.flip(); + return clone; + } + } + + private static ByteBuffer createByteBuffer(int size, boolean isDirect) { + return ByteBufferFactory.allocateView(size, isDirect); + } + public static int getMaxBufferedBytes() { return maxBufferedBytes; } @@ -278,4 +507,20 @@ public static void setMaxBufferedBytes(int aMaxBufferedBytes) { maxBufferedBytes = aMaxBufferedBytes; } + + + public static void setMaxBufferPoolSize(int size) { + int poolSize = (size >= 0) ? size : DEFAULT_BUFFER_POOL_SIZE; + + if (maxBufferPoolSize == poolSize) return; + + maxBufferPoolSize = poolSize; + + bufferPool = new ArrayBlockingQueue(maxBufferPoolSize); } + + + public static int getMaxBufferPoolSize() { + return maxBufferPoolSize; + } +} Index: src/test/java/com/sun/grizzly/http/ArpSSLTest.java --- src/test/java/com/sun/grizzly/http/ArpSSLTest.java Base (BASE) +++ src/test/java/com/sun/grizzly/http/ArpSSLTest.java Locally Modified (Based On LOCAL) @@ -37,7 +37,6 @@ */ package com.sun.grizzly.http; -import com.sun.grizzly.arp.AsyncTask; import com.sun.grizzly.arp.AsyncHandler; import com.sun.grizzly.arp.AsyncFilter; import com.sun.grizzly.arp.AsyncExecutor; Index: src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java --- src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java Locally New +++ src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java Locally New @@ -0,0 +1,132 @@ +/* + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. + * + * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. + * + * The contents of this file are subject to the terms of either the GNU + * General Public License Version 2 only ("GPL") or the Common Development + * and Distribution License("CDDL") (collectively, the "License"). You + * may not use this file except in compliance with the License. You can obtain + * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html + * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific + * language governing permissions and limitations under the License. + * + * When distributing the software, include this License Header Notice in each + * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. + * Sun designates this particular file as subject to the "Classpath" exception + * as provided by Sun in the GPL Version 2 section of the License file that + * accompanied this code. If applicable, add the following below the License + * Header, with the fields enclosed by brackets [] replaced by your own + * identifying information: "Portions Copyrighted [year] + * [name of copyright owner]" + * + * Contributor(s): + * + * If you wish your version of this file to be governed by only the CDDL or + * only the GPL Version 2, indicate your decision by adding "[Contributor] + * elects to include this software in this distribution under the [CDDL or GPL + * Version 2] license." If you don't indicate a single choice of license, a + * recipient has the option to distribute your version of this file under + * either the CDDL, the GPL Version 2 or to extend the choice of license to + * its licensees as provided above. However, if you add GPL Version 2 code + * and therefore, elected the GPL Version 2 license, then the option applies + * only if the new code is made subject to such option by the copyright + * holder. + * + */ + +package com.sun.grizzly.http; + +import com.sun.grizzly.tcp.Adapter; +import com.sun.grizzly.tcp.Request; +import com.sun.grizzly.tcp.Response; +import com.sun.grizzly.tcp.StaticResourcesAdapter; +import com.sun.grizzly.util.buf.ByteChunk; +import com.sun.grizzly.util.buf.CharChunk; +import java.net.HttpURLConnection; +import java.net.URL; +import junit.framework.TestCase; + +/** + * + * @author Alexey Stashok + */ +public class AsyncHTTPResponseTest extends TestCase { + public static final int PORT = 18890; + + private static final byte[] abcd = new byte[] {'a', 'b', 'c', 'd'}; + + public void testSimpleAsyncResponse() throws Exception { + int responseLength = 1024 * 1024; + + SelectorThread selectorThread = new SelectorThread(); + try { + selectorThread.setPort(PORT); + selectorThread.setAsyncHttpWriteEnabled(true); + selectorThread.setAdapter( + new BigResponseAdapter(responseLength)); + selectorThread.listen(); + + HttpURLConnection connection = (HttpURLConnection) + new URL("http://localhost:" + PORT).openConnection(); + + int code = connection.getResponseCode(); + assertEquals(code, 200); + + int length = connection.getContentLength(); + assertEquals(length, responseLength); + + byte[] content = new byte[length]; + + int readBytes; + int offset = 0; + do { + readBytes = connection.getInputStream().read(content, offset, + length - offset); + offset += readBytes; + } while(readBytes != -1 && offset < length); + + assertEquals(offset, length); + + checkResult(content); + } finally { + selectorThread.stopEndpoint(); + } + } + + public static class BigResponseAdapter implements Adapter { + private int length; + + public BigResponseAdapter(int length) { + this.length = length; + } + + public void service(Request req, Response res) throws Exception { + ByteChunk chunk = new ByteChunk(length); + byte[] content = new byte[length]; + for (int i=0; i