# This patch file was generated by NetBeans IDE
# Following Index: paths are relative to: /Users/oleksiys/Projects/Grizzly/trunk/modules/http
# This patch can be applied using context Tools: Patch action on respective folder.
# It uses platform neutral UTF-8 encoding and \n newlines.
# Above lines and this line are ignored by the patching process.
Index: src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java
--- src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java Base (BASE)
+++ src/main/java/com/sun/grizzly/http/DefaultProcessorTask.java Locally Modified (Based On LOCAL)
@@ -403,6 +403,11 @@
protected boolean disableUploadTimeout = true;
+ /**
+ * Flag, which indicates if async HTTP write is enabled
+ */
+ protected boolean isAsyncHttpWriteEnabled;
+
// ----------------------------------------------------- Constructor ---- //
public DefaultProcessorTask(){
@@ -449,6 +454,7 @@
maxHttpHeaderSize,
bufferResponse);
}
+
request.setInputBuffer(inputBuffer);
response.setOutputBuffer(outputBuffer);
@@ -508,7 +514,11 @@
inputStream = (InputReader)input;
SocketChannelOutputBuffer channelOutputBuffer =
((SocketChannelOutputBuffer)outputBuffer);
- channelOutputBuffer.setChannel((SocketChannel)key.channel());
+ channelOutputBuffer.setAsyncHttpWriteEnabled(
+ isAsyncHttpWriteEnabled);
+ channelOutputBuffer.setAsyncQueueWriter(
+ selectorThread.getSelectorHandler().getAsyncQueueWriter());
+ channelOutputBuffer.setSelectionKey(key);
response.setChannel((SocketChannel)key.channel());
}
configPreProcess();
@@ -1613,8 +1623,25 @@
return uploadTimeout;
}
+ /**
+ * Is async HTTP write enabled.
+ * @return true, if async HTTP write enabled, or false
+ * otherwise.
+ */
+ public boolean isAsyncHttpWriteEnabled() {
+ return isAsyncHttpWriteEnabled;
+ }
/**
+ * Set if async HTTP write enabled.
+ * @param isAsyncHttpWriteEnabled true, if async HTTP write
+ * enabled, or false otherwise.
+ */
+ public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) {
+ this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
+ }
+
+ /**
* Register a new RequestProcessor instance.
*/
private void registerMonitoring(){
Index: src/main/java/com/sun/grizzly/http/SelectorThread.java
--- src/main/java/com/sun/grizzly/http/SelectorThread.java Base (BASE)
+++ src/main/java/com/sun/grizzly/http/SelectorThread.java Locally Modified (Based On LOCAL)
@@ -195,6 +195,12 @@
protected int maxHttpHeaderSize = Constants.DEFAULT_HEADER_SIZE;
+ /**
+ * Is Async HTTP write enabled.
+ */
+ protected boolean isAsyncHttpWriteEnabled;
+
+
protected int maxPostSize = 2 * 1024 * 1024;
@@ -964,7 +970,7 @@
}
- protected ProcessorTask configureProcessorTask(DefaultProcessorTask task){
+ protected ProcessorTask configureProcessorTask(DefaultProcessorTask task) {
task.setAdapter(adapter);
task.setMaxHttpHeaderSize(maxHttpHeaderSize);
task.setBufferSize(requestBufferSize);
@@ -975,6 +981,7 @@
task.setMaxPostSize(maxPostSize);
task.setTimeout(uploadTimeout);
task.setDisableUploadTimeout(disableUploadTimeout);
+ task.setAsyncHttpWriteEnabled(isAsyncHttpWriteEnabled);
if ( keepAliveCounter.dropConnection() ) {
task.setDropConnection(true);
@@ -1681,8 +1688,25 @@
this.maxHttpHeaderSize = maxHttpHeaderSize;
}
+ /**
+ * Is async HTTP write enabled
+ * @return true, if async HTTP write enabled,
+ * or false otherwise.
+ */
+ public boolean isAsyncHttpWriteEnabled() {
+ return isAsyncHttpWriteEnabled;
+ }
/**
+ * Set if async HTTP write enabled
+ * @param isAsyncHttpWriteEnabled true, if async HTTP
+ * write enabled, or false otherwise.
+ */
+ public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) {
+ this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
+ }
+
+ /**
* The minimun threads created at startup.
*/
public void setMinThreads(int minWorkerThreads){
@@ -1719,6 +1743,10 @@
}
+ public TCPSelectorHandler getSelectorHandler() {
+ return selectorHandler;
+ }
+
public Controller getController() {
return controller;
}
Index: src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java
--- src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java Base (BASE)
+++ src/main/java/com/sun/grizzly/http/SelectorThreadConfig.java Locally Modified (Based On LOCAL)
@@ -139,6 +139,12 @@
"com.sun.grizzly.useFileCache";
+ private final static String IS_ASYNC_HTTP_WRITE =
+ "com.sun.grizzly.http.asyncwrite.enabled";
+
+ private final static String ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE =
+ "com.sun.grizzly.http.asyncwrite.maxBufferPoolSize";
+
/**
* The string manager for this package.
*/
@@ -311,16 +317,25 @@
}
}
-
if (System.getProperty(USE_FILE_CACHE)!= null){
selectorThread.setFileCacheIsEnabled(
Boolean.valueOf(
System.getProperty(USE_FILE_CACHE)).booleanValue());
selectorThread.setLargeFileCacheEnabled(selectorThread.isFileCacheEnabled());
}
+
+ if (System.getProperty(IS_ASYNC_HTTP_WRITE)!= null) {
+ selectorThread.setAsyncHttpWriteEnabled(
+ Boolean.getBoolean(IS_ASYNC_HTTP_WRITE));
}
+ if (System.getProperty(ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE) != null) {
+ SocketChannelOutputBuffer.setMaxBufferPoolSize(Integer.getInteger(
+ ASYNC_HTTP_WRITE_MAX_BUFFER_POOL_SIZE, -1));
+ }
+ }
+
/**
* Configure properties on {@link SelectorThread}
*/
Index: src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java
--- src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java Base (BASE)
+++ src/main/java/com/sun/grizzly/http/SocketChannelOutputBuffer.java Locally Modified (Based On LOCAL)
@@ -38,6 +38,10 @@
package com.sun.grizzly.http;
+import com.sun.grizzly.async.AsyncQueueWriteUnit;
+import com.sun.grizzly.async.AsyncQueueWriter;
+import com.sun.grizzly.async.AsyncWriteCallbackHandler;
+import com.sun.grizzly.async.ByteBufferCloner;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -45,7 +49,14 @@
import com.sun.grizzly.util.OutputWriter;
import com.sun.grizzly.tcp.Response;
import com.sun.grizzly.tcp.http11.InternalOutputBuffer;
+import com.sun.grizzly.util.ByteBufferFactory;
import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Future;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Output buffer.
@@ -54,16 +65,60 @@
*
* @author Jean-Francois Arcand
* @author Scott Oaks
+ * @author Alexey Stashok
*/
-public class SocketChannelOutputBuffer extends InternalOutputBuffer{
+public class SocketChannelOutputBuffer extends InternalOutputBuffer {
+ private static Logger logger = SelectorThread.logger();
+ private static final int DEFAULT_BUFFER_POOL_SIZE = 16384;
+
+ private static int maxBufferPoolSize = DEFAULT_BUFFER_POOL_SIZE;
+
/**
+ * ByteBuffer pool to be used with async write
+ */
+ private static Queue bufferPool =
+ new ArrayBlockingQueue(maxBufferPoolSize);
+
+ /**
+ * {@link ByteBufferCloner} implementation, which is called by Grizzly
+ * framework at the time, when asynchronous write queue can not write
+ * the buffer direcly on socket and instead will put it in queue.
+ * This implementation tries to get temporary ByteBuffer from the pool,
+ * if no ByteBuffer is available - then new one will be created.
+ */
+ private static final ByteBufferCloner asyncHttpByteBufferCloner =
+ new ByteBufferClonerImpl();
+
+ /**
+ * {@link AsyncWriteCallbackHandler} implementation, which is responsible
+ * for returning cloned ByteBuffers to the pool
+ */
+ private static final AsyncWriteCallbackHandler asyncHttpWriteCallbackHandler =
+ new AsyncWriteCallbackHandlerImpl();
+
+ /**
* Underlying output channel.
*/
protected Channel channel;
+ /**
+ * Underlying selection key of the output channel.
+ */
+ protected SelectionKey selectionKey;
/**
+ * Flag, which indicates if async HTTP write is enabled
+ */
+ protected boolean isAsyncHttpWriteEnabled;
+
+ /**
+ * Asynchronous queue writer, which will be used if asyncHttp mode
+ * is enabled
+ */
+ protected AsyncQueueWriter asyncQueueWriter;
+
+ /**
* Underlying ByteByteBuffer
*/
protected ByteBuffer outputByteBuffer;
@@ -132,6 +187,66 @@
return channel;
}
+
+ /**
+ * Gets the underlying selection key of the output channel.
+ * @return the underlying selection key of the output channel.
+ */
+ public SelectionKey getSelectionKey() {
+ return selectionKey;
+ }
+
+
+ /**
+ * Sets the underlying selection key of the output channel.
+ * @param selectionKey the underlying selection key of the output channel.
+ */
+ public void setSelectionKey(SelectionKey selectionKey) {
+ this.selectionKey = selectionKey;
+ channel = selectionKey.channel();
+ }
+
+ /**
+ * Is async HTTP write enabled.
+ * @return true, if async HTTP write enabled, or false
+ * otherwise.
+ */
+ public boolean isAsyncHttpWriteEnabled() {
+ return isAsyncHttpWriteEnabled;
+ }
+
+ /**
+ * Set if async HTTP write enabled.
+ * @param isAsyncHttpWriteEnabled true, if async HTTP write
+ * enabled, or false otherwise.
+ */
+ public void setAsyncHttpWriteEnabled(boolean isAsyncHttpWriteEnabled) {
+ this.isAsyncHttpWriteEnabled = isAsyncHttpWriteEnabled;
+ }
+
+ /**
+ * Gets the asynchronous queue writer, which will be used if asyncHttp mode
+ * is enabled
+ *
+ * @return The asynchronous queue writer, which will be used if asyncHttp
+ * mode is enabled
+ */
+ protected AsyncQueueWriter getAsyncQueueWriter() {
+ return asyncQueueWriter;
+ }
+
+ /**
+ * Sets the asynchronous queue writer, which will be used if asyncHttp mode
+ * is enabled
+ *
+ * @param asyncQueueWriter The asynchronous queue writer, which will be
+ * used if asyncHttp mode is enabled
+ */
+ protected void setAsyncQueueWriter(AsyncQueueWriter asyncQueueWriter) {
+ this.asyncQueueWriter = asyncQueueWriter;
+ }
+
+
// --------------------------------------------------------- Public Methods
/**
@@ -180,10 +295,28 @@
* Flush the buffer by looping until the {@link ByteBuffer} is empty
* @param bb the ByteBuffer to write.
*/
- public void flushChannel(ByteBuffer bb) throws IOException{
- OutputWriter.flushChannel(((SocketChannel)channel), bb);
+ public void flushChannel(ByteBuffer bb) throws IOException {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("flushChannel isAsyncHttpWriteEnabled=" +
+ isAsyncHttpWriteEnabled + " bb=" + bb);
+ }
+
+ if (!isAsyncHttpWriteEnabled) {
+ OutputWriter.flushChannel(((SocketChannel) channel), bb);
bb.clear();
+ } else if (asyncQueueWriter != null) {
+ Future future = asyncQueueWriter.write(selectionKey, bb,
+ asyncHttpWriteCallbackHandler, null,
+ asyncHttpByteBufferCloner);
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("async flushChannel isDone=" + future.isDone());
}
+ bb.clear();
+ } else {
+ SelectorThread.logger().log(Level.WARNING,
+ "HTTP async write is enabled, but AsyncWriter is null.");
+ }
+ }
/**
@@ -270,6 +403,102 @@
}
+ /**
+ * {@link AsyncWriteCallbackHandler} implementation, which is responsible
+ * for returning cloned ByteBuffers to the pool
+ */
+ private static final class AsyncWriteCallbackHandlerImpl implements
+ AsyncWriteCallbackHandler {
+ public void onWriteCompleted(SelectionKey key,
+ AsyncQueueWriteUnit writtenRecord) {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("onWriteCompleted isCloned=" +
+ writtenRecord.isCloned());
+ }
+
+ if (writtenRecord.isCloned()) {
+ returnBuffer(writtenRecord.getByteBuffer());
+ }
+ }
+
+ public void onIOException(IOException ioException, SelectionKey key,
+ ByteBuffer buffer, Queue remainingQueue) {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("onIOException key=" + key +
+ " exception=" + ioException);
+ }
+ returnBuffer(buffer);
+
+ for(AsyncQueueWriteUnit unit : remainingQueue) {
+ returnBuffer(unit.getByteBuffer());
+ }
+ }
+
+ private boolean returnBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ int size = buffer.remaining();
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("return buffer buffer=" + buffer + " maxSize=" +
+ maxBufferedBytes);
+ }
+
+ if (size <= maxBufferedBytes) {
+ boolean wasReturned = bufferPool.offer(buffer);
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("return buffer to pool. result=" + wasReturned);
+ }
+
+ return wasReturned;
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * {@link ByteBufferCloner} implementation, which is called by Grizzly
+ * framework at the time, when asynchronous write queue can not write
+ * the buffer direcly on socket and instead will put it in queue.
+ * This implementation tries to get temporary ByteBuffer from the pool,
+ * if no ByteBuffer is available - then new one will be created.
+ */
+ private static final class ByteBufferClonerImpl
+ implements ByteBufferCloner {
+
+ public ByteBuffer clone(ByteBuffer originalByteBuffer) {
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("clone buffer=" + originalByteBuffer +
+ " maxBufferedBytes=" + maxBufferedBytes);
+ }
+
+ int size = originalByteBuffer.remaining();
+
+ ByteBuffer clone = null;
+
+ if (size <= maxBufferedBytes) {
+ clone = bufferPool.poll();
+ }
+
+ if (logger.isLoggable(Level.FINEST)) {
+ logger.finest("clone buffer from pool=" + clone);
+ }
+
+ if (clone == null || clone.remaining() < size) {
+ int allocateSize = Math.max(size, maxBufferedBytes / 2);
+ clone = createByteBuffer(allocateSize,
+ originalByteBuffer.isDirect());
+ }
+
+ clone.put(originalByteBuffer);
+ clone.flip();
+ return clone;
+ }
+ }
+
+ private static ByteBuffer createByteBuffer(int size, boolean isDirect) {
+ return ByteBufferFactory.allocateView(size, isDirect);
+ }
+
public static int getMaxBufferedBytes() {
return maxBufferedBytes;
}
@@ -278,4 +507,20 @@
public static void setMaxBufferedBytes(int aMaxBufferedBytes) {
maxBufferedBytes = aMaxBufferedBytes;
}
+
+
+ public static void setMaxBufferPoolSize(int size) {
+ int poolSize = (size >= 0) ? size : DEFAULT_BUFFER_POOL_SIZE;
+
+ if (maxBufferPoolSize == poolSize) return;
+
+ maxBufferPoolSize = poolSize;
+
+ bufferPool = new ArrayBlockingQueue(maxBufferPoolSize);
}
+
+
+ public static int getMaxBufferPoolSize() {
+ return maxBufferPoolSize;
+ }
+}
Index: src/test/java/com/sun/grizzly/http/ArpSSLTest.java
--- src/test/java/com/sun/grizzly/http/ArpSSLTest.java Base (BASE)
+++ src/test/java/com/sun/grizzly/http/ArpSSLTest.java Locally Modified (Based On LOCAL)
@@ -37,7 +37,6 @@
*/
package com.sun.grizzly.http;
-import com.sun.grizzly.arp.AsyncTask;
import com.sun.grizzly.arp.AsyncHandler;
import com.sun.grizzly.arp.AsyncFilter;
import com.sun.grizzly.arp.AsyncExecutor;
Index: src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java
--- src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java Locally New
+++ src/test/java/com/sun/grizzly/http/AsyncHTTPResponseTest.java Locally New
@@ -0,0 +1,132 @@
+/*
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
+ *
+ * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
+ *
+ * The contents of this file are subject to the terms of either the GNU
+ * General Public License Version 2 only ("GPL") or the Common Development
+ * and Distribution License("CDDL") (collectively, the "License"). You
+ * may not use this file except in compliance with the License. You can obtain
+ * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
+ * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
+ * language governing permissions and limitations under the License.
+ *
+ * When distributing the software, include this License Header Notice in each
+ * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
+ * Sun designates this particular file as subject to the "Classpath" exception
+ * as provided by Sun in the GPL Version 2 section of the License file that
+ * accompanied this code. If applicable, add the following below the License
+ * Header, with the fields enclosed by brackets [] replaced by your own
+ * identifying information: "Portions Copyrighted [year]
+ * [name of copyright owner]"
+ *
+ * Contributor(s):
+ *
+ * If you wish your version of this file to be governed by only the CDDL or
+ * only the GPL Version 2, indicate your decision by adding "[Contributor]
+ * elects to include this software in this distribution under the [CDDL or GPL
+ * Version 2] license." If you don't indicate a single choice of license, a
+ * recipient has the option to distribute your version of this file under
+ * either the CDDL, the GPL Version 2 or to extend the choice of license to
+ * its licensees as provided above. However, if you add GPL Version 2 code
+ * and therefore, elected the GPL Version 2 license, then the option applies
+ * only if the new code is made subject to such option by the copyright
+ * holder.
+ *
+ */
+
+package com.sun.grizzly.http;
+
+import com.sun.grizzly.tcp.Adapter;
+import com.sun.grizzly.tcp.Request;
+import com.sun.grizzly.tcp.Response;
+import com.sun.grizzly.tcp.StaticResourcesAdapter;
+import com.sun.grizzly.util.buf.ByteChunk;
+import com.sun.grizzly.util.buf.CharChunk;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import junit.framework.TestCase;
+
+/**
+ *
+ * @author Alexey Stashok
+ */
+public class AsyncHTTPResponseTest extends TestCase {
+ public static final int PORT = 18890;
+
+ private static final byte[] abcd = new byte[] {'a', 'b', 'c', 'd'};
+
+ public void testSimpleAsyncResponse() throws Exception {
+ int responseLength = 1024 * 1024;
+
+ SelectorThread selectorThread = new SelectorThread();
+ try {
+ selectorThread.setPort(PORT);
+ selectorThread.setAsyncHttpWriteEnabled(true);
+ selectorThread.setAdapter(
+ new BigResponseAdapter(responseLength));
+ selectorThread.listen();
+
+ HttpURLConnection connection = (HttpURLConnection)
+ new URL("http://localhost:" + PORT).openConnection();
+
+ int code = connection.getResponseCode();
+ assertEquals(code, 200);
+
+ int length = connection.getContentLength();
+ assertEquals(length, responseLength);
+
+ byte[] content = new byte[length];
+
+ int readBytes;
+ int offset = 0;
+ do {
+ readBytes = connection.getInputStream().read(content, offset,
+ length - offset);
+ offset += readBytes;
+ } while(readBytes != -1 && offset < length);
+
+ assertEquals(offset, length);
+
+ checkResult(content);
+ } finally {
+ selectorThread.stopEndpoint();
+ }
+ }
+
+ public static class BigResponseAdapter implements Adapter {
+ private int length;
+
+ public BigResponseAdapter(int length) {
+ this.length = length;
+ }
+
+ public void service(Request req, Response res) throws Exception {
+ ByteChunk chunk = new ByteChunk(length);
+ byte[] content = new byte[length];
+ for (int i=0; i