Property changes on: . ___________________________________________________________________ Added: svn:ignore + target ..merge.diff Property changes on: contribs/bundles ___________________________________________________________________ Added: svn:ignore + target Property changes on: contribs/bundles/grizzly-httpservice-bundle ___________________________________________________________________ Added: svn:ignore + target Index: modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java =================================================================== --- modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (revision 2805) +++ modules/http/src/test/java/com/sun/grizzly/http/utils/SelectorThreadUtils.java (working copy) @@ -44,6 +44,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; import com.sun.grizzly.http.SelectorThread; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.IOException; import java.util.logging.Level; @@ -81,7 +82,7 @@ } }); - new Thread() { + new WorkerThreadImpl(new Runnable() { @Override public void run() { try { @@ -89,7 +90,7 @@ } catch (Exception ex) { } } - }.start(); + }).start(); try { latch.await(); Index: modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java =================================================================== --- modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (revision 2805) +++ modules/http/src/test/java/com/sun/grizzly/http/SuspendTest.java (working copy) @@ -46,6 +46,7 @@ import com.sun.grizzly.tcp.http11.GrizzlyAdapter; import com.sun.grizzly.tcp.http11.GrizzlyRequest; import com.sun.grizzly.tcp.http11.GrizzlyResponse; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -1006,7 +1007,7 @@ t.printStackTrace(); } - new Thread(){ + new WorkerThreadImpl(new Runnable(){ @Override public void run(){ try { @@ -1022,8 +1023,8 @@ res.resume(); } } - }.start();; - } + }).start(); + } }); try { Index: modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java =================================================================== --- modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (revision 2805) +++ modules/http/src/main/java/com/sun/grizzly/http/ProcessorTask.java (working copy) @@ -440,7 +440,22 @@ // being time. protected boolean handleKeepAliveBlockingThread = false; + /** + * false prevents the selectionkey from being re registered after async is done. + * if false then the socket lifecycle ending needs to be handled by you + */ + protected boolean reRegisterSelectionKey = true; + /** + * true if asyncprotocolfilter should cancel the selectionkey + */ + protected boolean aptCancelKey; + + /** + * used by asyncprotocolfilter + */ + private final TaskEvent event = new TaskEvent(this); + // ----------------------------------------------------- Constructor ---- // public ProcessorTask(){ @@ -523,8 +538,10 @@ process(inputStream, outputStream); } catch(Throwable ex){ - logger.log(Level.FINE, + if (logger.isLoggable(Level.FINE)){ + logger.log(Level.FINE, sm.getString("processorTask.errorProcessingRequest"), ex); + } } finally { terminateProcess(); } @@ -658,11 +675,7 @@ if (response.isSuspended()){ WorkerThread wt = (WorkerThread)Thread.currentThread(); wt.getAttachment().setAttribute("suspend",Boolean.TRUE); - - ((SelectorThreadKeyHandler) selectorHandler. - getSelectionKeyHandler()).resetExpiration(); - key.attach(response.getResponseAttachment()); - + key.attach(response.getResponseAttachment()); return; } @@ -674,8 +687,10 @@ adapter.afterService(request,response); } catch (Exception ex) { error = true; - logger.log(Level.FINEST, - sm.getString("processorTask.errorFinishingRequest"), ex); + if (logger.isLoggable(Level.FINEST)){ + logger.log(Level.FINEST, + sm.getString("processorTask.errorFinishingRequest"), ex); + } } // Finish the handling of the request @@ -773,7 +788,7 @@ WorkerThread workerThread = (WorkerThread)Thread.currentThread(); KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) workerThread.getAttachment(); - k.setActiveThreadTimeout(transactionTimeout); + k.setIdleTimeoutDelay(transactionTimeout); inputBuffer.parseHeaders(); @@ -850,20 +865,10 @@ // control how Grizzly ARP extension handle their asynchronous // behavior, we must make sure we are never called twice. if (asyncSemaphore.tryAcquire(0, TimeUnit.SECONDS)) { - // Nobody is listening, avoid extra operation. - if (getTaskListener() == null){ - return; - } - - TaskEvent event = new TaskEvent(); - if (error) { - event.setStatus(TaskEvent.ERROR); - } else { - event.setStatus(TaskEvent.COMPLETED); - } - event.attach(this); - getTaskListener().taskEvent(event); - event.attach(null); + if (getTaskListener() != null){ + event.setStatus(error?TaskEvent.ERROR:TaskEvent.COMPLETED); + getTaskListener().taskEvent(event); + } } } catch (InterruptedException ex) { if (logger.isLoggable(Level.WARNING)){ @@ -929,10 +934,11 @@ try { outputBuffer.commit(); } catch (IOException ex) { - logger.log(Level.FINEST, + if (logger.isLoggable(Level.FINEST)){ + logger.log(Level.FINEST, sm.getString("processorTask.nonBlockingError"), ex); - // Set error flag error = true; + } } } else if (actionCode == ActionCode.ACTION_ACK) { @@ -951,7 +957,6 @@ try { outputBuffer.sendAck(); } catch (IOException e) { - // Set error flag error = true; } } @@ -965,10 +970,11 @@ try { outputBuffer.endRequest(); } catch (IOException e) { - logger.log(Level.FINEST, + if (logger.isLoggable(Level.FINEST)){ + logger.log(Level.FINEST, sm.getString("processorTask.nonBlockingError"), e); - // Set error flag - error = true; + error = true; + } } } else if (actionCode == ActionCode.ACTION_RESET) { @@ -1094,7 +1100,7 @@ } } catch (Exception e) { logger.log(Level.WARNING, - sm.getString("processorTask.exceptionSSLcert"),e); + sm.getString("processorTask.exceptionSSLcert"),e); } } } else if ( actionCode == ActionCode.ACTION_POST_REQUEST ) { @@ -1103,18 +1109,15 @@ try{ handler.handle(request,Interceptor.RESPONSE_PROCEEDED); } catch(IOException ex){ - logger.log(Level.FINEST, - "Handler exception",ex); + logger.log(Level.FINEST,"Handler exception",ex); } } } else if ( actionCode == ActionCode.CANCEL_SUSPENDED_RESPONSE ) { key.attach(null); } else if ( actionCode == ActionCode.RESET_SUSPEND_TIMEOUT ) { - if (key.attachment() instanceof Response.ResponseAttachment){ - Response.ResponseAttachment ra = ((Response.ResponseAttachment)key.attachment()); - if (ra != null){ - ra.resetTimeout(); - } + Object attachment = key.attachment(); + if (attachment instanceof Response.ResponseAttachment){ + ((Response.ResponseAttachment)attachment).resetTimeout(); } } else if (actionCode == ActionCode.ACTION_CLIENT_FLUSH ) { if (key != null) { @@ -1913,6 +1916,8 @@ setTaskListener(null); socket = null; dropConnection = false; + reRegisterSelectionKey = true; + aptCancelKey = false; key = null; } @@ -2301,5 +2306,40 @@ public void setUseChunking(boolean useChunking) { this.useChunking = useChunking; } + + /** + * true if SelectionKey should be reregistered with Selector after async is done + * default is true. + * if false then the socket lifecycle ending needs to be handled by you + * @param reRegisterSelectionKey + */ + public void setReRegisterSelectionKey(boolean reRegisterSelectionKey) { + this.reRegisterSelectionKey = reRegisterSelectionKey; + } + + /** + * true if SelectionKey should be reregistered with Selector after async is done. + * default is true. + * if false then the socket lifecycle ending needs to be handled by you + * @return + */ + public boolean getReRegisterSelectionKey() { + return reRegisterSelectionKey; + } + + + /** + * true if asyncprotocolfilter should cancel the selectionkey + */ + public void setAptCancelKey(boolean aptCancelKey) { + this.aptCancelKey = aptCancelKey; + } + + /** + * true if asyncprotocolfilter should cancel the selectionkey + */ + public boolean getAptCancelKey() { + return aptCancelKey; + } } Index: modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java =================================================================== --- modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (revision 2805) +++ modules/http/src/main/java/com/sun/grizzly/http/KeepAliveThreadAttachment.java (working copy) @@ -39,6 +39,9 @@ package com.sun.grizzly.http; import com.sun.grizzly.util.ThreadAttachment; +import java.nio.channels.SelectionKey; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Add keep alive counting mechanism to the {@link ThreadAttachement}. @@ -46,8 +49,9 @@ * @author Jeanfrancois Arcand */ public class KeepAliveThreadAttachment extends ThreadAttachment{ - private int keepAliveCount = 0; - + protected final static Logger logger = SelectorThread.logger(); + + private int keepAliveCount; /** * The stats object used to gather statistics. */ @@ -88,4 +92,22 @@ keepAliveCount = 0; } + @Override + public void release(SelectionKey selectionKey) { + super.release(selectionKey); + resetKeepAliveCount(); + } + + + @Override + public boolean timedOut(SelectionKey selectionKey) { + Thread t = activeThread(); + if (t != null) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, "Interrupting idle Thread: " + t.getName()); + } + t.interrupt(); + } + return true; + } } Index: modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java =================================================================== --- modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (revision 2805) +++ modules/http/src/main/java/com/sun/grizzly/http/SelectorThreadKeyHandler.java (working copy) @@ -38,13 +38,11 @@ package com.sun.grizzly.http; import com.sun.grizzly.DefaultSelectionKeyHandler; -import com.sun.grizzly.tcp.Response; -import com.sun.grizzly.tcp.Response.ResponseAttachment; +import com.sun.grizzly.SelectionKeyHandler; import com.sun.grizzly.util.Copyable; import com.sun.grizzly.util.SelectionKeyAttachment; import java.nio.channels.SelectionKey; import java.util.Iterator; -import java.util.logging.Level; /** * Default HTTP {@link SelectionKeyHandler} implementation @@ -71,6 +69,19 @@ } @Override + public void cancel(SelectionKey key) { + if (key != null) { + if (selectorThread.getThreadPool() instanceof StatsThreadPool) { + if (selectorThread.isMonitoringEnabled() && + ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) { + selectorThread.getRequestGroupInfo().decreaseCountOpenConnections(); + } + } + super.cancel(key); + } + } + + @Override public void doRegisterKey(SelectionKey key, int ops, long currentTime) { Object attachment = key.attachment(); if (attachment instanceof KeepAliveThreadAttachment) { @@ -85,40 +96,12 @@ } key.interestOps(key.interestOps() | ops); } - - @Override - public void cancel(SelectionKey key) { - if (key == null) { - return; - } - if (selectorThread.getThreadPool() instanceof StatsThreadPool) { - if (selectorThread.isMonitoringEnabled() && - ((StatsThreadPool) selectorThread.getThreadPool()).getStatistic().decrementOpenConnectionsCount(key.channel())) { - selectorThread.getRequestGroupInfo().decreaseCountOpenConnections(); - } - } - - Object attachment = key.attachment(); - if (attachment instanceof KeepAliveThreadAttachment) { - KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment; - k.resetKeepAliveCount(); - } - super.cancel(key); - } - + /** - * Reset the expiration time - */ - public void resetExpiration() { - nextKeysExpiration = 0; - } - - /** * {@inheritDoc} */ @Override public void expire(Iterator iterator) { - //must check for timeout, attachments can have such interest final long currentTime = System.currentTimeMillis(); if (currentTime < nextKeysExpiration) { return; @@ -129,51 +112,21 @@ if (!key.isValid()) { continue; } + Object attachment = key.attachment(); if (attachment != null) { long expire = getExpirationStamp(attachment); - if (expire == SelectionKeyAttachment.UNLIMITED_TIMEOUT) { - continue; - } - long idleLimit, activeThreadTimeout; - if (attachment instanceof KeepAliveThreadAttachment) { - activeThreadTimeout = ((KeepAliveThreadAttachment) attachment).getActiveThreadTimeout(); + if (expire != SelectionKeyAttachment.UNLIMITED_TIMEOUT) { + long idleLimit = getIdleLimit(attachment); - if (activeThreadTimeout != SelectionKeyAttachment.UNLIMITED_TIMEOUT) { - idleLimit = activeThreadTimeout; - } else { - idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay(); - if (idleLimit == SelectionKeyAttachment.UNLIMITED_TIMEOUT) { - //this is true when attachment class dont have idletimeoutdelay configured. - idleLimit = timeout; - } - } - } else { - idleLimit = timeout; + if (idleLimit != -1 && currentTime - expire >= idleLimit && + (!(attachment instanceof SelectionKeyAttachment) || + ((SelectionKeyAttachment)attachment).timedOut(key))){ + // selectorHandler.addPendingKeyCancel(key); + cancel(key); + } } - if (idleLimit == -1) { - continue; - } - - if (currentTime - expire >= idleLimit) { - if (attachment instanceof Response.ResponseAttachment) { - ((ResponseAttachment) attachment).timeout(); - key.attach(null); - continue; - } - - if (attachment instanceof KeepAliveThreadAttachment) { - KeepAliveThreadAttachment k = (KeepAliveThreadAttachment) attachment; - if (k.activeThread() != null) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, "Interrupting idle Thread: " + k.activeThread().getName()); - } - k.activeThread().interrupt(); - } - } - cancel(key); - } } } } @@ -184,14 +137,26 @@ * * @param {@link SelectionKey} */ - private long getExpirationStamp(Object attachment) { + protected long getExpirationStamp(Object attachment) { if (attachment instanceof Long) { return (Long) attachment; - } else if (attachment instanceof SelectionKeyAttachment) { + } + if (attachment instanceof SelectionKeyAttachment) { return ((SelectionKeyAttachment) attachment).getTimeout(); - } else if (attachment instanceof Response.ResponseAttachment) { - return ((Response.ResponseAttachment) attachment).getExpirationTime() - timeout; } return SelectionKeyAttachment.UNLIMITED_TIMEOUT; } + + /** + * returns idle limit + */ + private long getIdleLimit(Object attachment){ + if (attachment instanceof SelectionKeyAttachment){ + long idleLimit = ((SelectionKeyAttachment) attachment).getIdleTimeoutDelay(); + if (idleLimit != SelectionKeyAttachment.UNLIMITED_TIMEOUT) { + return idleLimit; + } + } + return timeout; + } } Index: modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java =================================================================== --- modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (revision 2805) +++ modules/http/src/main/java/com/sun/grizzly/arp/AsyncProtocolFilter.java (working copy) @@ -110,11 +110,12 @@ */ public boolean execute(Context ctx) throws IOException{ HttpWorkerThread workerThread = ((HttpWorkerThread)Thread.currentThread()); + + SelectionKey key = ctx.getSelectionKey(); + + setSelectionKeyTimeout(key, Long.MAX_VALUE); - setSelectionKeyTimeout(ctx.getSelectionKey(), Long.MAX_VALUE); - - StreamAlgorithm streamAlgorithm = - workerThread.getStreamAlgorithm(); + StreamAlgorithm streamAlgorithm = workerThread.getStreamAlgorithm(); if (streamAlgorithm == null){ try{ streamAlgorithm = (StreamAlgorithm)algorithmClass @@ -142,8 +143,7 @@ inputStream = createByteBufferInputStream(); } configureByteBufferInputStream(inputStream, ctx, workerThread); - - SelectionKey key = ctx.getSelectionKey(); + SocketChannel socketChannel = (SocketChannel) key.channel(); streamAlgorithm.setChannel(socketChannel); @@ -160,17 +160,14 @@ ctx.setKeyRegistrationState(Context.KeyRegistrationState.NONE); if (streamAlgorithm.parse(byteBuffer)){ - ProcessorTask processor = - selectorThread.getProcessorTask(); + ProcessorTask processor = selectorThread.getProcessorTask(); configureProcessorTask(processor, ctx, workerThread, - streamAlgorithm.getHandler(), inputStream); - + streamAlgorithm.getHandler(), inputStream); try{ selectorThread.getAsyncHandler().handle(processor); } catch (Throwable ex){ logger.log(Level.INFO,"Processor exception",ex); - ctx.setKeyRegistrationState( - Context.KeyRegistrationState.CANCEL); + ctx.setKeyRegistrationState(Context.KeyRegistrationState.CANCEL); return false; } } @@ -196,18 +193,24 @@ InputReader is = (InputReader) processor.getInputStream(); is.getByteBuffer().clear(); - byteBufferStreams.offer(is); - + byteBufferStreams.offer(is); + SelectorThread selectorThread = processor.getSelectorThread(); - if (processor.isKeepAlive() && !processor.isError()){ - setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE); - - selectorThread.registerKey(processor.getSelectionKey()); - } else { - selectorThread.cancelKey(processor.getSelectionKey()); + boolean cancelkey = processor.getAptCancelKey() || processor.isError() + || !processor.isKeepAlive(); + try{ + if (!cancelkey){ + if (processor.getReRegisterSelectionKey()){ + setSelectionKeyTimeout(processor.getSelectionKey(), Long.MIN_VALUE); + selectorThread.registerKey(processor.getSelectionKey()); + } + }else{ + selectorThread.cancelKey(processor.getSelectionKey()); + } + }finally{ + processor.recycle(); + selectorThread.returnTask(processor); } - processor.recycle(); - selectorThread.returnTask(processor); } } Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java =================================================================== --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (revision 2805) +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/DefaultNotificationHandler.java (working copy) @@ -74,13 +74,11 @@ ((CometHandler)cometHandler).onInterrupt((CometEvent)cometEvent); break; case CometEvent.NOTIFY: - ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent); - break; case CometEvent.READ: - ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent); - break; case CometEvent.WRITE: + synchronized(cometHandler){ ((CometHandler)cometHandler).onEvent((CometEvent)cometEvent); + } break; case CometEvent.INITIALIZE: ((CometHandler)cometHandler).onInitialize((CometEvent)cometEvent); @@ -92,7 +90,7 @@ throw new IllegalStateException(); } } catch (IOException ex){ - Controller.logger().log(Level.WARNING,"",ex); + Controller.logger().log(Level.FINE,"",ex); } } } Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java =================================================================== --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (revision 2805) +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometContext.java (working copy) @@ -38,10 +38,7 @@ package com.sun.enterprise.web.connector.grizzly.comet; -import com.sun.grizzly.comet.CometTask; import java.io.IOException; -import java.util.Iterator; -import java.util.logging.Level; /** * The main object used by {@link CometHandler}. @@ -56,7 +53,7 @@ * @author Jeanfrancois Arcand * @deprecated use {@link CometContext} */ -public class CometContext extends com.sun.grizzly.comet.CometContext{ +public class CometContext extends com.sun.grizzly.comet.CometContext{ private final CometEvent eventInitialize; @@ -67,8 +64,8 @@ */ public CometContext(String contextPath, int continuationType) { super(contextPath, continuationType); - this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this); - this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this); + this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this); + this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this); } @@ -124,13 +121,8 @@ * {@inheritDoc} */ @Override - public void notify(final E attachment) throws IOException { - CometEvent event = new CometEvent(CometEvent.NOTIFY,this); - event.attach(attachment); - Iterator iterator = handlers.keySet().iterator(); - notificationHandler.setBlockingNotification(blockingNotification); - notificationHandler.notify((com.sun.grizzly.comet.CometEvent)event,iterator); - resetSuspendIdleTimeout(); + public void notify(final Object attachment) throws IOException { + super.notify(attachment); } @@ -146,14 +138,14 @@ * {@inheritDoc} */ @Override - public void notify(final E attachment,final int eventType,final int cometHandlerID) + public void notify(final Object attachment,final int eventType,final int cometHandlerID) throws IOException{ CometHandler cometHandler = getCometHandler(cometHandlerID); if (cometHandler == null){ throw new IllegalStateException(INVALID_COMET_HANDLER); } - CometEvent event = new CometEvent(eventType,this); + CometEvent event = new CometEvent(eventType,this); event.attach(attachment); notificationHandler.setBlockingNotification(blockingNotification); @@ -173,30 +165,5 @@ protected void initialize(com.sun.grizzly.comet.CometHandler handler) throws IOException { ((com.sun.enterprise.web.connector.grizzly.comet.CometHandler)handler).onInitialize(eventInitialize); } - - /** - * Interrupt a {@link CometHandler} by invoking {@link CometHandler#onInterrupt} - */ - @Override - protected boolean interrupt(CometTask task,boolean removecomethandler, boolean resume) { - boolean status = true; - try{ - if (removecomethandler){ - status = (handlers.remove(task.getCometHandler()) != null); - if (status && resume){ - ((com.sun.enterprise.web.connector.grizzly.comet.CometHandler) - task.getCometHandler()).onInterrupt(eventInterrupt); - }else{ - logger.finer(ALREADY_REMOVED); - } - } - } catch (Throwable ex){ - status = false; - logger.log(Level.FINE,"Unable to interrupt",ex); - }finally{ - activeTasks.remove(task); - return status; - } - } } - + Index: modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java =================================================================== --- modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (revision 2805) +++ modules/compat/src/main/java/com/sun/enterprise/web/connector/grizzly/comet/CometEngine.java (working copy) @@ -87,36 +87,10 @@ */ @Override public CometContext register(String topic, int type){ - // Double checked locking used used to prevent the otherwise static/global - // locking, cause example code does heavy usage of register calls - // for existing topics from http get calls etc. - CometContext cometContext = (CometContext)activeContexts.get(topic); - if (cometContext == null){ - synchronized(activeContexts){ - cometContext = (CometContext)activeContexts.get(topic); - if (cometContext == null){ - cometContext = (CometContext)cometContextCache.poll(); - if (cometContext != null) - cometContext.setTopic(topic); - if (cometContext == null){ - cometContext = new CometContext(topic, type); - NotificationHandler notificationHandler - = new DefaultNotificationHandler(); - cometContext.setNotificationHandler(notificationHandler); - if (notificationHandler != null && (notificationHandler - instanceof DefaultNotificationHandler)){ - ((DefaultNotificationHandler)notificationHandler) - .setThreadPool(threadPool); - } - } - activeContexts.put(topic,cometContext); - } - } - } - return cometContext; + return (CometContext) super.register(topic, type); } + - /** * {@inheritDoc} */ Index: modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java =================================================================== --- modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (revision 2805) +++ modules/utils/src/main/java/com/sun/grizzly/tcp/Response.java (working copy) @@ -57,11 +57,13 @@ import com.sun.grizzly.tcp.http11.InternalOutputBuffer; import com.sun.grizzly.tcp.http11.filters.VoidOutputFilter; import com.sun.grizzly.util.LoggerUtils; +import com.sun.grizzly.util.SelectionKeyAttachment; +import com.sun.grizzly.util.WorkerThreadImpl; import com.sun.grizzly.util.buf.ByteChunk; import com.sun.grizzly.util.http.MimeHeaders; import java.io.IOException; +import java.nio.channels.SelectionKey; import java.util.Locale; - import java.nio.channels.SocketChannel; import java.util.concurrent.Semaphore; import java.util.logging.Level; @@ -892,21 +894,20 @@ } - public static class ResponseAttachment{ - - private A attachment; + public static class ResponseAttachment extends SelectionKeyAttachment + implements Runnable { + private CompletionHandler completionHandler; - private Long timeout; - private Long expiration; - private Response response; + private final A attachment; + private final long idletimeoutdelay; + private final Response response; - public ResponseAttachment(Long timeout,A attachment, + public ResponseAttachment(long idletimeoutdelay,A attachment, CompletionHandler completionHandler, Response response){ - this.timeout = timeout; + this.idletimeoutdelay = idletimeoutdelay; this.attachment = attachment; this.completionHandler = completionHandler; - this.response = response; - + this.response = response; resetTimeout(); } @@ -914,21 +915,20 @@ return attachment; } - public CompletionHandler getCompletionHandler() { return completionHandler; } public void resetTimeout(){ - expiration = System.currentTimeMillis() + timeout; + timeout = System.currentTimeMillis(); } - - - public Long getExpirationTime() { - return expiration; + + @Override + public long getIdleTimeoutDelay() { + return idletimeoutdelay; } + - public void resume(){ completionHandler.resumed(attachment); try{ @@ -939,11 +939,21 @@ LoggerUtils.getLogger().log(Level.FINEST,"resume",ex); } } - - - public void timeout(){ + + @Override + public boolean timedOut(SelectionKey Key) { + Key.attach(null); + run(); + //((WorkerThreadImpl)Thread.currentThread()). + // getPendingIOhandler().addPendingIO(this); + return false; + } + + @Override + public void run() { timeout(true); } + public void timeout(boolean forceClose){ // If the buffers are empty, commit the response header Index: modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java =================================================================== --- modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (revision 2805) +++ modules/utils/src/main/java/com/sun/grizzly/util/SelectionKeyAttachment.java (working copy) @@ -49,7 +49,7 @@ public abstract class SelectionKeyAttachment { public static final long UNLIMITED_TIMEOUT = Long.MIN_VALUE; - private long timeout = UNLIMITED_TIMEOUT; + protected long timeout = UNLIMITED_TIMEOUT; public static Object getAttachment(SelectionKey key) { Object attachment = key.attachment(); @@ -60,25 +60,45 @@ return attachment; } - public long getTimeout() { - return timeout; - } - /** * returns the idle timeout delay. * default it returns Long.MIN_VALUE , meaning null. * -1 means no timeout. - * subclass need to implement it. + * subclass need to override it. * @return */ public long getIdleTimeoutDelay(){ return UNLIMITED_TIMEOUT; } + /** + * subclass need to override this method for it to work. + * Long.MIN_VALUE means null , and default value will be used. + * -1 means no timeout. + * @param idletimeoutdelay + */ + public void setIdleTimeoutDelay(long idletimeoutdelay){ + throw new IllegalStateException("setIdleTimeoutDelay not implemented in subclass"); + } + + + public long getTimeout() { + return timeout; + } + public void setTimeout(long timeout) { this.timeout = timeout; } + /** + * called when idle timeout detected. + * return true if key should be canceled. + */ + public boolean timedOut(SelectionKey Key){ + return true; + } + + public void release(SelectionKey selectionKey) { timeout = UNLIMITED_TIMEOUT; } Index: modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java =================================================================== --- modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (revision 2805) +++ modules/utils/src/main/java/com/sun/grizzly/util/ThreadAttachment.java (working copy) @@ -53,7 +53,7 @@ */ public class ThreadAttachment extends SelectionKeyActionAttachment implements AttributeHolder { - + /** * The maximum time this object can be associated with an active {@link Thread} */ @@ -310,18 +310,19 @@ * Set the time, in milliseconds, this object can be attached to a {@link Thread} * @param the time, in milliseconds, this object can be attached to a {@link Thread} */ - public void setActiveThreadTimeout(long activeThreadTimeout){ + @Override + public void setIdleTimeoutDelay(long activeThreadTimeout) { this.activeThreadTimeout = activeThreadTimeout; - - // As soon as we get invoked we grab the Thread activeThread= Thread.currentThread(); } - + + /** * Return the time, in milliseconds, this object can be attached to a {@link Thread} * @return the time, in milliseconds, this object can be attached to a {@link Thread} - */ - public long getActiveThreadTimeout(){ + */ + @Override + public long getIdleTimeoutDelay() { return activeThreadTimeout; } } Index: modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java =================================================================== --- modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (revision 2805) +++ modules/utils/src/main/java/com/sun/grizzly/util/WorkerThreadImpl.java (working copy) @@ -37,6 +37,7 @@ */ package com.sun.grizzly.util; +import com.sun.grizzly.tcp.PendingIOhandler; import java.util.concurrent.Callable; import com.sun.grizzly.util.ByteBufferFactory.ByteBufferType; import com.sun.grizzly.util.ThreadAttachment.Mode; @@ -79,8 +80,14 @@ * The size of the ByteBuffer attached to this object. */ private int initialByteBufferSize; + + + /** + * used by selectionkey attachments to enqueue io events that will be executed in + * selectorhandler.postselect by worker threads instead of the selector thread. + */ + private PendingIOhandler pendingIOhandler; - /** * Create a Thread that will synchronizes/block on * {@link DefaultThreadPool} instance. @@ -90,7 +97,14 @@ public WorkerThreadImpl(ThreadGroup threadGroup, Runnable runnable){ this(threadGroup, runnable, DEFAULT_BYTE_BUFFER_SIZE); } - + + public WorkerThreadImpl(Runnable runnable){ + this(null, "workerthread", runnable, 0); + } + + public WorkerThreadImpl(String name, Runnable runnable){ + this(null, name, runnable, 0); + } /** * Create a Thread that will synchronizes/block on * {@link DefaultThreadPool} instance. @@ -278,6 +292,22 @@ } + /** + * used by selectionkey attachments to enqueue io events that will be executed in + * selectorhandler.postselect by worker threads instead of the selector thread. + */ + public PendingIOhandler getPendingIOhandler() { + return pendingIOhandler; + } + + /** + * used by selectionkey attachments to enqueue io events that will be executed in + * selectorhandler.postselect by worker threads instead of the selector thread. + */ + public void setPendingIOhandler(PendingIOhandler pendingIOhandler) { + this.pendingIOhandler = pendingIOhandler; + } + @Override protected void reset() { if (threadAttachment != null) { Index: modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java =================================================================== --- modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (revision 2805) +++ modules/comet/src/test/java/com/sun/grizzly/comet/CometUnitTest.java (working copy) @@ -40,9 +40,7 @@ import com.sun.grizzly.arp.AsyncHandler; import com.sun.grizzly.arp.DefaultAsyncHandler; import com.sun.grizzly.http.SelectorThread; -import com.sun.grizzly.http.StatsThreadPool; import java.io.BufferedInputStream; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.management.ManagementFactory; @@ -51,7 +49,6 @@ import java.net.Socket; import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; @@ -62,13 +59,13 @@ * @author Gustav Trede */ public class CometUnitTest extends TestCase { - private final int port = 19000; + private final int port = 19100; private SocketAddress connectadr; - private final int socketreusedelayMilliSec = 0; + private final int socketreusedelayMilliSec = 40; private static volatile boolean status; private static volatile boolean testisdone; private SelectorThread st; - private final String context = "/cometText"; + private final String context = "/cometTextn"; private final byte joinmessage = 126; private final byte[] connectstr= ("POST /index.html/comet HTTP/1.1\r\n"+ @@ -81,9 +78,16 @@ } @Override + protected void tearDown() throws Exception { + super.tearDown(); + if (st != null) + st.stopEndpoint(); + } + + @Override protected void setUp() throws Exception { - super.setUp(); - init(false); + super.setUp(); + init(false); } protected void init(boolean useconcurrentcomethandler) throws Exception{ @@ -92,7 +96,7 @@ System.err.println("JVM: "+rmx.getVmVendor()+" "+rmx.getVmName()+" "+rmx.getVmVersion()+" params: "+rmx.getInputArguments()); st = new SelectorThread(); st.setPort(port); - st.setDisplayConfiguration(true); + st.setDisplayConfiguration(false); st.setAdapter(new CometTestAdapter(context,useconcurrentcomethandler,-1)); st.setEnableAsyncExecution(true); AsyncHandler asyncHandler = new DefaultAsyncHandler(); @@ -100,10 +104,7 @@ st.setAsyncHandler(asyncHandler); st.setTcpNoDelay(true); st.setLinger(-1); - /*st.setThreadPool( new StatsThreadPool(16, - 32, 50, - StatsThreadPool.DEFAULT_IDLE_THREAD_KEEPALIVE_TIMEOUT, - TimeUnit.MILLISECONDS));*/ + try { st.listen(); } catch (Exception ex) { @@ -113,44 +114,51 @@ } - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (st != null) - st.stopEndpoint(); - } - /* public void testLongPollingSocketReuse() throws Exception{ doActualLogic(true,false,40,20); }*/ - /* public void testLongPollingNewSocket() throws Exception{ - doActualLogic(false,false,64,5); - } -*/ + /* public void testLongPollingNewSocket() throws Exception{ + doActualLogic(false,false,6500,64); + }*/ - public void testStreaming2() throws Exception{ - doActualLogic(false,true,10,4); + public void testStreaming1() throws Throwable{ + //doActualLogic(false,true,15,1,false); } + /* public void testStreaming2() throws Throwable{ + doActualLogic(false,true,21,4, false); + } + + public void testStreaming3() throws Throwable{ + doActualLogic(false,true,21,64, false); + }*/ + + /* public void testStreaming5() throws Throwable{ + doActualLogic(false,true, 15, 256); + }*/ + protected void doActualLogic(final boolean socketreuse,final boolean streaming, - final int secondspertest,final int threadcount) throws Exception{ - System.err.println(streaming?"STREAMING-":"LONGPOLLING-"+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount); + final int secondspertest,final int threadcount, boolean spreadnotify) throws Throwable{ + System.err.println((streaming?"STREAMING-":"LONGPOLLING-")+(socketreuse?"SOCKETREUSE":"NEWSOCKET")+" client threads: "+threadcount+" spreadNotifyToManyThreads: "+spreadnotify); //int cpus = Runtime.getRuntime().availableProcessors(); + ((DefaultNotificationHandler)CometTestAdapter.cometContext.notificationHandler). + setSpreadNotifyToManyToThreads(spreadnotify); testisdone = false; msgc.set(0); - CometTestAdapter.usetreaming = streaming; - status = true; + CometTestAdapter.usetreaming = streaming; final CountDownLatch threadsaredone = new CountDownLatch(threadcount); try{ + status = true; for (int i=0;i2300){ + if (deltatime>4500){ t1 = t2; int currenttotalmsg = msgc.get(); System.err.println( - " events/sec : "+((currenttotalmsg-oldtotal)*1000/deltatime)+ + " K events/sec : "+((currenttotalmsg-oldtotal+500)/deltatime)+ " comethandlers: "+CometTestAdapter.cometContext.handlers.size()+ - " cometWorkqueue: "+cometexecutor.getQueue().size() + " workqueue: "+queuesize+ + " broadcastsper: "+eventbroadcasts ); oldtotal = currenttotalmsg; } - int queuesize = cometexecutor.getQueue().size(); - if (queuesize < 10000){ - eventbroadcasts = (eventbroadcasts*5)/4; + + if (streaming){ + + /*if (queuesize < (spreadnotify?threadcount:1)*300 ){ + eventbroadcasts = (eventbroadcasts*5)/4; + }*/ + if (queuesize < (spreadnotify?threadcount:1)*100){ + for (int i=0;i 0){ Thread.sleep(socketreusedelayMilliSec); } @@ -287,11 +311,12 @@ private Socket newSocket(int timeout) throws Exception{ Socket socket = new Socket(); - socket.setReuseAddress(false); - socket.setReceiveBufferSize(8192); - socket.setSendBufferSize(1024); + socket.setReuseAddress(true); + //socket.setReceiveBufferSize(2048); + //socket.setSendBufferSize(512); socket.setSoLinger(false, 0); socket.setSoTimeout(timeout); + socket.setTcpNoDelay(true); socket.connect(connectadr); return socket; } Index: modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java =================================================================== --- modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (revision 2805) +++ modules/comet/src/test/java/com/sun/grizzly/comet/BasicCometTest.java (working copy) @@ -71,31 +71,34 @@ private int PORT = 18890; final CometContext test = CometEngine.getEngine().register("GrizzlyAdapter"); + @Override + protected void setUp() throws Exception { + super.setUp(); + test.setBlockingNotification(false); + } @Override protected void tearDown() throws Exception { super.tearDown(); - test.activeTasks.clear(); test.handlers.clear(); - stopGrizzlyWebServer(); - + stopGrizzlyWebServer(); } public void testOnInterruptExpirationDelay() throws Exception { - System.out.println("testOnInterruptExpirationDelay - will wait 5 seconds"); - final int delay = 5000; + System.out.println("testOnInterruptExpirationDelay - will wait 2 seconds"); + final int delay = 2000; test.setExpirationDelay(delay); newGWS(PORT+=1); String alias = "/OnInterrupt"; addAdapter(alias, false); gws.start(); - HttpURLConnection conn = getConnection(alias); + HttpURLConnection conn = getConnection(alias,delay+4000); long t1 = System.currentTimeMillis(); assertEquals(conn.getHeaderField(onInitialize), onInitialize); assertEquals(conn.getHeaderField(onInterrupt), onInterrupt); long delta = System.currentTimeMillis() - t1; - assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-200); + assertTrue("comet idletimeout was too fast,"+delta+"ms",delta > delay-250); assertTrue("comet idletimeout was too late,"+delta+"ms",delta < delay+3000); } @@ -109,7 +112,7 @@ Socket s = new Socket("localhost", PORT); s.setSoLinger(false, 0); - s.setSoTimeout(5 * 1000); + s.setSoTimeout(1 * 1000); OutputStream os = s.getOutputStream(); String a = "GET " + alias + " HTTP/1.1\n"+"Host: localhost:" + PORT + "\n\n"; System.out.println(" "+a); @@ -120,66 +123,75 @@ fail("client socket read did not read timeout"); } catch (SocketTimeoutException ex) { s.close(); - Thread.sleep(3 * 1000); + Thread.sleep(500); assertEquals(onInterrupt, ga.c.wasInterrupt); } + } + + public void testOnTerminate() throws IOException { + System.out.println("testOnTerminate "); + test.setExpirationDelay(-1); + newGWS(PORT+=3); + String alias = "/OnTerminate"; + addAdapter(alias,true); + gws.start(); + for (int i=0;i<10;i++){ + new Thread() { + @Override + public void run() { + try { + Thread.sleep(200); + test.notify(onTerminate, CometEvent.TERMINATE); + } catch (Throwable ex) { + ex.printStackTrace(); + fail("exception:"+ex.getMessage()); + } + } + }.start(); + HttpURLConnection conn = getConnection(alias,1000); + assertEquals(conn.getHeaderField(onInitialize) , onInitialize); + assertEquals(conn.getHeaderField(onTerminate), onTerminate); + conn.disconnect(); + } } - - public void testOnEvent() throws IOException { - System.out.println("testOnEvent - will wait 5 seconds"); - newGWS(PORT+=3); + + public void testOnEvent() throws Exception { + System.out.println("testOnEvent "); + newGWS(PORT+=4); String alias = "/OnEvent"; addAdapter(alias, true); test.setExpirationDelay(-1); gws.start(); - new Thread() { - - @Override - public void run() { - try { - Thread.sleep(5 * 1000); - test.notify(onEvent); - } catch (Throwable ex) { - Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex); - fail("sleep/notify exception:"+ex.getMessage()); + int iter = 10; + while(iter-->0){ + new Thread() { + @Override + public void run() { + try { + Thread.sleep(150); + test.notify(onEvent); + } catch (Throwable ex) { + Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex); + fail("sleep/notify exception:"+ex.getMessage()); + } } - } - }.start(); - - HttpURLConnection conn = getConnection(alias); - assertEquals(conn.getHeaderField(onInitialize), onInitialize); - assertEquals(conn.getHeaderField(onEvent), onEvent); + }.start(); + HttpURLConnection conn = getConnection(alias,1000); + assertEquals(conn.getHeaderField(onInitialize), onInitialize); + assertEquals(conn.getHeaderField(onEvent), onEvent); + conn.disconnect(); + } } - public void testOnTerminate() throws IOException { - System.out.println("testOnTerminate - will wait 5 seconds"); - test.setExpirationDelay(-1); - newGWS(PORT+=4); - String alias = "/OnTerminate"; - addAdapter(alias, false); - gws.start(); - new Thread() { - @Override - public void run() { - try { - Thread.sleep(5 * 1000); - test.notify(onTerminate, CometEvent.TERMINATE); - } catch (Exception ex) { - fail("exception:"+ex.getMessage()); - Logger.getLogger(BasicCometTest.class.getName()).log(Level.SEVERE, null, ex); - } - } - }.start(); - HttpURLConnection conn = getConnection(alias); - assertEquals(conn.getHeaderField(onInitialize) , onInitialize); - assertEquals(conn.getHeaderField(onTerminate), onTerminate); - } + private HttpURLConnection getConnection(String alias) throws IOException { + return getConnection(alias, 40*1000); + } - private HttpURLConnection getConnection(String alias) throws IOException { + private HttpURLConnection getConnection(String alias, int readtimeout) throws IOException { URL url = new URL("http", "localhost", PORT, alias); HttpURLConnection urlConn = (HttpURLConnection) url.openConnection(); - urlConn.setConnectTimeout(10*1000); - urlConn.setReadTimeout(40*1000); + urlConn.setConnectTimeout(5*1000); + urlConn.setReadTimeout(readtimeout); urlConn.connect(); return urlConn; } @@ -260,7 +272,6 @@ response.addHeader(onTerminate, event.attachment().toString()); response.getWriter().print(onTerminate); - event.getCometContext().resumeCometHandler(this); } public void onInterrupt(CometEvent event) throws IOException { Index: modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/concurrent/DefaultConcurrentCometHandler.java (working copy) @@ -40,34 +40,36 @@ import com.sun.grizzly.Controller; import com.sun.grizzly.comet.CometEvent; import com.sun.grizzly.comet.CometHandler; -import java.io.Closeable; +import com.sun.grizzly.comet.DefaultNotificationHandler; import java.io.IOException; import java.util.LinkedList; import java.util.logging.Logger; /** - * * we queue events in each comethandler to lower the probability * that slow or massive IO for one comethandler severly delays events to others.
*
+ * only streaming mode can benefit from buffering messages like this.
* only 1 thread at a time is allowed to do IO, * other threads put events in the queue and return to the thread pool.
*
* a thread initially calls enqueueEvent and stay there until there are no more - * events in the queue, calling the onEVent method in unsynchronized context for each Event.
+ * events in the queue, calling the onEVent method in synchronized context for each Event.
*
* on IOE in onEvent we terminate.
* we have a limit, to keep memory usage under control.
*
* if queue limit is reached onQueueFull is called, and then we terminate.
*
- * default implementation of onInterrupt and onTerminate performs a .close() if attachment instanceof Closeable
< *
* whats not optimal is that a worker thread is sticky to the client depending * uppon available events in the handlers local queue, * that can in theory allow a few clients to block all threads for extended time.
- * The improvement is that only 1 thread is tied up to a client instead of several - * being blocked by synchronized.
+ * that effect can make this implementation unusable depending on the scenario, + * its not a perfect design be any means. + *
+ * The potential improvement is that only 1 worker thread is tied up to a client instead of several + * being blocked by synchronized io wait for one comethandler .
* * @author Gustav Trede */ @@ -78,12 +80,8 @@ /** * used for preventing othe worker threads from the executor event queue from adding events * to the comethandlers local queue or starting IO logic after shuttdown.
- *
- * {@link DefaultNotificationHandler} sets shuttingdown = true when needed.
- * this way we dont need subclasses to remember to do super calls in the onXX methods.
- * todo: CometEvent.INTERRUPT should do cometHandler.shuttingdown = true; ? */ - protected volatile boolean shuttingdown; + private boolean shuttingdown; /** * max number of events to locally queue for this comethandler.
@@ -128,11 +126,8 @@ * further events in the internal queue. */ public void EnQueueEvent(CometEvent event){ - if (shuttingdown) - return; synchronized(messageQueue){ if (!isreadyforwork){ - // to prevent add of event when we are shutdown if (!shuttingdown && queuesize < messageQueueLimit){ messageQueue.add(event); queuesize++; @@ -149,12 +144,19 @@ return; } try{ - onEvent(event); - } catch (Throwable ex) { + //move synchronized outside the while loop ? + synchronized(this){ + onEvent(event); + } + } catch (IOException ex) { shuttingdown = true; - event.getCometContext().resumeCometHandler(this); - return; + }finally{ + if (shuttingdown){ + event.getCometContext().resumeCometHandler(this); + return; + } } + synchronized(messageQueue){ if (queuesize == messageQueueLimit){ queuefull = true; @@ -175,7 +177,7 @@ } /** - * called in unsynchronized context, not blocking other threads + * called in synchronized context. * when the comethandler's local event queue is full.
* default impl resumes the comethandler * @param event {@link CometEvent} @@ -183,14 +185,6 @@ public void onQueueFull(CometEvent event){ event.getCometContext().resumeCometHandler(this); } - - /** - * prevents further event handling in the enQueue method.
- * existing queued events will be discarded. - */ - public void shutdownQueue() { - shuttingdown = true; - } /** * returns the attachment @@ -229,14 +223,14 @@ } /** - * closes the connection if attachment instanceof Closable. + * */ protected void terminate(){ - if (attachment() instanceof Closeable){ + /* if (attachment() instanceof Closeable){ try { ((Closeable) attachment()).close(); } catch (IOException ex) { } - } + }*/ } } \ No newline at end of file Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometHandler.java (working copy) @@ -76,7 +76,11 @@ * everytime a {@link CometContext#notify} is invoked. The {@link CometEvent} * will contains the message that can be pushed back to the remote client, * cached or ignored. This method can also be used to resume a connection - * once a notified by invoking {@link CometContext#resumeCometHandler}. + * once a notified by invoking {@link CometContext#resumeCometHandler}.
+ * its not optimal to flush outputstream in this method for long polling, + * flush is performed in each CometContext.resume call.
+ * flushing multiple times can fragment the data into several tcp packets, + * that leads to extra IO and overhead in general due to client ack for each packet etc. */ public void onEvent(CometEvent event) throws IOException; Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometSelector.java (working copy) @@ -1,376 +0,0 @@ -/* - * - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * - * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * - * The contents of this file are subject to the terms of either the GNU - * General Public License Version 2 only ("GPL") or the Common Development - * and Distribution License("CDDL") (collectively, the "License"). You - * may not use this file except in compliance with the License. You can obtain - * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html - * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific - * language governing permissions and limitations under the License. - * - * When distributing the software, include this License Header Notice in each - * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. - * Sun designates this particular file as subject to the "Classpath" exception - * as provided by Sun in the GPL Version 2 section of the License file that - * accompanied this code. If applicable, add the following below the License - * Header, with the fields enclosed by brackets [] replaced by your own - * identifying information: "Portions Copyrighted [year] - * [name of copyright owner]" - * - * Contributor(s): - * - * If you wish your version of this file to be governed by only the CDDL or - * only the GPL Version 2, indicate your decision by adding "[Contributor] - * elects to include this software in this distribution under the [CDDL or GPL - * Version 2] license." If you don't indicate a single choice of license, a - * recipient has the option to distribute your version of this file under - * either the CDDL, the GPL Version 2 or to extend the choice of license to - * its licensees as provided above. However, if you add GPL Version 2 code - * and therefore, elected the GPL Version 2 license, then the option applies - * only if the new code is made subject to such option by the copyright - * holder. - * - */ - -package com.sun.grizzly.comet; - -import com.sun.grizzly.http.SelectorThread; -import com.sun.grizzly.util.LinkedTransferQueue; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.concurrent.CountDownLatch; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * NIO {@link Selector} allowing {@link CometHandler} to receive - * non-blocking requests bytes during request polling. - * - * TODO: investigate if its possible to move this functionality to grizzly main - * selector inorder to lower the extra overhead this 2nd selector is. - * - * @author Jeanfrancois Arcand - * @author Gustav Trede - */ -public class CometSelector { - - /** - * The {@link CometEngine} singleton - */ - protected final CometEngine cometEngine; - - - /** - * The {@link Selector} - */ - private Selector selector; - - /** - * timestamp when last expireIdleKeys() performed its check - */ - private long lastIdleCheck; - - /** - * Logger. - */ - private final Logger logger = SelectorThread.logger(); - - /** - * - */ - private final ByteBuffer dumybuffer = ByteBuffer.allocate(1); - - /** - * The list of {@link SelectionKey} to register with the - * {@link Selector} - * TODO: replace with LinkedTransferQueue - */ - private final LinkedTransferQueue keysToRegister - = new LinkedTransferQueue(); - - - /** - * New {@link CometSelector} - * @param cometEngine The {@link CometEngine} singleton - */ - public CometSelector(CometEngine cometEngine) { - this.cometEngine = cometEngine; - } - - - /** - * Start the {@link Selector} running on its - * Thread. - */ - public void start() throws InterruptedException{ - final CountDownLatch isStartedLatch = new CountDownLatch(1); - new Thread("CometSelector"){{ - setDaemon(true); - } - - @Override - public void run(){ - try{ - selector = Selector.open(); - } catch(IOException ex){ - // Most probably a fd leak. - logger.log(Level.SEVERE,"CometSelector.open()",ex); - return; - } - isStartedLatch.countDown(); - - doSelection(); - } - }.start(); - isStartedLatch.await(); - } - - /** - * the selection logic - */ - private void doSelection(){ - while (true){ - int selectorState = 0; - try{ - try{ - selectorState = selector.select(1000); - } catch (CancelledKeyException ex){ - if (logger.isLoggable(Level.FINEST)){ - logger.log(Level.FINEST,"CometSelector.open()",ex); - } - } - - handleSelectedKeys(); - expireIdleKeys(); - registerNewKeys(); - - } catch (Throwable t){ - handleException(t,null); - }finally{ - if (selectorState <= 0){ //todo why is this needed ? - selector.selectedKeys().clear(); - } - } - } - } - - /** - * handle the selected keys - */ - private void handleSelectedKeys(){ - for (SelectionKey cometKey:selector.selectedKeys()) { - try{ - if (cometKey.isValid()) { - CometTask cometTask = (CometTask)cometKey.attachment(); - boolean asyncExec = cometTask.isComethandlerisAsyncregistered(); - if (asyncExec){ - cometTask.setComethandlerisAsyncregistered(false); - if (cometKey.isReadable()){ - cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_READ)); - cometTask.upcoming_op = CometTask.OP_EVENT.READ; - } - - if (cometKey.isWritable()){ - cometKey.interestOps(cometKey.interestOps() & (~SelectionKey.OP_WRITE)); - cometTask.upcoming_op = CometTask.OP_EVENT.WRITE; - } - } - if (cometTask.getSelectionKey().attachment() == null){ - if (cometTask.cometHandlerNotResumed()){ - if (asyncExec){ - cometTask.execute(); - }else{ - checkIfclientClosedConnection(cometKey); - } - } - } else { - // logger.warning("cometselector comettask.mainkey has an attachment. "); - cancelKey(cometKey,false,true, true); - } - } else { - //logger.warning("cometselector select detected invalid cometKey."); - cancelKey(cometKey,false,true,true); - } - }catch(Throwable t){ - handleException(t, cometKey); - } - } - // one shot clear is alot faster then removing each element one by one. - selector.selectedKeys().clear(); - } - - /** - * - * @param cometKey - */ - private void checkIfclientClosedConnection(SelectionKey cometKey) { - boolean connectionclosed = true; - try { - SocketChannel socketChannel = (SocketChannel)cometKey.channel(); - dumybuffer.clear(); - connectionclosed = socketChannel.read(dumybuffer) == -1; - } catch (Throwable ex) { - // null we dont want cancelkey to happen here, cause it does not cancel mainKey - handleException(ex, null); - } - finally{ - if (connectionclosed) - cancelKey(cometKey, true, true, true); - } - } - - /** - * perform the registration of new keys. - * The mainKey is the SelectionKey returned by the - * Selector used in the SelectorThread class. - */ - private void registerNewKeys(){ - SelectionKey cometKey = null; - CometTask cometTask; - while ((cometTask = keysToRegister.poll()) != null ){ - try{ - SelectionKey mainKey = cometTask.getSelectionKey(); - SocketChannel channel = (SocketChannel)mainKey.channel(); - if (mainKey.isValid() && channel.isOpen()) { - cometKey = channel.register(selector,SelectionKey.OP_READ); - cometTask.setCometKey(cometKey); - cometKey.attach(cometTask); - cometTask.getCometContext().addActiveCometTask(cometTask); - cometTask.getCometContext(). - addActiveHandler(cometTask.getCometHandler(), cometKey); - cometKey = null; - } - }catch(Throwable t){ - handleException(t, cometKey); - } - } - } - - /** - * Expires registered {@link SelectionKey}. If a - * {@link SelectionKey} is expired, the request will be resumed and the - * HTTP request will complete, - */ - private void expireIdleKeys(){ - if (selector.keys().isEmpty()){ - return; - } - - final long current = System.currentTimeMillis(); - if (current - lastIdleCheck < 1000){ - return; - } - - lastIdleCheck = current; - for (SelectionKey cometKey:selector.keys()){ - try{ - CometTask cometTask = (CometTask)cometKey.attachment(); - if (cometTask == null) - continue; - if (cometTask.hasExpired(current)){ - cancelKey(cometKey,false,true, true); - continue; - } - /** - * The connection has been resumed since the timeout is - * re-attached to the SelectionKey so cancel the Comet key. - */ - if (cometTask.getSelectionKey().attachment() instanceof Long){ - cometKey.attach(null); - cometKey.cancel(); - } - }catch(Throwable t){ - handleException(t, cometKey); - } - } - } - - /** - * handle exceptions for selection logic - * @param t - * @param key - */ - private void handleException(Throwable t, SelectionKey key){ - try{ - cancelKey(key,false,true, true); - } catch (Throwable t2){ - logger.log(Level.SEVERE,"CometSelector",t2); - } - if (logger.isLoggable(Level.FINEST)){ - logger.log(Level.FINEST,"CometSelector",t); - } - } - - - /** - * Cancel the {@link SelectionKey} associated with a suspended connection. - */ - protected boolean cancelKey(SelectionKey cometKey, boolean cancelMainKey, - boolean removeCometHandler, boolean notifyInterrupt){ - if (cometKey == null){ //cometcontext.resume can give a null cometkey - return false; - } - boolean status = true; - CometTask cometTask = null; - // attach is only atomic since dolphin b06 , hence we must synchronize - // until we can require dolphin - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6436220 - synchronized(cometKey){ - cometTask = (CometTask) cometKey.attach(null); - if (cometTask != null){ - //synchronizes internally on itself and canceledkeyset, - //we want hotspot to use lock coarsening. - cometKey.cancel(); - } - } - status = cometTask != null; - if (status){ - status = cometTask.getCometContext().interrupt(cometTask, - removeCometHandler, notifyInterrupt); - cometEngine.flushPostExecute(cometTask.getAsyncProcessorTask()); - - if (cancelMainKey){ - cometTask.getSelectorThread().cancelKey(cometTask.getSelectionKey()); - } - } - return status; - } - - /** - * Register the {@link SelectionKey} to the {@link Selector}. We - * cannot register the {@link SelectionKey} directy on the - * {@link Selector} because there is a deadlock in the VM (see bug XXX). - */ - public void registerKey(CometTask cometTask){ - if (cometTask.getSelectionKey().isValid() && selector != null){ - cometTask.setExpireTime(System.currentTimeMillis()); - keysToRegister.offer(cometTask); - selector.wakeup(); - } - } - - - /** - * Wakes up the {@link Selector} - */ - public void wakeup(){ - selector.wakeup(); - } - - /** - * Return the SelectionKey associated with this channel. - */ - public SelectionKey cometKeyFor(SelectableChannel channel){ - return channel.keyFor(selector); - } - -} Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEvent.java (working copy) @@ -38,13 +38,15 @@ package com.sun.grizzly.comet; +import java.io.Serializable; + /** * Simple event class used to pass information between {@link CometHandler} * and the Comet implementation. * * @author Jeanfrancois Arcand */ -public class CometEvent { +public class CometEvent implements Serializable{ /** @@ -98,7 +100,7 @@ /** * The CometContext from where this instance was fired. */ - private CometContext cometContext; + private transient CometContext cometContext; /** @@ -116,6 +118,12 @@ this.type = type; this.cometContext = context; } + + public CometEvent(int type, CometContext cometContext, E attachment) { + this.type = type; + this.attachment = attachment; + this.cometContext = cometContext; + } /** * Return the type of this object. Index: modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/DefaultNotificationHandler.java (working copy) @@ -48,7 +48,7 @@ /** * Default Notificationhandler that uses a thread pool dedicated to the CometEngine - * to execute the notification process. + * to execute the notification process.
* * @author Jeanfrancois Arcand * @author Gustav Trede @@ -75,7 +75,7 @@ /** * only used if blockingnotification == false and threadpool != null */ - private boolean spreadNotifyToManyToThreads = true; + private boolean spreadNotifyToManyToThreads = false; public DefaultNotificationHandler() { } @@ -104,8 +104,21 @@ public void setBlockingNotification(boolean blockingNotification) { this.blockingNotification = blockingNotification; } + + /** + * if true a notify to Iterator will be spread into one runnable task for + * each comethandler. + * if false , all comethandlers notify will be executed in 1 Runnable, after each other, + * + * @param spreadNotifyToManyToThreads + */ + public void setSpreadNotifyToManyToThreads(boolean spreadNotifyToManyToThreads) { + this.spreadNotifyToManyToThreads = spreadNotifyToManyToThreads; + } - + + + /** * Notify all {@link CometHandler}. * @param cometEvent the CometEvent used to notify CometHandler @@ -162,49 +175,36 @@ protected void notify0(CometEvent cometEvent,CometHandler cometHandler) { try{ switch (cometEvent.getType()) { - case CometEvent.INTERRUPT: - if (cometHandler instanceof DefaultConcurrentCometHandler){ - ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue(); - //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do - cometHandler.onInterrupt(cometEvent); - }else - synchronized(cometHandler){ - cometHandler.onInterrupt(cometEvent); - } - break; + case CometEvent.INTERRUPT: + cometHandler.onInterrupt(cometEvent); break; case CometEvent.NOTIFY: case CometEvent.READ: case CometEvent.WRITE: - if (cometHandler instanceof DefaultConcurrentCometHandler) + if (cometHandler instanceof DefaultConcurrentCometHandler){ ((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(cometEvent); - else - if (cometEvent.getCometContext().isActive(cometHandler)) + break; + } + if (cometEvent.getCometContext().isActive(cometHandler)){ synchronized(cometHandler){ cometHandler.onEvent(cometEvent); } + } break; case CometEvent.INITIALIZE: - cometHandler.onInitialize(cometEvent); - break; + cometHandler.onInitialize(cometEvent); break; case CometEvent.TERMINATE: - if (cometHandler instanceof DefaultConcurrentCometHandler){ - ((DefaultConcurrentCometHandler)cometHandler).shutdownQueue(); - cometHandler.onTerminate(cometEvent); //todo how do we synchronize ?, the defaultConcurrentcomethandler can do that, but we dont know if other implementations do - }else - synchronized(cometHandler){ - cometHandler.onTerminate(cometEvent); - } - break; + synchronized(cometHandler){ + cometHandler.onTerminate(cometEvent); break; + } default: throw ISEempty; } } catch (Throwable ex) { try { - cometEvent.getCometContext().resumeCometHandler(cometHandler, true); + cometEvent.getCometContext().resumeCometHandler(cometHandler); } catch (Throwable t) { logger.log(Level.FINE, "Resume phase failed: ", t); } - //todo cant log this at WARNING level.. its normal to have alot of failed notifications. imagine several K users in the real world.. logger.log(Level.FINE, "Notification failed: ", ex); } } Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometTask.java (working copy) @@ -1,9 +1,9 @@ /* - * + * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. - * + * * Copyright 2007-2008 Sun Microsystems, Inc. All rights reserved. - * + * * The contents of this file are subject to the terms of either the GNU * General Public License Version 2 only ("GPL") or the Common Development * and Distribution License("CDDL") (collectively, the "License"). You @@ -11,7 +11,7 @@ * a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html * or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific * language governing permissions and limitations under the License. - * + * * When distributing the software, include this License Header Notice in each * file and include the License file at glassfish/bootstrap/legal/LICENSE.txt. * Sun designates this particular file as subject to the "Classpath" exception @@ -20,9 +20,9 @@ * Header, with the fields enclosed by brackets [] replaced by your own * identifying information: "Portions Copyrighted [year] * [name of copyright owner]" - * + * * Contributor(s): - * + * * If you wish your version of this file to be governed by only the CDDL or * only the GPL Version 2, indicate your decision by adding "[Contributor] * elects to include this software in this distribution under the [CDDL or GPL @@ -44,127 +44,179 @@ import com.sun.grizzly.ProtocolChain; import com.sun.grizzly.arp.AsyncProcessorTask; import com.sun.grizzly.http.SelectorThread; -import com.sun.grizzly.util.InputReader; -import com.sun.grizzly.http.TaskBase; +import com.sun.grizzly.http.Task; +import com.sun.grizzly.util.SelectedKeyAttachmentLogic; import com.sun.grizzly.util.WorkerThread; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.logging.Level; +import java.util.logging.Logger; /** * A {@link Task} implementation that allow Grizzly ARP to invokeCometHandler - * {@link CometHandler} when new data (bytes) are available from the + * {@link CometHandler} when new data (bytes) are available from the * {@link CometSelector}. * * @author Jeanfrancois Arcand + * @author Gustav Trede */ -public class CometTask extends TaskBase{ +public class CometTask extends SelectedKeyAttachmentLogic implements Runnable{ - - public enum OP_EVENT { READ, WRITE } - - + private static final Logger logger = SelectorThread.logger(); + /** - * The current non blocking operation. + * The {@link CometContext} associated with this instance. */ - protected OP_EVENT upcoming_op = OP_EVENT.READ; - - + protected final CometContext cometContext; + /** - * The {@link CometContext} associated with this instance. + * The {@link CometHandler} associated with this task. */ - private CometContext cometContext; - - + protected final CometHandler cometHandler; + /** - * The {@link CometSelector} . + * The {@link AsyncProcessorTask} */ - private CometSelector cometSelector; + private AsyncProcessorTask asyncProcessorTask; - /** - * The time in milliseconds before this object was registered the - * {@link SelectionKey} on the {@link CometSelector} + * true if comethandler is registered for async IO in cometcontext. + * used to optmize: + * dont give simple read == -1 operations to thread pool */ - private long expireTime ; + protected volatile boolean cometHandlerIsAsyncRegistered; - /** - * used by cometselector to optmize: - * dont give simple read == -1 operations to thread pool + * The current non blocking operation. */ - private volatile boolean comethandlerisAsyncregistered; - + protected boolean upcoming_op_isread; + /** - * The InputStream used to read bytes from the {@link CometSelector} + * true if run() should call cometcontext.interrupt0 */ - private InputReader cometInputStream; - - + protected boolean callInterrupt; + /** - * The CometSelector registered key. + * true if interrupt should flushAPT */ - private SelectionKey cometKey; + protected boolean interruptFlushAPT; + /** - * The {@link AsyncProcessorTask} + * New {@link CometTask}. */ - private AsyncProcessorTask asyncProcessorTask; + public CometTask(CometContext cometContext, CometHandler cometHandler) { + this.cometContext = cometContext; + this.cometHandler = cometHandler; + } /** - * The {@link CometEvent} associated with this task. + * performs doTask() or cometContext.interrupt0 */ - private CometEvent event; - + public void run(){ + if (callInterrupt){ + cometContext.interrupt0(this, true, interruptFlushAPT, true); + }else{ + try{ + doTask(); + } catch (IOException ex){ + throw new RuntimeException(ex); + } + } + } + /** - * The {@link CometHandler} associated with this task. + * {@inheritDoc} */ - private CometHandler cometHandler; + @Override + public long getIdleTimeoutDelay() { + return cometContext.getExpirationDelay(); + } /** - * The CometWriter associated with this task. + * this should never be called for for comet, due to we are nulling the attachment + * and completely overriding the selector.select logic.
+ * called by grizzly when the selectionkey is canceled and its socket closed.
+ * + * @param selectionKey */ - private CometWriter writer; - - + @Override + public void release(SelectionKey selectionKey) { + //logger.warning("cometTask.release() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+selectionKey.attachment()); + //cometContext.interrupt(this, true, false,false, true); + } + /** - * The CometReader associated with this task. + * {@inheritDoc} */ - private CometReader reader; - - + @Override + public boolean timedOut(SelectionKey key){ + //System.err.println("cometTask.timedout() : isactive: "+cometContext.isActive(cometHandler)+" attachment:"+key.attachment()); + cometContext.interrupt(this, true, true, true, true); + return false; + } + /** - * true if the CometHandler has been registered for OP_READ - * events. - * false by default. java lang specification states that. + * {@inheritDoc} */ - private boolean asyncReadSupported ; - - + @Override + public void handleSelectedKey(SelectionKey selectionKey) { + if (!selectionKey.isValid()){ + cometContext.interrupt(this, true, false,true, true); + return; + } + if (cometHandlerIsAsyncRegistered){ + if (selectionKey.isReadable()){ + selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ)); + upcoming_op_isread = true; + } + if (selectionKey.isWritable()){ + selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_WRITE)); + upcoming_op_isread = false; + } + asyncProcessorTask.getThreadPool().execute(this); + } + else{ + checkIfClientClosedConnection(selectionKey); + } + } + /** - * Is this Task suspended. + * checks if client has closed the connection. + * the check is done by trying to read 1 byte that is trown away. + * only used for non async registered comethandler. + * @param mainKey */ - private boolean isSuspended = false; - - /** - * New {@link CometTask}. - */ - public CometTask() { + private void checkIfClientClosedConnection(SelectionKey mainKey) { + boolean connectionclosed = true; + try { + connectionclosed = ((SocketChannel)mainKey.channel()). + read(ByteBuffer.allocate(1)) == -1; + } catch (IOException ex) { + + } + finally{ + if (connectionclosed){ + cometContext.interrupt(this, true, false,true, true); + }else{ + //cometContext.interrupt(this, false, false, true,false, true); + //System.err.println("**** ready key detected : "+mainKey.attachment() +" isactive:"+cometContext.isActive(cometHandler)); + } + } } - + /** * Notify the {@link CometHandler} that bytes are available for read. * The notification will invoke all {@link CometContext} */ - public void doTask() throws IOException{ + public void doTask() throws IOException{ // The CometHandler has been resumed. if (!cometContext.isActive(cometHandler) ){ return; } - /** * The CometHandler in that case is **always** invoked using this * thread so we can re-use the Thread attribute safely. @@ -172,41 +224,31 @@ ByteBuffer byteBuffer = null; boolean connectionClosed = false; boolean clearBuffer = true; + final SelectionKey key = getSelectionKey(); try{ - - if (cometInputStream == null){ - cometInputStream = new InputReader(); - } - - cometInputStream.setSelectionKey(cometKey); byteBuffer = ((WorkerThread)Thread.currentThread()).getByteBuffer(); if (byteBuffer == null){ - byteBuffer = ByteBuffer.allocate(selectorThread.getBufferSize()); + byteBuffer = ByteBuffer.allocate(asyncProcessorTask.getSelectorThread().getBufferSize()); ((WorkerThread)Thread.currentThread()).setByteBuffer(byteBuffer); } else { byteBuffer.clear(); } - cometInputStream.setByteBuffer(byteBuffer); - SocketChannel socketChannel = (SocketChannel)cometKey.channel(); - if (upcoming_op == OP_EVENT.READ){ + SocketChannel socketChannel = (SocketChannel)key.channel(); + if (upcoming_op_isread){ /* * We must execute the first read to prevent client abort. - */ - int nRead = socketChannel.read(byteBuffer); + */ + int nRead = socketChannel.read(byteBuffer); if (nRead == -1 ){ connectionClosed = true; - } else { - /* + } else { + /* * This is an HTTP pipelined request. We need to resume - * the continuation and invoke the http parsing + * the continuation and invoke the http parsing * request code. */ - if (!asyncReadSupported){ - // Don't let the main Selector (SelectorThread) starts - // handling the pipelined request. - key.attach(Long.MIN_VALUE); - + if (!cometHandlerIsAsyncRegistered){ /** * Something when wrong, most probably the CometHandler * has been resumed or removed by the Comet implementation. @@ -214,57 +256,57 @@ if (!cometContext.isActive(cometHandler)){ return; } - + // Before executing, make sure the connection is still - // alive. This situation happens with SSL and there + // alive. This situation happens with SSL and there // is not a cleaner way fo handling the browser closing // the connection. - nRead = socketChannel.read(byteBuffer); + nRead = socketChannel.read(byteBuffer); if (nRead == -1){ connectionClosed = true; return; } - - cometContext.resumeCometHandler(cometHandler, false); + //resume without remove: + try{ + cometHandler.onInterrupt(cometContext.eventInterrupt); + }catch(IOException e) { } + CometEngine.cometEngine.flushPostExecute(this,true,false); + clearBuffer = false; - + Controller controller = getSelectorThread().getController(); - ProtocolChain protocolChain = + ProtocolChain protocolChain = controller.getProtocolChainInstanceHandler().poll(); - NIOContext ctx = (NIOContext)controller.pollContext(key); + NIOContext ctx = (NIOContext)controller.pollContext(key); ctx.setController(controller); ctx.setSelectionKey(key); ctx.setProtocolChain(protocolChain); ctx.setProtocol(Protocol.TCP); - protocolChain.execute(ctx); + protocolChain.execute(ctx); } else { - byteBuffer.flip(); - reader = new CometReader(); + byteBuffer.flip(); + CometReader reader = new CometReader(); reader.setNRead(nRead); reader.setByteBuffer(byteBuffer); - if (event == null) - event = new CometEvent(); - event.type = CometEvent.READ; + CometEvent event = new CometEvent(CometEvent.READ,cometContext); event.attach(reader); cometContext.invokeCometHandler(event,cometHandler); reader.setByteBuffer(null); - + // This Reader is now invalid. Any attempt to use // it will results in an IllegalStateException. reader.setReady(false); } } - } else if (upcoming_op == OP_EVENT.WRITE){ - if (event == null) - event = new CometEvent(); - event.type = CometEvent.WRITE; - writer = new CometWriter(); + } else { + CometEvent event = new CometEvent(CometEvent.WRITE,cometContext); + CometWriter writer = new CometWriter(); writer.setChannel(socketChannel); event.attach(writer); - cometContext.invokeCometHandler(event,cometHandler); - + cometContext.invokeCometHandler(event,cometHandler); + // This Writer is now invalid. Any attempt to use - // it will results in an IllegalStateException. + // it will results in an IllegalStateException. writer.setReady(false); } } catch (IOException ex){ @@ -275,170 +317,84 @@ } } catch (Throwable t){ connectionClosed = true; - SelectorThread.logger().log(Level.SEVERE,"Comet exception",t); - } finally { + SelectorThread.logger().log(Level.SEVERE,"Comet exception",t); + } finally { + cometHandlerIsAsyncRegistered = false; + // Bug 6403933 if (connectionClosed){ - cometSelector.cancelKey(cometKey,true,true, true); + asyncProcessorTask.getSelectorThread().cancelKey(key); } - + if (clearBuffer && byteBuffer != null){ byteBuffer.clear(); } - asyncReadSupported = false; } } - public void setComethandlerisAsyncregistered(boolean comethandlerisAsyncregistered) { - this.comethandlerisAsyncregistered = comethandlerisAsyncregistered; + /** + * sets the comettask async interest flag in the comettask + * @param + */ + public void setComethandlerIsAsyncRegistered(boolean cometHandlerIsAsyncRegistered) { + this.cometHandlerIsAsyncRegistered = cometHandlerIsAsyncRegistered; } - public boolean isComethandlerisAsyncregistered() { - return comethandlerisAsyncregistered; - } - /** - * returns true if the CometHandler has not been resumed / removed. - * allows cometSelector to do a fast check before leting threadpool execute the comettask + * returns true if the comethandler is registered for async io * @return */ - public boolean cometHandlerNotResumed(){ - return cometContext.isActive(cometHandler); + public boolean isComethandlerAsyncRegistered() { + return cometHandlerIsAsyncRegistered; } - + /** * Return the {@link CometContext} associated with this instance. - * @return CometContext the {@link CometContext} associated with this + * @return CometContext the {@link CometContext} associated with this * instance. */ public CometContext getCometContext() { return cometContext; } - - - /** - * Set the {@link CometContext} used to invokeCometHandler {@link CometHandler}. - * @param cometContext the {@link CometContext} used to invokeCometHandler {@link CometHandler} - */ - public void setCometContext(CometContext cometContext) { - this.cometContext = cometContext; - } - /** - * Recycle this object. + * returns the {@link AsyncProcessorTask } + * @return {@lnk AsyncProcessorTask } */ - @Override - public void recycle(){ - isSuspended = false; - key = null; - cometContext = null; - asyncReadSupported = false; - if(cometInputStream != null) { - cometInputStream.recycle(); - } + public AsyncProcessorTask getAsyncProcessorTask() { + return asyncProcessorTask; } - /** - * Return the {@link CometSelector} - * @return CometSelector the {@link CometSelector} + * sets the {@link AsyncProcessorTask } + * @param {@link AsyncProcessorTask } */ - public CometSelector getCometSelector() { - return cometSelector; + public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) { + this.asyncProcessorTask = asyncProcessorTask; } - /** - * Set the {@link CometSelector} - * @param cometSelector the {@link CometSelector} - */ - public void setCometSelector(CometSelector cometSelector) { - this.cometSelector = cometSelector; - } - - - /** - * Return the time in milliseconds before this object was registered the - * {@link SelectionKey} on the {@link CometSelector} - * @return long Return the time in milliseconds before this object was - * registered the {@link SelectionKey} on the - * {@link CometSelector} + * returns selectionkey + * @return */ - public long getExpireTime() { - return expireTime; + public SelectionKey getSelectionKey() { + return asyncProcessorTask.getAsyncExecutor().getProcessorTask().getSelectionKey(); } - /** - * Set the time in milliseconds before this object was registered the - * {@link SelectionKey} on the {@link CometSelector} - * @param expireTime Return the time in milliseconds before this object was - * registered the {@link SelectionKey} on the - * {@link CometSelector} - */ - public void setExpireTime(long expireTime) { - this.expireTime = expireTime; - } - - - /** - * Return the {@link CometSelector}'s {@link SelectionKey}. + * returns the {@link AsyncProcessorTask } + * @return {@link AsyncProcessorTask } */ - public SelectionKey getCometKey() { - return cometKey; + private SelectorThread getSelectorThread(){ + return asyncProcessorTask.getSelectorThread(); } - /** - * Set the {@link CometSelector}'s {@link SelectionKey}. + * returns the {@link CometHandler } + * @return {@link CometHandler } */ - public void setCometKey(SelectionKey cometKey) { - this.cometKey = cometKey; - } - - - public boolean isAsyncReadSupported() { - return asyncReadSupported; - } - - - public void setAsyncReadSupported(boolean asyncReadSupported) { - this.asyncReadSupported = asyncReadSupported; - } - - /** - * Return true if cometContext.getExpirationDelay() != -1 - * && timestamp - expireTime >= cometContext.getExpirationDelay(); - * @param timestamp - * @return - */ - protected boolean hasExpired(long timestamp){ - long expdelay = cometContext.getExpirationDelay(); - return expdelay != -1 && timestamp - expireTime >= expdelay; - } - - public AsyncProcessorTask getAsyncProcessorTask() { - return asyncProcessorTask; - } - - public void setAsyncProcessorTask(AsyncProcessorTask asyncProcessorTask) { - this.asyncProcessorTask = asyncProcessorTask; - } - public CometHandler getCometHandler() { return cometHandler; } - public void setCometHandler(CometHandler cometHandler) { - this.cometHandler = cometHandler; - } - - public boolean isSuspended() { - return isSuspended; - } - - public void setSuspended(boolean isSuspended) { - this.isSuspended = isSuspended; - } } Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometContext.java (working copy) @@ -40,6 +40,7 @@ import com.sun.grizzly.comet.concurrent.DefaultConcurrentCometHandler; import com.sun.grizzly.http.SelectorThread; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.IOException; import java.nio.channels.SelectionKey; import java.util.Iterator; @@ -100,7 +101,7 @@ * is doing. It is not recommended to use attributes if this * {@link CometContext} is not shared amongs multiple * context path (uses {@link HttpServletSession} instead). - *

+ *

* @author Jeanfrancois Arcand * @author Gustav Trede */ @@ -135,13 +136,6 @@ /** - * The {@link CometSelector} used to register {@link SelectionKey} - * for upcoming bytes. - */ - protected CometSelector cometSelector; - - - /** * The {@link CometContext} continuationType. See {@link CometEngine} */ protected int continuationType = CometEngine.AFTER_SERVLET_PROCESSING; @@ -151,14 +145,14 @@ * The default delay expiration before a {@link CometContext}'s * {@link CometHandler} are interrupted. */ - private long expirationDelay = 30 * 1000; + private long expirationDelay ; /** * true if the caller of {@link #notify} should block when * notifying other CometHandler. */ - protected boolean blockingNotification = false; + protected boolean blockingNotification; /** @@ -168,23 +162,20 @@ /** - * timestamp for last performed resetSuspendidletimeout. + * timestamp for next idlecheck * used to limit the frequency of actual performed resets. */ - private volatile long lastIdleReset; + private volatile long nextidleclear; /** - * Current associated list of {@link CometTask} - */ - protected final ConcurrentHashMap activeTasks; - - /** * The list of registered {@link CometHandler} */ - protected final ConcurrentHashMap handlers; + protected final ConcurrentHashMap handlers; - private final CometEvent eventInterrupt; + protected final CometEvent eventInterrupt; + protected final CometEvent eventTerminate; + private final CometEvent eventInitialize; private static final IllegalStateException ISE = new IllegalStateException(INVALID_COMET_HANDLER); @@ -193,26 +184,34 @@ new IllegalStateException("Make sure you have enabled Comet " + "or make sure the Thread invoking that method is the same " + "as the Servlet.service() Thread."); - - // ---------------------------------------------------------------------- // - - + /** * Create a new instance * @param topic the context path * @param type when the Comet processing will happen (see {@link CometEngine}). */ public CometContext(String topic, int continuationType) { - this.topic = topic; + this.topic = topic; this.continuationType = continuationType; - this.attributes = new ConcurrentHashMap(); - this.handlers = new ConcurrentHashMap(16,0.75f,64); - this.activeTasks = new ConcurrentHashMap(16,0.75f,64); - this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this); - this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this); + this.attributes = new ConcurrentHashMap(); + this.handlers = new ConcurrentHashMap(8,0.75f,64); + this.eventInterrupt = new CometEvent(CometEvent.INTERRUPT,this); + this.eventInitialize = new CometEvent(CometEvent.INITIALIZE,this); + this.eventTerminate = new CometEvent(CometEvent.TERMINATE,this); + initDefaultValues(); } /** + * init of default values. + * used by constructor and the cache recycle mechanism + */ + private void initDefaultValues() { + blockingNotification = false; + expirationDelay = 30*1000; + nextidleclear = 0; + } + + /** * Get the context path associated with this instance. * @return topic the context path associated with this instance * @deprecated - use getTopic. @@ -289,14 +288,11 @@ if (!CometEngine.getEngine().isCometEnabled()){ throw cometNotEnabled; } - handlers.put(handler, CometEngine.dumykey); - // is it ok that we only manage one adcomethandler call ? - CometTask cometTask = new CometTask(); - cometTask.setCometContext(this); - cometTask.setCometHandler(handler); - cometTask.setSuspended(alreadySuspended); + // is it ok that we only manage one addcomethandler call per thread ? + // else we can use a list of handlers to add inside tlocal + CometTask cometTask = new CometTask(this,handler); + cometTask.upcoming_op_isread = alreadySuspended; CometEngine.updatedContexts.set(cometTask); - return handler.hashCode(); } @@ -316,6 +312,7 @@ /** * Retrieve a {@link CometHandler} using its based on its {@link CometHandler#hashCode}; */ + @Deprecated public CometHandler getCometHandler(int hashCode){ for (CometHandler handler:handlers.keySet()){ if (handler.hashCode() == hashCode ) @@ -328,20 +325,28 @@ * Recycle this object. */ public void recycle(){ + try{ + notify(this,CometEvent.TERMINATE); + } catch (IOException ex) { + + } handlers.clear(); attributes.clear(); - activeTasks.clear(); topic = null; + notificationHandler = null; + initDefaultValues(); + // add check for datastructure size, if cometcontext had large + // datastructes its probably not optimal to waste RAM with caching it + CometEngine.cometEngine.cometContextCache.offer(this); } /** - * adds a {@link CometHandler} to the active set - * @param handler {@link CometHandler} - * @param cometKey {@link SelectionKey} + * adds a {@link CometTask} to the active set + * @param cometTask {@link CometTask} */ - protected void addActiveHandler(CometHandler handler, SelectionKey cometKey){ - handlers.put(handler, cometKey); + protected void addActiveHandler(CometTask cometTask){ + handlers.put(cometTask.cometHandler, cometTask); } /** @@ -358,7 +363,9 @@ if (cometHandler instanceof DefaultConcurrentCometHandler){ ((DefaultConcurrentCometHandler)cometHandler).EnQueueEvent(event); }else{ - cometHandler.onEvent(event); + synchronized(cometHandler){ + cometHandler.onEvent(event); + } } } @@ -385,12 +392,10 @@ * @return true if the operation succeeded. */ public boolean removeCometHandler(CometHandler handler,boolean resume){ - CometEngine.updatedContexts.set(null); - SelectionKey key = handlers.remove(handler); - if (key != null){ + CometTask task = handlers.remove(handler); + if (task != null){ if (resume){ - CometEngine.getEngine().flushPostExecute( - ((CometTask)key.attachment()).getAsyncProcessorTask()); + CometEngine.getEngine().flushPostExecute(task,true,false); } return true; } @@ -405,22 +410,18 @@ * @param hashCode The hashcode of the CometHandler to remove. * @return true if the operation succeeded. */ + @Deprecated public boolean removeCometHandler(int hashCode){ - CometEngine.updatedContexts.set(null); - Iterator iterator = handlers.keySet().iterator(); - CometHandler handler = null; - while (iterator.hasNext()){ - handler = iterator.next(); + CometHandler handler_ = null; + for (CometHandler handler:handlers.keySet()){ if (handler.hashCode() == hashCode){ - SelectionKey key = handlers.get(handler); - if (key == null){ - logger.warning(ALREADY_REMOVED); - return false; - } - iterator.remove(); - return true; - } + handler_ = handler; + break; + } } + if (handler_ != null){ + return handlers.remove(handler_) != null; + } return false; } @@ -435,70 +436,68 @@ * @return true if the operation succeeded. */ public boolean resumeCometHandler(CometHandler handler){ - return resumeCometHandler(handler,true); - } - - - /** - * Resume the suspended response. A response can only be suspended when - * {@link CometContext#addCometHandler} was called first. - * - * @param handler The CometHandler associated with the current continuation. - * @param remove true if the CometHandler needs to be removed. - * @return true if the operation succeeded. - */ - protected boolean resumeCometHandler(CometHandler handler, boolean remove){ - CometEngine.updatedContexts.set(null); - boolean b= cometSelector.cancelKey(handlers.get(handler), false, remove, false); - - // Try a second time to locate the associated CometTask - if (!b){ - for (CometTask cometTask:activeTasks.keySet()){ - if (cometTask.getCometHandler() == handler){ - interrupt(cometTask, remove, false); - CometEngine.getEngine().flushPostExecute( - cometTask.getAsyncProcessorTask()); - activeTasks.remove(cometTask); - return true; - } - } + boolean status = interrupt(handlers.get(handler),false,true,false,false); + if (status){ + try { + handler.onTerminate(eventTerminate); + } catch (IOException ex) { } } - return b; + return status; } /** * Interrupt a {@link CometHandler} by invoking {@link CometHandler#onInterrupt} */ - protected boolean interrupt(CometTask task,boolean removeCometHandler, - boolean notifyInterrupt) { - - boolean status = true; - try{ - if (removeCometHandler){ - status = (handlers.remove(task.getCometHandler()) != null); - if (status && notifyInterrupt){ - task.getCometHandler().onInterrupt(eventInterrupt); - }else{ - logger.fine(ALREADY_REMOVED); + protected boolean interrupt(final CometTask task, + final boolean notifyInterrupt, final boolean flushAPT, + final boolean cancelkey, boolean asyncExecution) { + if (task != null && handlers.remove(task.cometHandler) != null){ + final SelectionKey key = task.getSelectionKey(); + // setting attachment non asynced to ensure grizzly dont keep calling us + key.attach(System.currentTimeMillis()); + if (asyncExecution){ + if (cancelkey){ + // dont want to do that in non selector thread: + // canceled key wont get canceled again due to isvalid check + key.cancel(); } + task.callInterrupt = true; + task.interruptFlushAPT = flushAPT; + //((WorkerThreadImpl)Thread.currentThread()). + // getPendingIOhandler().addPendingIO(task); + task.run(); + + }else{ + interrupt0(task, notifyInterrupt, flushAPT, cancelkey); } - } catch (Throwable ex){ - status = false; - logger.log(Level.FINE,"Unable to interrupt",ex); - }finally{ - activeTasks.remove(task); - return status; + return true; } + return false; } - + /** + * interrupt logic in its own method, so it can be executed either async or sync.
+ * cometHandler.onInterrupt is performed async due to its functionality is unknown, + * hence not safe to run in the performance critical selector thread. + */ + protected void interrupt0(CometTask task, + boolean notifyInterrupt, boolean flushAPT, boolean cancelkey){ + if (notifyInterrupt){ + try{ + task.cometHandler.onInterrupt(eventInterrupt); + }catch(IOException e) { } + } + CometEngine.cometEngine.flushPostExecute(task,flushAPT,cancelkey); + } + + /** * Return true if this {@link CometHandler} is still active, e.g. there is * still a suspended connection associated with it. * * @return true */ public boolean isActive(CometHandler handler){ - return handlers.containsKey(handler) || CometEngine.updatedContexts.get() != null; + return handlers.containsKey(handler); } /** @@ -507,7 +506,7 @@ * of type NOTIFY. * @param attachment An object shared amongst {@link CometHandler}. */ - public void notify(final E attachment) throws IOException{ + public void notify(final Object attachment) throws IOException{ notify(attachment, CometEvent.NOTIFY); } @@ -527,7 +526,7 @@ * @param cometHandlerID Notify a single CometHandler. * @deprecated - use notify(attachment,eventType,CometHandler; */ - public void notify(final E attachment,final int eventType,final int cometHandlerID) + public void notify(final Object attachment,final int eventType,final int cometHandlerID) throws IOException{ notify(attachment,eventType,getCometHandler(cometHandlerID)); } @@ -537,7 +536,7 @@ * @param attachment An object shared amongst {@link CometHandler}. * @param {@link CometHandler} to notify. */ - public void notify(final E attachment,final CometHandler cometHandler) + public void notify(final Object attachment,final CometHandler cometHandler) throws IOException{ notify(attachment,CometEvent.NOTIFY,cometHandler); } @@ -556,21 +555,20 @@ * @param type The type of notification. * @param {@link CometHandler} to notify. */ - public void notify(final E attachment,final int eventType,final CometHandler cometHandler) + public void notify(final Object attachment,final int eventType,final CometHandler cometHandler) throws IOException{ if (cometHandler == null){ throw ISE; } - CometEvent event = new CometEvent(eventType,this); - event.attach(attachment); + CometEvent event = new CometEvent(eventType,this,attachment); notificationHandler.setBlockingNotification(blockingNotification); notificationHandler.notify(event,cometHandler); - if (event.getType() == CometEvent.TERMINATE + if (event.getType() == CometEvent.TERMINATE || event.getType() == CometEvent.INTERRUPT) { resumeCometHandler(cometHandler); } else { resetSuspendIdleTimeout(); - } + } } @@ -588,14 +586,12 @@ * @param attachment An object shared amongst {@link CometHandler}. * @param type The type of notification. */ - public void notify(final E attachment,final int eventType) - throws IOException{ - CometEvent event = new CometEvent(eventType,this); - event.attach(attachment); + public void notify(Object attachment,int eventType)throws IOException { + CometEvent event = new CometEvent(eventType,this,attachment); Iterator iterator = handlers.keySet().iterator(); notificationHandler.setBlockingNotification(blockingNotification); notificationHandler.notify(event,iterator); - if (event.getType() == CometEvent.TERMINATE + if (event.getType() == CometEvent.TERMINATE || event.getType() == CometEvent.INTERRUPT) { while(iterator.hasNext()){ resumeCometHandler(iterator.next()); @@ -625,16 +621,18 @@ protected void resetSuspendIdleTimeout(){ if (expirationDelay != -1){ long timestamp = System.currentTimeMillis(); - // not threadsafe, but that will only lead to a few extra idle checks. - // it will still be a major win. - if (timestamp - lastIdleReset >= 1000){ - lastIdleReset = timestamp; - for (CometTask cometTask:activeTasks.keySet()){ - cometTask.setExpireTime(timestamp); + if (timestamp > nextidleclear){ + synchronized(handlers){ + if (timestamp > nextidleclear){ + nextidleclear = timestamp+1000; + for (CometTask cometTask:handlers.values()){ + cometTask.setTimeout(timestamp); + } + } } } } - } + } /** @@ -675,38 +673,22 @@ * @return true if the operation worked. */ private boolean doAsyncRegister(CometHandler handler, int interest){ - SelectionKey cometKey = null; if (handler != null) { - cometKey = handlers.get(handler); - } - if (handler == null || cometKey == null) { - throw ISE; - } - - CometTask cometTask = (CometTask)cometKey.attachment(); - if (cometTask != null){ - cometKey.interestOps(cometKey.interestOps() | interest); - if (interest == SelectionKey.OP_READ){ - cometTask.setAsyncReadSupported(true); + CometTask task = handlers.get(handler); + if (task != null) { + SelectionKey mainKey = task.getSelectionKey(); + if (mainKey != null){ + mainKey.interestOps(mainKey.interestOps() | interest); + task.setComethandlerIsAsyncRegistered(true); + return true; + } } - cometTask.setComethandlerisAsyncregistered(true); - return true; } throw ISE; } /** - * Set the {@link CometSelector} associated with this instance. - * @param CometSelector the {@link CometSelector} associated with - * this instance. - */ - protected void setCometSelector(CometSelector cometSelector) { - this.cometSelector = cometSelector; - } - - - /** * Helper. */ @Override @@ -741,19 +723,9 @@ public Set getCometHandlers(){ return handlers.keySet(); } + - - /** - * Add a {@link CometTask} to the active list. - * @param cometTask - */ - protected void addActiveCometTask(CometTask cometTask){ - activeTasks.put(cometTask,Boolean.TRUE); - } - - - /** * Return true if the invoker of {@link #notify} should block when * notifying Comet Handlers. */ @@ -788,13 +760,4 @@ return notificationHandler; } - /** - * Return the current set of active {@link CometTask} - * @return - */ - protected Set getActiveTasks() { - return activeTasks.keySet(); - } - } - Index: modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java =================================================================== --- modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (revision 2805) +++ modules/comet/src/main/java/com/sun/grizzly/comet/CometEngine.java (working copy) @@ -41,12 +41,14 @@ import com.sun.grizzly.arp.AsyncTask; import com.sun.grizzly.http.SelectorThread; import com.sun.grizzly.arp.AsyncProcessorTask; +import com.sun.grizzly.http.ProcessorTask; import com.sun.grizzly.util.LinkedTransferQueue; +import com.sun.grizzly.util.TestThreadPool; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.IOException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -123,7 +125,7 @@ /** * The single instance of this class. */ - private final static CometEngine cometEngine = new CometEngine(); + protected final static CometEngine cometEngine = new CometEngine(); /** @@ -133,11 +135,6 @@ /** - * The {@link CometSelector} used to poll requests. - */ - protected final CometSelector cometSelector; - - /** * cached CometContexts */ protected final LinkedTransferQueue cometContextCache; @@ -154,7 +151,7 @@ */ protected final static ThreadLocal updatedContexts = new ThreadLocal(); - protected static final SelectionKey dumykey = new SelectionKey() { + private static final SelectionKey dumykey = new SelectionKey() { public SelectableChannel channel() {throw ISE;} public int interestOps() {throw ISE;} public SelectionKey interestOps(int ops) {throw ISE;} @@ -165,33 +162,31 @@ }; /** - * Creat a singleton and initialize all lists required. Also create and - * start the {@link CometSelector} + * Creat a singleton and initialize all lists required. */ protected CometEngine() { - cometSelector = new CometSelector(this); - try{ - cometSelector.start(); - } catch(InterruptedException ex){ - logger.log(Level.SEVERE,"Unable to start CometSelector",ex); - } - cometContextCache = new LinkedTransferQueue(); activeContexts = new ConcurrentHashMap(16,0.75f,64); - ThreadPoolExecutor tpe = new ThreadPoolExecutor( - 8, + /*ExecutorService tpe = new ThreadPoolExecutor( 64, + 64, 30L, TimeUnit.SECONDS, - new LinkedBlockingQueue(), + //new LinkedTransferQueue(), + new LinkedBlockingQueue(), new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); public Thread newThread(Runnable r) { - return new Thread(r, "CometWorker-"+counter.incrementAndGet()); + //return new Thread(r, "CometWorker-"+counter.incrementAndGet()); + return new WorkerThreadImpl(null, "CometWorker-"+counter.incrementAndGet(), r, 0); } - }); - //tpe.allowCoreThreadTimeOut(true); + }); */ + + //ExecutorService tpe = threadPool = new DefaultExecutorService(4, 8, 30, + // TimeUnit.SECONDS, new LinkedBlockingQueue(), "CometWorker-"); + ExecutorService tpe = new com.sun.grizzly.util.FixedThreadPool(8,"CometWorker"); + //ExecutorService tpe = new NewDefaultThreadPool("CometWorker",4,64,15,TimeUnit.SECONDS); threadPool = tpe; } @@ -215,6 +210,7 @@ /** * sets the default threadpool that DefaultNotificationHandler use. + * shuttdownnow is called on the existing threadpool. * does notupdate existing notificationhandlers */ public void setThreadPool(ExecutorService threadPool) { @@ -224,7 +220,15 @@ } } + /** + * returns the threadpool comet is using + * @return ExecutorService + */ + public ExecutorService getThreadPool() { + return threadPool; + } + /** * Unregister the {@link CometHandler} to the list of the * {@link CometContext}. Invoking this method will invoke all @@ -238,18 +242,7 @@ public CometContext unregister(String topic){ CometContext cometContext = activeContexts.remove(topic); if (cometContext != null){ - try{ - cometContext.notify(cometContext,CometEvent.TERMINATE); - } catch (IOException ex) {} - Set tasks = cometContext.getActiveTasks(); - for (CometTask cometTask: tasks){ - // does this work ? the notify above might be async. - flushResponse(cometTask.getAsyncProcessorTask()); - } - //TODO: add check for datastructure size, if cometcontext had large - // datastructes its probably not optimal to waste RAM with caching it - cometContext.recycle(); - cometContextCache.offer(cometContext); + cometContext.recycle(); } return cometContext; } @@ -279,7 +272,7 @@ * {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING} * @return CometContext a configured {@link CometContext}. */ - public CometContext register(String topic, int type){ + public CometContext register(String topic, int type){ return register(topic, type, DefaultNotificationHandler.class); } @@ -287,14 +280,13 @@ * Instanciate a new {@link CometContext}. * @param topic the topic the new {@link CometContext} will represent. * @param type when the request will be suspended, e.g. {@link BEFORE_REQUEST_PROCESSING}, - * {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING} + * {@link AFTER_SERVLET_PROCESSING} or {@link AFTER_RESPONSE_PROCESSING} * @return a new {@link CometContext} if not already created, or the * existing one. */ public CometContext register(String topic, int type, Class notificationClass ) { - - // Double checked locking used used to prevent the otherwise static/global + // Double checked locking used used to prevent the otherwise static/global // locking, cause example code does heavy usage of register calls // for existing topics from http get calls etc. CometContext cometContext = activeContexts.get(topic); @@ -303,30 +295,31 @@ cometContext = activeContexts.get(topic); if (cometContext == null){ cometContext = cometContextCache.poll(); - if (cometContext != null) + if (cometContext != null){ cometContext.topic = topic; - if (cometContext == null){ + }else{ cometContext = new CometContext(topic, type); - NotificationHandler notificationHandler = null; - try{ - notificationHandler = notificationClass.newInstance(); - } catch (Throwable t) { - logger.log(Level.SEVERE,"Invalid NotificationHandler class : " - + notificationClass.getName() + " Using default.",t); - notificationHandler = new DefaultNotificationHandler(); - } - cometContext.setCometSelector(cometSelector); - cometContext.setNotificationHandler(notificationHandler); - if (notificationHandler != null && (notificationHandler - instanceof DefaultNotificationHandler)){ - ((DefaultNotificationHandler)notificationHandler) - .setThreadPool(threadPool); - } } + NotificationHandler notificationHandler = null; + try{ + notificationHandler = notificationClass.newInstance(); + } catch (Throwable t) { + logger.log(Level.SEVERE,"Invalid NotificationHandler class : " + + notificationClass.getName() + " Using default.",t); + notificationHandler = new DefaultNotificationHandler(); + } + cometContext.setNotificationHandler(notificationHandler); + if (notificationHandler != null && (notificationHandler + instanceof DefaultNotificationHandler)){ + ((DefaultNotificationHandler)notificationHandler) + .setThreadPool(threadPool); + } activeContexts.put(topic,cometContext); } + } } + cometContext.continuationType = type; return cometContext; } @@ -356,7 +349,7 @@ * to the current thread so we can later retrieve the associated * SelectionKey. The SelectionKey is required in order to park the request. */ - int continuationType = (cometContext == null)? + int continuationType = (cometContext == null)? AFTER_SERVLET_PROCESSING:cometContext.continuationType; /* Execute the Servlet.service method. CometEngine.register() or @@ -370,22 +363,27 @@ */ CometTask cometTask = updatedContexts.get(); if (cometTask != null) { + //need to impl tlocal that gets and sets null in one efficent operation updatedContexts.set(null); - if (cometTask.isSuspended()){ //alreadySuspended) - cometTask.setSuspended(false); - cometTask.getCometContext().addActiveHandler(cometTask.getCometHandler(), dumykey); + cometContext = cometTask.getCometContext(); + if (cometTask.upcoming_op_isread){ //alreadySuspended + cometTask.upcoming_op_isread = false; + //need to set dumykey in cometTask ? + cometContext.addActiveHandler(cometTask); return false; + } + cometTask.setAsyncProcessorTask(apt); + if (cometContext.getExpirationDelay() != -1){ + cometTask.setTimeout(System.currentTimeMillis()); } - SelectionKey key = apt.getAsyncExecutor().getProcessorTask().getSelectionKey(); - key.attach("comet"); // Disable keep-alive - cometTask.getCometContext().initialize(cometTask.getCometHandler()); - cometTask.setAsyncProcessorTask(apt); - cometTask.setSelectionKey(key); - cometTask.setCometSelector(cometSelector); - cometTask.setSelectorThread(apt.getSelectorThread()); - cometTask.setThreadPool(apt.getThreadPool()); - cometSelector.registerKey(cometTask); - return true; + SelectionKey mainKey = apt.getAsyncExecutor().getProcessorTask().getSelectionKey(); + if (mainKey.isValid()){ + mainKey.interestOps(SelectionKey.OP_READ); + mainKey.attach(cometTask); + cometContext.initialize(cometTask.getCometHandler()); + cometContext.addActiveHandler(cometTask); + return true; + } } return false; } @@ -400,28 +398,33 @@ } /** - * flush if AsyncTask.POST_EXECUTE + * flush if AsyncTask.POST_EXECUTE .
* {@link AsyncProcessorTask} */ - protected void flushPostExecute(AsyncProcessorTask apt) { - if (apt != null && apt.getStage() == AsyncTask.POST_EXECUTE){ - flushResponse(apt); - } - } - - /** - * Complete the asynchronous request. - */ - protected void flushResponse(AsyncProcessorTask apt){ - apt.setStage(AsyncTask.POST_EXECUTE); - try{ - apt.doTask(); - } catch (IllegalStateException ex){ - if (logger.isLoggable(Level.FINEST)){ - logger.log(Level.FINEST,"Resuming Response failed",ex); + protected void flushPostExecute(final CometTask task, boolean aptflush,boolean cancelkey) { + AsyncProcessorTask apt = task.getAsyncProcessorTask(); + ProcessorTask p = task.getAsyncProcessorTask().getAsyncExecutor().getProcessorTask(); + p.setReRegisterSelectionKey(false); + p.setAptCancelKey(cancelkey); + if (!aptflush){ + p.terminateProcess(); + }else{ + if (apt.getStage() == AsyncTask.POST_EXECUTE){ + try{ + //All comet IO operations sync on handler except close + synchronized(task.cometHandler){ + apt.doTask(); + } + } catch (IllegalStateException ex){ + if (logger.isLoggable(Level.FINEST)){ + logger.log(Level.FINEST,"Resuming Response failed at aptflush",ex); + } + } catch (Throwable ex) { + logger.log(Level.SEVERE,"Resuming failed at aptflush",ex); + } + }else{ + logger.warning("APTflush called at wrong stage"); } - } catch (Throwable ex) { - logger.log(Level.SEVERE,"Resuming failed",ex); } } Index: modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java =================================================================== --- modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (revision 2805) +++ modules/grizzly/src/test/java/com/sun/grizzly/utils/ControllerUtils.java (working copy) @@ -39,6 +39,7 @@ package com.sun.grizzly.utils; import com.sun.grizzly.*; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -74,7 +75,7 @@ } }); - new Thread(controller).start(); + new WorkerThreadImpl("ControllerWorker", controller).start(); try { latch.await(); Index: modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java =================================================================== --- modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (revision 2805) +++ modules/grizzly/src/test/java/com/sun/grizzly/ControllerStateTest.java (working copy) @@ -41,6 +41,7 @@ import com.sun.grizzly.filter.EchoFilter; import com.sun.grizzly.filter.LogFilter; import com.sun.grizzly.filter.ReadFilter; +import com.sun.grizzly.util.WorkerThreadImpl; import com.sun.grizzly.utils.ControllerUtils; import com.sun.grizzly.utils.TCPIOClient; import java.io.IOException; @@ -113,7 +114,7 @@ ControllerUtils.startController(controller); - Thread restartThread = new Thread() { + Thread restartThread = new WorkerThreadImpl(new Runnable() { @Override public void run() { try { @@ -123,7 +124,7 @@ exceptionHolder[0] = ex; } } - }; + }); restartThread.start(); Index: modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java =================================================================== --- modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (revision 2805) +++ modules/grizzly/src/main/java/com/sun/grizzly/UDPSelectorHandler.java (working copy) @@ -124,35 +124,38 @@ } if (selector == null){ - try{ - isShutDown.set(false); + initSelector(ctx); + } else { + processPendingOperations(ctx); + } + } - connectorInstanceHandler = new ConnectorInstanceHandler. - ConcurrentQueueDelegateCIH( - getConnectorInstanceHandlerDelegate()); - - datagramChannel = DatagramChannel.open(); - selector = Selector.open(); - if (role != Role.CLIENT){ - datagramSocket = datagramChannel.socket(); - datagramSocket.setReuseAddress(reuseAddress); - if (inet == null) - datagramSocket.bind(new InetSocketAddress(port)); - else - datagramSocket.bind(new InetSocketAddress(inet,port)); + private void initSelector(Context ctx) throws IOException{ + try{ + isShutDown.set(false); - datagramChannel.configureBlocking(false); - datagramChannel.register( selector, SelectionKey.OP_READ ); - - datagramSocket.setSoTimeout(serverTimeout); - } - ctx.getController().notifyReady(); - } catch (SocketException ex){ - throw new BindException(ex.getMessage() + ": " + port); + connectorInstanceHandler = new ConnectorInstanceHandler. + ConcurrentQueueDelegateCIH( + getConnectorInstanceHandlerDelegate()); + + datagramChannel = DatagramChannel.open(); + selector = Selector.open(); + if (role != Role.CLIENT){ + datagramSocket = datagramChannel.socket(); + datagramSocket.setReuseAddress(reuseAddress); + if (inet == null) + datagramSocket.bind(new InetSocketAddress(port)); + else + datagramSocket.bind(new InetSocketAddress(inet,port)); + + datagramChannel.configureBlocking(false); + datagramChannel.register( selector, SelectionKey.OP_READ ); + + datagramSocket.setSoTimeout(serverTimeout); } - - } else { - processPendingOperations(ctx); + ctx.getController().notifyReady(); + } catch (SocketException ex){ + throw new BindException(ex.getMessage() + ": " + port); } } Index: modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java =================================================================== --- modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (revision 2805) +++ modules/grizzly/src/main/java/com/sun/grizzly/DefaultProtocolChain.java (working copy) @@ -39,9 +39,8 @@ package com.sun.grizzly; import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.logging.Level; /** @@ -56,13 +55,13 @@ /** * The list of ProtocolFilter this chain will invoke. */ - protected List protocolFilters; + protected final List protocolFilters; /** * The list of {@link EventHandler}s, which will be notified about this * {@link ProtocolChain} events */ - protected Collection eventHandlers; + protected final List eventHandlers; /** * true if a pipelined execution is required. A pipelined execution @@ -75,8 +74,9 @@ public DefaultProtocolChain() { - protocolFilters = new ArrayList(); - eventHandlers = new HashSet(); + protocolFilters = new ArrayList(4); + //ArrayList is faster then HashSet for small datasets. + eventHandlers = new ArrayList(4); } @@ -296,9 +296,9 @@ */ protected void notifyException(Phase phase, ProtocolFilter filter, Throwable throwable) { - for(EventHandler eventHandler : eventHandlers) { + for(int i=0;i { + AttributeHolder, SupportStateHolder { + + /** + * enqueues runnable for later execution in postSelect
+ * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.
+ * @param runnable + public void addPendingIO(Runnable runnable); + + + * enqueues SlectionKey for later cancel and close .
+ * this is not to be a threadsafe method, must be called from within the same SelectorHandler thread.
+ * @param runnable + + public void addPendingKeyCancel(SelectionKey key);*/ + /** * A token decribing the protocol supported by an implementation of this * interface Index: modules/grizzly/src/main/java/com/sun/grizzly/Controller.java =================================================================== --- modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (revision 2805) +++ modules/grizzly/src/main/java/com/sun/grizzly/Controller.java (working copy) @@ -45,9 +45,11 @@ import com.sun.grizzly.util.DefaultThreadPool; import com.sun.grizzly.util.LinkedTransferQueue; import com.sun.grizzly.util.LoggerUtils; +import com.sun.grizzly.util.SelectedKeyAttachmentLogic; import com.sun.grizzly.util.State; import com.sun.grizzly.util.StateHolder; import com.sun.grizzly.util.SupportStateHolder; +import com.sun.grizzly.util.WorkerThreadImpl; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; @@ -58,7 +60,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; @@ -371,6 +372,13 @@ private void handleSelectedKeys(Set readyKeys,SelectorHandler selectorHandler,NIOContext serverCtx){ for(SelectionKey key:readyKeys) { try{ + + Object attachment = key.attachment(); + if (attachment instanceof SelectedKeyAttachmentLogic){ + ((SelectedKeyAttachmentLogic)attachment).handleSelectedKey(key); + continue; + } + if (!key.isValid()){ selectorHandler.getSelectionKeyHandler().close(key); continue;