Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[jetty-dev] SDPY client gets overwhelmed when subscribing to high volume incoming messages

Hello,
Please find below a sample subscriber and a publisher. The subscriber becomes unresponsive and stops subscribing to messages after the count crosses two hundred thousand. Could you please let me know what I am doing wrong,

OS: MAC

VMArgs: -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.KQueueSelectorProvider -Xbootclasspath/p:/Users/abhinavsunderrajan/.m2/repository/org/mortbay/jetty/npn/npn-boot/7.6.2.v20120308/npn-boot-7.6.2.v20120308.jar



************************ PUBLISHER *********************************************************************

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;

public class TestServer {

   /**
    * Main method.
    * @param args
    * @throws Exception
    */
   public static void main(final String[] args) throws Exception {
      System.setProperty("Djavax.net.debug", "all");

      startTLSServer();
   }

   private static void startTLSServer() throws Exception {
      // Frame listener that handles the communication over speedy
      ServerSessionFrameListener frameListener = new ServerSessionFrameListener.Adapter() {

         /**
          * As soon as we receive a syninfo we return the handler for the stream
          * on this session
          */
         @Override
         public StreamFrameListener onSyn(final Stream stream,
               final SynInfo synInfo) {
            System.out.println("onSyn");

            // Send a reply to this message
            stream.reply(new ReplyInfo(false));

            // and start a timer that sends a request to this stream every 5
            // seconds
            ScheduledExecutorService executor = Executors
                  .newSingleThreadScheduledExecutor();
            Runnable periodicTask = new Runnable() {
               private int i = 0;

               public void run() {
                  stream.data(new StringDataInfo(System.currentTimeMillis()
                        + ", some data from the server, " + i++, false));
               }
            };
            executor.scheduleAtFixedRate(periodicTask, 0, 1,
                  TimeUnit.MICROSECONDS);

            // Next create an adapter to further handle the client input from
            // specific stream.
            return new StreamFrameListener.Adapter() {

               /**
                * We're only interested in the data, not the headers in this
                * example
                */
               public void onData(final Stream stream, final DataInfo dataInfo) {
                  String clientData = dataInfo.asString("UTF-8", true);
                  System.out.println("Received the following client data: "
                        + clientData);
               }
            };
         }
      };



      // Wire up and start the connector
      Server server = new Server();
      SPDYServerConnector connector = new SPDYServerConnector(frameListener);
      connector.setPort(5082);
      server.addConnector(connector);

      server.start();
      server.join();
   }
}




***********************  SPDY SUBSCRIBER***********************   



import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;

/**
 * Put <code>-Xbootclasspath/p:lib/spdy/npn-boot-7.6.2.v20120308.jar</code> as
 * vmarg.
 */
public class TestClient {

   /**
    * Main method.
    * @param args
    * @throws Exception
    */
   public static void main(final String[] args) throws Exception {

      // create client
      SPDYClient.Factory clientFactory = new SPDYClient.Factory();
      clientFactory.start();
      SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);

      // create a session to the server running on localhost port 8181
      Future<Session> future = client.connect(new InetSocketAddress(
            "localhost", 5082), null);

      Session session = future.get(5, TimeUnit.SECONDS);

      // this listener receives data from the server and prints it
      StreamFrameListener streamListener = new StreamFrameListener.Adapter() {
         public void onData(final Stream stream, final DataInfo dataInfo) {
            // Data received from server
            String content = dataInfo.asString("UTF-8", true);
            System.out.println("SPDY content: " + content);
         }
      };

      // Start a new session, and configure the stream listener
      final Stream stream = session.syn(new SynInfo(false), streamListener)
            .get(5, TimeUnit.SECONDS);
      stream.data(new StringDataInfo("hello publisher!!", false));
   }
}


Many thanks,
Abhinav





Back to the top