案发现场
- 从RabbitMQ Server端获取消息的线程运行正常,读到数据后放到本地的BlockQueue中,等待消费者线程消费。
- 消费者线程堵塞,不再消费本地BlockQueue中的消息。
- 使用jstack进行Thread Dump分析。由于自定义消费者线程名称,所以直接筛选前缀为MessageConsumer线程,结果如下:
|
|
从上可以发现,消费者线程大都处于WAITING状态,线程阻塞在NettyHttpClient.sendHttpRequest的CountDownLatch.await()方法。12at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)at cn.jiguang.common.connection.NettyHttpClient.sendHttpRequest(NettyHttpClient.java:193)
分析推理
通过对应用中各线程运行的状态情况,推测是极光推送SDK代码BUG导致。深入源码进行问题排查:
1> cn.jpush.api.JPushClient.java
123public PushResult sendPush(PushPayload pushPayload) throws APIConnectionException, APIRequestException {return _pushClient.sendPush(pushPayload);}2> cn.jpush.api.push.PushClient.java
1234567891011121314public PushResult sendPush(PushPayload pushPayload) throws APIConnectionException, APIRequestException {Preconditions.checkArgument(! (null == pushPayload), "pushPayload should not be null");if (_apnsProduction > 0) {pushPayload.resetOptionsApnsProduction(true);} else if(_apnsProduction == 0) {pushPayload.resetOptionsApnsProduction(false);}if (_timeToLive >= 0) {pushPayload.resetOptionsTimeToLive(_timeToLive);}//该版本默认使用NettyHttpClient,进入源码ResponseWrapper response = _httpClient.sendPost(_baseUrl + _pushPath, pushPayload.toString());return BaseResult.fromResponse(response, PushResult.class);}3> cn.jiguang.common.connection.NettyHttpClient.java
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455@Overridepublic ResponseWrapper sendPost(String url, String content) throws APIConnectionException, APIRequestException {ResponseWrapper wrapper = new ResponseWrapper();try {return sendHttpRequest(HttpMethod.POST, url, content);} catch (Exception e) {e.printStackTrace();}return wrapper;}private ResponseWrapper sendHttpRequest(HttpMethod method, String url, String body) throws Exception {// 此处实例化CountDownLatch并传递给NettyClientInitializer, 继续跟踪CountDownLatch latch = new CountDownLatch(1);NettyClientInitializer initializer = new NettyClientInitializer(_sslCtx, null, latch);b.handler(initializer);ResponseWrapper wrapper = new ResponseWrapper();URI uri = new URI(url);String scheme = uri.getScheme() == null ? "http" : uri.getScheme();String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();int port = uri.getPort();if (port == -1) {if ("http".equalsIgnoreCase(scheme)) {port = 80;} else if ("https".equalsIgnoreCase(scheme)) {port = 443;}}try {ChannelFuture connect = b.connect(host, port);_channel = connect.sync().channel();FullHttpRequest request;if (null != body) {ByteBuf byteBuf = Unpooled.copiedBuffer(body.getBytes(CharsetUtil.UTF_8));request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri.getRawPath(), byteBuf);request.headers().set(HttpHeaderNames.CONTENT_LENGTH, (long) byteBuf.readableBytes());} else {request = new DefaultFullHttpRequest(HTTP_1_1, method, uri.getRawPath());}request.headers().set(HttpHeaderNames.HOST, uri.getHost());request.headers().set(HttpHeaderNames.AUTHORIZATION, _authCode);request.headers().set("Content-Type","application/json;charset=utf-8");connect.awaitUninterruptibly();LOG.info("Sending request. " + request);LOG.info("Send body: " + body);_channel.writeAndFlush(request);//调用latch.await()后主线程阻塞,等待netty异步网络调用。如果异步线程执行后不调用countDown(),主线程将永远阻塞,此为问题主要原因。latch.await();wrapper = initializer.getResponse();} catch (InterruptedException e) {e.printStackTrace();}return wrapper;}4> cn.jiguang.common.connection.NettyClientInitializer.java
|
|
- 5> cn.jiguang.common.connection.HttpResponseHandler.java1234567891011121314151617181920212223242526272829303132333435363738394041protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {if (msg instanceof HttpResponse) {HttpResponse response = (HttpResponse) msg;status = response.status().code();}if (msg instanceof HttpContent) {HttpContent content = (HttpContent) msg;LOG.info(content.content().toString());if (content instanceof LastHttpContent) {//断点跟踪发现,程序运行到该分支后,直接close,并未调用_latch.countDown(); 此为问题的主要爆发点LOG.info("closing connection");ctx.close();} else {String responseContent = content.content().toString(CharsetUtil.UTF_8);_wrapper.responseCode = status;_wrapper.responseContent = responseContent;LOG.info("Got Response code: " + status + " content: " + responseContent);System.err.println("Got Response code: " + status + " content: " + responseContent);System.err.flush();if (null != _callback) {_callback.onSucceed(_wrapper);}if (null != _latch) {//正常情况下_latch.countDown();}}}}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {LOG.error("error:", cause);try {//模拟网络异常,发现此处也存在bug,未调用_latch.countDown(); 隐藏爆发点ctx.close();}catch (Exception ex) {LOG.error("close error:", ex);}}
翻阅源码发现HttpResponseHandler.java类内部存在两个隐患:
- 程序运行到if (content instanceof LastHttpContent) {……}分支后,未调用_latch.countDown();
- 当出现网络异常时,exceptionCaught方法内部也未调用_latch.countDown();
只要其中任意一个隐患爆发,主线程就会阻塞,进入WAITING状态。消费者线程就此堵塞,不再消费BlockQueue中的消息。
恢复现场
- Maven pom.xml
|
|
- 测试代码
|
|
- Thread Dump
Debug跟踪,发现程序进入if分支,调用close()关闭连接后,主线程阻塞进入wait状态。使用IDEA生成thread dump:
- 日志输出
|
|
主线程处于WAIT状态,Netty异步线程已运行完毕,现场恢复成功。
解决方案
- 升级极光SDK版本,升级后发现最新版已解决该问题。
- 增加熔断处理,调用外部服务时,使用Guava SimpleTimeLimiter设置超时熔断。