极光推送java-sdk-V3.2.15堵塞BUG

案发现场

  • 从RabbitMQ Server端获取消息的线程运行正常,读到数据后放到本地的BlockQueue中,等待消费者线程消费。
  • 消费者线程堵塞,不再消费本地BlockQueue中的消息。
  • 使用jstack进行Thread Dump分析。由于自定义消费者线程名称,所以直接筛选前缀为MessageConsumer线程,结果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
"MessageConsumer-1" prio=10 tid=0x00007f39309f6000 nid=0x1047 waiting on condition [0x00007f39052d3000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000a4b994d0> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at cn.jiguang.common.connection.NettyHttpClient.sendHttpRequest(NettyHttpClient.java:193)
at cn.jiguang.common.connection.NettyHttpClient.sendPost(NettyHttpClient.java:134)
at cn.jpush.api.push.PushClient.sendPush(PushClient.java:194)
at cn.jpush.api.JPushClient.sendPush(JPushClient.java:205)
at hbec.app.platform.push.service.impl.JPushService.push(JPushService.java:66)
at hbec.app.platform.push.service.impl.PushFacadeService.jpush(PushFacadeService.java:113)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at hbec.app.mc.RpcService.send(RpcService.java:38)
at hbec.app.mc.NotifySendService.send(NotifySendService.java:40)
at hbec.app.mc.NotifyService.notifyPerson(NotifyService.java:151)
at hbec.app.mc.NotifyService.notify(NotifyService.java:83)
at hbec.app.mc.rabbitmq.RealTimeConsumer.onMessage(RealTimeConsumer.java:29)
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:273)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:848)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:771)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:198)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1311)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:752)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1254)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1224)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:102)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
……

从上可以发现,消费者线程大都处于WAITING状态,线程阻塞在NettyHttpClient.sendHttpRequest的CountDownLatch.await()方法。

1
2
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at cn.jiguang.common.connection.NettyHttpClient.sendHttpRequest(NettyHttpClient.java:193)

分析推理

通过对应用中各线程运行的状态情况,推测是极光推送SDK代码BUG导致。深入源码进行问题排查:

  • 1> cn.jpush.api.JPushClient.java

    1
    2
    3
    public PushResult sendPush(PushPayload pushPayload) throws APIConnectionException, APIRequestException {
    return _pushClient.sendPush(pushPayload);
    }
  • 2> cn.jpush.api.push.PushClient.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public PushResult sendPush(PushPayload pushPayload) throws APIConnectionException, APIRequestException {
    Preconditions.checkArgument(! (null == pushPayload), "pushPayload should not be null");
    if (_apnsProduction > 0) {
    pushPayload.resetOptionsApnsProduction(true);
    } else if(_apnsProduction == 0) {
    pushPayload.resetOptionsApnsProduction(false);
    }
    if (_timeToLive >= 0) {
    pushPayload.resetOptionsTimeToLive(_timeToLive);
    }
    //该版本默认使用NettyHttpClient,进入源码
    ResponseWrapper response = _httpClient.sendPost(_baseUrl + _pushPath, pushPayload.toString());
    return BaseResult.fromResponse(response, PushResult.class);
    }
  • 3> cn.jiguang.common.connection.NettyHttpClient.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    @Override
    public ResponseWrapper sendPost(String url, String content) throws APIConnectionException, APIRequestException {
    ResponseWrapper wrapper = new ResponseWrapper();
    try {
    return sendHttpRequest(HttpMethod.POST, url, content);
    } catch (Exception e) {
    e.printStackTrace();
    }
    return wrapper;
    }
    private ResponseWrapper sendHttpRequest(HttpMethod method, String url, String body) throws Exception {
    // 此处实例化CountDownLatch并传递给NettyClientInitializer, 继续跟踪
    CountDownLatch latch = new CountDownLatch(1);
    NettyClientInitializer initializer = new NettyClientInitializer(_sslCtx, null, latch);
    b.handler(initializer);
    ResponseWrapper wrapper = new ResponseWrapper();
    URI uri = new URI(url);
    String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
    String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
    int port = uri.getPort();
    if (port == -1) {
    if ("http".equalsIgnoreCase(scheme)) {
    port = 80;
    } else if ("https".equalsIgnoreCase(scheme)) {
    port = 443;
    }
    }
    try {
    ChannelFuture connect = b.connect(host, port);
    _channel = connect.sync().channel();
    FullHttpRequest request;
    if (null != body) {
    ByteBuf byteBuf = Unpooled.copiedBuffer(body.getBytes(CharsetUtil.UTF_8));
    request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri.getRawPath(), byteBuf);
    request.headers().set(HttpHeaderNames.CONTENT_LENGTH, (long) byteBuf.readableBytes());
    } else {
    request = new DefaultFullHttpRequest(HTTP_1_1, method, uri.getRawPath());
    }
    request.headers().set(HttpHeaderNames.HOST, uri.getHost());
    request.headers().set(HttpHeaderNames.AUTHORIZATION, _authCode);
    request.headers().set("Content-Type","application/json;charset=utf-8");
    connect.awaitUninterruptibly();
    LOG.info("Sending request. " + request);
    LOG.info("Send body: " + body);
    _channel.writeAndFlush(request);
    //调用latch.await()后主线程阻塞,等待netty异步网络调用。如果异步线程执行后不调用countDown(),主线程将永远阻塞,此为问题主要原因。
    latch.await();
    wrapper = initializer.getResponse();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return wrapper;
    }
  • 4> cn.jiguang.common.connection.NettyClientInitializer.java

1
2
3
4
5
6
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//使用_latch创建HttpResponseHandler
this._handler = new HttpResponseHandler(_callback, _latch);
socketChannel.pipeline().addLast(_sslCtx.newHandler(socketChannel.alloc()), new HttpClientCodec(), _handler);
}
  • 5> cn.jiguang.common.connection.HttpResponseHandler.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    if (msg instanceof HttpResponse) {
    HttpResponse response = (HttpResponse) msg;
    status = response.status().code();
    }
    if (msg instanceof HttpContent) {
    HttpContent content = (HttpContent) msg;
    LOG.info(content.content().toString());
    if (content instanceof LastHttpContent) {
    //断点跟踪发现,程序运行到该分支后,直接close,并未调用_latch.countDown(); 此为问题的主要爆发点
    LOG.info("closing connection");
    ctx.close();
    } else {
    String responseContent = content.content().toString(CharsetUtil.UTF_8);
    _wrapper.responseCode = status;
    _wrapper.responseContent = responseContent;
    LOG.info("Got Response code: " + status + " content: " + responseContent);
    System.err.println("Got Response code: " + status + " content: " + responseContent);
    System.err.flush();
    if (null != _callback) {
    _callback.onSucceed(_wrapper);
    }
    if (null != _latch) {
    //正常情况下
    _latch.countDown();
    }
    }
    }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    LOG.error("error:", cause);
    try {
    //模拟网络异常,发现此处也存在bug,未调用_latch.countDown(); 隐藏爆发点
    ctx.close();
    }catch (Exception ex) {
    LOG.error("close error:", ex);
    }
    }

翻阅源码发现HttpResponseHandler.java类内部存在两个隐患:

  • 程序运行到if (content instanceof LastHttpContent) {……}分支后,未调用_latch.countDown();
  • 当出现网络异常时,exceptionCaught方法内部也未调用_latch.countDown();

只要其中任意一个隐患爆发,主线程就会阻塞,进入WAITING状态。消费者线程就此堵塞,不再消费BlockQueue中的消息。

恢复现场

  • Maven pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>sky-framework</artifactId>
<groupId>org.sky</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jpush-test</artifactId>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- jpush -->
<dependency>
<groupId>cn.jpush.api</groupId>
<artifactId>jpush-client</artifactId>
<version>3.2.15</version>
</dependency>
<!-- utils begin -->
<dependency>
<groupId>cn.jpush.api</groupId>
<artifactId>jiguang-common</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.3</version>
</dependency>
<!-- LOGGING begin -->
<!-- slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<!-- logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<!-- 代码直接调用log4j会被桥接到slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<!-- 代码直接调用commons-logging会被桥接到slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<!-- 代码直接调用java.util.logging会被桥接到slf4j -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</dependency>
<!-- log4jdbc -->
<dependency>
<groupId>com.googlecode.log4jdbc</groupId>
<artifactId>log4jdbc</artifactId>
<scope>runtime</scope>
</dependency>
<!-- LOGGING end -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
</dependencies>
</project>
  • 测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package org.sky.jpush;
import cn.jiguang.common.ClientConfig;
import cn.jiguang.common.resp.APIConnectionException;
import cn.jiguang.common.resp.APIRequestException;
import cn.jpush.api.JPushClient;
import cn.jpush.api.push.PushResult;
import cn.jpush.api.push.model.Options;
import cn.jpush.api.push.model.Platform;
import cn.jpush.api.push.model.PushPayload;
import cn.jpush.api.push.model.audience.Audience;
import cn.jpush.api.push.model.audience.AudienceTarget;
import cn.jpush.api.push.model.notification.IosAlert;
import cn.jpush.api.push.model.notification.IosNotification;
import cn.jpush.api.push.model.notification.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
/**
* @author roc
* @date 2018/01/09
*/
public class JPushServiceTest {
protected static final Logger LOG = LoggerFactory.getLogger(JPushServiceTest.class);
private static final String APP_KEY = "b11314807507e2bcfdeebe2e";
private static final String MASTER_SECRET = "2c88a01e073a0fe4fc7b167c";
protected static final String GROUP_MASTER_SECRET = "b11314807507e2bcfdeebe2e";
protected static final String GROUP_PUSH_KEY = "2c88a01e073a0fe4fc7b167c";
public static final String ALERT = "JPush Test - alert";
public static final String MSG_CONTENT = "JPush Test - msgContent";
public static final String REGISTRATION_ID1 = "0900e8d85ef";
public static final String REGISTRATION_ID2 = "0a04ad7d8b4";
public static final String REGISTRATION_ID3 = "18071adc030dcba91c0";
@Test
public void testNettyClient() {
ClientConfig clientConfig = ClientConfig.getInstance();
JPushClient jpushClient = new JPushClient(MASTER_SECRET, APP_KEY, null, clientConfig);
/*String authCode = ServiceHelper.getBasicAuthorization(APP_KEY, MASTER_SECRET);
ApacheHttpClient apacheHttpClient = new (authCode, null, clientConfig);
jpushClient.getPushClient().setHttpClient(apacheHttpClienApacheHttpClientt);
*/
PushPayload payload = buildTestPayload();
try {
PushResult result = jpushClient.sendPush(payload);
int status = result.getResponseCode();
LOG.info("Got result - " + result);
} catch (APIConnectionException e) {
LOG.error("Connection error. Should retry later. ", e);
LOG.error("Sendno: " + payload.getSendno());
} catch (APIRequestException e) {
LOG.error("Error response from JPush server. Should review and fix it. ", e);
LOG.info("HTTP Status: " + e.getStatus());
LOG.info("Error Code: " + e.getErrorCode());
LOG.info("Error Message: " + e.getErrorMessage());
LOG.info("Msg ID: " + e.getMsgId());
LOG.error("Sendno: " + payload.getSendno());
}
}
public PushPayload buildTestPayload() {
String title = "hello world, roc";
String subTitle = "低头前行了这么久,我只是在找一个抬头的机会";
String body = "人这一生,浪费了太多的时间在毫无意义的事情上,担忧、抱怨、埋怨、比较……";
IosAlert iosAlert = IosAlert.newBuilder()
.setTitleAndBody(title, subTitle, body).build();
IosNotification iosNotification = IosNotification.newBuilder()
.setAlert(iosAlert).incrBadge(1)
.setSound("happy").build();
Notification notification = Notification.newBuilder().addPlatformNotification(iosNotification).build();
PushPayload pushPayload = PushPayload
.newBuilder()
.setPlatform(Platform.android_ios())
.setAudience(
Audience.newBuilder()
.addAudienceTarget(
AudienceTarget.alias("TEST303644"))
.build())
.setNotification(notification)
.setOptions(Options.newBuilder().setApnsProduction(true).build())
.build();
return pushPayload;
}
}
  • Thread Dump

Debug跟踪,发现程序进入if分支,调用close()关闭连接后,主线程阻塞进入wait状态。使用IDEA生成thread dump:
此处输入图片的描述
此处输入图片的描述

  • 日志输出
1
2
3
4
2018-01-09 21:12:19.064 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.ssl.SslHandler - [id: 0x6f019129, L:/10.0.31.70:55848 - R:api.jpush.cn/103.40.232.116:443] HANDSHAKEN: TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
2018-01-09 21:12:29.324 [nioEventLoopGroup-2-1] INFO c.j.c.connection.HttpResponseHandler - PooledSlicedByteBuf(ridx: 0, widx: 44, cap: 44/44, unwrapped: PooledUnsafeDirectByteBuf(ridx: 324, widx: 324, cap: 373))
//此为HttpResponseHandler.java类log输出,与源代码吻合
2018-01-09 21:12:49.010 [nioEventLoopGroup-2-1] INFO c.j.c.connection.HttpResponseHandler - closing connection

主线程处于WAIT状态,Netty异步线程已运行完毕,现场恢复成功。

解决方案

  1. 升级极光SDK版本,升级后发现最新版已解决该问题。
  2. 增加熔断处理,调用外部服务时,使用Guava SimpleTimeLimiter设置超时熔断。