package test.udpbinding; import com.sun.grizzly.Controller; import com.sun.grizzly.ProtocolChainInstanceHandler; import com.sun.grizzly.DefaultProtocolChainInstanceHandler; import com.sun.grizzly.ProtocolChain; import com.sun.grizzly.DefaultProtocolChain; import com.sun.grizzly.UDPSelectorHandler; import com.sun.grizzly.ControllerStateListener; import com.sun.grizzly.ProtocolFilter; import com.sun.grizzly.Context; import com.sun.grizzly.util.DefaultThreadPool; import com.sun.grizzly.filter.ReadFilter; import java.net.InetAddress; import java.net.UnknownHostException; import java.net.SocketException; import java.net.DatagramSocket; import java.net.DatagramPacket; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.io.IOException; /** * @author Bongjae Chang * @date 2009. 6. 9 */ public class DatagramSocketDuplicatedBindingTest { private static final int CORE_POOL_SIZE = 25; private static final int MAX_POOL_SIZE = 25; private static final int KEEP_ALIVE_TIMEOUT = 60 * 1000; // ms private static final int PORT = 9090; private static final int WAIT_FOR_BINDING = 2 * 1000; // ms private static final int SLEEP_TIME = 300; // ms private final InetAddress localInetAddress; private final InetSocketAddress localInetSocketAddress; private final CountDownLatch latchForSending = new CountDownLatch( 1 ); private Controller server = null; private DatagramSocketDuplicatedBindingTest() throws UnknownHostException { localInetAddress = InetAddress.getLocalHost(); localInetSocketAddress = new InetSocketAddress( localInetAddress, PORT ); } private void testSimpleSendAndReceive() throws IOException { this.server = new Controller(); setupAndStartController(); new Thread( new DatagramSocketDuplicatedBindingTest.DatagramSocketClient() ).start(); System.out.println( "Wait for " + WAIT_FOR_BINDING + "(ms)" ); try { Thread.sleep( WAIT_FOR_BINDING ); } catch( InterruptedException e ) { } } private void setupAndStartController() { UDPSelectorHandler udpSelectorHandler; final DefaultThreadPool threadPool = new DefaultThreadPool( "server", CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIMEOUT, TimeUnit.MILLISECONDS ); this.server.setThreadPool(threadPool); udpSelectorHandler = new UDPSelectorHandler(); udpSelectorHandler.setPort( PORT ); udpSelectorHandler.setInet( localInetAddress ); this.server.addSelectorHandler( udpSelectorHandler ); ProtocolChainInstanceHandler pciHandler = new DefaultProtocolChainInstanceHandler() { @Override public ProtocolChain poll() { ProtocolChain protocolChain = protocolChains.poll(); if( protocolChain == null ) { protocolChain = new DefaultProtocolChain(); protocolChain.addFilter( new ReadFilter() ); protocolChain.addFilter( new ProtocolFilter() { public boolean execute( Context ctx ) throws IOException { System.out.println("Queue size is " + threadPool.getQueueSize()); return true; } public boolean postExecute( Context ctx ) throws IOException { return true; } } ); } return protocolChain; } }; this.server.setProtocolChainInstanceHandler( pciHandler ); final CountDownLatch controllerGate = new CountDownLatch( 1 ); ControllerStateListener controllerStateListener = new ControllerStateListener() { public void onStarted() { } public void onReady() { controllerGate.countDown(); } public void onStopped() { controllerGate.countDown(); } public void onException( Throwable throwable ) { controllerGate.countDown(); } }; this.server.addStateListener( controllerStateListener ); new Thread( this.server ).start(); try { controllerGate.await(); } catch( InterruptedException e ) { e.printStackTrace(); } latchForSending.countDown(); } public static void main( String[] args ) throws IOException { DatagramSocketDuplicatedBindingTest test = new DatagramSocketDuplicatedBindingTest(); test.testSimpleSendAndReceive(); } private class DatagramSocketClient implements Runnable { private DatagramSocket client; private int sendCount; private DatagramSocketClient() throws SocketException { client = new DatagramSocket( null ); client.setReuseAddress(true); client.bind( new InetSocketAddress(PORT)); } public void run() { if( client == null ) return; try { latchForSending.await(); for (int i = 0; i < 10; i++) { sendCount++; byte[] testPacket = new String( "hello world-" + sendCount ).getBytes(); DatagramPacket packet = new DatagramPacket( testPacket, testPacket.length, localInetSocketAddress ); client.send( packet ); try { Thread.sleep( SLEEP_TIME ); } catch( InterruptedException e ) { } } System.out.println("Client stopped"); try { Thread.sleep( SLEEP_TIME ); } catch( InterruptedException e ) { } while(true){ System.out.println("Queue size is " + ((DefaultThreadPool)server.getThreadPool()).getQueueSize()); try { Thread.sleep( SLEEP_TIME ); } catch( InterruptedException e ) { } } } catch( Throwable t ) { t.printStackTrace(); } finally { if( client != null ) { client.close(); client = null; } System.out.println( "Client socket is finished.." ); } } } }