Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[jetty-users] Fwd: IllegalStateException when using asynchronous servlet

I try to use the asynchronous servlet with Jetty 9, but I get IllegalStateException from HttpOutput class. The same code works well with Tomcat.

This is my test code (in Scala).

TestServers.scala ------------------------------------------------------------------------------------------------------------------------
package playground.webserver

import java.io.File
import javax.servlet.Servlet

import org.apache.catalina.startup.Tomcat
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
;

trait TestServer {
  val port: Int
  def start: Unit
  def stop: Unit
  def addServlet(servlet: Servlet, contextPath: String)
}

class TomcatTestServer(val port: Int) extends TestServer {
  private val tomcat = new Tomcat()
  tomcat.setPort(port)

  override def start: Unit = tomcat.start()

  override def stop: Unit = tomcat.stop()

  override def addServlet(servlet: Servlet, contextPath: String): Unit = {
  val base = new File(System.getProperty("java.io.tmpdir"));
  val context = tomcat.addContext(contextPath, base.getAbsolutePath());

  tomcat.addServlet(contextPath, contextPath, servlet)
  context.addServletMapping("/", contextPath)
  }
}

class JettyTestServer(val port: Int) extends TestServer {
  private val jetty = new Server(port)
  private val context = new ServletContextHandler(ServletContextHandler.SESSIONS)
  context.setContextPath("/")
  jetty.setHandler(context)

  override def start: Unit = jetty.start()

  override def stop: Unit = jetty.stop()

  override def addServlet(servlet: Servlet, contextPath: String): Unit = {
  context.addServlet(new ServletHolder(servlet), contextPath)
  }
}


AsyncIoTest.scala------------------------------------------------------------------------------------------------------------------------------------
package playground.webserver

import java.net.ServerSocket
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}
import javax.servlet.{AsyncContext, WriteListener}

import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

abstract class AsyncIoTest extends FunSpec with Matchers with BeforeAndAfterAll {
  private val data = "" Array[Byte](1024 * 1024 * 128)

  class DataWriteListener(context: AsyncContext) extends WriteListener {
    private[this] var pos = 0

    override def onError(t: Throwable): Unit = {
      context.getRequest.getServletContext.log("Async Error", t)
      context.complete()
    }

    override def onWritePossible(): Unit = {
      val out = context.getResponse.getOutputStream
      while (out.isReady && pos < data.length) {
        val toWrite = math.min(1024, data.length - pos)
        out.write(data, pos, toWrite)
        pos += toWrite
      }

      if (pos >= data.length) {
        context.complete()
      }
    }
  }

  class AsyncServlet extends HttpServlet {
    override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
      req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);

      resp.setStatus(200)
      resp.setContentLength(data.length)

      val async = req.startAsync()
      val out = resp.getOutputStream

      out.setWriteListener(new DataWriteListener(async))
    }
  }

  class SyncServlet extends HttpServlet {
    override def doGet(req: HttpServletRequest, resp: HttpServletResponse): Unit = {
      resp.setStatus(200)
      resp.setContentLength(data.length)
      val out = resp.getOutputStream
      var pos = 0
      while(pos < data.length) {
        out.write(data, pos, 1024)
        pos += 1024
      }
    }
  }

  protected val server: TestServer
  protected lazy val port: Int = findAvailablePort()
  private val requestNum = 100

  private def findAvailablePort(): Int = {
    val serverSocket = new ServerSocket(0)
    val port = serverSocket.getLocalPort()
    serverSocket.close()

    port
  }

  describe("Sync IO") {
    it("trigger some traffic") {
      testWithUrl(s"http://localhost:${port}/sync")
    }
  }

  describe("Async IO") {
    it("trigger some traffic") {
      testWithUrl(s"http://localhost:${port}/async")
    }
  }

  override protected def beforeAll(): Unit = {
    server.addServlet(new AsyncServlet, "/async")
    server.addServlet(new SyncServlet, "/sync")
    server.start
  }


  override protected def afterAll(): Unit = {
    server.stop
  }

  private def testWithUrl(url: String): Unit = {
    val start = System.currentTimeMillis

    val futures = (1 to requestNum).map { i =>
      Future {
        val total = readData(url)
        total should be(1024 * 1024 * 128)
        total
      }
    }

    Await.result(Future.sequence(futures), Duration.Inf)

    println(s"Total millis used: ${System.currentTimeMillis - start}")
  }

  private def readData(url: String): Long = {
    val httpclient = HttpClients.createDefault()
    val httpGet = new HttpGet(url)
    val resp = httpclient.execute(httpGet)
    val is = resp.getEntity.getContent
    val buf = new Array[Byte](1024 * 1024)
    var read = 0
    var total = 0L
    while(read != -1) {
      total += read
      read = is.read(buf)
    }

    resp.close()
    total
  }
}

class AsyncIoWithJettyTest extends AsyncIoTest {
  override protected val server: TestServer = new JettyTestServer(port)
}

class AsyncIoWithTomcatTest extends AsyncIoTest {
  override protected val server: TestServer = new TomcatTestServer(port)
}


This is the error I got.
[qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - 
[qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel - //localhost:51809/async
java.lang.IllegalStateException
at org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990)
at org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126)
at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
at org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
at org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747)
at org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
at org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
at org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269)
at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394)
at org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
at java.lang.Thread.run(Thread.java:745)
[qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - Unexpected thread death: org.eclipse.jetty.util.thread.QueuedThreadPool$3@26b0a9dd in qtp990355670{STARTED,8<=20<=200,i=9,q=0}
java.lang.NullPointerException
at playground.webserver.AsyncIoTest$DataWriteListener.onError(AsyncIoTest.scala:22)
at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:201)
at org.eclipse.jetty.server.Response.closeOutput(Response.java:987)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412)
at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
at java.lang.Thread.run(Thread.java:745)
......









Back to the top