[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
[jetty-dev] HttpClient version 7.0.1.v20091125: few cases that async exchanges won't come back

Hi,

Under the following cases, some asynchronous exchanges won't come back (actually some are never sent). That means neither onResponseComplete(), onConnectionFailed(), onException() or onExpire() is invoked.

1. connection failed and HttpDestination::_idle queue has more than allowed MaxConnectionsPerAddress pending exchanges.
Under this case, the current implementation just calls onConnectionFailed() of the first exchange from _idle queue. The rest become orphan and never get send.
Possible fix could be; calling onConnectionFailed() for all exchanges in _idle queue, or just calls onConnectionFailed() of the first exchange from _idle queue then try to start a new connection if the _idle is still not empty

2. HttpConnection::send failed
When this happened, connection was neither in the idle timer or timeout timer but still in HttpDestination::_connection queue, so this connection never got a chance to be removed and it occupied one slot from MaxConnectionsPerAddress. These connections will gradually eat up all the maximum allowed connections per address and no more exchanges can be send.


It might be easier to see it in action. The attached files contain a quickly whipped up test program (to reproduce the problem) and patch against version 7.0.1.v20091125.

import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;

public class JettyTest {
	protected Server _server;
	protected int _port;
	protected HttpClient _httpClient;
	protected Connector _connector;
	    
	public JettyTest() {
	}
	
	public void testConnectionFailed() throws Exception {
		setup();
		TestExchange[] exchanges = new TestExchange[] { new TestExchange(), new TestExchange() };
		int i = 1;
		for(ContentExchange exchange:exchanges) {
			exchange.setURL("http://dev.eclipse.org:1234/"+(i++));
		}
		for(ContentExchange exchange:exchanges) {
			try {
				_httpClient.send(exchange);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		for(ContentExchange exchange:exchanges) {
			try {
				exchange.waitForDone();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		teardown();
	}
	
	public void testServerClose() throws Exception {
		setup();
		TestExchange exchange = new TestExchange();
		exchange.setURL("http://localhost:"+_port+"/";);
		try {
			_httpClient.send(exchange);
		} catch (IOException e) {
			e.printStackTrace();
		}
		Thread.sleep(2000);
		exchange.waitForDone();
		stopServer();
		
		exchange.reset();
		exchange.setURL("http://localhost:"+_port+"/";);
		try {
			_httpClient.send(exchange);
		} catch (IOException e) {
			e.printStackTrace();
		}
		Thread.sleep(2000);
		exchange.waitForDone();
		
		teardown();
	}
	
	class TestExchange extends ContentExchange {

		@Override
		protected void onResponseComplete() throws IOException {
			System.out.println(super.getResponseContent());
		}

		@Override
		protected void onConnectionFailed(Throwable x) {
			x.printStackTrace();
		}

		@Override
		protected void onException(Throwable x) {
			x.printStackTrace();
		}

		@Override
		protected void onExpire() {
			System.out.println("Expired!!!");
		}
		
	}
	
	protected void setup() throws Exception {
		_httpClient = new HttpClient();
		_httpClient.setMaxConnectionsPerAddress(1);
		_httpClient.setTimeout(20000);
		_httpClient.setConnectTimeout(2000);
		_httpClient.start();
		startServer();
	}
	
	protected void teardown() throws Exception {
		stopServer();
		_httpClient.stop();
	}
	
    protected void startServer() throws Exception
    {
    	_server=new Server();
        _connector=new SelectChannelConnector();
        _connector.setPort(0);
        _server.setConnectors(new Connector[] { _connector });
        
        _server.setHandler(new AbstractHandler() {
			@Override
			public void handle(String target, Request baseRequest,
					HttpServletRequest request, HttpServletResponse response)
					throws IOException, ServletException {
				baseRequest.setHandled(true);
        		response.setStatus(200);
        		response.getOutputStream().println("Request URL: "+request.getRequestURL());
			}
        });
        _server.start();
        _port=_connector.getLocalPort();
    }

    private void stopServer() throws Exception {
        _server.stop();
    }
	
	public static void main(String[] args) throws Exception {
		JettyTest test = new JettyTest();
		// both test cases hang forever, so un-comment the one you wanna test
		
		// testConnectionFailed() test the case that connection failed but _idle queue still has a couple of exchanges
		//test.testConnectionFailed();
		
		// testServerClose() test the case that connection was closed by server which then causing
		// the following send to failed
		//test.testServerClose();
	}
}
Index: HttpDestination.java
===================================================================
--- HttpDestination.java	(revision 1337)
+++ HttpDestination.java	(working copy)
@@ -221,7 +221,8 @@
                 if (connection!=null)
                 {
                     _connections.remove(connection);
-                    connection.getEndPoint().close();
+                    //connection.getEndPoint().close();
+                    connection.close();
                     connection=null;
                 }
                 if (_idle.size() > 0)
@@ -271,6 +272,10 @@
                 HttpExchange ex = _queue.removeFirst();
                 ex.setStatus(HttpExchange.STATUS_EXCEPTED);
                 ex.getEventListener().onConnectionFailed(throwable);
+                // Either call onConnectionFailed() on all the idle exchanges or retry it for each idle exchange
+                // Let's try it again here.
+                if (!_queue.isEmpty() && _client.isStarted())
+                    startNewConnection();
             }
         }
 
@@ -322,7 +327,12 @@
             else
             {
                 HttpExchange ex = _queue.removeFirst();
-                connection.send(ex);
+                // send could fail due to server closes the connection.
+                // put the exchange back to the idle queue and recycle the connection
+            	if(!connection.send(ex)) {
+                	_queue.addFirst(ex);
+                	recycleConnection(connection);
+                }
             }
         }
 
@@ -371,7 +381,12 @@
                 else
                 {
                     HttpExchange ex = _queue.removeFirst();
-                    connection.send(ex);
+                    // send could fail due to server closes the connection.
+                    // put the exchange back to the idle queue and recycle the connection
+                    if(!connection.send(ex)) {
+                    	_queue.addFirst(ex);
+                    	recycleConnection(connection);
+                    }
                 }
                 this.notifyAll();
             }
@@ -408,6 +423,28 @@
         
     }
 
+    /*
+     * remove the connection from _conections and _idle queue and start a new connection
+     */
+    private void recycleConnection(HttpConnection connection) {
+    	connection.cancelIdleTimeout();
+    	try
+        {
+            connection.close();
+        }
+        catch (IOException e)
+        {
+            Log.ignore(e);
+        }
+        synchronized (this)
+        {
+            _idle.remove(connection);
+            _connections.remove(connection);
+            if (!_queue.isEmpty() && _client.isStarted())
+                startNewConnection();
+        }
+    }
+    
     public void send(HttpExchange ex) throws IOException
     {
         LinkedList<String> listeners = _client.getRegisteredListeners();
@@ -482,6 +519,27 @@
         HttpConnection connection = getIdleConnection();
         if (connection != null)
         {
+        	synchronized (this) {
+                // send could fail due to server closes the connection.
+                // put the exchange back to the idle queue and recycle the connection 
+        		if(!connection.send(ex)) {
+        			_queue.add(ex);
+        			recycleConnection(connection);
+        		}
+        	};
+        } else {
+        	synchronized (this)
+            {
+                _queue.add(ex);
+                if (_connections.size() + _pendingConnections < _maxConnections)
+                {
+                    startNewConnection();
+                }
+            }
+        }
+        /*
+        if (connection != null)
+        {
             boolean sent = connection.send(ex);
             if (!sent) connection = null;
         }
@@ -497,6 +555,7 @@
                 }
             }
         }
+        */
     }
 
     @Override