Property changes on: .
___________________________________________________________________
Added: svn:ignore
+ target
..merge.diff
Property changes on: contribs/bundles
___________________________________________________________________
Added: svn:ignore
+ target
Property changes on: contribs/bundles/grizzly-httpservice-bundle
___________________________________________________________________
Added: svn:ignore
+ target
Index: modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java
===================================================================
--- modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (revision 2805)
+++ modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (working copy)
@@ -44,6 +44,7 @@
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import com.sun.grizzly.http.SelectorThread;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.IOException;
import java.util.logging.Level;
@@ -81,7 +82,7 @@
}
});
- new Thread() {
+ new WorkerThreadImpl(new Runnable() {
@Override
public void run() {
try {
@@ -89,7 +90,7 @@
} catch (Exception ex) {
}
}
- }.start();
+ }).start();
try {
latch.await();
Index: modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java
===================================================================
--- modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (revision 2805)
+++ modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (working copy)
@@ -46,6 +46,7 @@
import com.sun.grizzly.tcp.http11.GrizzlyAdapter;
import com.sun.grizzly.tcp.http11.GrizzlyRequest;
import com.sun.grizzly.tcp.http11.GrizzlyResponse;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -1006,7 +1007,7 @@
t.printStackTrace();
}
- new Thread(){
+ new WorkerThreadImpl(new Runnable(){
@Override
public void run(){
try {
@@ -1022,8 +1023,8 @@
res.resume();
}
}
- }.start();;
- }
+ }).start();
+ }
});
try {
Index: modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java
===================================================================
--- modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (revision 2805)
+++ modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (working copy)
@@ -440,7 +440,22 @@
// being time.
protected boolean handleKeepAliveBlockingThread = false;
+ /**
+ * false prevents the selectionkey from being re registered after async is done.
+ * if false then the socket lifecycle ending needs to be handled by you
+ */
+ protected boolean reRegisterSelectionKey = true;
+ /**
+ * true if asyncprotocolfilter should cancel the selectionkey
+ */
+ protected boolean aptCancelKey;
+
+ /**
+ * used by asyncprotocolfilter
+ */
+ private final TaskEvent event = new TaskEvent(this);
+
// ----------------------------------------------------- Constructor ---- //
public ProcessorTask(){
@@ -523,8 +538,10 @@
process(inputStream,
outputStream);
} catch(Throwable ex){
- logger.log(Level.FINE,
+ if (logger.isLoggable(Level.FINE)){
+ logger.log(Level.FINE,
sm.getString("processorTask.errorProcessingRequest"), ex);
+ }
} finally {
terminateProcess();
}
@@ -658,11 +675,7 @@
if (response.isSuspended()){
WorkerThread wt = (WorkerThread)Thread.currentThread();
wt.getAttachment().setAttribute("suspend",Boolean.TRUE);
-
- ((SelectorThreadKeyHandler) selectorHandler.
- getSelectionKeyHandler()).resetExpiration();
- key.attach(response.getResponseAttachment());
-
+ key.attach(response.getResponseAttachment());
return;
}
@@ -674,8 +687,10 @@
adapter.afterService(request,response);
} catch (Exception ex) {
error = true;
- logger.log(Level.FINEST,
- sm.getString("processorTask.errorFinishingRequest"), ex);
+ if (logger.isLoggable(Level.FINEST)){
+ logger.log(Level.FINEST,
+ sm.getString("processorTask.errorFinishingRequest"), ex);
+ }
}
// Finish the handling of the request
@@ -773,7 +788,7 @@
WorkerThread workerThread = (WorkerThread)Thread.currentThread();
KeepAliveThreadAttachment k =
(KeepAliveThreadAttachment) workerThread.getAttachment();
- k.setActiveThreadTimeout(transactionTimeout);
+ k.setIdleTimeoutDelay(transactionTimeout);
inputBuffer.parseHeaders();
@@ -850,20 +865,10 @@
// control how Grizzly ARP extension handle their asynchronous
// behavior, we must make sure we are never called twice.
if (asyncSemaphore.tryAcquire(0, TimeUnit.SECONDS)) {
- // Nobody is listening, avoid extra operation.
- if (getTaskListener() == null){
- return;
- }
-
- TaskEvent event = new TaskEvent();
- if (error) {
- event.setStatus(TaskEvent.ERROR);
- } else {
- event.setStatus(TaskEvent.COMPLETED);
- }
- event.attach(this);
- getTaskListener().taskEvent(event);
- event.attach(null);
+ if (getTaskListener() != null){
+ event.setStatus(error?TaskEvent.ERROR:TaskEvent.COMPLETED);
+ getTaskListener().taskEvent(event);
+ }
}
} catch (InterruptedException ex) {
if (logger.isLoggable(Level.WARNING)){
@@ -929,10 +934,11 @@
try {
outputBuffer.commit();
} catch (IOException ex) {
- logger.log(Level.FINEST,
+ if (logger.isLoggable(Level.FINEST)){
+ logger.log(Level.FINEST,
sm.getString("processorTask.nonBlockingError"), ex);
- // Set error flag
error = true;
+ }
}
} else if (actionCode == ActionCode.ACTION_ACK) {
@@ -951,7 +957,6 @@
try {
outputBuffer.sendAck();
} catch (IOException e) {
- // Set error flag
error = true;
}
}
@@ -965,10 +970,11 @@
try {
outputBuffer.endRequest();
} catch (IOException e) {
- logger.log(Level.FINEST,
+ if (logger.isLoggable(Level.FINEST)){
+ logger.log(Level.FINEST,
sm.getString("processorTask.nonBlockingError"), e);
- // Set error flag
- error = true;
+ error = true;
+ }
}
} else if (actionCode == ActionCode.ACTION_RESET) {
@@ -1094,7 +1100,7 @@
}
} catch (Exception e) {
logger.log(Level.WARNING,
- sm.getString("processorTask.exceptionSSLcert"),e);
+ sm.getString("processorTask.exceptionSSLcert"),e);
}
}
} else if ( actionCode == ActionCode.ACTION_POST_REQUEST ) {
@@ -1103,18 +1109,15 @@
try{
handler.handle(request,Interceptor.RESPONSE_PROCEEDED);
} catch(IOException ex){
- logger.log(Level.FINEST,
- "Handler exception",ex);
+ logger.log(Level.FINEST,"Handler exception",ex);
}
}
} else if ( actionCode == ActionCode.CANCEL_SUSPENDED_RESPONSE ) {
key.attach(null);
} else if ( actionCode == ActionCode.RESET_SUSPEND_TIMEOUT ) {
- if (key.attachment() instanceof Response.ResponseAttachment){
- Response.ResponseAttachment ra = ((Response.ResponseAttachment)key.attachment());
- if (ra != null){
- ra.resetTimeout();
- }
+ Object attachment = key.attachment();
+ if (attachment instanceof Response.ResponseAttachment){
+ ((Response.ResponseAttachment)attachment).resetTimeout();
}
} else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH ) {
if (key != null) {
@@ -1913,6 +1916,8 @@
setTaskListener(null);
socket = null;
dropConnection = false;
+ reRegisterSelectionKey = true;
+ aptCancelKey = false;
key = null;
}
@@ -2301,5 +2306,40 @@
public void setUseChunking(boolean useChunking) {
this.useChunking = useChunking;
}
+
+ /**
+ * true if SelectionKey should be reregistered with Selector after async is done
+ * default is true.
+ * if false then the socket lifecycle ending needs to be handled by you
+ * @param reRegisterSelectionKey
+ */
+ public void setReRegisterSelectionKey(boolean reRegisterSelectionKey) {
+ this.reRegisterSelectionKey = reRegisterSelectionKey;
+ }
+
+ /**
+ * true if SelectionKey should be reregistered with Selector after async is done.
+ * default is true.
+ * if false then the socket lifecycle ending needs to be handled by you
+ * @return
+ */
+ public boolean getReRegisterSelectionKey() {
+ return reRegisterSelectionKey;
+ }
+
+
+ /**
+ * true if asyncprotocolfilter should cancel the selectionkey
+ */
+ public void setAptCancelKey(boolean aptCancelKey) {
+ this.aptCancelKey = aptCancelKey;
+ }
+
+ /**
+ * true if asyncprotocolfilter should cancel the selectionkey
+ */
+ public boolean getAptCancelKey() {
+ return aptCancelKey;
+ }
}
Index: modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java
===================================================================
--- modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (revision 2805)
+++ modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (working copy)
@@ -39,6 +39,9 @@
package com.sun.grizzly.http;
import com.sun.grizzly.util.ThreadAttachment;
+import java.nio.channels.SelectionKey;
+import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* Add keep alive counting mechanism to the {@link ThreadAttachement}.
@@ -46,8 +49,9 @@
* @author Jeanfrancois Arcand
*/
public class KeepAliveThreadAttachment extends ThreadAttachment{
- private int keepAliveCount = 0;
-
+ protected final static Logger logger = SelectorThread.logger();
+
+ private int keepAliveCount;
/**
* The stats object used to gather statistics.
*/
@@ -88,4 +92,22 @@
keepAliveCount = 0;
}
+ @Override
+ public void release(SelectionKey selectionKey) {
+ super.release(selectionKey);
+ resetKeepAliveCount();
+ }
+
+
+ @Override
+ public boolean timedOut(SelectionKey selectionKey) {
+ Thread t = activeThread();
+ if (t != null) {
+ if (logger.isLoggable(Level.WARNING)) {
+ logger.log(Level.WARNING, "Interrupting idle Thread: " + t.getName());
+ }
+ t.interrupt();
+ }
+ return true;
+ }
}
Index: modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java
===================================================================
--- modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (revision 2805)
+++ modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (working copy)
@@ -38,13 +38,11 @@
package com.sun.grizzly.http;
import com.sun.grizzly.DefaultSelectionKeyHandler;
-import com.sun.grizzly.tcp.Response;
-import com.sun.grizzly.tcp.Response.ResponseAttachment;
+import com.sun.grizzly.SelectionKeyHandler;
import com.sun.grizzly.util.Copyable;
import com.sun.grizzly.util.SelectionKeyAttachment;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
-import java.util.logging.Level;
/**
* Default HTTP {@link SelectionKeyHandler} implementation
@@ -71,6 +69,19 @@
}
@Override
+ public void cancel(SelectionKey key) {
+ if (key != null) {
+ if (selectorThread.getThreadPool() instanceof StatsThreadPool) {
+ if (selectorThread.isMonitoringEnabled() &&
+ ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) {
+ selectorThread.getRequestGroupInfo().decreaseCountOpenConnections();
+ }
+ }
+ super.cancel(key);
+ }
+ }
+
+ @Override
public void doRegisterKey(SelectionKey key, int ops, long currentTime) {
Object attachment = key.attachment();
if (attachment instanceof KeepAliveThreadAttachment) {
@@ -85,40 +96,12 @@
}
key.interestOps(key.interestOps() | ops);
}
-
- @Override
- public void cancel(SelectionKey key) {
- if (key == null) {
- return;
- }
- if (selectorThread.getThreadPool() instanceof StatsThreadPool) {
- if (selectorThread.isMonitoringEnabled() &&
- ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) {
- selectorThread.getRequestGroupInfo().decreaseCountOpenConnections();
- }
- }
-
- Object attachment = key.attachment();
- if (attachment instanceof KeepAliveThreadAttachment) {
- KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment;
- k.resetKeepAliveCount();
- }
- super.cancel(key);
- }
-
+
/**
- * Reset the expiration time
- */
- public void resetExpiration() {
- nextKeysExpiration = 0;
- }
-
- /**
* {@inheritDoc}
*/
@Override
public void expire(Iterator iterator) {
- //must check for timeout, attachments can have such interest
final long currentTime = System.currentTimeMillis();
if (currentTime < nextKeysExpiration) {
return;
@@ -129,51 +112,21 @@
if (!key.isValid()) {
continue;
}
+
Object attachment = key.attachment();
if (attachment != null) {
long expire = getExpirationStamp(attachment);
- if (expire == SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
- continue;
- }
- long idleLimit, activeThreadTimeout;
- if (attachment instanceof KeepAliveThreadAttachment) {
- activeThreadTimeout = ((KeepAliveThreadAttachment) attachment).getActiveThreadTimeout();
+ if (expire != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
+ long idleLimit = getIdleLimit(attachment);
- if (activeThreadTimeout != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
- idleLimit = activeThreadTimeout;
- } else {
- idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay();
- if (idleLimit == SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
- //this is true when attachment class dont have idletimeoutdelay configured.
- idleLimit = timeout;
- }
- }
- } else {
- idleLimit = timeout;
+ if (idleLimit != -1 && currentTime - expire >= idleLimit &&
+ (!(attachment instanceof SelectionKeyAttachment) ||
+ ((SelectionKeyAttachment)attachment).timedOut(key))){
+ // selectorHandler.addPendingKeyCancel(key);
+ cancel(key);
+ }
}
- if (idleLimit == -1) {
- continue;
- }
-
- if (currentTime - expire >= idleLimit) {
- if (attachment instanceof Response.ResponseAttachment) {
- ((ResponseAttachment) attachment).timeout();
- key.attach(null);
- continue;
- }
-
- if (attachment instanceof KeepAliveThreadAttachment) {
- KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment;
- if (k.activeThread() != null) {
- if (logger.isLoggable(Level.WARNING)) {
- logger.log(Level.WARNING, "Interrupting idle Thread: " + k.activeThread().getName());
- }
- k.activeThread().interrupt();
- }
- }
- cancel(key);
- }
}
}
}
@@ -184,14 +137,26 @@
*
* @param {@link SelectionKey}
*/
- private long getExpirationStamp(Object attachment) {
+ protected long getExpirationStamp(Object attachment) {
if (attachment instanceof Long) {
return (Long) attachment;
- } else if (attachment instanceof SelectionKeyAttachment) {
+ }
+ if (attachment instanceof SelectionKeyAttachment) {
return ((SelectionKeyAttachment) attachment).getTimeout();
- } else if (attachment instanceof Response.ResponseAttachment) {
- return ((Response.ResponseAttachment) attachment).getExpirationTime() - timeout;
}
return SelectionKeyAttachment.UNLIMITED_TIMEOUT;
}
+
+ /**
+ * returns idle limit
+ */
+ private long getIdleLimit(Object attachment){
+ if (attachment instanceof SelectionKeyAttachment){
+ long idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay();
+ if (idleLimit != SelectionKeyAttachment.UNLIMITED_TIMEOUT) {
+ return idleLimit;
+ }
+ }
+ return timeout;
+ }
}
Index: modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java
===================================================================
--- modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (revision 2805)
+++ modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (working copy)
@@ -110,11 +110,12 @@
*/
public boolean execute(Context ctx) throws IOException{
HttpWorkerThread workerThread = ((HttpWorkerThread)Thread.currentThread());
+
+ SelectionKey key = ctx.getSelectionKey();
+
+ setSelectionKeyTimeout(key, Long.MAX_VALUE);
- setSelectionKeyTimeout(ctx.getSelectionKey(), Long.MAX_VALUE);
-
- StreamAlgorithm streamAlgorithm =
- workerThread.getStreamAlgorithm();
+ StreamAlgorithm streamAlgorithm = workerThread.getStreamAlgorithm();
if (streamAlgorithm == null){
try{
streamAlgorithm = (StreamAlgorithm)algorithmClass
@@ -142,8 +143,7 @@
inputStream = createByteBufferInputStream();
}
configureByteBufferInputStream(inputStream, ctx, workerThread);
-
- SelectionKey key = ctx.getSelectionKey();
+
SocketChannel socketChannel = (SocketChannel) key.channel();
streamAlgorithm.setChannel(socketChannel);
@@ -160,17 +160,14 @@
ctx.setKeyRegistrationState(Context.KeyRegistrationState.NONE);
if (streamAlgorithm.parse(byteBuffer)){
- ProcessorTask processor =
- selectorThread.getProcessorTask();
+ ProcessorTask processor = selectorThread.getProcessorTask();
configureProcessorTask(processor, ctx, workerThread,
- streamAlgorithm.getHandler(), inputStream);
-
+ streamAlgorithm.getHandler(), inputStream);
try{
selectorThread.getAsyncHandler().handle(processor);
} catch (Throwable ex){
logger.log(Level.INFO,"Processor exception",ex);
- ctx.setKeyRegistrationState(
- Context.KeyRegistrationState.CANCEL);
+ ctx.setKeyRegistrationState(Context.KeyRegistrationState.CANCEL);
return false;
}
}
@@ -196,18 +193,24 @@
InputReader is = (InputReader) processor.getInputStream();
is.getByteBuffer().clear();
- byteBufferStreams.offer(is);
-
+ byteBufferStreams.offer(is);
+
SelectorThread selectorThread = processor.getSelectorThread();
- if (processor.isKeepAlive() && !processor.isError()){
- setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE);
-
- selectorThread.registerKey(processor.getSelectionKey());
- } else {
- selectorThread.cancelKey(processor.getSelectionKey());
+ boolean cancelkey = processor.getAptCancelKey() || processor.isError()
+ || !processor.isKeepAlive();
+ try{
+ if (!cancelkey){
+ if (processor.getReRegisterSelectionKey()){
+ setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE);
+ selectorThread.registerKey(processor.getSelectionKey());
+ }
+ }else{
+ selectorThread.cancelKey(processor.getSelectionKey());
+ }
+ }finally{
+ processor.recycle();
+ selectorThread.returnTask(processor);
}
- processor.recycle();
- selectorThread.returnTask(processor);
}
}
Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java
===================================================================
--- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (revision 2805)
+++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (working copy)
@@ -74,13 +74,11 @@
((CometHandler)cometHandler).onInterrupt((CometEvent)cometEvent);
break;
case CometEvent.NOTIFY:
- ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
- break;
case CometEvent.READ:
- ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
- break;
case CometEvent.WRITE:
+ synchronized(cometHandler){
((CometHandler)cometHandler).onEvent((CometEvent)cometEvent);
+ }
break;
case CometEvent.INITIALIZE:
((CometHandler)cometHandler).onInitialize((CometEvent)cometEvent);
@@ -92,7 +90,7 @@
throw new IllegalStateException();
}
} catch (IOException ex){
- Controller.logger().log(Level.WARNING,"",ex);
+ Controller.logger().log(Level.FINE,"",ex);
}
}
}
Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java
===================================================================
--- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (revision 2805)
+++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (working copy)
@@ -38,10 +38,7 @@
package com.sun.enterprise.web.connector.grizzly.comet;
-import com.sun.grizzly.comet.CometTask;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.logging.Level;
/**
* The main object used by {@link CometHandler}.
@@ -56,7 +53,7 @@
* @author Jeanfrancois Arcand
* @deprecated use {@link CometContext}
*/
-public class CometContext extends com.sun.grizzly.comet.CometContext{
+public class CometContext extends com.sun.grizzly.comet.CometContext{
private final CometEvent eventInitialize;
@@ -67,8 +64,8 @@
*/
public CometContext(String contextPath, int continuationType) {
super(contextPath, continuationType);
- this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this);
- this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this);
+ this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this);
+ this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this);
}
@@ -124,13 +121,8 @@
* {@inheritDoc}
*/
@Override
- public void notify(final E attachment) throws IOException {
- CometEvent event = new CometEvent(CometEvent.NOTIFY,this);
- event.attach(attachment);
- Iterator iterator = handlers.keySet().iterator();
- notificationHandler.setBlockingNotification(blockingNotification);
- notificationHandler.notify((com.sun.grizzly.comet.CometEvent)event,iterator);
- resetSuspendIdleTimeout();
+ public void notify(final Object attachment) throws IOException {
+ super.notify(attachment);
}
@@ -146,14 +138,14 @@
* {@inheritDoc}
*/
@Override
- public void notify(final E attachment,final int eventType,final int cometHandlerID)
+ public void notify(final Object attachment,final int eventType,final int cometHandlerID)
throws IOException{
CometHandler cometHandler = getCometHandler(cometHandlerID);
if (cometHandler == null){
throw new IllegalStateException(INVALID_COMET_HANDLER);
}
- CometEvent event = new CometEvent(eventType,this);
+ CometEvent event = new CometEvent(eventType,this);
event.attach(attachment);
notificationHandler.setBlockingNotification(blockingNotification);
@@ -173,30 +165,5 @@
protected void initialize(com.sun.grizzly.comet.CometHandler handler) throws IOException {
((com.sun.enterprise.web.connector.grizzly.comet.CometHandler)handler).onInitialize(eventInitialize);
}
-
- /**
- * Interrupt a {@link CometHandler} by invoking {@link CometHandler#onInterrupt}
- */
- @Override
- protected boolean interrupt(CometTask task,boolean removecomethandler, boolean resume) {
- boolean status = true;
- try{
- if (removecomethandler){
- status = (handlers.remove(task.getCometHandler()) != null);
- if (status && resume){
- ((com.sun.enterprise.web.connector.grizzly.comet.CometHandler)
- task.getCometHandler()).onInterrupt(eventInterrupt);
- }else{
- logger.finer(ALREADY_REMOVED);
- }
- }
- } catch (Throwable ex){
- status = false;
- logger.log(Level.FINE,"Unable to interrupt",ex);
- }finally{
- activeTasks.remove(task);
- return status;
- }
- }
}
-
+
Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java
===================================================================
--- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (revision 2805)
+++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (working copy)
@@ -87,36 +87,10 @@
*/
@Override
public CometContext register(String topic, int type){
- // Double checked locking used used to prevent the otherwise static/global
- // locking, cause example code does heavy usage of register calls
- // for existing topics from http get calls etc.
- CometContext cometContext = (CometContext)activeContexts.get(topic);
- if (cometContext == null){
- synchronized(activeContexts){
- cometContext = (CometContext)activeContexts.get(topic);
- if (cometContext == null){
- cometContext = (CometContext)cometContextCache.poll();
- if (cometContext != null)
- cometContext.setTopic(topic);
- if (cometContext == null){
- cometContext = new CometContext(topic, type);
- NotificationHandler notificationHandler
- = new DefaultNotificationHandler();
- cometContext.setNotificationHandler(notificationHandler);
- if (notificationHandler != null && (notificationHandler
- instanceof DefaultNotificationHandler)){
- ((DefaultNotificationHandler)notificationHandler)
- .setThreadPool(threadPool);
- }
- }
- activeContexts.put(topic,cometContext);
- }
- }
- }
- return cometContext;
+ return (CometContext) super.register(topic, type);
}
+
-
/**
* {@inheritDoc}
*/
Index: modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java
===================================================================
--- modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (revision 2805)
+++ modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (working copy)
@@ -57,11 +57,13 @@
import com.sun.grizzly.tcp.http11.InternalOutputBuffer;
import com.sun.grizzly.tcp.http11.filters.VoidOutputFilter;
import com.sun.grizzly.util.LoggerUtils;
+import com.sun.grizzly.util.SelectionKeyAttachment;
+import com.sun.grizzly.util.WorkerThreadImpl;
import com.sun.grizzly.util.buf.ByteChunk;
import com.sun.grizzly.util.http.MimeHeaders;
import java.io.IOException;
+import java.nio.channels.SelectionKey;
import java.util.Locale;
-
import java.nio.channels.SocketChannel;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
@@ -892,21 +894,20 @@
}
- public static class ResponseAttachment{
-
- private A attachment;
+ public static class ResponseAttachment extends SelectionKeyAttachment
+ implements Runnable {
+
private CompletionHandler super A> completionHandler;
- private Long timeout;
- private Long expiration;
- private Response response;
+ private final A attachment;
+ private final long idletimeoutdelay;
+ private final Response response;
- public ResponseAttachment(Long timeout,A attachment,
+ public ResponseAttachment(long idletimeoutdelay,A attachment,
CompletionHandler super A> completionHandler, Response response){
- this.timeout = timeout;
+ this.idletimeoutdelay = idletimeoutdelay;
this.attachment = attachment;
this.completionHandler = completionHandler;
- this.response = response;
-
+ this.response = response;
resetTimeout();
}
@@ -914,21 +915,20 @@
return attachment;
}
-
public CompletionHandler super A> getCompletionHandler() {
return completionHandler;
}
public void resetTimeout(){
- expiration = System.currentTimeMillis() + timeout;
+ timeout = System.currentTimeMillis();
}
-
-
- public Long getExpirationTime() {
- return expiration;
+
+ @Override
+ public long getIdleTimeoutDelay() {
+ return idletimeoutdelay;
}
+
-
public void resume(){
completionHandler.resumed(attachment);
try{
@@ -939,11 +939,21 @@
LoggerUtils.getLogger().log(Level.FINEST,"resume",ex);
}
}
-
-
- public void timeout(){
+
+ @Override
+ public boolean timedOut(SelectionKey Key) {
+ Key.attach(null);
+ run();
+ //((WorkerThreadImpl)Thread.currentThread()).
+ // getPendingIOhandler().addPendingIO(this);
+ return false;
+ }
+
+ @Override
+ public void run() {
timeout(true);
}
+
public void timeout(boolean forceClose){
// If the buffers are empty, commit the response header
Index: modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java
===================================================================
--- modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (revision 2805)
+++ modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (working copy)
@@ -49,7 +49,7 @@
public abstract class SelectionKeyAttachment {
public static final long UNLIMITED_TIMEOUT = Long.MIN_VALUE;
- private long timeout = UNLIMITED_TIMEOUT;
+ protected long timeout = UNLIMITED_TIMEOUT;
public static Object getAttachment(SelectionKey key) {
Object attachment = key.attachment();
@@ -60,25 +60,45 @@
return attachment;
}
- public long getTimeout() {
- return timeout;
- }
-
/**
* returns the idle timeout delay.
* default it returns Long.MIN_VALUE , meaning null.
* -1 means no timeout.
- * subclass need to implement it.
+ * subclass need to override it.
* @return
*/
public long getIdleTimeoutDelay(){
return UNLIMITED_TIMEOUT;
}
+ /**
+ * subclass need to override this method for it to work.
+ * Long.MIN_VALUE means null , and default value will be used.
+ * -1 means no timeout.
+ * @param idletimeoutdelay
+ */
+ public void setIdleTimeoutDelay(long idletimeoutdelay){
+ throw new IllegalStateException("setIdleTimeoutDelay not implemented in subclass");
+ }
+
+
+ public long getTimeout() {
+ return timeout;
+ }
+
public void setTimeout(long timeout) {
this.timeout = timeout;
}
+ /**
+ * called when idle timeout detected.
+ * return true if key should be canceled.
+ */
+ public boolean timedOut(SelectionKey Key){
+ return true;
+ }
+
+
public void release(SelectionKey selectionKey) {
timeout = UNLIMITED_TIMEOUT;
}
Index: modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java
===================================================================
--- modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (revision 2805)
+++ modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (working copy)
@@ -53,7 +53,7 @@
*/
public class ThreadAttachment extends SelectionKeyActionAttachment
implements AttributeHolder {
-
+
/**
* The maximum time this object can be associated with an active {@link Thread}
*/
@@ -310,18 +310,19 @@
* Set the time, in milliseconds, this object can be attached to a {@link Thread}
* @param the time, in milliseconds, this object can be attached to a {@link Thread}
*/
- public void setActiveThreadTimeout(long activeThreadTimeout){
+ @Override
+ public void setIdleTimeoutDelay(long activeThreadTimeout) {
this.activeThreadTimeout = activeThreadTimeout;
-
- // As soon as we get invoked we grab the Thread
activeThread= Thread.currentThread();
}
-
+
+
/**
* Return the time, in milliseconds, this object can be attached to a {@link Thread}
* @return the time, in milliseconds, this object can be attached to a {@link Thread}
- */
- public long getActiveThreadTimeout(){
+ */
+ @Override
+ public long getIdleTimeoutDelay() {
return activeThreadTimeout;
}
}
Index: modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java
===================================================================
--- modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (revision 2805)
+++ modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (working copy)
@@ -37,6 +37,7 @@
*/
package com.sun.grizzly.util;
+import com.sun.grizzly.tcp.PendingIOhandler;
import java.util.concurrent.Callable;
import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType;
import com.sun.grizzly.util.ThreadAttachment.Mode;
@@ -79,8 +80,14 @@
* The size of the ByteBuffer attached to this object.
*/
private int initialByteBufferSize;
+
+
+ /**
+ * used by selectionkey attachments to enqueue io events that will be executed in
+ * selectorhandler.postselect by worker threads instead of the selector thread.
+ */
+ private PendingIOhandler pendingIOhandler;
-
/**
* Create a Thread that will synchronizes/block on
* {@link DefaultThreadPool} instance.
@@ -90,7 +97,14 @@
public WorkerThreadImpl(ThreadGroup threadGroup, Runnable runnable){
this(threadGroup, runnable, DEFAULT_BYTE_BUFFER_SIZE);
}
-
+
+ public WorkerThreadImpl(Runnable runnable){
+ this(null, "workerthread", runnable, 0);
+ }
+
+ public WorkerThreadImpl(String name, Runnable runnable){
+ this(null, name, runnable, 0);
+ }
/**
* Create a Thread that will synchronizes/block on
* {@link DefaultThreadPool} instance.
@@ -278,6 +292,22 @@
}
+ /**
+ * used by selectionkey attachments to enqueue io events that will be executed in
+ * selectorhandler.postselect by worker threads instead of the selector thread.
+ */
+ public PendingIOhandler getPendingIOhandler() {
+ return pendingIOhandler;
+ }
+
+ /**
+ * used by selectionkey attachments to enqueue io events that will be executed in
+ * selectorhandler.postselect by worker threads instead of the selector thread.
+ */
+ public void setPendingIOhandler(PendingIOhandler pendingIOhandler) {
+ this.pendingIOhandler = pendingIOhandler;
+ }
+
@Override
protected void reset() {
if (threadAttachment != null) {
Index: modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java
===================================================================
--- modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (revision 2805)
+++ modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (working copy)
@@ -40,9 +40,7 @@
import com.sun.grizzly.arp.AsyncHandler;
import com.sun.grizzly.arp.DefaultAsyncHandler;
import com.sun.grizzly.http.SelectorThread;
-import com.sun.grizzly.http.StatsThreadPool;
import java.io.BufferedInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
@@ -51,7 +49,6 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
@@ -62,13 +59,13 @@
* @author Gustav Trede
*/
public class CometUnitTest extends TestCase {
- private final int port = 19000;
+ private final int port = 19100;
private SocketAddress connectadr;
- private final int socketreusedelayMilliSec = 0;
+ private final int socketreusedelayMilliSec = 40;
private static volatile boolean status;
private static volatile boolean testisdone;
private SelectorThread st;
- private final String context = "/cometText";
+ private final String context = "/cometTextn";
private final byte joinmessage = 126;
private final byte[] connectstr=
("POST /index.html/comet HTTP/1.1\r\n"+
@@ -81,9 +78,16 @@
}
@Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (st != null)
+ st.stopEndpoint();
+ }
+
+ @Override
protected void setUp() throws Exception {
- super.setUp();
- init(false);
+ super.setUp();
+ init(false);
}
protected void init(boolean useconcurrentcomethandler) throws Exception{
@@ -92,7 +96,7 @@
System.err.println("JVM: "+rmx.getVmVendor()+" "+rmx.getVmName()+" "+rmx.getVmVersion()+" params: "+rmx.getInputArguments());
st = new SelectorThread();
st.setPort(port);
- st.setDisplayConfiguration(true);
+ st.setDisplayConfiguration(false);
st.setAdapter(new CometTestAdapter(context,useconcurrentcomethandler,-1));
st.setEnableAsyncExecution(true);
AsyncHandler asyncHandler = new DefaultAsyncHandler();
@@ -100,10 +104,7 @@
st.setAsyncHandler(asyncHandler);
st.setTcpNoDelay(true);
st.setLinger(-1);
- /*st.setThreadPool( new StatsThreadPool(16,
- 32, 50,
- StatsThreadPool.DEFAULT_IDLE_THREAD_KEEPALIVE_TIMEOUT,
- TimeUnit.MILLISECONDS));*/
+
try {
st.listen();
} catch (Exception ex) {
@@ -113,44 +114,51 @@
}
- @Override
- protected void tearDown() throws Exception {
- super.tearDown();
- if (st != null)
- st.stopEndpoint();
- }
-
/* public void testLongPollingSocketReuse() throws Exception{
doActualLogic(true,false,40,20);
}*/
- /* public void testLongPollingNewSocket() throws Exception{
- doActualLogic(false,false,64,5);
- }
-*/
+ /* public void testLongPollingNewSocket() throws Exception{
+ doActualLogic(false,false,6500,64);
+ }*/
- public void testStreaming2() throws Exception{
- doActualLogic(false,true,10,4);
+ public void testStreaming1() throws Throwable{
+ //doActualLogic(false,true,15,1,false);
}
+ /* public void testStreaming2() throws Throwable{
+ doActualLogic(false,true,21,4, false);
+ }
+
+ public void testStreaming3() throws Throwable{
+ doActualLogic(false,true,21,64, false);
+ }*/
+
+ /* public void testStreaming5() throws Throwable{
+ doActualLogic(false,true, 15, 256);
+ }*/
+
protected void doActualLogic(final boolean socketreuse,final boolean streaming,
- final int secondspertest,final int threadcount) throws Exception{
- System.err.println(streaming?"STREAMING-":"LONGPOLLING-"+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount);
+ final int secondspertest,final int threadcount, boolean spreadnotify) throws Throwable{
+ System.err.println((streaming?"STREAMING-":"LONGPOLLING-")+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount+" spreadNotifyToManyThreads: "+spreadnotify);
//int cpus = Runtime.getRuntime().availableProcessors();
+ ((DefaultNotificationHandler)CometTestAdapter.cometContext.notificationHandler).
+ setSpreadNotifyToManyToThreads(spreadnotify);
testisdone = false;
msgc.set(0);
- CometTestAdapter.usetreaming = streaming;
- status = true;
+ CometTestAdapter.usetreaming = streaming;
final CountDownLatch threadsaredone = new CountDownLatch(threadcount);
try{
+ status = true;
for (int i=0;i2300){
+ if (deltatime>4500){
t1 = t2;
int currenttotalmsg = msgc.get();
System.err.println(
- " events/sec : "+((currenttotalmsg-oldtotal)*1000/deltatime)+
+ " K events/sec : "+((currenttotalmsg-oldtotal+500)/deltatime)+
" comethandlers: "+CometTestAdapter.cometContext.handlers.size()+
- " cometWorkqueue: "+cometexecutor.getQueue().size()
+ " workqueue: "+queuesize+
+ " broadcastsper: "+eventbroadcasts
);
oldtotal = currenttotalmsg;
}
- int queuesize = cometexecutor.getQueue().size();
- if (queuesize < 10000){
- eventbroadcasts = (eventbroadcasts*5)/4;
+
+ if (streaming){
+
+ /*if (queuesize < (spreadnotify?threadcount:1)*300 ){
+ eventbroadcasts = (eventbroadcasts*5)/4;
+ }*/
+ if (queuesize < (spreadnotify?threadcount:1)*100){
+ for (int i=0;i 0){
Thread.sleep(socketreusedelayMilliSec);
}
@@ -287,11 +311,12 @@
private Socket newSocket(int timeout) throws Exception{
Socket socket = new Socket();
- socket.setReuseAddress(false);
- socket.setReceiveBufferSize(8192);
- socket.setSendBufferSize(1024);
+ socket.setReuseAddress(true);
+ //socket.setReceiveBufferSize(2048);
+ //socket.setSendBufferSize(512);
socket.setSoLinger(false, 0);
socket.setSoTimeout(timeout);
+ socket.setTcpNoDelay(true);
socket.connect(connectadr);
return socket;
}
Index: modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java
===================================================================
--- modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (revision 2805)
+++ modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (working copy)
@@ -71,31 +71,34 @@
private int PORT = 18890;
final CometContext test = CometEngine.getEngine().register("GrizzlyAdapter");
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ test.setBlockingNotification(false);
+ }
@Override
protected void tearDown() throws Exception {
super.tearDown();
- test.activeTasks.clear();
test.handlers.clear();
- stopGrizzlyWebServer();
-
+ stopGrizzlyWebServer();
}
public void testOnInterruptExpirationDelay() throws Exception {
- System.out.println("testOnInterruptExpirationDelay - will wait 5 seconds");
- final int delay = 5000;
+ System.out.println("testOnInterruptExpirationDelay - will wait 2 seconds");
+ final int delay = 2000;
test.setExpirationDelay(delay);
newGWS(PORT+=1);
String alias = "/OnInterrupt";
addAdapter(alias, false);
gws.start();
- HttpURLConnection conn = getConnection(alias);
+ HttpURLConnection conn = getConnection(alias,delay+4000);
long t1 = System.currentTimeMillis();
assertEquals(conn.getHeaderField(onInitialize), onInitialize);
assertEquals(conn.getHeaderField(onInterrupt), onInterrupt);
long delta = System.currentTimeMillis() - t1;
- assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-200);
+ assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-250);
assertTrue("comet idletimeout was too late,"+delta+"ms",delta < delay+3000);
}
@@ -109,7 +112,7 @@
Socket s = new Socket("localhost", PORT);
s.setSoLinger(false, 0);
- s.setSoTimeout(5 * 1000);
+ s.setSoTimeout(1 * 1000);
OutputStream os = s.getOutputStream();
String a = "GET " + alias + " HTTP/1.1\n"+"Host: localhost:" + PORT + "\n\n";
System.out.println(" "+a);
@@ -120,66 +123,75 @@
fail("client socket read did not read timeout");
} catch (SocketTimeoutException ex) {
s.close();
- Thread.sleep(3 * 1000);
+ Thread.sleep(500);
assertEquals(onInterrupt, ga.c.wasInterrupt);
}
+ }
+
+ public void testOnTerminate() throws IOException {
+ System.out.println("testOnTerminate ");
+ test.setExpirationDelay(-1);
+ newGWS(PORT+=3);
+ String alias = "/OnTerminate";
+ addAdapter(alias,true);
+ gws.start();
+ for (int i=0;i<10;i++){
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(200);
+ test.notify(onTerminate, CometEvent.TERMINATE);
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ fail("exception:"+ex.getMessage());
+ }
+ }
+ }.start();
+ HttpURLConnection conn = getConnection(alias,1000);
+ assertEquals(conn.getHeaderField(onInitialize) , onInitialize);
+ assertEquals(conn.getHeaderField(onTerminate), onTerminate);
+ conn.disconnect();
+ }
}
-
- public void testOnEvent() throws IOException {
- System.out.println("testOnEvent - will wait 5 seconds");
- newGWS(PORT+=3);
+
+ public void testOnEvent() throws Exception {
+ System.out.println("testOnEvent ");
+ newGWS(PORT+=4);
String alias = "/OnEvent";
addAdapter(alias, true);
test.setExpirationDelay(-1);
gws.start();
- new Thread() {
-
- @Override
- public void run() {
- try {
- Thread.sleep(5 * 1000);
- test.notify(onEvent);
- } catch (Throwable ex) {
- Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
- fail("sleep/notify exception:"+ex.getMessage());
+ int iter = 10;
+ while(iter-->0){
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(150);
+ test.notify(onEvent);
+ } catch (Throwable ex) {
+ Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
+ fail("sleep/notify exception:"+ex.getMessage());
+ }
}
- }
- }.start();
-
- HttpURLConnection conn = getConnection(alias);
- assertEquals(conn.getHeaderField(onInitialize), onInitialize);
- assertEquals(conn.getHeaderField(onEvent), onEvent);
+ }.start();
+ HttpURLConnection conn = getConnection(alias,1000);
+ assertEquals(conn.getHeaderField(onInitialize), onInitialize);
+ assertEquals(conn.getHeaderField(onEvent), onEvent);
+ conn.disconnect();
+ }
}
- public void testOnTerminate() throws IOException {
- System.out.println("testOnTerminate - will wait 5 seconds");
- test.setExpirationDelay(-1);
- newGWS(PORT+=4);
- String alias = "/OnTerminate";
- addAdapter(alias, false);
- gws.start();
- new Thread() {
- @Override
- public void run() {
- try {
- Thread.sleep(5 * 1000);
- test.notify(onTerminate, CometEvent.TERMINATE);
- } catch (Exception ex) {
- fail("exception:"+ex.getMessage());
- Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex);
- }
- }
- }.start();
- HttpURLConnection conn = getConnection(alias);
- assertEquals(conn.getHeaderField(onInitialize) , onInitialize);
- assertEquals(conn.getHeaderField(onTerminate), onTerminate);
- }
+ private HttpURLConnection getConnection(String alias) throws IOException {
+ return getConnection(alias, 40*1000);
+ }
- private HttpURLConnection getConnection(String alias) throws IOException {
+ private HttpURLConnection getConnection(String alias, int readtimeout) throws IOException {
URL url = new URL("http", "localhost", PORT, alias);
HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
- urlConn.setConnectTimeout(10*1000);
- urlConn.setReadTimeout(40*1000);
+ urlConn.setConnectTimeout(5*1000);
+ urlConn.setReadTimeout(readtimeout);
urlConn.connect();
return urlConn;
}
@@ -260,7 +272,6 @@
response.addHeader(onTerminate, event.attachment().toString());
response.getWriter().print(onTerminate);
- event.getCometContext().resumeCometHandler(this);
}
public void onInterrupt(CometEvent event) throws IOException {
Index: modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (working copy)
@@ -40,34 +40,36 @@
import com.sun.grizzly.Controller;
import com.sun.grizzly.comet.CometEvent;
import com.sun.grizzly.comet.CometHandler;
-import java.io.Closeable;
+import com.sun.grizzly.comet.DefaultNotificationHandler;
import java.io.IOException;
import java.util.LinkedList;
import java.util.logging.Logger;
/**
- *
* we queue events in each comethandler to lower the probability
* that slow or massive IO for one comethandler severly delays events to others.
*
+ * only streaming mode can benefit from buffering messages like this.
* only 1 thread at a time is allowed to do IO,
* other threads put events in the queue and return to the thread pool.
*
* a thread initially calls enqueueEvent and stay there until there are no more
- * events in the queue, calling the onEVent method in unsynchronized context for each Event.
+ * events in the queue, calling the onEVent method in synchronized context for each Event.
*
* on IOE in onEvent we terminate.
* we have a limit, to keep memory usage under control.
*
* if queue limit is reached onQueueFull is called, and then we terminate.
*
- * default implementation of onInterrupt and onTerminate performs a .close() if attachment instanceof Closeable
<
*
* whats not optimal is that a worker thread is sticky to the client depending
* uppon available events in the handlers local queue,
* that can in theory allow a few clients to block all threads for extended time.
- * The improvement is that only 1 thread is tied up to a client instead of several
- * being blocked by synchronized.
+ * that effect can make this implementation unusable depending on the scenario,
+ * its not a perfect design be any means.
+ *
+ * The potential improvement is that only 1 worker thread is tied up to a client instead of several
+ * being blocked by synchronized io wait for one comethandler .
*
* @author Gustav Trede
*/
@@ -78,12 +80,8 @@
/**
* used for preventing othe worker threads from the executor event queue from adding events
* to the comethandlers local queue or starting IO logic after shuttdown.
- *
- * {@link DefaultNotificationHandler} sets shuttingdown = true when needed.
- * this way we dont need subclasses to remember to do super calls in the onXX methods.
- * todo: CometEvent.INTERRUPT should do cometHandler.shuttingdown = true; ?
*/
- protected volatile boolean shuttingdown;
+ private boolean shuttingdown;
/**
* max number of events to locally queue for this comethandler.
@@ -128,11 +126,8 @@
* further events in the internal queue.
*/
public void EnQueueEvent(CometEvent event){
- if (shuttingdown)
- return;
synchronized(messageQueue){
if (!isreadyforwork){
- // to prevent add of event when we are shutdown
if (!shuttingdown && queuesize < messageQueueLimit){
messageQueue.add(event);
queuesize++;
@@ -149,12 +144,19 @@
return;
}
try{
- onEvent(event);
- } catch (Throwable ex) {
+ //move synchronized outside the while loop ?
+ synchronized(this){
+ onEvent(event);
+ }
+ } catch (IOException ex) {
shuttingdown = true;
- event.getCometContext().resumeCometHandler(this);
- return;
+ }finally{
+ if (shuttingdown){
+ event.getCometContext().resumeCometHandler(this);
+ return;
+ }
}
+
synchronized(messageQueue){
if (queuesize == messageQueueLimit){
queuefull = true;
@@ -175,7 +177,7 @@
}
/**
- * called in unsynchronized context, not blocking other threads
+ * called in synchronized context.
* when the comethandler's local event queue is full.
* default impl resumes the comethandler
* @param event {@link CometEvent}
@@ -183,14 +185,6 @@
public void onQueueFull(CometEvent event){
event.getCometContext().resumeCometHandler(this);
}
-
- /**
- * prevents further event handling in the enQueue method.
- * existing queued events will be discarded.
- */
- public void shutdownQueue() {
- shuttingdown = true;
- }
/**
* returns the attachment
@@ -229,14 +223,14 @@
}
/**
- * closes the connection if attachment instanceof Closable.
+ *
*/
protected void terminate(){
- if (attachment() instanceof Closeable){
+ /* if (attachment() instanceof Closeable){
try {
((Closeable) attachment()).close();
} catch (IOException ex) { }
- }
+ }*/
}
}
\ No newline at end of file
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (working copy)
@@ -76,7 +76,11 @@
* everytime a {@link CometContext#notify} is invoked. The {@link CometEvent}
* will contains the message that can be pushed back to the remote client,
* cached or ignored. This method can also be used to resume a connection
- * once a notified by invoking {@link CometContext#resumeCometHandler}.
+ * once a notified by invoking {@link CometContext#resumeCometHandler}.
+ * its not optimal to flush outputstream in this method for long polling,
+ * flush is performed in each CometContext.resume call.
+ * flushing multiple times can fragment the data into several tcp packets,
+ * that leads to extra IO and overhead in general due to client ack for each packet etc.
*/
public void onEvent(CometEvent event) throws IOException;
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (working copy)
@@ -1,376 +0,0 @@
-/*
- *
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
- *
- * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
- *
- * The contents of this file are subject to the terms of either the GNU
- * General Public License Version 2 only ("GPL") or the Common Development
- * and Distribution License("CDDL") (collectively, the "License"). You
- * may not use this file except in compliance with the License. You can obtain
- * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
- * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
- * language governing permissions and limitations under the License.
- *
- * When distributing the software, include this License Header Notice in each
- * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
- * Sun designates this particular file as subject to the "Classpath" exception
- * as provided by Sun in the GPL Version 2 section of the License file that
- * accompanied this code. If applicable, add the following below the License
- * Header, with the fields enclosed by brackets [] replaced by your own
- * identifying information: "Portions Copyrighted [year]
- * [name of copyright owner]"
- *
- * Contributor(s):
- *
- * If you wish your version of this file to be governed by only the CDDL or
- * only the GPL Version 2, indicate your decision by adding "[Contributor]
- * elects to include this software in this distribution under the [CDDL or GPL
- * Version 2] license." If you don't indicate a single choice of license, a
- * recipient has the option to distribute your version of this file under
- * either the CDDL, the GPL Version 2 or to extend the choice of license to
- * its licensees as provided above. However, if you add GPL Version 2 code
- * and therefore, elected the GPL Version 2 license, then the option applies
- * only if the new code is made subject to such option by the copyright
- * holder.
- *
- */
-
-package com.sun.grizzly.comet;
-
-import com.sun.grizzly.http.SelectorThread;
-import com.sun.grizzly.util.LinkedTransferQueue;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * NIO {@link Selector} allowing {@link CometHandler} to receive
- * non-blocking requests bytes during request polling.
- *
- * TODO: investigate if its possible to move this functionality to grizzly main
- * selector inorder to lower the extra overhead this 2nd selector is.
- *
- * @author Jeanfrancois Arcand
- * @author Gustav Trede
- */
-public class CometSelector {
-
- /**
- * The {@link CometEngine} singleton
- */
- protected final CometEngine cometEngine;
-
-
- /**
- * The {@link Selector}
- */
- private Selector selector;
-
- /**
- * timestamp when last expireIdleKeys() performed its check
- */
- private long lastIdleCheck;
-
- /**
- * Logger.
- */
- private final Logger logger = SelectorThread.logger();
-
- /**
- *
- */
- private final ByteBuffer dumybuffer = ByteBuffer.allocate(1);
-
- /**
- * The list of {@link SelectionKey} to register with the
- * {@link Selector}
- * TODO: replace with LinkedTransferQueue
- */
- private final LinkedTransferQueue keysToRegister
- = new LinkedTransferQueue();
-
-
- /**
- * New {@link CometSelector}
- * @param cometEngine The {@link CometEngine} singleton
- */
- public CometSelector(CometEngine cometEngine) {
- this.cometEngine = cometEngine;
- }
-
-
- /**
- * Start the {@link Selector} running on its
- * Thread.
- */
- public void start() throws InterruptedException{
- final CountDownLatch isStartedLatch = new CountDownLatch(1);
- new Thread("CometSelector"){{
- setDaemon(true);
- }
-
- @Override
- public void run(){
- try{
- selector = Selector.open();
- } catch(IOException ex){
- // Most probably a fd leak.
- logger.log(Level.SEVERE,"CometSelector.open()",ex);
- return;
- }
- isStartedLatch.countDown();
-
- doSelection();
- }
- }.start();
- isStartedLatch.await();
- }
-
- /**
- * the selection logic
- */
- private void doSelection(){
- while (true){
- int selectorState = 0;
- try{
- try{
- selectorState = selector.select(1000);
- } catch (CancelledKeyException ex){
- if (logger.isLoggable(Level.FINEST)){
- logger.log(Level.FINEST,"CometSelector.open()",ex);
- }
- }
-
- handleSelectedKeys();
- expireIdleKeys();
- registerNewKeys();
-
- } catch (Throwable t){
- handleException(t,null);
- }finally{
- if (selectorState <= 0){ //todo why is this needed ?
- selector.selectedKeys().clear();
- }
- }
- }
- }
-
- /**
- * handle the selected keys
- */
- private void handleSelectedKeys(){
- for (SelectionKey cometKey:selector.selectedKeys()) {
- try{
- if (cometKey.isValid()) {
- CometTask cometTask = (CometTask)cometKey.attachment();
- boolean asyncExec = cometTask.isComethandlerisAsyncregistered();
- if (asyncExec){
- cometTask.setComethandlerisAsyncregistered(false);
- if (cometKey.isReadable()){
- cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_READ));
- cometTask.upcoming_op = CometTask.OP_EVENT.READ;
- }
-
- if (cometKey.isWritable()){
- cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_WRITE));
- cometTask.upcoming_op = CometTask.OP_EVENT.WRITE;
- }
- }
- if (cometTask.getSelectionKey().attachment() == null){
- if (cometTask.cometHandlerNotResumed()){
- if (asyncExec){
- cometTask.execute();
- }else{
- checkIfclientClosedConnection(cometKey);
- }
- }
- } else {
- // logger.warning("cometselector comettask.mainkey has an attachment. ");
- cancelKey(cometKey,false,true, true);
- }
- } else {
- //logger.warning("cometselector select detected invalid cometKey.");
- cancelKey(cometKey,false,true,true);
- }
- }catch(Throwable t){
- handleException(t, cometKey);
- }
- }
- // one shot clear is alot faster then removing each element one by one.
- selector.selectedKeys().clear();
- }
-
- /**
- *
- * @param cometKey
- */
- private void checkIfclientClosedConnection(SelectionKey cometKey) {
- boolean connectionclosed = true;
- try {
- SocketChannel socketChannel = (SocketChannel)cometKey.channel();
- dumybuffer.clear();
- connectionclosed = socketChannel.read(dumybuffer) == -1;
- } catch (Throwable ex) {
- // null we dont want cancelkey to happen here, cause it does not cancel mainKey
- handleException(ex, null);
- }
- finally{
- if (connectionclosed)
- cancelKey(cometKey, true, true, true);
- }
- }
-
- /**
- * perform the registration of new keys.
- * The mainKey is the SelectionKey returned by the
- * Selector used in the SelectorThread class.
- */
- private void registerNewKeys(){
- SelectionKey cometKey = null;
- CometTask cometTask;
- while ((cometTask = keysToRegister.poll()) != null ){
- try{
- SelectionKey mainKey = cometTask.getSelectionKey();
- SocketChannel channel = (SocketChannel)mainKey.channel();
- if (mainKey.isValid() && channel.isOpen()) {
- cometKey = channel.register(selector,SelectionKey.OP_READ);
- cometTask.setCometKey(cometKey);
- cometKey.attach(cometTask);
- cometTask.getCometContext().addActiveCometTask(cometTask);
- cometTask.getCometContext().
- addActiveHandler(cometTask.getCometHandler(), cometKey);
- cometKey = null;
- }
- }catch(Throwable t){
- handleException(t, cometKey);
- }
- }
- }
-
- /**
- * Expires registered {@link SelectionKey}. If a
- * {@link SelectionKey} is expired, the request will be resumed and the
- * HTTP request will complete,
- */
- private void expireIdleKeys(){
- if (selector.keys().isEmpty()){
- return;
- }
-
- final long current = System.currentTimeMillis();
- if (current - lastIdleCheck < 1000){
- return;
- }
-
- lastIdleCheck = current;
- for (SelectionKey cometKey:selector.keys()){
- try{
- CometTask cometTask = (CometTask)cometKey.attachment();
- if (cometTask == null)
- continue;
- if (cometTask.hasExpired(current)){
- cancelKey(cometKey,false,true, true);
- continue;
- }
- /**
- * The connection has been resumed since the timeout is
- * re-attached to the SelectionKey so cancel the Comet key.
- */
- if (cometTask.getSelectionKey().attachment() instanceof Long){
- cometKey.attach(null);
- cometKey.cancel();
- }
- }catch(Throwable t){
- handleException(t, cometKey);
- }
- }
- }
-
- /**
- * handle exceptions for selection logic
- * @param t
- * @param key
- */
- private void handleException(Throwable t, SelectionKey key){
- try{
- cancelKey(key,false,true, true);
- } catch (Throwable t2){
- logger.log(Level.SEVERE,"CometSelector",t2);
- }
- if (logger.isLoggable(Level.FINEST)){
- logger.log(Level.FINEST,"CometSelector",t);
- }
- }
-
-
- /**
- * Cancel the {@link SelectionKey} associated with a suspended connection.
- */
- protected boolean cancelKey(SelectionKey cometKey, boolean cancelMainKey,
- boolean removeCometHandler, boolean notifyInterrupt){
- if (cometKey == null){ //cometcontext.resume can give a null cometkey
- return false;
- }
- boolean status = true;
- CometTask cometTask = null;
- // attach is only atomic since dolphin b06 , hence we must synchronize
- // until we can require dolphin
- // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6436220
- synchronized(cometKey){
- cometTask = (CometTask) cometKey.attach(null);
- if (cometTask != null){
- //synchronizes internally on itself and canceledkeyset,
- //we want hotspot to use lock coarsening.
- cometKey.cancel();
- }
- }
- status = cometTask != null;
- if (status){
- status = cometTask.getCometContext().interrupt(cometTask,
- removeCometHandler, notifyInterrupt);
- cometEngine.flushPostExecute(cometTask.getAsyncProcessorTask());
-
- if (cancelMainKey){
- cometTask.getSelectorThread().cancelKey(cometTask.getSelectionKey());
- }
- }
- return status;
- }
-
- /**
- * Register the {@link SelectionKey} to the {@link Selector}. We
- * cannot register the {@link SelectionKey} directy on the
- * {@link Selector} because there is a deadlock in the VM (see bug XXX).
- */
- public void registerKey(CometTask cometTask){
- if (cometTask.getSelectionKey().isValid() && selector != null){
- cometTask.setExpireTime(System.currentTimeMillis());
- keysToRegister.offer(cometTask);
- selector.wakeup();
- }
- }
-
-
- /**
- * Wakes up the {@link Selector}
- */
- public void wakeup(){
- selector.wakeup();
- }
-
- /**
- * Return the SelectionKey associated with this channel.
- */
- public SelectionKey cometKeyFor(SelectableChannel channel){
- return channel.keyFor(selector);
- }
-
-}
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (working copy)
@@ -38,13 +38,15 @@
package com.sun.grizzly.comet;
+import java.io.Serializable;
+
/**
* Simple event class used to pass information between {@link CometHandler}
* and the Comet implementation.
*
* @author Jeanfrancois Arcand
*/
-public class CometEvent {
+public class CometEvent implements Serializable{
/**
@@ -98,7 +100,7 @@
/**
* The CometContext from where this instance was fired.
*/
- private CometContext cometContext;
+ private transient CometContext cometContext;
/**
@@ -116,6 +118,12 @@
this.type = type;
this.cometContext = context;
}
+
+ public CometEvent(int type, CometContext cometContext, E attachment) {
+ this.type = type;
+ this.attachment = attachment;
+ this.cometContext = cometContext;
+ }
/**
* Return the type of this object.
Index: modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (working copy)
@@ -48,7 +48,7 @@
/**
* Default Notificationhandler that uses a thread pool dedicated to the CometEngine
- * to execute the notification process.
+ * to execute the notification process.
*
* @author Jeanfrancois Arcand
* @author Gustav Trede
@@ -75,7 +75,7 @@
/**
* only used if blockingnotification == false and threadpool != null
*/
- private boolean spreadNotifyToManyToThreads = true;
+ private boolean spreadNotifyToManyToThreads = false;
public DefaultNotificationHandler() {
}
@@ -104,8 +104,21 @@
public void setBlockingNotification(boolean blockingNotification) {
this.blockingNotification = blockingNotification;
}
+
+ /**
+ * if true a notify to Iterator will be spread into one runnable task for
+ * each comethandler.
+ * if false , all comethandlers notify will be executed in 1 Runnable, after each other,
+ *
+ * @param spreadNotifyToManyToThreads
+ */
+ public void setSpreadNotifyToManyToThreads(boolean spreadNotifyToManyToThreads) {
+ this.spreadNotifyToManyToThreads = spreadNotifyToManyToThreads;
+ }
-
+
+
+
/**
* Notify all {@link CometHandler}.
* @param cometEvent the CometEvent used to notify CometHandler
@@ -162,49 +175,36 @@
protected void notify0(CometEvent cometEvent,CometHandler cometHandler) {
try{
switch (cometEvent.getType()) {
- case CometEvent.INTERRUPT:
- if (cometHandler instanceof DefaultConcurrentCometHandler){
- ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue();
- //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do
- cometHandler.onInterrupt(cometEvent);
- }else
- synchronized(cometHandler){
- cometHandler.onInterrupt(cometEvent);
- }
- break;
+ case CometEvent.INTERRUPT:
+ cometHandler.onInterrupt(cometEvent); break;
case CometEvent.NOTIFY:
case CometEvent.READ:
case CometEvent.WRITE:
- if (cometHandler instanceof DefaultConcurrentCometHandler)
+ if (cometHandler instanceof DefaultConcurrentCometHandler){
((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(cometEvent);
- else
- if (cometEvent.getCometContext().isActive(cometHandler))
+ break;
+ }
+ if (cometEvent.getCometContext().isActive(cometHandler)){
synchronized(cometHandler){
cometHandler.onEvent(cometEvent);
}
+ }
break;
case CometEvent.INITIALIZE:
- cometHandler.onInitialize(cometEvent);
- break;
+ cometHandler.onInitialize(cometEvent); break;
case CometEvent.TERMINATE:
- if (cometHandler instanceof DefaultConcurrentCometHandler){
- ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue();
- cometHandler.onTerminate(cometEvent); //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do
- }else
- synchronized(cometHandler){
- cometHandler.onTerminate(cometEvent);
- }
- break;
+ synchronized(cometHandler){
+ cometHandler.onTerminate(cometEvent); break;
+ }
default:
throw ISEempty;
}
} catch (Throwable ex) {
try {
- cometEvent.getCometContext().resumeCometHandler(cometHandler, true);
+ cometEvent.getCometContext().resumeCometHandler(cometHandler);
} catch (Throwable t) {
logger.log(Level.FINE, "Resume phase failed: ", t);
}
- //todo cant log this at WARNING level.. its normal to have alot of failed notifications. imagine several K users in the real world..
logger.log(Level.FINE, "Notification failed: ", ex);
}
}
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (working copy)
@@ -1,9 +1,9 @@
/*
- *
+ *
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
- *
+ *
* Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved.
- *
+ *
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
@@ -11,7 +11,7 @@
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
- *
+ *
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
@@ -20,9 +20,9 @@
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
- *
+ *
* Contributor(s):
- *
+ *
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
@@ -44,127 +44,179 @@
import com.sun.grizzly.ProtocolChain;
import com.sun.grizzly.arp.AsyncProcessorTask;
import com.sun.grizzly.http.SelectorThread;
-import com.sun.grizzly.util.InputReader;
-import com.sun.grizzly.http.TaskBase;
+import com.sun.grizzly.http.Task;
+import com.sun.grizzly.util.SelectedKeyAttachmentLogic;
import com.sun.grizzly.util.WorkerThread;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.logging.Level;
+import java.util.logging.Logger;
/**
* A {@link Task} implementation that allow Grizzly ARP to invokeCometHandler
- * {@link CometHandler} when new data (bytes) are available from the
+ * {@link CometHandler} when new data (bytes) are available from the
* {@link CometSelector}.
*
* @author Jeanfrancois Arcand
+ * @author Gustav Trede
*/
-public class CometTask extends TaskBase{
+public class CometTask extends SelectedKeyAttachmentLogic implements Runnable{
-
- public enum OP_EVENT { READ, WRITE }
-
-
+ private static final Logger logger = SelectorThread.logger();
+
/**
- * The current non blocking operation.
+ * The {@link CometContext} associated with this instance.
*/
- protected OP_EVENT upcoming_op = OP_EVENT.READ;
-
-
+ protected final CometContext cometContext;
+
/**
- * The {@link CometContext} associated with this instance.
+ * The {@link CometHandler} associated with this task.
*/
- private CometContext cometContext;
-
-
+ protected final CometHandler cometHandler;
+
/**
- * The {@link CometSelector} .
+ * The {@link AsyncProcessorTask}
*/
- private CometSelector cometSelector;
+ private AsyncProcessorTask asyncProcessorTask;
-
/**
- * The time in milliseconds before this object was registered the
- * {@link SelectionKey} on the {@link CometSelector}
+ * true if comethandler is registered for async IO in cometcontext.
+ * used to optmize:
+ * dont give simple read == -1 operations to thread pool
*/
- private long expireTime ;
+ protected volatile boolean cometHandlerIsAsyncRegistered;
-
/**
- * used by cometselector to optmize:
- * dont give simple read == -1 operations to thread pool
+ * The current non blocking operation.
*/
- private volatile boolean comethandlerisAsyncregistered;
-
+ protected boolean upcoming_op_isread;
+
/**
- * The InputStream used to read bytes from the {@link CometSelector}
+ * true if run() should call cometcontext.interrupt0
*/
- private InputReader cometInputStream;
-
-
+ protected boolean callInterrupt;
+
/**
- * The CometSelector registered key.
+ * true if interrupt should flushAPT
*/
- private SelectionKey cometKey;
+ protected boolean interruptFlushAPT;
+
/**
- * The {@link AsyncProcessorTask}
+ * New {@link CometTask}.
*/
- private AsyncProcessorTask asyncProcessorTask;
+ public CometTask(CometContext cometContext, CometHandler cometHandler) {
+ this.cometContext = cometContext;
+ this.cometHandler = cometHandler;
+ }
/**
- * The {@link CometEvent} associated with this task.
+ * performs doTask() or cometContext.interrupt0
*/
- private CometEvent event;
-
+ public void run(){
+ if (callInterrupt){
+ cometContext.interrupt0(this, true, interruptFlushAPT, true);
+ }else{
+ try{
+ doTask();
+ } catch (IOException ex){
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
/**
- * The {@link CometHandler} associated with this task.
+ * {@inheritDoc}
*/
- private CometHandler cometHandler;
+ @Override
+ public long getIdleTimeoutDelay() {
+ return cometContext.getExpirationDelay();
+ }
/**
- * The CometWriter associated with this task.
+ * this should never be called for for comet, due to we are nulling the attachment
+ * and completely overriding the selector.select logic.
+ * called by grizzly when the selectionkey is canceled and its socket closed.
+ *
+ * @param selectionKey
*/
- private CometWriter writer;
-
-
+ @Override
+ public void release(SelectionKey selectionKey) {
+ //logger.warning("cometTask.release() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+selectionKey.attachment());
+ //cometContext.interrupt(this, true, false,false, true);
+ }
+
/**
- * The CometReader associated with this task.
+ * {@inheritDoc}
*/
- private CometReader reader;
-
-
+ @Override
+ public boolean timedOut(SelectionKey key){
+ //System.err.println("cometTask.timedout() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+key.attachment());
+ cometContext.interrupt(this, true, true, true, true);
+ return false;
+ }
+
/**
- * true if the CometHandler has been registered for OP_READ
- * events.
- * false by default. java lang specification states that.
+ * {@inheritDoc}
*/
- private boolean asyncReadSupported ;
-
-
+ @Override
+ public void handleSelectedKey(SelectionKey selectionKey) {
+ if (!selectionKey.isValid()){
+ cometContext.interrupt(this, true, false,true, true);
+ return;
+ }
+ if (cometHandlerIsAsyncRegistered){
+ if (selectionKey.isReadable()){
+ selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ));
+ upcoming_op_isread = true;
+ }
+ if (selectionKey.isWritable()){
+ selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_WRITE));
+ upcoming_op_isread = false;
+ }
+ asyncProcessorTask.getThreadPool().execute(this);
+ }
+ else{
+ checkIfClientClosedConnection(selectionKey);
+ }
+ }
+
/**
- * Is this Task suspended.
+ * checks if client has closed the connection.
+ * the check is done by trying to read 1 byte that is trown away.
+ * only used for non async registered comethandler.
+ * @param mainKey
*/
- private boolean isSuspended = false;
-
- /**
- * New {@link CometTask}.
- */
- public CometTask() {
+ private void checkIfClientClosedConnection(SelectionKey mainKey) {
+ boolean connectionclosed = true;
+ try {
+ connectionclosed = ((SocketChannel)mainKey.channel()).
+ read(ByteBuffer.allocate(1)) == -1;
+ } catch (IOException ex) {
+
+ }
+ finally{
+ if (connectionclosed){
+ cometContext.interrupt(this, true, false,true, true);
+ }else{
+ //cometContext.interrupt(this, false, false, true,false, true);
+ //System.err.println("**** ready key detected : "+mainKey.attachment() +" isactive:"+cometContext.isActive(cometHandler));
+ }
+ }
}
-
+
/**
* Notify the {@link CometHandler} that bytes are available for read.
* The notification will invoke all {@link CometContext}
*/
- public void doTask() throws IOException{
+ public void doTask() throws IOException{
// The CometHandler has been resumed.
if (!cometContext.isActive(cometHandler) ){
return;
}
-
/**
* The CometHandler in that case is **always** invoked using this
* thread so we can re-use the Thread attribute safely.
@@ -172,41 +224,31 @@
ByteBuffer byteBuffer = null;
boolean connectionClosed = false;
boolean clearBuffer = true;
+ final SelectionKey key = getSelectionKey();
try{
-
- if (cometInputStream == null){
- cometInputStream = new InputReader();
- }
-
- cometInputStream.setSelectionKey(cometKey);
byteBuffer = ((WorkerThread)Thread.currentThread()).getByteBuffer();
if (byteBuffer == null){
- byteBuffer = ByteBuffer.allocate(selectorThread.getBufferSize());
+ byteBuffer = ByteBuffer.allocate(asyncProcessorTask.getSelectorThread().getBufferSize());
((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer);
} else {
byteBuffer.clear();
}
- cometInputStream.setByteBuffer(byteBuffer);
- SocketChannel socketChannel = (SocketChannel)cometKey.channel();
- if (upcoming_op == OP_EVENT.READ){
+ SocketChannel socketChannel = (SocketChannel)key.channel();
+ if (upcoming_op_isread){
/*
* We must execute the first read to prevent client abort.
- */
- int nRead = socketChannel.read(byteBuffer);
+ */
+ int nRead = socketChannel.read(byteBuffer);
if (nRead == -1 ){
connectionClosed = true;
- } else {
- /*
+ } else {
+ /*
* This is an HTTP pipelined request. We need to resume
- * the continuation and invoke the http parsing
+ * the continuation and invoke the http parsing
* request code.
*/
- if (!asyncReadSupported){
- // Don't let the main Selector (SelectorThread) starts
- // handling the pipelined request.
- key.attach(Long.MIN_VALUE);
-
+ if (!cometHandlerIsAsyncRegistered){
/**
* Something when wrong, most probably the CometHandler
* has been resumed or removed by the Comet implementation.
@@ -214,57 +256,57 @@
if (!cometContext.isActive(cometHandler)){
return;
}
-
+
// Before executing, make sure the connection is still
- // alive. This situation happens with SSL and there
+ // alive. This situation happens with SSL and there
// is not a cleaner way fo handling the browser closing
// the connection.
- nRead = socketChannel.read(byteBuffer);
+ nRead = socketChannel.read(byteBuffer);
if (nRead == -1){
connectionClosed = true;
return;
}
-
- cometContext.resumeCometHandler(cometHandler, false);
+ //resume without remove:
+ try{
+ cometHandler.onInterrupt(cometContext.eventInterrupt);
+ }catch(IOException e) { }
+ CometEngine.cometEngine.flushPostExecute(this,true,false);
+
clearBuffer = false;
-
+
Controller controller = getSelectorThread().getController();
- ProtocolChain protocolChain =
+ ProtocolChain protocolChain =
controller.getProtocolChainInstanceHandler().poll();
- NIOContext ctx = (NIOContext)controller.pollContext(key);
+ NIOContext ctx = (NIOContext)controller.pollContext(key);
ctx.setController(controller);
ctx.setSelectionKey(key);
ctx.setProtocolChain(protocolChain);
ctx.setProtocol(Protocol.TCP);
- protocolChain.execute(ctx);
+ protocolChain.execute(ctx);
} else {
- byteBuffer.flip();
- reader = new CometReader();
+ byteBuffer.flip();
+ CometReader reader = new CometReader();
reader.setNRead(nRead);
reader.setByteBuffer(byteBuffer);
- if (event == null)
- event = new CometEvent();
- event.type = CometEvent.READ;
+ CometEvent event = new CometEvent(CometEvent.READ,cometContext);
event.attach(reader);
cometContext.invokeCometHandler(event,cometHandler);
reader.setByteBuffer(null);
-
+
// This Reader is now invalid. Any attempt to use
// it will results in an IllegalStateException.
reader.setReady(false);
}
}
- } else if (upcoming_op == OP_EVENT.WRITE){
- if (event == null)
- event = new CometEvent();
- event.type = CometEvent.WRITE;
- writer = new CometWriter();
+ } else {
+ CometEvent event = new CometEvent(CometEvent.WRITE,cometContext);
+ CometWriter writer = new CometWriter();
writer.setChannel(socketChannel);
event.attach(writer);
- cometContext.invokeCometHandler(event,cometHandler);
-
+ cometContext.invokeCometHandler(event,cometHandler);
+
// This Writer is now invalid. Any attempt to use
- // it will results in an IllegalStateException.
+ // it will results in an IllegalStateException.
writer.setReady(false);
}
} catch (IOException ex){
@@ -275,170 +317,84 @@
}
} catch (Throwable t){
connectionClosed = true;
- SelectorThread.logger().log(Level.SEVERE,"Comet exception",t);
- } finally {
+ SelectorThread.logger().log(Level.SEVERE,"Comet exception",t);
+ } finally {
+ cometHandlerIsAsyncRegistered = false;
+
// Bug 6403933
if (connectionClosed){
- cometSelector.cancelKey(cometKey,true,true, true);
+ asyncProcessorTask.getSelectorThread().cancelKey(key);
}
-
+
if (clearBuffer && byteBuffer != null){
byteBuffer.clear();
}
- asyncReadSupported = false;
}
}
- public void setComethandlerisAsyncregistered(boolean comethandlerisAsyncregistered) {
- this.comethandlerisAsyncregistered = comethandlerisAsyncregistered;
+ /**
+ * sets the comettask async interest flag in the comettask
+ * @param
+ */
+ public void setComethandlerIsAsyncRegistered(boolean cometHandlerIsAsyncRegistered) {
+ this.cometHandlerIsAsyncRegistered = cometHandlerIsAsyncRegistered;
}
- public boolean isComethandlerisAsyncregistered() {
- return comethandlerisAsyncregistered;
- }
-
/**
- * returns true if the CometHandler has not been resumed / removed.
- * allows cometSelector to do a fast check before leting threadpool execute the comettask
+ * returns true if the comethandler is registered for async io
* @return
*/
- public boolean cometHandlerNotResumed(){
- return cometContext.isActive(cometHandler);
+ public boolean isComethandlerAsyncRegistered() {
+ return cometHandlerIsAsyncRegistered;
}
-
+
/**
* Return the {@link CometContext} associated with this instance.
- * @return CometContext the {@link CometContext} associated with this
+ * @return CometContext the {@link CometContext} associated with this
* instance.
*/
public CometContext getCometContext() {
return cometContext;
}
-
-
- /**
- * Set the {@link CometContext} used to invokeCometHandler {@link CometHandler}.
- * @param cometContext the {@link CometContext} used to invokeCometHandler {@link CometHandler}
- */
- public void setCometContext(CometContext cometContext) {
- this.cometContext = cometContext;
- }
-
/**
- * Recycle this object.
+ * returns the {@link AsyncProcessorTask }
+ * @return {@lnk AsyncProcessorTask }
*/
- @Override
- public void recycle(){
- isSuspended = false;
- key = null;
- cometContext = null;
- asyncReadSupported = false;
- if(cometInputStream != null) {
- cometInputStream.recycle();
- }
+ public AsyncProcessorTask getAsyncProcessorTask() {
+ return asyncProcessorTask;
}
-
/**
- * Return the {@link CometSelector}
- * @return CometSelector the {@link CometSelector}
+ * sets the {@link AsyncProcessorTask }
+ * @param {@link AsyncProcessorTask }
*/
- public CometSelector getCometSelector() {
- return cometSelector;
+ public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) {
+ this.asyncProcessorTask = asyncProcessorTask;
}
-
/**
- * Set the {@link CometSelector}
- * @param cometSelector the {@link CometSelector}
- */
- public void setCometSelector(CometSelector cometSelector) {
- this.cometSelector = cometSelector;
- }
-
-
- /**
- * Return the time in milliseconds before this object was registered the
- * {@link SelectionKey} on the {@link CometSelector}
- * @return long Return the time in milliseconds before this object was
- * registered the {@link SelectionKey} on the
- * {@link CometSelector}
+ * returns selectionkey
+ * @return
*/
- public long getExpireTime() {
- return expireTime;
+ public SelectionKey getSelectionKey() {
+ return asyncProcessorTask.getAsyncExecutor().getProcessorTask().getSelectionKey();
}
-
/**
- * Set the time in milliseconds before this object was registered the
- * {@link SelectionKey} on the {@link CometSelector}
- * @param expireTime Return the time in milliseconds before this object was
- * registered the {@link SelectionKey} on the
- * {@link CometSelector}
- */
- public void setExpireTime(long expireTime) {
- this.expireTime = expireTime;
- }
-
-
- /**
- * Return the {@link CometSelector}'s {@link SelectionKey}.
+ * returns the {@link AsyncProcessorTask }
+ * @return {@link AsyncProcessorTask }
*/
- public SelectionKey getCometKey() {
- return cometKey;
+ private SelectorThread getSelectorThread(){
+ return asyncProcessorTask.getSelectorThread();
}
-
/**
- * Set the {@link CometSelector}'s {@link SelectionKey}.
+ * returns the {@link CometHandler }
+ * @return {@link CometHandler }
*/
- public void setCometKey(SelectionKey cometKey) {
- this.cometKey = cometKey;
- }
-
-
- public boolean isAsyncReadSupported() {
- return asyncReadSupported;
- }
-
-
- public void setAsyncReadSupported(boolean asyncReadSupported) {
- this.asyncReadSupported = asyncReadSupported;
- }
-
- /**
- * Return true if cometContext.getExpirationDelay() != -1
- * && timestamp - expireTime >= cometContext.getExpirationDelay();
- * @param timestamp
- * @return
- */
- protected boolean hasExpired(long timestamp){
- long expdelay = cometContext.getExpirationDelay();
- return expdelay != -1 && timestamp - expireTime >= expdelay;
- }
-
- public AsyncProcessorTask getAsyncProcessorTask() {
- return asyncProcessorTask;
- }
-
- public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) {
- this.asyncProcessorTask = asyncProcessorTask;
- }
-
public CometHandler getCometHandler() {
return cometHandler;
}
- public void setCometHandler(CometHandler cometHandler) {
- this.cometHandler = cometHandler;
- }
-
- public boolean isSuspended() {
- return isSuspended;
- }
-
- public void setSuspended(boolean isSuspended) {
- this.isSuspended = isSuspended;
- }
}
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (working copy)
@@ -40,6 +40,7 @@
import com.sun.grizzly.comet.concurrent.DefaultConcurrentCometHandler;
import com.sun.grizzly.http.SelectorThread;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.Iterator;
@@ -100,7 +101,7 @@
* is doing. It is not recommended to use attributes if this
* {@link CometContext} is not shared amongs multiple
* context path (uses {@link HttpServletSession} instead).
- *
+ *
* @author Jeanfrancois Arcand
* @author Gustav Trede
*/
@@ -135,13 +136,6 @@
/**
- * The {@link CometSelector} used to register {@link SelectionKey}
- * for upcoming bytes.
- */
- protected CometSelector cometSelector;
-
-
- /**
* The {@link CometContext} continuationType. See {@link CometEngine}
*/
protected int continuationType = CometEngine.AFTER_SERVLET_PROCESSING;
@@ -151,14 +145,14 @@
* The default delay expiration before a {@link CometContext}'s
* {@link CometHandler} are interrupted.
*/
- private long expirationDelay = 30 * 1000;
+ private long expirationDelay ;
/**
* true if the caller of {@link #notify} should block when
* notifying other CometHandler.
*/
- protected boolean blockingNotification = false;
+ protected boolean blockingNotification;
/**
@@ -168,23 +162,20 @@
/**
- * timestamp for last performed resetSuspendidletimeout.
+ * timestamp for next idlecheck
* used to limit the frequency of actual performed resets.
*/
- private volatile long lastIdleReset;
+ private volatile long nextidleclear;
/**
- * Current associated list of {@link CometTask}
- */
- protected final ConcurrentHashMap activeTasks;
-
- /**
* The list of registered {@link CometHandler}
*/
- protected final ConcurrentHashMap handlers;
+ protected final ConcurrentHashMap handlers;
- private final CometEvent eventInterrupt;
+ protected final CometEvent eventInterrupt;
+ protected final CometEvent eventTerminate;
+
private final CometEvent eventInitialize;
private static final IllegalStateException ISE = new IllegalStateException(INVALID_COMET_HANDLER);
@@ -193,26 +184,34 @@
new IllegalStateException("Make sure you have enabled Comet " +
"or make sure the Thread invoking that method is the same " +
"as the Servlet.service() Thread.");
-
- // ---------------------------------------------------------------------- //
-
-
+
/**
* Create a new instance
* @param topic the context path
* @param type when the Comet processing will happen (see {@link CometEngine}).
*/
public CometContext(String topic, int continuationType) {
- this.topic = topic;
+ this.topic = topic;
this.continuationType = continuationType;
- this.attributes = new ConcurrentHashMap();
- this.handlers = new ConcurrentHashMap(16,0.75f,64);
- this.activeTasks = new ConcurrentHashMap(16,0.75f,64);
- this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this);
- this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this);
+ this.attributes = new ConcurrentHashMap();
+ this.handlers = new ConcurrentHashMap(8,0.75f,64);
+ this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this);
+ this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this);
+ this.eventTerminate = new CometEvent(CometEvent.TERMINATE,this);
+ initDefaultValues();
}
/**
+ * init of default values.
+ * used by constructor and the cache recycle mechanism
+ */
+ private void initDefaultValues() {
+ blockingNotification = false;
+ expirationDelay = 30*1000;
+ nextidleclear = 0;
+ }
+
+ /**
* Get the context path associated with this instance.
* @return topic the context path associated with this instance
* @deprecated - use getTopic.
@@ -289,14 +288,11 @@
if (!CometEngine.getEngine().isCometEnabled()){
throw cometNotEnabled;
}
- handlers.put(handler, CometEngine.dumykey);
- // is it ok that we only manage one adcomethandler call ?
- CometTask cometTask = new CometTask();
- cometTask.setCometContext(this);
- cometTask.setCometHandler(handler);
- cometTask.setSuspended(alreadySuspended);
+ // is it ok that we only manage one addcomethandler call per thread ?
+ // else we can use a list of handlers to add inside tlocal
+ CometTask cometTask = new CometTask(this,handler);
+ cometTask.upcoming_op_isread = alreadySuspended;
CometEngine.updatedContexts.set(cometTask);
-
return handler.hashCode();
}
@@ -316,6 +312,7 @@
/**
* Retrieve a {@link CometHandler} using its based on its {@link CometHandler#hashCode};
*/
+ @Deprecated
public CometHandler getCometHandler(int hashCode){
for (CometHandler handler:handlers.keySet()){
if (handler.hashCode() == hashCode )
@@ -328,20 +325,28 @@
* Recycle this object.
*/
public void recycle(){
+ try{
+ notify(this,CometEvent.TERMINATE);
+ } catch (IOException ex) {
+
+ }
handlers.clear();
attributes.clear();
- activeTasks.clear();
topic = null;
+ notificationHandler = null;
+ initDefaultValues();
+ // add check for datastructure size, if cometcontext had large
+ // datastructes its probably not optimal to waste RAM with caching it
+ CometEngine.cometEngine.cometContextCache.offer(this);
}
/**
- * adds a {@link CometHandler} to the active set
- * @param handler {@link CometHandler}
- * @param cometKey {@link SelectionKey}
+ * adds a {@link CometTask} to the active set
+ * @param cometTask {@link CometTask}
*/
- protected void addActiveHandler(CometHandler handler, SelectionKey cometKey){
- handlers.put(handler, cometKey);
+ protected void addActiveHandler(CometTask cometTask){
+ handlers.put(cometTask.cometHandler, cometTask);
}
/**
@@ -358,7 +363,9 @@
if (cometHandler instanceof DefaultConcurrentCometHandler){
((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(event);
}else{
- cometHandler.onEvent(event);
+ synchronized(cometHandler){
+ cometHandler.onEvent(event);
+ }
}
}
@@ -385,12 +392,10 @@
* @return true if the operation succeeded.
*/
public boolean removeCometHandler(CometHandler handler,boolean resume){
- CometEngine.updatedContexts.set(null);
- SelectionKey key = handlers.remove(handler);
- if (key != null){
+ CometTask task = handlers.remove(handler);
+ if (task != null){
if (resume){
- CometEngine.getEngine().flushPostExecute(
- ((CometTask)key.attachment()).getAsyncProcessorTask());
+ CometEngine.getEngine().flushPostExecute(task,true,false);
}
return true;
}
@@ -405,22 +410,18 @@
* @param hashCode The hashcode of the CometHandler to remove.
* @return true if the operation succeeded.
*/
+ @Deprecated
public boolean removeCometHandler(int hashCode){
- CometEngine.updatedContexts.set(null);
- Iterator iterator = handlers.keySet().iterator();
- CometHandler handler = null;
- while (iterator.hasNext()){
- handler = iterator.next();
+ CometHandler handler_ = null;
+ for (CometHandler handler:handlers.keySet()){
if (handler.hashCode() == hashCode){
- SelectionKey key = handlers.get(handler);
- if (key == null){
- logger.warning(ALREADY_REMOVED);
- return false;
- }
- iterator.remove();
- return true;
- }
+ handler_ = handler;
+ break;
+ }
}
+ if (handler_ != null){
+ return handlers.remove(handler_) != null;
+ }
return false;
}
@@ -435,70 +436,68 @@
* @return true if the operation succeeded.
*/
public boolean resumeCometHandler(CometHandler handler){
- return resumeCometHandler(handler,true);
- }
-
-
- /**
- * Resume the suspended response. A response can only be suspended when
- * {@link CometContext#addCometHandler} was called first.
- *
- * @param handler The CometHandler associated with the current continuation.
- * @param remove true if the CometHandler needs to be removed.
- * @return true if the operation succeeded.
- */
- protected boolean resumeCometHandler(CometHandler handler, boolean remove){
- CometEngine.updatedContexts.set(null);
- boolean b= cometSelector.cancelKey(handlers.get(handler), false, remove, false);
-
- // Try a second time to locate the associated CometTask
- if (!b){
- for (CometTask cometTask:activeTasks.keySet()){
- if (cometTask.getCometHandler() == handler){
- interrupt(cometTask, remove, false);
- CometEngine.getEngine().flushPostExecute(
- cometTask.getAsyncProcessorTask());
- activeTasks.remove(cometTask);
- return true;
- }
- }
+ boolean status = interrupt(handlers.get(handler),false,true,false,false);
+ if (status){
+ try {
+ handler.onTerminate(eventTerminate);
+ } catch (IOException ex) { }
}
- return b;
+ return status;
}
/**
* Interrupt a {@link CometHandler} by invoking {@link CometHandler#onInterrupt}
*/
- protected boolean interrupt(CometTask task,boolean removeCometHandler,
- boolean notifyInterrupt) {
-
- boolean status = true;
- try{
- if (removeCometHandler){
- status = (handlers.remove(task.getCometHandler()) != null);
- if (status && notifyInterrupt){
- task.getCometHandler().onInterrupt(eventInterrupt);
- }else{
- logger.fine(ALREADY_REMOVED);
+ protected boolean interrupt(final CometTask task,
+ final boolean notifyInterrupt, final boolean flushAPT,
+ final boolean cancelkey, boolean asyncExecution) {
+ if (task != null && handlers.remove(task.cometHandler) != null){
+ final SelectionKey key = task.getSelectionKey();
+ // setting attachment non asynced to ensure grizzly dont keep calling us
+ key.attach(System.currentTimeMillis());
+ if (asyncExecution){
+ if (cancelkey){
+ // dont want to do that in non selector thread:
+ // canceled key wont get canceled again due to isvalid check
+ key.cancel();
}
+ task.callInterrupt = true;
+ task.interruptFlushAPT = flushAPT;
+ //((WorkerThreadImpl)Thread.currentThread()).
+ // getPendingIOhandler().addPendingIO(task);
+ task.run();
+
+ }else{
+ interrupt0(task, notifyInterrupt, flushAPT, cancelkey);
}
- } catch (Throwable ex){
- status = false;
- logger.log(Level.FINE,"Unable to interrupt",ex);
- }finally{
- activeTasks.remove(task);
- return status;
+ return true;
}
+ return false;
}
-
+
/**
+ * interrupt logic in its own method, so it can be executed either async or sync.
+ * cometHandler.onInterrupt is performed async due to its functionality is unknown,
+ * hence not safe to run in the performance critical selector thread.
+ */
+ protected void interrupt0(CometTask task,
+ boolean notifyInterrupt, boolean flushAPT, boolean cancelkey){
+ if (notifyInterrupt){
+ try{
+ task.cometHandler.onInterrupt(eventInterrupt);
+ }catch(IOException e) { }
+ }
+ CometEngine.cometEngine.flushPostExecute(task,flushAPT,cancelkey);
+ }
+
+ /**
* Return true if this {@link CometHandler} is still active, e.g. there is
* still a suspended connection associated with it.
*
* @return true
*/
public boolean isActive(CometHandler handler){
- return handlers.containsKey(handler) || CometEngine.updatedContexts.get() != null;
+ return handlers.containsKey(handler);
}
/**
@@ -507,7 +506,7 @@
* of type NOTIFY.
* @param attachment An object shared amongst {@link CometHandler}.
*/
- public void notify(final E attachment) throws IOException{
+ public void notify(final Object attachment) throws IOException{
notify(attachment, CometEvent.NOTIFY);
}
@@ -527,7 +526,7 @@
* @param cometHandlerID Notify a single CometHandler.
* @deprecated - use notify(attachment,eventType,CometHandler;
*/
- public void notify(final E attachment,final int eventType,final int cometHandlerID)
+ public void notify(final Object attachment,final int eventType,final int cometHandlerID)
throws IOException{
notify(attachment,eventType,getCometHandler(cometHandlerID));
}
@@ -537,7 +536,7 @@
* @param attachment An object shared amongst {@link CometHandler}.
* @param {@link CometHandler} to notify.
*/
- public void notify(final E attachment,final CometHandler cometHandler)
+ public void notify(final Object attachment,final CometHandler cometHandler)
throws IOException{
notify(attachment,CometEvent.NOTIFY,cometHandler);
}
@@ -556,21 +555,20 @@
* @param type The type of notification.
* @param {@link CometHandler} to notify.
*/
- public void notify(final E attachment,final int eventType,final CometHandler cometHandler)
+ public void notify(final Object attachment,final int eventType,final CometHandler cometHandler)
throws IOException{
if (cometHandler == null){
throw ISE;
}
- CometEvent event = new CometEvent(eventType,this);
- event.attach(attachment);
+ CometEvent event = new CometEvent(eventType,this,attachment);
notificationHandler.setBlockingNotification(blockingNotification);
notificationHandler.notify(event,cometHandler);
- if (event.getType() == CometEvent.TERMINATE
+ if (event.getType() == CometEvent.TERMINATE
|| event.getType() == CometEvent.INTERRUPT) {
resumeCometHandler(cometHandler);
} else {
resetSuspendIdleTimeout();
- }
+ }
}
@@ -588,14 +586,12 @@
* @param attachment An object shared amongst {@link CometHandler}.
* @param type The type of notification.
*/
- public void notify(final E attachment,final int eventType)
- throws IOException{
- CometEvent event = new CometEvent(eventType,this);
- event.attach(attachment);
+ public void notify(Object attachment,int eventType)throws IOException {
+ CometEvent event = new CometEvent(eventType,this,attachment);
Iterator iterator = handlers.keySet().iterator();
notificationHandler.setBlockingNotification(blockingNotification);
notificationHandler.notify(event,iterator);
- if (event.getType() == CometEvent.TERMINATE
+ if (event.getType() == CometEvent.TERMINATE
|| event.getType() == CometEvent.INTERRUPT) {
while(iterator.hasNext()){
resumeCometHandler(iterator.next());
@@ -625,16 +621,18 @@
protected void resetSuspendIdleTimeout(){
if (expirationDelay != -1){
long timestamp = System.currentTimeMillis();
- // not threadsafe, but that will only lead to a few extra idle checks.
- // it will still be a major win.
- if (timestamp - lastIdleReset >= 1000){
- lastIdleReset = timestamp;
- for (CometTask cometTask:activeTasks.keySet()){
- cometTask.setExpireTime(timestamp);
+ if (timestamp > nextidleclear){
+ synchronized(handlers){
+ if (timestamp > nextidleclear){
+ nextidleclear = timestamp+1000;
+ for (CometTask cometTask:handlers.values()){
+ cometTask.setTimeout(timestamp);
+ }
+ }
}
}
}
- }
+ }
/**
@@ -675,38 +673,22 @@
* @return true if the operation worked.
*/
private boolean doAsyncRegister(CometHandler handler, int interest){
- SelectionKey cometKey = null;
if (handler != null) {
- cometKey = handlers.get(handler);
- }
- if (handler == null || cometKey == null) {
- throw ISE;
- }
-
- CometTask cometTask = (CometTask)cometKey.attachment();
- if (cometTask != null){
- cometKey.interestOps(cometKey.interestOps() | interest);
- if (interest == SelectionKey.OP_READ){
- cometTask.setAsyncReadSupported(true);
+ CometTask task = handlers.get(handler);
+ if (task != null) {
+ SelectionKey mainKey = task.getSelectionKey();
+ if (mainKey != null){
+ mainKey.interestOps(mainKey.interestOps() | interest);
+ task.setComethandlerIsAsyncRegistered(true);
+ return true;
+ }
}
- cometTask.setComethandlerisAsyncregistered(true);
- return true;
}
throw ISE;
}
/**
- * Set the {@link CometSelector} associated with this instance.
- * @param CometSelector the {@link CometSelector} associated with
- * this instance.
- */
- protected void setCometSelector(CometSelector cometSelector) {
- this.cometSelector = cometSelector;
- }
-
-
- /**
* Helper.
*/
@Override
@@ -741,19 +723,9 @@
public Set getCometHandlers(){
return handlers.keySet();
}
+
-
-
/**
- * Add a {@link CometTask} to the active list.
- * @param cometTask
- */
- protected void addActiveCometTask(CometTask cometTask){
- activeTasks.put(cometTask,Boolean.TRUE);
- }
-
-
- /**
* Return true if the invoker of {@link #notify} should block when
* notifying Comet Handlers.
*/
@@ -788,13 +760,4 @@
return notificationHandler;
}
- /**
- * Return the current set of active {@link CometTask}
- * @return
- */
- protected Set getActiveTasks() {
- return activeTasks.keySet();
- }
-
}
-
Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java
===================================================================
--- modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (revision 2805)
+++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (working copy)
@@ -41,12 +41,14 @@
import com.sun.grizzly.arp.AsyncTask;
import com.sun.grizzly.http.SelectorThread;
import com.sun.grizzly.arp.AsyncProcessorTask;
+import com.sun.grizzly.http.ProcessorTask;
import com.sun.grizzly.util.LinkedTransferQueue;
+import com.sun.grizzly.util.TestThreadPool;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -123,7 +125,7 @@
/**
* The single instance of this class.
*/
- private final static CometEngine cometEngine = new CometEngine();
+ protected final static CometEngine cometEngine = new CometEngine();
/**
@@ -133,11 +135,6 @@
/**
- * The {@link CometSelector} used to poll requests.
- */
- protected final CometSelector cometSelector;
-
- /**
* cached CometContexts
*/
protected final LinkedTransferQueue cometContextCache;
@@ -154,7 +151,7 @@
*/
protected final static ThreadLocal updatedContexts = new ThreadLocal();
- protected static final SelectionKey dumykey = new SelectionKey() {
+ private static final SelectionKey dumykey = new SelectionKey() {
public SelectableChannel channel() {throw ISE;}
public int interestOps() {throw ISE;}
public SelectionKey interestOps(int ops) {throw ISE;}
@@ -165,33 +162,31 @@
};
/**
- * Creat a singleton and initialize all lists required. Also create and
- * start the {@link CometSelector}
+ * Creat a singleton and initialize all lists required.
*/
protected CometEngine() {
- cometSelector = new CometSelector(this);
- try{
- cometSelector.start();
- } catch(InterruptedException ex){
- logger.log(Level.SEVERE,"Unable to start CometSelector",ex);
- }
-
cometContextCache = new LinkedTransferQueue();
activeContexts = new ConcurrentHashMap(16,0.75f,64);
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
- 8,
+ /*ExecutorService tpe = new ThreadPoolExecutor(
64,
+ 64,
30L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue(),
+ //new LinkedTransferQueue(),
+ new LinkedBlockingQueue(),
new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger();
public Thread newThread(Runnable r) {
- return new Thread(r, "CometWorker-"+counter.incrementAndGet());
+ //return new Thread(r, "CometWorker-"+counter.incrementAndGet());
+ return new WorkerThreadImpl(null, "CometWorker-"+counter.incrementAndGet(), r, 0);
}
- });
- //tpe.allowCoreThreadTimeOut(true);
+ }); */
+
+ //ExecutorService tpe = threadPool = new DefaultExecutorService(4, 8, 30,
+ // TimeUnit.SECONDS, new LinkedBlockingQueue(), "CometWorker-");
+ ExecutorService tpe = new com.sun.grizzly.util.FixedThreadPool(8,"CometWorker");
+ //ExecutorService tpe = new NewDefaultThreadPool("CometWorker",4,64,15,TimeUnit.SECONDS);
threadPool = tpe;
}
@@ -215,6 +210,7 @@
/**
* sets the default threadpool that DefaultNotificationHandler use.
+ * shuttdownnow is called on the existing threadpool.
* does notupdate existing notificationhandlers
*/
public void setThreadPool(ExecutorService threadPool) {
@@ -224,7 +220,15 @@
}
}
+ /**
+ * returns the threadpool comet is using
+ * @return ExecutorService
+ */
+ public ExecutorService getThreadPool() {
+ return threadPool;
+ }
+
/**
* Unregister the {@link CometHandler} to the list of the
* {@link CometContext}. Invoking this method will invoke all
@@ -238,18 +242,7 @@
public CometContext unregister(String topic){
CometContext cometContext = activeContexts.remove(topic);
if (cometContext != null){
- try{
- cometContext.notify(cometContext,CometEvent.TERMINATE);
- } catch (IOException ex) {}
- Set tasks = cometContext.getActiveTasks();
- for (CometTask cometTask: tasks){
- // does this work ? the notify above might be async.
- flushResponse(cometTask.getAsyncProcessorTask());
- }
- //TODO: add check for datastructure size, if cometcontext had large
- // datastructes its probably not optimal to waste RAM with caching it
- cometContext.recycle();
- cometContextCache.offer(cometContext);
+ cometContext.recycle();
}
return cometContext;
}
@@ -279,7 +272,7 @@
* {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING}
* @return CometContext a configured {@link CometContext}.
*/
- public CometContext register(String topic, int type){
+ public CometContext register(String topic, int type){
return register(topic, type, DefaultNotificationHandler.class);
}
@@ -287,14 +280,13 @@
* Instanciate a new {@link CometContext}.
* @param topic the topic the new {@link CometContext} will represent.
* @param type when the request will be suspended, e.g. {@link BEFORE_REQUEST_PROCESSING},
- * {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING}
+ * {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING}
* @return a new {@link CometContext} if not already created, or the
* existing one.
*/
public CometContext register(String topic, int type,
Class extends NotificationHandler> notificationClass ) {
-
- // Double checked locking used used to prevent the otherwise static/global
+ // Double checked locking used used to prevent the otherwise static/global
// locking, cause example code does heavy usage of register calls
// for existing topics from http get calls etc.
CometContext cometContext = activeContexts.get(topic);
@@ -303,30 +295,31 @@
cometContext = activeContexts.get(topic);
if (cometContext == null){
cometContext = cometContextCache.poll();
- if (cometContext != null)
+ if (cometContext != null){
cometContext.topic = topic;
- if (cometContext == null){
+ }else{
cometContext = new CometContext(topic, type);
- NotificationHandler notificationHandler = null;
- try{
- notificationHandler = notificationClass.newInstance();
- } catch (Throwable t) {
- logger.log(Level.SEVERE,"Invalid NotificationHandler class : "
- + notificationClass.getName() + " Using default.",t);
- notificationHandler = new DefaultNotificationHandler();
- }
- cometContext.setCometSelector(cometSelector);
- cometContext.setNotificationHandler(notificationHandler);
- if (notificationHandler != null && (notificationHandler
- instanceof DefaultNotificationHandler)){
- ((DefaultNotificationHandler)notificationHandler)
- .setThreadPool(threadPool);
- }
}
+ NotificationHandler notificationHandler = null;
+ try{
+ notificationHandler = notificationClass.newInstance();
+ } catch (Throwable t) {
+ logger.log(Level.SEVERE,"Invalid NotificationHandler class : "
+ + notificationClass.getName() + " Using default.",t);
+ notificationHandler = new DefaultNotificationHandler();
+ }
+ cometContext.setNotificationHandler(notificationHandler);
+ if (notificationHandler != null && (notificationHandler
+ instanceof DefaultNotificationHandler)){
+ ((DefaultNotificationHandler)notificationHandler)
+ .setThreadPool(threadPool);
+ }
activeContexts.put(topic,cometContext);
}
+
}
}
+ cometContext.continuationType = type;
return cometContext;
}
@@ -356,7 +349,7 @@
* to the current thread so we can later retrieve the associated
* SelectionKey. The SelectionKey is required in order to park the request.
*/
- int continuationType = (cometContext == null)?
+ int continuationType = (cometContext == null)?
AFTER_SERVLET_PROCESSING:cometContext.continuationType;
/* Execute the Servlet.service method. CometEngine.register() or
@@ -370,22 +363,27 @@
*/
CometTask cometTask = updatedContexts.get();
if (cometTask != null) {
+ //need to impl tlocal that gets and sets null in one efficent operation
updatedContexts.set(null);
- if (cometTask.isSuspended()){ //alreadySuspended)
- cometTask.setSuspended(false);
- cometTask.getCometContext().addActiveHandler(cometTask.getCometHandler(), dumykey);
+ cometContext = cometTask.getCometContext();
+ if (cometTask.upcoming_op_isread){ //alreadySuspended
+ cometTask.upcoming_op_isread = false;
+ //need to set dumykey in cometTask ?
+ cometContext.addActiveHandler(cometTask);
return false;
+ }
+ cometTask.setAsyncProcessorTask(apt);
+ if (cometContext.getExpirationDelay() != -1){
+ cometTask.setTimeout(System.currentTimeMillis());
}
- SelectionKey key = apt.getAsyncExecutor().getProcessorTask().getSelectionKey();
- key.attach("comet"); // Disable keep-alive
- cometTask.getCometContext().initialize(cometTask.getCometHandler());
- cometTask.setAsyncProcessorTask(apt);
- cometTask.setSelectionKey(key);
- cometTask.setCometSelector(cometSelector);
- cometTask.setSelectorThread(apt.getSelectorThread());
- cometTask.setThreadPool(apt.getThreadPool());
- cometSelector.registerKey(cometTask);
- return true;
+ SelectionKey mainKey = apt.getAsyncExecutor().getProcessorTask().getSelectionKey();
+ if (mainKey.isValid()){
+ mainKey.interestOps(SelectionKey.OP_READ);
+ mainKey.attach(cometTask);
+ cometContext.initialize(cometTask.getCometHandler());
+ cometContext.addActiveHandler(cometTask);
+ return true;
+ }
}
return false;
}
@@ -400,28 +398,33 @@
}
/**
- * flush if AsyncTask.POST_EXECUTE
+ * flush if AsyncTask.POST_EXECUTE .
* {@link AsyncProcessorTask}
*/
- protected void flushPostExecute(AsyncProcessorTask apt) {
- if (apt != null && apt.getStage() == AsyncTask.POST_EXECUTE){
- flushResponse(apt);
- }
- }
-
- /**
- * Complete the asynchronous request.
- */
- protected void flushResponse(AsyncProcessorTask apt){
- apt.setStage(AsyncTask.POST_EXECUTE);
- try{
- apt.doTask();
- } catch (IllegalStateException ex){
- if (logger.isLoggable(Level.FINEST)){
- logger.log(Level.FINEST,"Resuming Response failed",ex);
+ protected void flushPostExecute(final CometTask task, boolean aptflush,boolean cancelkey) {
+ AsyncProcessorTask apt = task.getAsyncProcessorTask();
+ ProcessorTask p = task.getAsyncProcessorTask().getAsyncExecutor().getProcessorTask();
+ p.setReRegisterSelectionKey(false);
+ p.setAptCancelKey(cancelkey);
+ if (!aptflush){
+ p.terminateProcess();
+ }else{
+ if (apt.getStage() == AsyncTask.POST_EXECUTE){
+ try{
+ //All comet IO operations sync on handler except close
+ synchronized(task.cometHandler){
+ apt.doTask();
+ }
+ } catch (IllegalStateException ex){
+ if (logger.isLoggable(Level.FINEST)){
+ logger.log(Level.FINEST,"Resuming Response failed at aptflush",ex);
+ }
+ } catch (Throwable ex) {
+ logger.log(Level.SEVERE,"Resuming failed at aptflush",ex);
+ }
+ }else{
+ logger.warning("APTflush called at wrong stage");
}
- } catch (Throwable ex) {
- logger.log(Level.SEVERE,"Resuming failed",ex);
}
}
Index: modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java
===================================================================
--- modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (revision 2805)
+++ modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (working copy)
@@ -39,6 +39,7 @@
package com.sun.grizzly.utils;
import com.sun.grizzly.*;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -74,7 +75,7 @@
}
});
- new Thread(controller).start();
+ new WorkerThreadImpl("ControllerWorker", controller).start();
try {
latch.await();
Index: modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java
===================================================================
--- modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (revision 2805)
+++ modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (working copy)
@@ -41,6 +41,7 @@
import com.sun.grizzly.filter.EchoFilter;
import com.sun.grizzly.filter.LogFilter;
import com.sun.grizzly.filter.ReadFilter;
+import com.sun.grizzly.util.WorkerThreadImpl;
import com.sun.grizzly.utils.ControllerUtils;
import com.sun.grizzly.utils.TCPIOClient;
import java.io.IOException;
@@ -113,7 +114,7 @@
ControllerUtils.startController(controller);
- Thread restartThread = new Thread() {
+ Thread restartThread = new WorkerThreadImpl(new Runnable() {
@Override
public void run() {
try {
@@ -123,7 +124,7 @@
exceptionHolder[0] = ex;
}
}
- };
+ });
restartThread.start();
Index: modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java
===================================================================
--- modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (revision 2805)
+++ modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (working copy)
@@ -124,35 +124,38 @@
}
if (selector == null){
- try{
- isShutDown.set(false);
+ initSelector(ctx);
+ } else {
+ processPendingOperations(ctx);
+ }
+ }
- connectorInstanceHandler = new ConnectorInstanceHandler.
- ConcurrentQueueDelegateCIH(
- getConnectorInstanceHandlerDelegate());
-
- datagramChannel = DatagramChannel.open();
- selector = Selector.open();
- if (role != Role.CLIENT){
- datagramSocket = datagramChannel.socket();
- datagramSocket.setReuseAddress(reuseAddress);
- if (inet == null)
- datagramSocket.bind(new InetSocketAddress(port));
- else
- datagramSocket.bind(new InetSocketAddress(inet,port));
+ private void initSelector(Context ctx) throws IOException{
+ try{
+ isShutDown.set(false);
- datagramChannel.configureBlocking(false);
- datagramChannel.register( selector, SelectionKey.OP_READ );
-
- datagramSocket.setSoTimeout(serverTimeout);
- }
- ctx.getController().notifyReady();
- } catch (SocketException ex){
- throw new BindException(ex.getMessage() + ": " + port);
+ connectorInstanceHandler = new ConnectorInstanceHandler.
+ ConcurrentQueueDelegateCIH(
+ getConnectorInstanceHandlerDelegate());
+
+ datagramChannel = DatagramChannel.open();
+ selector = Selector.open();
+ if (role != Role.CLIENT){
+ datagramSocket = datagramChannel.socket();
+ datagramSocket.setReuseAddress(reuseAddress);
+ if (inet == null)
+ datagramSocket.bind(new InetSocketAddress(port));
+ else
+ datagramSocket.bind(new InetSocketAddress(inet,port));
+
+ datagramChannel.configureBlocking(false);
+ datagramChannel.register( selector, SelectionKey.OP_READ );
+
+ datagramSocket.setSoTimeout(serverTimeout);
}
-
- } else {
- processPendingOperations(ctx);
+ ctx.getController().notifyReady();
+ } catch (SocketException ex){
+ throw new BindException(ex.getMessage() + ": " + port);
}
}
Index: modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java
===================================================================
--- modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (revision 2805)
+++ modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (working copy)
@@ -39,9 +39,8 @@
package com.sun.grizzly;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
/**
@@ -56,13 +55,13 @@
/**
* The list of ProtocolFilter this chain will invoke.
*/
- protected List protocolFilters;
+ protected final List protocolFilters;
/**
* The list of {@link EventHandler}s, which will be notified about this
* {@link ProtocolChain} events
*/
- protected Collection eventHandlers;
+ protected final List eventHandlers;
/**
* true if a pipelined execution is required. A pipelined execution
@@ -75,8 +74,9 @@
public DefaultProtocolChain() {
- protocolFilters = new ArrayList();
- eventHandlers = new HashSet();
+ protocolFilters = new ArrayList(4);
+ //ArrayList is faster then HashSet for small datasets.
+ eventHandlers = new ArrayList(4);
}
@@ -296,9 +296,9 @@
*/
protected void notifyException(Phase phase, ProtocolFilter filter,
Throwable throwable) {
- for(EventHandler eventHandler : eventHandlers) {
+ for(int i=0;i {
+ AttributeHolder, SupportStateHolder {
+
+ /**
+ * enqueues runnable for later execution in postSelect
+ * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.
+ * @param runnable
+ public void addPendingIO(Runnable runnable);
+
+
+ * enqueues SlectionKey for later cancel and close .
+ * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.
+ * @param runnable
+
+ public void addPendingKeyCancel(SelectionKey key);*/
+
/**
* A token decribing the protocol supported by an implementation of this
* interface
Index: modules/grizzly/src/main/java/com/sun/grizzly/Controller.java
===================================================================
--- modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (revision 2805)
+++ modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (working copy)
@@ -45,9 +45,11 @@
import com.sun.grizzly.util.DefaultThreadPool;
import com.sun.grizzly.util.LinkedTransferQueue;
import com.sun.grizzly.util.LoggerUtils;
+import com.sun.grizzly.util.SelectedKeyAttachmentLogic;
import com.sun.grizzly.util.State;
import com.sun.grizzly.util.StateHolder;
import com.sun.grizzly.util.SupportStateHolder;
+import com.sun.grizzly.util.WorkerThreadImpl;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
@@ -58,7 +60,6 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@@ -371,6 +372,13 @@
private void handleSelectedKeys(Set readyKeys,SelectorHandler selectorHandler,NIOContext serverCtx){
for(SelectionKey key:readyKeys) {
try{
+
+ Object attachment = key.attachment();
+ if (attachment instanceof SelectedKeyAttachmentLogic){
+ ((SelectedKeyAttachmentLogic)attachment).handleSelectedKey(key);
+ continue;
+ }
+
if (!key.isValid()){
selectorHandler.getSelectionKeyHandler().close(key);
continue;