1.我们以支付以后7天自动收货为例来说明下:
(1)用户支付完成以后,把订单ID插入到内存的一个DelayQueue中,同时插入到Redis中。
(2)7天之内,用户点击了确认收货,则从DelayQueue中删除,从Redis中删除。
(3)超过7天,DelayQueue中的订单ID出队,查询数据库,改状态为自动收货,删除redis。
(4)如果7天之内,web服务器重启过,则web服务器启动以后,从redis中读取待收货的订单,插入到DelayQueue。
看下具体的代码:
@Controller @RequestMapping(value = "") public class OrderController { @Autowired DelayService delayService; @Autowired RedisService redisServie; @Autowired ConfigService configService; //模拟数据库 private List<Long> ordeIds = new ArrayList<Long>(); private static final Logger log = Logger.getLogger(OrderController.class); @RequestMapping(value = "/order", method = RequestMethod.GET) public String order(final HttpServletRequest request, final Model model) { return "order"; } @RequestMapping(value = "/pay", method = RequestMethod.GET) @ResponseBody public Response<Void> pay(final HttpServletRequest request, final Model model) { final long orderId = Long.parseLong(request.getParameter("orderId")); ordeIds.add(orderId); log.error("订单已支付:"+orderId); //把订单插入到待收货的队列和redis ThreadPoolUtil.execute(new Runnable(){ @Override public void run() { //1 插入到待收货队列 DSHOrder dshOrder = new DSHOrder(orderId, configService.getDshTimeOut()); delayService.add(dshOrder); log.error("订单入队:"+orderId); //2插入到redis redisServie.set(Constants.RedisKey.DSH_PREFIX+orderId, dshOrder, RedisService.DB.DSH); log.error("订单入redis:"+orderId); } }); return new Response<Void>(0,"成功"); } @RequestMapping(value = "/confirm_delivery", method = RequestMethod.GET) @ResponseBody public Response<Void> confirm_delivery(final HttpServletRequest request, final Model model) { final long orderId = Long.parseLong(request.getParameter("orderId")); ordeIds.remove(orderId); log.error("订单已确认收货:"+orderId); //从delay队列删除,从redis删除 ThreadPoolUtil.execute(new Runnable(){ public void run(){ //从delay队列删除 delayService.remove(orderId); log.error("订单手动出队:"+orderId); //从redis删除 redisServie.delete(Constants.RedisKey.DSH_PREFIX+orderId, RedisService.DB.DSH); log.error("订单手动出redis:"+orderId); } }); return new Response<Void>(0,"成功"); } }
@Service public class DelayService { private static final Logger log = Logger.getLogger(DelayService.class); @Autowired ConfigService configService; private boolean start ; private OnDelayedListener listener; private DelayQueue<DSHOrder> delayQueue = new DelayQueue<DSHOrder>(); public static interface OnDelayedListener{ public void onDelayedArrived(DSHOrder order); } public void start(OnDelayedListener listener){ if(start){ return; } log.error("DelayService 启动"); start = true; this.listener = listener; new Thread(new Runnable(){ public void run(){ try{ while(true){ DSHOrder order = delayQueue.take(); if(DelayService.this.listener != null){ DelayService.this.listener.onDelayedArrived(order); } } }catch(Exception e){ e.printStackTrace(); } } }).start();; } public void add(DSHOrder order){ delayQueue.put(order); } public boolean remove(DSHOrder order){ return delayQueue.remove(order); } public void add(long orderId){ delayQueue.put(new DSHOrder(orderId, configService.getDshTimeOut())); } public void remove(long orderId){ DSHOrder[] array = delayQueue.toArray(new DSHOrder[]{}); if(array == null || array.length <= 0){ return; } DSHOrder target = null; for(DSHOrder order : array){ if(order.getOrderId() == orderId){ target = order; break; } } if(target != null){ delayQueue.remove(target); } } }
public class DSHOrder implements Delayed { private long orderId; private long startTime; public DSHOrder(){ } /** * orderId:订单id * timeout:自动收货的超时时间,秒 * */ public DSHOrder(long orderId, int timeout){ this.orderId = orderId; this.startTime = System.currentTimeMillis() + timeout*1000L; } @Override public int compareTo(Delayed other) { if (other == this){ return 0; } if(other instanceof DSHOrder){ DSHOrder otherRequest = (DSHOrder)other; long otherStartTime = otherRequest.getStartTime(); return (int)(this.startTime - otherStartTime); } return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (orderId ^ (orderId >>> 32)); result = prime * result + (int) (startTime ^ (startTime >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; DSHOrder other = (DSHOrder) obj; if (orderId != other.orderId) return false; if (startTime != other.startTime) return false; return true; } public long getStartTime() { return startTime; } public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public void setStartTime(long startTime) { this.startTime = startTime; } @Override public String toString() { return "DSHOrder [orderId=" + orderId + ", startTime=" + startTime + "]"; } }
@Service public class StartupListener implements ApplicationListener<ContextRefreshedEvent> { private static final Logger log = Logger.getLogger(StartupListener.class); @Autowired DelayService delayService; @Autowired RedisService redisService; @Override public void onApplicationEvent(ContextRefreshedEvent evt) { log.error(">>>>>>>>>>>>系统启动完成,onApplicationEvent()"); if (evt.getApplicationContext().getParent() == null) { return; } //自动收货 delayService.start(new OnDelayedListener(){ @Override public void onDelayedArrived(final DSHOrder order) { //异步来做 ThreadPoolUtil.execute(new Runnable(){ public void run(){ long orderId = order.getOrderId(); //查库判断是否需要自动收货 log.error("自动确认收货,onDelayedArrived():"+orderId); //从redis删除 redisService.delete(Constants.RedisKey.DSH_PREFIX+orderId, RedisService.DB.DSH); log.error("自动确认收货,删除redis:"+orderId); } }); } }); //查找需要入队的订单 ThreadPoolUtil.execute(new Runnable(){ @Override public void run() { log.error("查找需要入队的订单"); //扫描redis,找到所有可能的orderId List<String> keys = redisService.scan(RedisService.DB.DSH); if(keys == null || keys.size() <= 0){ return; } log.error("需要入队的订单keys:"+keys); //写到DelayQueue for(String key : keys){ DSHOrder order = redisService.get(key, DSHOrder.class, RedisService.DB.DSH); log.error("读redis,key:"+key); if(order != null){ delayService.add(order); log.error("订单自动入队:"+order.getOrderId()); } } } }); } }
最新的代码:https://github.com/xjs1919/util/tree/master/src/main/java/com/github/xjs/util/delay