# RocketMQ 实战:模拟电商网站场景综合案例(十)

RocketMQ 实战:模拟电商网站场景综合案例(十)

一、RocketMQ 实战:模拟电商网站场景综合案例-- 创建支付订单流程

1、支付订单流程

1.png

2、在 shop-pay-service 工程模块中,创建 启动 类 PayServiceApplication.java


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
 *
 *   2024-6-11 创建 启动 类 PayServiceApplication.java
 */
package com.itheima.shop;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(PayServiceApplication.class,args);
    }
}

3、在 shop-pay-service 工程模块中,创建 支付订单 实现类 PayServiceImpl.java


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
 *
 *   2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
 */
package com.itheima.shop.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.itheima.api.IPayService;
import com.itheima.shop.pojo.TradePay;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{
	//1.判断订单支付状态

	//2.设置订单的状态为未支付
       
    //3.保存支付订单
}

二、RocketMQ 实战:模拟电商网站场景综合案例-- 创建支付订单实现

1、在 shop-api 工程模块中,创建 支付订单 接口类 IPayService.java


/**
 *   shop\shop-api\src\main\java\com\itheima\api\IPayService.java
 *
 *   2024-6-11 创建 支付订单 接口类 IPayService.java
 */
package com.itheima.api;

import com.itheima.entity.Result;
import com.itheima.shop.pojo.TradePay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.exception.RemotingException;

public interface IPayService {

    public Result createPayment(TradePay tradePay);

    public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;

}

2、在 shop-pay-service 工程模块中,启动类 PayServiceApplication.java 中,添加 IDWorker


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
 *
 *   2024-6-11 创建 启动 类 PayServiceApplication.java
 */
package com.itheima.shop;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.itheima.utils.IDWorker;

@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(PayServiceApplication.class,args);
    }
	
	@Bean
    public IDWorker getBean(){
        return new IDWorker(1,2);
    }
}

3、在 shop-pay-service 工程模块中,实现类 PayServiceImpl.java 中,完成 创建支付订单 方法 代码。


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
 *
 *   2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
 */
package com.itheima.shop.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.itheima.api.IPayService;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import org.springframework.stereotype.Component;
import com.itheima.utils.IDWorker;
import com.itheima.entity.Result;
import com.itheima.constant.ShopCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{

    @Autowired
    private TradePayMapper tradePayMapper;

    @Autowired
    private IDWorker idWorker;

    @Override  //创建支付订单
    public Result createPayment(TradePay tradePay) {

        if(tradePay==null || tradePay.getOrderId()==null){
            CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
        }

        //1.判断订单支付状态
        TradePayExample example = new TradePayExample();
        TradePayExample.Criteria criteria = example.createCriteria();
        criteria.andOrderIdEqualTo(tradePay.getOrderId());
        criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
        int r = tradePayMapper.countByExample(example);
        if(r>0){
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
        //2.设置订单的状态为未支付
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
        //3.保存支付订单
        tradePay.setPayId(idWorker.nextId());
        tradePayMapper.insert(tradePay);

        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
    }
}

三、RocketMQ 实战:模拟电商网站场景综合案例-- 支付回调处理流程分析

支付回调处理流程分析.png

四、RocketMQ 实战:模拟电商网站场景综合案例-- 支付回调代码实现

1、支付回调代码实现


public Result callbackPayment(TradePay tradePay) {

    if (tradePay.getIsPaid().equals(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode())) {
        tradePay = tradePayMapper.selectByPrimaryKey(tradePay.getPayId());
        if (tradePay == null) {
            CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
        }
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        int i = tradePayMapper.updateByPrimaryKeySelective(tradePay);
        //更新成功代表支付成功
        if (i == 1) {
            TradeMqProducerTemp mqProducerTemp = new TradeMqProducerTemp();
            mqProducerTemp.setId(String.valueOf(idWorker.nextId()));
            mqProducerTemp.setGroupName("payProducerGroup");
            mqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
            mqProducerTemp.setMsgTag(topic);
            mqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
            mqProducerTemp.setCreateTime(new Date());
            mqProducerTempMapper.insert(mqProducerTemp);
            TradePay finalTradePay = tradePay;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        SendResult sendResult = sendMessage(topic, 
                                                            tag, 
                                                            finalTradePay.getPayId(), 
                                                            JSON.toJSONString(finalTradePay));
                        log.info(JSON.toJSONString(sendResult));
                        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                            mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
                            System.out.println("删除消息表成功");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        } else {
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
    }
    return new Result(ShopCode.SHOP_SUCCESS.getSuccess(), ShopCode.SHOP_SUCCESS.getMessage());
}

2、在 shop-pay-service 工程模块中,订单支付 实现类 PayServiceImpl.java 中,创建 支付回调 方法。


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
 *
 *   2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
 */
package com.itheima.shop.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Date;

@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{

    @Autowired
    private TradePayMapper tradePayMapper;

    @Autowired
    private TradeMqProducerTempMapper mqProducerTempMapper;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private IDWorker idWorker;

    @Value("${rocketmq.producer.group}")
    private String groupName;

    @Value("${mq.topic}")
    private String topic;

    @Value("${mq.pay.tag}")
    private String tag;

    @Override  //创建支付订单
    public Result createPayment(TradePay tradePay) {

        if(tradePay==null || tradePay.getOrderId()==null){
            CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
        }

        //1.判断订单支付状态
        TradePayExample example = new TradePayExample();
        TradePayExample.Criteria criteria = example.createCriteria();
        criteria.andOrderIdEqualTo(tradePay.getOrderId());
        criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
        int r = tradePayMapper.countByExample(example);
        if(r>0){
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
        //2.设置订单的状态为未支付
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
        //3.保存支付订单
        tradePay.setPayId(idWorker.nextId());
        tradePayMapper.insert(tradePay);

        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
    }

    @Override  //支付回调方法
    public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        log.info("支付回调");
        //1. 判断用户支付状态
        if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
            //2. 更新支付订单状态为已支付
            Long payId = tradePay.getPayId();
            TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
            //判断支付订单是否存在
            if(pay==null){
                CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
            }
            pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
            int r = tradePayMapper.updateByPrimaryKeySelective(pay);
            log.info("支付订单状态改为已支付");
            if(r==1){
                //3. 创建支付成功的消息
                TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
                tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
                tradeMqProducerTemp.setGroupName(groupName);
                tradeMqProducerTemp.setMsgTopic(topic);
                tradeMqProducerTemp.setMsgTag(tag);
                tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
                tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
                tradeMqProducerTemp.setCreateTime(new Date());
                //4. 将消息持久化数据库
                mqProducerTempMapper.insert(tradeMqProducerTemp);
                log.info("将支付成功消息持久化到数据库");

				//5. 发送消息到MQ
				SendResult result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
				
				if(result.getSendStatus().equals(SendStatus.SEND_OK)){
					log.info("消息发送成功");
					//6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
					mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
					log.info("持久化到数据库的消息删除");
				}
            }
            return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
        }else{
            CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
            return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
        }
    }

    /**
     * 发送支付成功消息
     * @param topic
     * @param tag
     * @param key
     * @param body
     */
    private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        if(StringUtils.isEmpty(topic)){
            CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
        }
        if(StringUtils.isEmpty(body)){
            CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
        }
        Message message = new Message(topic,tag,key,body.getBytes());
        SendResult sendResult = rocketMQTemplate.getProducer().send(message);
        return sendResult;
    }
}

五、RocketMQ 实战:模拟电商网站场景综合案例-- 线程池优化消息发送

1、线程池优化消息发送逻辑:创建线程池对象


@Bean
public ThreadPoolTaskExecutor getThreadPool() {

    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

    executor.setCorePoolSize(4);

    executor.setMaxPoolSize(8);

    executor.setQueueCapacity(100);

    executor.setKeepAliveSeconds(60);

    executor.setThreadNamePrefix("Pool-A");

    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    executor.initialize();

    return executor;

}

2、线程池优化消息发送逻辑: 使用线程池


@Autowired
private ThreadPoolTaskExecutor executorService;

executorService.submit(new Runnable() {
    @Override
    public void run() {
        try {
            SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));
            log.info(JSON.toJSONString(sendResult));
            if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
                mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());
                System.out.println("删除消息表成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});

3、在 shop-pay-service 工程模块中,启动 类 PayServiceApplication.java 添加 线程池优化消息发送。


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\PayServiceApplication.java
 *
 *   2024-6-11 创建 启动 类 PayServiceApplication.java
 */
package com.itheima.shop;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import com.itheima.utils.IDWorker;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@SpringBootApplication
@EnableDubboConfiguration
public class PayServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(PayServiceApplication.class,args);
    }

    @Bean
    public IDWorker getBean(){
        return new IDWorker(1,2);
    }

    @Bean  //添加 线程池优化消息发送。
    public ThreadPoolTaskExecutor getThreadPool() {

        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(4);

        executor.setMaxPoolSize(8);

        executor.setQueueCapacity(100);

        executor.setKeepAliveSeconds(60);

        executor.setThreadNamePrefix("Pool-A");

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        executor.initialize();

        return executor;
    }
}

4、在 shop-pay-service 工程模块中,支付订单 实现类 PayServiceImpl.java 添加 线程池优化消息发送。


/**
 *   shop\shop-pay-service\src\main\java\com\itheima\shop\service\impl\PayServiceImpl.java
 *
 *   2024-6-11 创建 支付订单 实现类 PayServiceImpl.java
 */
package com.itheima.shop.service.impl;

import com.alibaba.dubbo.config.annotation.Service;
import com.alibaba.fastjson.JSON;
import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.entity.Result;
import com.itheima.exception.CastException;
import com.itheima.shop.mapper.TradeMqProducerTempMapper;
import com.itheima.shop.mapper.TradePayMapper;
import com.itheima.shop.pojo.TradeMqProducerTemp;
import com.itheima.shop.pojo.TradePay;
import com.itheima.shop.pojo.TradePayExample;
import com.itheima.utils.IDWorker;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Date;

@Slf4j
@Component
@Service(interfaceClass = IPayService.class)
public class PayServiceImpl implements IPayService{

    @Autowired
    private TradePayMapper tradePayMapper;

    @Autowired
    private TradeMqProducerTempMapper mqProducerTempMapper;

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private IDWorker idWorker;

    @Value("${rocketmq.producer.group}")
    private String groupName;

    @Value("${mq.topic}")
    private String topic;

    @Value("${mq.pay.tag}")
    private String tag;

    @Override  //创建支付订单
    public Result createPayment(TradePay tradePay) {

        if(tradePay==null || tradePay.getOrderId()==null){
            CastException.cast(ShopCode.SHOP_REQUEST_PARAMETER_VALID);
        }

        //1.判断订单支付状态
        TradePayExample example = new TradePayExample();
        TradePayExample.Criteria criteria = example.createCriteria();
        criteria.andOrderIdEqualTo(tradePay.getOrderId());
        criteria.andIsPaidEqualTo(ShopCode.SHOP_PAYMENT_IS_PAID.getCode());
        int r = tradePayMapper.countByExample(example);
        if(r>0){
            CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);
        }
        //2.设置订单的状态为未支付
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_NO_PAY.getCode());
        //3.保存支付订单
        tradePay.setPayId(idWorker.nextId());
        tradePayMapper.insert(tradePay);

        return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
    }

    @Override  //支付回调方法
    public Result callbackPayment(TradePay tradePay) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        log.info("支付回调");
        //1. 判断用户支付状态
        if(tradePay.getIsPaid().intValue()==ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode().intValue()){
            //2. 更新支付订单状态为已支付
            Long payId = tradePay.getPayId();
            TradePay pay = tradePayMapper.selectByPrimaryKey(payId);
            //判断支付订单是否存在
            if(pay==null){
                CastException.cast(ShopCode.SHOP_PAYMENT_NOT_FOUND);
            }
            pay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
            int r = tradePayMapper.updateByPrimaryKeySelective(pay);
            log.info("支付订单状态改为已支付");
            if(r==1){
                //3. 创建支付成功的消息
                TradeMqProducerTemp tradeMqProducerTemp = new TradeMqProducerTemp();
                tradeMqProducerTemp.setId(String.valueOf(idWorker.nextId()));
                tradeMqProducerTemp.setGroupName(groupName);
                tradeMqProducerTemp.setMsgTopic(topic);
                tradeMqProducerTemp.setMsgTag(tag);
                tradeMqProducerTemp.setMsgKey(String.valueOf(tradePay.getPayId()));
                tradeMqProducerTemp.setMsgBody(JSON.toJSONString(tradePay));
                tradeMqProducerTemp.setCreateTime(new Date());
                //4. 将消息持久化数据库
                mqProducerTempMapper.insert(tradeMqProducerTemp);
                log.info("将支付成功消息持久化到数据库");

                //在线程池中进行处理
                threadPoolTaskExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        //5. 发送消息到MQ
                        SendResult result = null;
                        try {
                            result = sendMessage(topic, tag, String.valueOf(tradePay.getPayId()), JSON.toJSONString(tradePay));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        if(result.getSendStatus().equals(SendStatus.SEND_OK)){
                            log.info("消息发送成功");
                            //6. 等待发送结果,如果MQ接受到消息,删除发送成功的消息
                            mqProducerTempMapper.deleteByPrimaryKey(tradeMqProducerTemp.getId());
                            log.info("持久化到数据库的消息删除");
                        }
                    }
                });
            }
            return new Result(ShopCode.SHOP_SUCCESS.getSuccess(),ShopCode.SHOP_SUCCESS.getMessage());
        }else{
            CastException.cast(ShopCode.SHOP_PAYMENT_PAY_ERROR);
            return new Result(ShopCode.SHOP_FAIL.getSuccess(),ShopCode.SHOP_FAIL.getMessage());
        }
    }

    /**
     * 发送支付成功消息
     * @param topic
     * @param tag
     * @param key
     * @param body
     */
    private SendResult sendMessage(String topic, String tag, String key, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        if(StringUtils.isEmpty(topic)){
            CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);
        }
        if(StringUtils.isEmpty(body)){
            CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);
        }
        Message message = new Message(topic,tag,key,body.getBytes());
        SendResult sendResult = rocketMQTemplate.getProducer().send(message);
        return sendResult;
    }
}

六、RocketMQ 实战:模拟电商网站场景综合案例-- 订单微服务处理支付成功消息

1、订单微服务处理支付成功消息

支付成功后,支付服务 payService 发送 MQ 消息,订单服务、用户服务、日志服务需要订阅消息进行处理。

1) 订单服务修改订单状态为已支付
2) 日志服务记录支付日志
3) 用户服务负责给用户增加积分

2、以下用订单服务为例说明消息的处理情况

2.1、配置 RocketMQ 属性值

mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group
2.2、消费消息:在订单服务中,配置公共的消息处理类

public class BaseConsumer {

    public TradeOrder handleMessage(IOrderService 
                                    orderService, 
                                    MessageExt messageExt,Integer code) throws Exception {
        //解析消息内容
        String body = new String(messageExt.getBody(), "UTF-8");
        String msgId = messageExt.getMsgId();
        String tags = messageExt.getTags();
        String keys = messageExt.getKeys();
        OrderMQ orderMq = JSON.parseObject(body, OrderMQ.class);
        
        //查询
        TradeOrder order = orderService.findOne(orderMq.getOrderId());

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_CANCEL.getCode().equals(code)){
            order.setOrderStatus(ShopCode.SHOP_ORDER_CANCEL.getCode());
        }

        if(ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode().equals(code)){
            order.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
        }
        orderService.changeOrderStatus(order);
        return order;
    }

}
2.3、消费消息:接受订单支付成功消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}", 
                         consumerGroup = "${mq.pay.consumer.group.name}")
public class PayConsumer extends BaseConsumer implements RocketMQListener<MessageExt> {

    @Autowired
    private IOrderService orderService;

    @Override
    public void onMessage(MessageExt messageExt) {
        try {
            log.info("CancelOrderProcessor receive message:"+messageExt);
            TradeOrder order = handleMessage(orderService, 
                                             messageExt, 
                                             ShopCode.SHOP_ORDER_MESSAGE_STATUS_ISPAID.getCode());
            log.info("订单:["+order.getOrderId()+"]支付成功");
        } catch (Exception e) {
            e.printStackTrace();
            log.error("订单支付失败");
        }
    }
}

3、在 shop-pay-service 工程模块中,application.properties 配置文件中,配置 RocketMQ 属性值,保持 topic 属性值要一致。


#  shop\shop-pay-service\src\main\resources\application.properties

# dubbo
spring.application.name=dubbo-pay-provider
spring.dubbo.application.id=dubbo-pay-provider
spring.dubbo.application.name=dubbo-pay-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20885

# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root

# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml

# RocketMQ
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=payProducerGroup

# 配置 RocketMQ 属性值,保持 topic 属性值要一致。
mq.topic=payTopic
mq.pay.tag=paid

4、在 shop-order-service 工程模块中,application.properties 配置文件中,配置 RocketMQ 属性值,保持 topic 属性值要一致。


#  shop\shop-order-service\src\main\resources\application.properties

# dubbo
spring.application.name=dubbo-order-provider
spring.dubbo.application.id=dubbo-order-provider
spring.dubbo.application.name=dubbo-order-provider
spring.dubbo.registry.address=zookeeper://192.168.25.140:2181;zookeeper://192.168.25.140:2182;zookeeper://192.168.25.140:2183
spring.dubbo.server=true
spring.dubbo.protocol.name=dubbo
spring.dubbo.protocol.port=20884

# DB
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/trade?useUnicode=true&characterEncoding=utf8
spring.datasource.username=root
spring.datasource.password=root

# Mybatis
#pojo别名扫描包
mybatis.type-aliases-package=com.itheima.shop.pojo
#加载Mybatis映射文件
mybatis.mapper-locations=classpath:com/itheima/shop/mapper/*Mapper.xml

# RocketMQ
# 下单失败消息发送组
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
rocketmq.producer.group=orderProducerGroup

mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
mq.order.tag.cancel=order_cancel

# 配置 RocketMQ 属性值,保持 topic 属性值要一致。
mq.pay.topic=payTopic
mq.pay.consumer.group.name=pay_payTopic_group

5、在 shop-order-service 工程模块中,创建 订单微服务处理支付成功消息 的类 PaymentListener.java


/**
 *   shop\shop-order-service\src\main\java\com\itheima\shop\mq\PaymentListener.java
 *
 *   2024-6-12  创建 订单微服务处理支付成功消息 的类 PaymentListener.java
 */
package com.itheima.shop.mq;

import com.alibaba.fastjson.JSON;
import com.itheima.constant.ShopCode;
import com.itheima.shop.mapper.TradeOrderMapper;
import com.itheima.shop.pojo.TradeOrder;
import com.itheima.shop.pojo.TradePay;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",consumerGroup = "${mq.pay.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class PaymentListener implements RocketMQListener<MessageExt>{

    @Autowired
    private TradeOrderMapper orderMapper;

    @Override
    public void onMessage(MessageExt messageExt) {

        log.info("接收到支付成功消息");

        try {
            //1.解析消息内容
            String body = new String(messageExt.getBody(),"UTF-8");
            TradePay tradePay = JSON.parseObject(body,TradePay.class);
            //2.根据订单ID查询订单对象
            TradeOrder tradeOrder = orderMapper.selectByPrimaryKey(tradePay.getOrderId());
            //3.更改订单支付状态为已支付
            tradeOrder.setPayStatus(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());
            //4.更新订单数据到数据库
            orderMapper.updateByPrimaryKey(tradeOrder);
            log.info("更改订单支付状态为已支付");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

七、RocketMQ 实战:模拟电商网站场景综合案例-- 支付业务服务端测试

1、在 shop-pay-service 工程模块中,创建 测试类 PayServiceTest.java


/**
 *   shop\shop-pay-service\src\test\java\com\itheima\test\PayServiceTest.java
 *
 *   2024-6-12  创建 测试类 PayServiceTest.java
 */
package com.itheima.test;

import com.itheima.api.IPayService;
import com.itheima.constant.ShopCode;
import com.itheima.shop.PayServiceApplication;
import com.itheima.shop.pojo.TradePay;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.IOException;
import java.math.BigDecimal;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = PayServiceApplication.class)
public class PayServiceTest {

    @Autowired
    private IPayService payService;

    @Test
    public void createPayment(){
        long orderId = 351526299216515072L;  //状态为1已确认支付的订单ID
        TradePay tradePay = new TradePay();
        tradePay.setOrderId(orderId);
        tradePay.setPayAmount(new BigDecimal(880));  //支付金额=总金额-优惠券-余额
        payService.createPayment(tradePay);
    }

    @Test
    public void callbackPayment() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, IOException {

        long payId = 352516176372441088L;  //支付定单的ID
        long orderId = 351526299216515072L;  //定单的ID

        TradePay tradePay = new TradePay();
        tradePay.setPayId(payId);
        tradePay.setOrderId(orderId);
        tradePay.setIsPaid(ShopCode.SHOP_ORDER_PAY_STATUS_IS_PAY.getCode());  //支付成功的状态
        payService.callbackPayment(tradePay);

        System.in.read();  //让主线程不要停止,以让子线程有时间发送消息。
    }

}

2、启动各个微服务,进行测试。

在这里插入图片描述

在这里插入图片描述

上一节关联链接请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(九)

环境搭建:数据库表结构介绍–项目工程初始化 查看 请点击:
# RocketMQ 实战:模拟电商网站场景综合案例(三)

mybatis 逆向工程 生成 POJO 类 源文件 和 mapper 映射文件 查看 请点击:
RocketMQ 实战:模拟电商网站场景综合案例(四)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/705269.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MT7981B+MT7976C+MT7531A RF定频测试方法

1、从下面网址下载QA软件包&#xff0c;然后在WIN系统下安装QA环境。 https://download.csdn.net/download/zhouwu_linux/89428691?spm1001.2014.3001.5501 在WINDOWS 7系统下先安装WinPcap_4_1_3.exe。 2、搭建硬件环境&#xff0c;电脑先连接仪器&#xff0c;主板网络与电…

天地图开发实战:Vue结合OpenLayers实现动态点位地图

在Web开发中&#xff0c;地图功能是一个常见的需求&#xff0c;尤其是在需要展示地理位置信息的应用程序中。OpenLayers&#xff08;简称OL&#xff09;是一个强大的JavaScript库&#xff0c;用于创建交互式地图。本文将介绍如何利用OpenLayers和天地图API&#xff0c;实现一个…

Mybatis save、saveOrUpdate、update的区别

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 1. save方法 Mybatis的save方法用于插入一条新的记录。当数据库中不存在相同的记录时&#xff0c;会执行插入操作&#xff1b;如果已经存在相同的记录&#xff0c;则会抛出异常。 int result sqlSession.insert(&…

电脑桌面提醒做事的app 好用的桌面提醒app

在快节奏的现代生活中&#xff0c;我们每天都要通过电脑处理大量的工作事项。然而&#xff0c;繁忙的工作节奏有时会导致我们遗忘某些重要任务&#xff0c;从而带来不必要的损失。为了避免这种情况&#xff0c;选择一款好用的桌面提醒app显得尤为重要。 想象一下&#xff0c;你…

Java中Transactional在不同方法间的穿透性,rollbackFor参数含义

哈喽&#xff0c;大家好&#xff0c;我是木头左&#xff01; 在Java开发中&#xff0c;经常会遇到需要在一个事务中执行多个操作的场景。为了确保这些操作的原子性&#xff0c;可以使用Spring框架提供的Transactional注解来实现事务管理。然而&#xff0c;在实际开发过程中&…

CVE-2012-2122-mysql未授权访问漏洞复现-vulhub

1.原理 参考&#xff1a;CVE-2012-2122 Mysql身份认证漏洞及利用-CSDN博客 简单来说&#xff0c;除了配置上的问题以外&#xff0c;是密码的验证出现了漏洞&#xff0c;导致尝试次数多了之后直接可以登入 使用&#xff1a;kalivulhub 2.复现 开一下镜像&#xff0c;用的是v…

代码随想录算法训练营第五十八天 | 392.判断子序列

392.判断子序列 题目链接&#xff1a;代码随想录 视频讲解&#xff1a;动态规划&#xff0c;用相似思路解决复杂问题 | LeetCode&#xff1a;392.判断子序列_哔哩哔哩_bilibili 解题思路 本题和求最长公共子序列是一样的&#xff0c;值就是s字符串的长度&#xff0c;如果一致…

拥抱开源,构建未来:王嘉树与 TDengine 的开源之旅

在当代的技术浪潮中&#xff0c;开源文化不仅催生了无数创新技术&#xff0c;也为广大技术爱好者提供了一个展示才华、相互学习的平台。我们今天采访到的这位北京邮电大学电子工程学院的研究生&#xff0c;就是在这样的背景下&#xff0c;通过开源活动不断探索、学习并实现自我…

C++中extern “C“的用法

目的 extern "C"是经常用到的东西&#xff0c;面试题目也经常出现&#xff0c;然则&#xff0c;实际用时&#xff0c;还是经常遗忘&#xff0c;因此&#xff0c;深入的了解一下&#xff0c;以增强记忆。 extern "C"指令非常有用&#xff0c;因为C和C的近亲…

python -- 异步、asyncio

文章目录 协程实现协成的方法greenlet实现协程yield 关键字asyncio async & await&#xff08;**重点**&#xff09; 协程的意义异步编程事件循环快速上手awaitTask对象asyncio.Future对象concurrent.futures.Future 对象 协程 协成不是操作系统提供的&#xff0c;是程序员…

LLM-不要错过,教你如何快速且精准生成提示词?(总结Singapore首届GPT-4提示工程获奖者Sheila Teo博客)

文章目录 前置理论精炼介绍1. CO-STAR框架CO-STAR框架简单介绍CO-STAR简单示例 2. 创建系统提示【优化LLM问答丰富度】何为系统提示&#xff1f;系统提示示例 3. 使用分隔符分段提示【优化问答准度】分割符作特殊字符及CO-STAR示例分割符作XML标记 仅数据的CO-STAR实操前置分析…

如何用西语问候呢,柯桥零基础西班牙语培训

正式问候 在正式场合&#xff0c;如工作会议、正式介绍或第一次见面时&#xff0c;通常使用更为尊敬和礼貌的问候语。以下是一些例子&#xff1a; 1. Buenos das&#xff08;早上好&#xff09;:从早上到中午使用。这是一个非常常见和礼貌的问候。 2. Buenas tardes(下午好):…

HTML静态网页成品作业(HTML+CSS)—— 节日母亲节介绍网页(5个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有5个页面。 二、作品演示 三、代…

2024年3月电子学会青少年软件编程 中小学生Python编程等级考试二级真题解析(判断题)

2024年3月Python编程等级考试二级真题解析 判断题&#xff08;共10题&#xff0c;每题2分&#xff0c;共20分&#xff09; 26、元组中的元素可以是不同的数据类型 答案&#xff1a;对 考点分析&#xff1a;考查元元组相关知识&#xff0c;元组里面的元素是可以出现不同的数据…

流程设计的基本步骤

背景 公司为什么要流程&#xff0c;已经有专门章节进行阐述&#xff1b; 什么是流程&#xff0c;已经有专门章节进行专门阐述&#xff1b; 那么接下来这个章节讨论&#xff0c;流程设计的基本步骤&#xff0c;那么谁来设计流程呢&#xff0c;让一个部门的员工来设计一份流程…

30字以内免费翻译维吾尔语,汉维翻译工具推荐,维吾尔文字母OCR识别神器《维汉翻译通》App!

维吾尔文OCR文字识别 《维汉翻译通》App内置的OCR技术&#xff0c;能够快速识别图片中的文字和字母&#xff0c;无论是路标、菜单还是书籍&#xff0c;都能迅速转换为用户所需的语言&#xff0c;让语言障碍不再是问题。针对维吾尔语更是进行了专门的优化&#xff0c;即便是手写…

vue3+electron搭建桌面软件

vue3electron开发桌面软件 最近有个小项目, 客户希望像打开 网易云音乐 那么简单的运行起来系统. 前端用 Vue 会比较快一些, 因此决定使用 electron 结合 Vue3 的方式来完成该项目. 然而, 在实施过程中发现没有完整的博客能够记录从创建到打包的流程, 摸索一番之后, 随即梳理…

Android 14 系统启动流程 之 启动init进程、启动Zygote进程

Android 14 系统启动流程 之 启动init进程、启动Zygote进程 废话不多说&#xff0c;先上图&#xff0c;不清楚的可以在评论区留言。

C#下WinForm多语种切换

这是应一个网友要求写的&#xff0c;希望对你有所帮助。本文将介绍如何在一个WinForm应用程序中实现多语种切换。通过一个简单的示例&#xff0c;你将了解到如何使用资源文件管理不同语言的文本&#xff0c;并通过用户界面实现语言切换。 创建WinForm项目 打开Visual Studio&a…

ACL原理和基础配置

ACL&#xff08;Access Control List&#xff0c;访问控制列表&#xff09;是一种用于控制网络设备或操作系统上资源访问权限的方法。ACL能够基于规则和条件来允许或拒绝对资源的访问。 标准ACL&#xff08;Standard ACL&#xff09;&#xff1a;基于源IP地址来进行流量过滤&a…