diff --git a/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java b/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java index b637a033..7f010eaf 100644 --- a/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java +++ b/mpush-client/src/main/java/com/mpush/client/push/PushRequest.java @@ -134,11 +134,14 @@ private void submit(Status status) { if (isTimeoutEnd) {//超时结束时,当前线程已经是线程池里的线程,直接调用callback callback.onResult(getResult()); } else {//非超时结束时,当前线程为Netty线程池,要异步执行callback - PushRequestBus.I.asyncCall(this);//会执行run方法 + //PushRequestBus.I.asyncCall(this);//会执行run方法 + + //延时,等待接收到server通知。by 鹏 + PushRequestBus.I.put(sessionId, PushRequest.this); } } } - LOGGER.info("push request {} end, {}, {}, {}", status, userId, location, timeLine); + LOGGER.info("push request {} end, {}, {}, {}, timeout={}", status, userId, location, timeLine, this.getTimeout()); } /** diff --git a/mpush-common/src/main/java/com/mpush/common/message/gateway/GatewayPushMessage.java b/mpush-common/src/main/java/com/mpush/common/message/gateway/GatewayPushMessage.java index eaa5b21b..418dd319 100644 --- a/mpush-common/src/main/java/com/mpush/common/message/gateway/GatewayPushMessage.java +++ b/mpush-common/src/main/java/com/mpush/common/message/gateway/GatewayPushMessage.java @@ -207,6 +207,7 @@ public void send(ChannelFutureListener listener) { public String toString() { return "GatewayPushMessage{" + "userId='" + userId + '\'' + + ", taskId=" + taskId + '\'' + ", clientType='" + clientType + '\'' + ", timeout='" + timeout + '\'' + ", content='" + (content == null ? 0 : content.length) + '\'' + diff --git a/mpush-common/src/main/java/com/mpush/common/push/RedisBroadcastController.java b/mpush-common/src/main/java/com/mpush/common/push/RedisBroadcastController.java index ee7dfa1c..4af89c9a 100644 --- a/mpush-common/src/main/java/com/mpush/common/push/RedisBroadcastController.java +++ b/mpush-common/src/main/java/com/mpush/common/push/RedisBroadcastController.java @@ -23,6 +23,7 @@ import com.mpush.api.spi.common.CacheManager; import com.mpush.api.spi.common.CacheManagerFactory; import com.mpush.common.CacheKeys; +import com.mpush.tools.Jsons; import java.util.List; @@ -94,10 +95,12 @@ public int incSendCount(int count) { } public void success(String userId) { - cacheManager.lpush(taskSuccessUIDKey, userId); + cacheManager.lpush(taskSuccessUIDKey, Jsons.toJson(userId)); } public List successUserIds() { - return cacheManager.lrange(taskSuccessUIDKey, 0, -1, String.class); + List list = cacheManager.lrange(taskSuccessUIDKey, 0, -1, String.class); + cacheManager.del(taskSuccessUIDKey); + return list; } } diff --git a/mpush-core/src/main/java/com/mpush/core/push/BroadcastPushTask.java b/mpush-core/src/main/java/com/mpush/core/push/BroadcastPushTask.java index bcfdecb2..04187ab8 100644 --- a/mpush-core/src/main/java/com/mpush/core/push/BroadcastPushTask.java +++ b/mpush-core/src/main/java/com/mpush/core/push/BroadcastPushTask.java @@ -26,6 +26,8 @@ import com.mpush.common.condition.AwaysPassCondition; import com.mpush.api.common.Condition; import com.mpush.common.message.PushMessage; +import com.mpush.common.message.gateway.GatewayPushMessage; +import com.mpush.common.push.RedisBroadcastController; import com.mpush.common.qps.FlowControl; import com.mpush.common.qps.OverFlowException; import com.mpush.core.router.LocalRouter; @@ -98,6 +100,10 @@ private boolean broadcast() { if (checkCondition(condition, connection)) {//1.条件检测 if (connection.isConnected()) { if (connection.getChannel().isWritable()) { //检测TCP缓冲区是否已满且写队列超过最高阀值 + //设置userid,by 鹏 2017-5-23 + GatewayPushMessage gpm = (GatewayPushMessage)message; + gpm.userId = userId; + PushMessage .build(connection) .setContent(message.getContent()) @@ -145,6 +151,11 @@ private boolean checkCondition(Condition condition, Connection connection) { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) {//推送成功 Logs.PUSH.info("[Broadcast] push message to client success, userId={}, message={}", message.getUserId(), message); + + //写入redis,将成功接收的用户id保存。 by 鹏 2017-5-23 + RedisBroadcastController rbc = new RedisBroadcastController(message.getTaskId()); + rbc.success(message.getUserId()); + } else {//推送失败 Logs.PUSH.warn("[Broadcast] push message to client failure, userId={}, message={}, conn={}", message.getUserId(), message, future.channel()); }