摘要:執行該方法前,先進入切面編程注冊用戶等待確認直接發送有序無序默認延時級別不延時秒小時切面中,因為郵件激活發送消息類型為默認的等待確認。
用戶注冊
github 開源項目--paascloud-master:https://github.com/paascloud/...
分布式解決方案--基于可靠消息的最終一致性:https://github.com/paascloud/...
本篇文章目的是理解該項目可靠消息服務中心(TCP)發送消息、消費消息的流程,用戶注冊發送激活郵箱和激活后發送注冊成功郵箱都是利用可靠消息服務來解決分布式事務,理解了該流程也就弄懂了該項目中其他業務流程。發送激活郵箱過程
消息生產端:UAC
可靠消息服務:TPC
消息服務端:OPC
用戶注冊后,向注冊郵箱發送一封激活郵箱。消息生產端(UAC)
大致流程為:
本地服務 UAC 先持久化 預發送消息(等待確認消息),表 pc_mq_message_data;
調用遠端可靠消息服務TPC持久化預發送消息,可靠消息表pc_tpc_mq_message;
執行本地事務即 保存用戶信息 ;
調用遠端可靠消息服務TPC更新第2步中的等待確認狀態為發送中sending;
同時創建消費待確認列表,即持久化該Topic類型的消息被哪些消費者訂閱監聽的所有消費待確認列表,狀態為未確認,表pc_tpc_mq_confirm;
完成上面操作后,發送消息到 RocketMQ。
controller層AuthRestController.java
@PostMapping(value = "/register") @ApiOperation(httpMethod = "POST", value = "注冊用戶") public Wrapper registerUser(UserRegisterDto user) { uacUserService.register(user); return WrapMapper.ok(); }service層
用戶ID生成:雪花算法生成分布式唯一 ID
用戶密碼加密:SpringSecurity BCryptPasswordEncoder 強哈希方法加密,每次加密的結果都不一樣。
bcrypt 可以有效抵御彩虹表暴力破解,其原理就是在加鹽的基礎上多次 hash,關于密碼參考:https://mp.weixin.qq.com/s/Dk...
Redis存儲激活郵箱token:key(active_token):email:過期時間1天,即激活接口參數:activeUserToken;
生成郵件發送模板(freeMarker):activeUserTemplate.ftl
根據上面模板和發送郵件參數生成實體:MqMessageData(pc_mq_message_data)
各個子系統消息落地的消息表,比如用戶服務系統主要就是郵件消息、短信消息等。
@Override public void register(UserRegisterDto registerDto) { // 校驗注冊信息 validateRegisterInfo(registerDto); String mobileNo = registerDto.getMobileNo(); String email = registerDto.getEmail(); Date row = new Date(); String salt = String.valueOf(generateId()); // 封裝注冊信息 long id = generateId(); // id 雪花算法生成 UacUser uacUser = new UacUser(); uacUser.setLoginName(registerDto.getLoginName()); uacUser.setSalt(salt); uacUser.setLoginPwd(Md5Util.encrypt(registerDto.getLoginPwd())); uacUser.setMobileNo(mobileNo); uacUser.setStatus(UacUserStatusEnum.DISABLE.getKey()); uacUser.setUserSource(UacUserSourceEnum.REGISTER.getKey()); uacUser.setCreatedTime(row); uacUser.setUpdateTime(row); uacUser.setEmail(email); uacUser.setId(id); uacUser.setCreatorId(id); uacUser.setCreator(registerDto.getLoginName()); uacUser.setLastOperatorId(id); uacUser.setUserName(registerDto.getLoginName()); uacUser.setLastOperator(registerDto.getLoginName()); // 發送激活郵件 String activeToken = PubUtils.uuid() + super.generateId(); redisService.setKey(RedisKeyUtil.getActiveUserKey(activeToken), email, 1, TimeUnit.DAYS); Mapparam = Maps.newHashMap(); param.put("loginName", registerDto.getLoginName()); param.put("email", registerDto.getEmail()); param.put("activeUserUrl", activeUserUrl + activeToken); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set to = Sets.newHashSet(); to.add(registerDto.getEmail()); MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER, param); // 即下面的第6步 userManager.register(mqMessageData, uacUser); }
userManager.register() 通過注解 @MqProducerStore 發送消息服務。
執行該方法前,先進入切面編程
@MqProducerStore public void register(final MqMessageData mqMessageData, final UacUser uacUser) { log.info("注冊用戶. mqMessageData={}, user={}", mqMessageData, uacUser); uacUserMapper.insertSelective(uacUser); }
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface MqProducerStore { // WAIT_CONFIRM:等待確認;SAVE_AND_SEND:直接發送; MqSendTypeEnum sendType() default MqSendTypeEnum.WAIT_CONFIRM; // ORDER(1):有序;DIS_ORDER(0):無序 MqOrderTypeEnum orderType() default MqOrderTypeEnum.ORDER; // Rocketmq 默認延時級別 // ZERO(0, 不延時);ONE(1, 1秒)....EIGHTEEN(18, 2小時) DelayLevelEnum delayLevel() default DelayLevelEnum.ZERO; }
切面中,因為郵件激活發送消息類型為默認的:等待確認。
此處本地服務 UAC 消息落地:保存待確認消息 MqMessageData 到 mysql,表pc_mq_message_data;
發送待確認消息到可靠消息系統(TPC):發送預發送狀態的消息給消息中心
// 切面 MqMessageData domain = null; for (Object object : args) { if (object instanceof MqMessageData) { domain = (MqMessageData) object; break; } } domain.setOrderType(orderType); domain.setProducerGroup(producerGroup); // 1. 等待確認 if (type == MqSendTypeEnum.WAIT_CONFIRM) { if (delayLevelEnum != DelayLevelEnum.ZERO) { domain.setDelayLevel(delayLevelEnum.delayLevel()); } // 1.1 發送待確認消息到可靠消息系統 // 本地服務消息落地,可靠消息服務中心也持久化預發送消息,但是不發送 mqMessageService.saveWaitConfirmMessage(domain); } result = joinPoint.proceed(); // 返回注解方法,執行業務
@Override @Transactional(rollbackFor = Exception.class) public void saveWaitConfirmMessage(final MqMessageData mqMessageData) { // 1. 持久化到本地mysql this.saveMqProducerMessage(mqMessageData); // 2. 發送預發送狀態的消息給消息中心 TpcMqMessageDto tpcMqMessageDto = mqMessageData.getTpcMqMessageDto(); // 3. 調用遠端可靠消息服務(tpc),持久化等待確認消息 tpcMqMessageFeignApi.saveMessageWaitingConfirm(tpcMqMessageDto); // 4. mqMessageData 此時為調用遠端服務返回來的數據 log.info("<== saveWaitConfirmMessage - 存儲預發送消息成功. messageKey={}", mqMessageData.getMessageKey()); }
緊接著第7步,調用遠端可靠消息服務(TCP),此時只是持久化預發送消息,但是沒有發送(等執行完本地事務即保存用戶后在發送,即第9步)
持久化 TpcMqMessage,即 pc_tpc_mq_message(可靠消息表)
@Override public void saveMessageWaitingConfirm(TpcMqMessageDto messageDto) { if (StringUtils.isEmpty(messageDto.getMessageTopic())) { throw new TpcBizException(ErrorCodeEnum.TPC10050001); } Date now = new Date(); TpcMqMessage message = new ModelMapper().map(messageDto, TpcMqMessage.class); // 消息狀態:WAIT_SEND(10, "未發送");SENDING(20, "已發送");FINISH(30, "已完成"); message.setMessageStatus(MqSendStatusEnum.WAIT_SEND.sendStatus()); message.setUpdateTime(now); message.setCreatedTime(now); tpcMqMessageMapper.insertSelective(message); }
上面執行完后,返回注解 @MqProducerStore所在方法,執行本地事務:保存用戶到 mysql。
result = joinPoint.proceed(); // 返回注解方法,執行業務
第9步執行完后,再次進入切面,發送確認消息給可靠消息服務中心
result = joinPoint.proceed(); // 返回注解方法,執行業務 // 2. 直接發送 if (type == MqSendTypeEnum.SAVE_AND_SEND) { mqMessageService.saveAndSendMessage(domain); // 3. XXX } else if (type == MqSendTypeEnum.DIRECT_SEND) { mqMessageService.directSendMessage(domain); } else { // type = WAIT_CONFIRM final MqMessageData finalDomain = domain; taskExecutor.execute(() -> mqMessageService.confirmAndSendMessage(finalDomain.getMessageKey())); } return result;
緊接著上面,可靠消息服務中心(TCP):根據傳過來的 messageKey 確認并發送之前已經持久化的預發送消息。
// TpcMqMessageFeignClient.java @Override @ApiOperation(httpMethod = "POST", value = "確認并發送消息") public Wrapper confirmAndSendMessage(@RequestParam("messageKey") String messageKey) { logger.info("確認并發送消息. messageKey={}", messageKey); tpcMqMessageService.confirmAndSendMessage(messageKey); return WrapMapper.ok(); } // TpcMqMessageServiceImpl.java @Override public void confirmAndSendMessage(String messageKey) { final TpcMqMessage message = tpcMqMessageMapper.getByMessageKey(messageKey); if (message == null) { throw new TpcBizException(ErrorCodeEnum.TPC10050002); } TpcMqMessage update = new TpcMqMessage(); update.setMessageStatus(MqSendStatusEnum.SENDING.sendStatus()); update.setId(message.getId()); update.setUpdateTime(new Date()); // 1. 更新消息狀態為:SENDING tpcMqMessageMapper.updateByPrimaryKeySelective(update); // 2. 創建消費待確認列表(此處topic:SEND_EMAIL_TOPIC) this.createMqConfirmListByTopic(message.getMessageTopic(), message.getId(), message.getMessageKey()); // 3. 直接發送消息 this.directSendMessage(message.getMessageBody(), message.getMessageTopic(), message.getMessageTag(), message.getMessageKey(), message.getProducerGroup(), message.getDelayLevel()); }
第11步中的第2點:TCP 服務中,創建消費待確認列表,根據表 pc_tpc_mq_subscribe,查詢出不同 topic 下相對應的所有 consumer_code(消費監聽者),即設置該消息被哪些服務(CID)監聽消費;
- ``SEND_EMAIL_TOPIC`` --> ``CID_OPC``:該消息會被 `consumerGroup` 為 `CID_OPC` 的服務監聽并消費。 - 同時,保存**確認消息**:``TpcMqConfirm`` --> 表 ``pc_tpc_mq_confirm``
@Override public void createMqConfirmListByTopic(final String topic, final Long messageId, final String messageKey) { Listlist = Lists.newArrayList(); TpcMqConfirm tpcMqConfirm; List consumerGroupList = tpcMqConsumerService.listConsumerGroupByTopic(topic); if (PublicUtil.isEmpty(consumerGroupList)) { throw new TpcBizException(ErrorCodeEnum.TPC100500010, topic); } for (final String cid : consumerGroupList) { tpcMqConfirm = new TpcMqConfirm(UniqueIdGenerator.generateId(), messageId, messageKey, cid); list.add(tpcMqConfirm); } tpcMqConfirmMapper.batchCreateMqConfirm(list); }
第11步中的第3點:完成上面操作后,直接發送消息到中間件 RocketMQ 隊列中。
@Override public void directSendMessage(String body, String topic, String tag, String key, String pid, Integer delayLevel) { RocketMqProducer.sendSimpleMessage(body, topic, tag, key, pid, delayLevel); } // 核心方法:重試發送消息(重試次數3次) // pid:producerGroup --> 發送郵件服務是 PID_UAC // cid: consumerGroup --> 監聽郵件消息服務是 CID_OPC private static SendResult retrySendMessage(String pid, Message msg) { int iniCount = 1; SendResult result; while (true) { try { // Message中屬性 result = MqProducerBeanFactory.getBean(pid).send(msg); break; } catch (Exception e) { log.error("發送消息失敗:", e); if (iniCount++ >= PRODUCER_RETRY_TIMES) { throw new TpcBizException(ErrorCodeEnum.TPC100500014, msg.getTopic(), msg.getKeys()); } } } log.info("<== 發送MQ SendResult={}", result); return result; }消息消費端(OPC)
大致流程:
OPC服務通過配置類AliyunMqConfiguration.java啟動DefaultMQPushConsumer RocketMQ 消費端,并設置消息邏輯處理監聽器OptPushMessageListener;
本地服務OPC持久化消費者確認消息,表 pc_mq_message_data;
調用遠端可靠消息服務TPC,更新之前生產端持久化的消費確認列表狀態,未確認 --> 已確認,表pc_tpc_mq_confirm;
接著就可以發送激活郵箱;
如果發送成功,調用遠端可靠消息服務TPC,繼續更新第3步表中消費確認消息的狀態為已消費;
消費端 RocketMQ 啟動配置類DefaultMQPushConsumer 根據配置信息啟動;
并 subscribe 訂閱 該服務 OPC 所有的 Topic 和 tags 消息。
包括短信、郵箱激活、附件更新刪除等所有消息。
@Slf4j @Configuration public class AliyunMqConfiguration { @Resource private PaascloudProperties paascloudProperties; @Resource private OptPushMessageListener optPushConsumer; @Resource private TaskExecutor taskExecutor; /** * Default mq push consumer default mq push consumer. * * @return the default mq push consumer * * @throws MQClientException the mq client exception */ @Bean public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException { // 1. 新建消費者組 // RocketMQ實際上都是拉模式,這里的DefaultMQPushConsumer實現了推模式, // 也只是對拉消息服務做了一層封裝,即拉到消息的時候觸發業務消費者注冊到這里的callback DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(paascloudProperties.getAliyun().getRocketMq().getConsumerGroup()); // 2. 指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr(paascloudProperties.getAliyun().getRocketMq().getNamesrvAddr()); // 3. 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 // 如果非第一次啟動,那么按照上次消費的位置繼續消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); String[] strArray = AliyunMqTopicConstants.ConsumerTopics.OPT.split(GlobalConstant.Symbol.COMMA); for (String aStrArray : strArray) { String[] topicArray = aStrArray.split(GlobalConstant.Symbol.AT); String topic = topicArray[0]; String tags = topicArray[1]; if (PublicUtil.isEmpty(tags)) { tags = "*"; } // 4. 進行Topic訂閱,訂閱PushTopic下Tag為push的消息 consumer.subscribe(topic, tags); log.info("RocketMq OpcPushConsumer topic = {}, tags={}", topic, tags); } // 5. 設置消息處理器 consumer.registerMessageListener(optPushConsumer); consumer.setConsumeThreadMax(2); consumer.setConsumeThreadMin(2); taskExecutor.execute(() -> { try { Thread.sleep(5000); consumer.start(); log.info("RocketMq OpcPushConsumer OK."); } catch (InterruptedException | MQClientException e) { log.error("RocketMq OpcPushConsumer, 出現異常={}", e.getMessage(), e); } }); return consumer; } }消息邏輯處理監聽器
consumeMessage() 上有注解 @MqConsumerStore,執行前先進入切面編程;
@Slf4j @Component public class OptPushMessageListener implements MessageListenerConcurrently { @Resource private OptSendSmsTopicConsumer optSendSmsTopicService; @Resource private OptSendEmailTopicConsumer optSendEmailTopicService; @Resource private MdcTopicConsumer mdcTopicConsumer; @Resource private MqMessageService mqMessageService; @Resource private StringRedisTemplate srt; /** * Consume message consume concurrently status. * * @param messageExtList the message ext list * @param consumeConcurrentlyContext the consume concurrently context * * @return the consume concurrently status */ @Override @MqConsumerStore public ConsumeConcurrentlyStatus consumeMessage(ListmessageExtList, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt msg = messageExtList.get(0); String body = new String(msg.getBody()); String topicName = msg.getTopic(); String tags = msg.getTags(); String keys = msg.getKeys(); log.info("MQ消費Topic={},tag={},key={}", topicName, tags, keys); ValueOperations ops = srt.opsForValue(); // 控制冪等性使用的key try { MqMessage.checkMessage(body, topicName, tags, keys); String mqKV = null; if (srt.hasKey(keys)) { mqKV = ops.get(keys); } if (PublicUtil.isNotEmpty(mqKV)) { log.error("MQ消費Topic={},tag={},key={}, 重復消費", topicName, tags, keys); // 消費成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (AliyunMqTopicConstants.MqTopicEnum.SEND_SMS_TOPIC.getTopic().equals(topicName)) { optSendSmsTopicService.handlerSendSmsTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.SEND_EMAIL_TOPIC.getTopic().equals(topicName)) { optSendEmailTopicService.handlerSendEmailTopic(body, topicName, tags, keys); } if (AliyunMqTopicConstants.MqTopicEnum.TPC_TOPIC.getTopic().equals(topicName)) { mqMessageService.deleteMessageTopic(body, tags); } if (AliyunMqTopicConstants.MqTopicEnum.MDC_TOPIC.getTopic().equals(topicName)) { mdcTopicConsumer.handlerSendSmsTopic(body, topicName, tags, keys); } else { log.info("OPC訂單信息消 topicName={} 不存在", topicName); } } catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 如果消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗需要重試的場景, // 返回下面代碼,RocketMQ就認為消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } ops.set(keys, keys, 10, TimeUnit.DAYS); // 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ才會認為這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
執行方法之前先進入切面編程執行,獲取注解方法的參數和消息;
@Around(value = "mqConsumerStoreAnnotationPointcut()") public Object processMqConsumerStoreJoinPoint(ProceedingJoinPoint joinPoint) throws Throwable { // ... MqMessageData dto = this.getTpcMqMessageDto(messageExtList.get(0)); final String messageKey = dto.getMessageKey(); if (isStorePreStatus) { // 執行下面3、4步 mqMessageService.confirmReceiveMessage(consumerGroup, dto); } String methodName = joinPoint.getSignature().getName(); try { // 返回注解方法; result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result; }
confirmReceiveMessage:消費者確認收到消息;在上面目錄【發送激活郵箱的消息/service層】的第 12 步
持久化消費者確認消息 MqMessageData 到本地服務 OPC mysql 中,表 pc_mq_message_data;
@Override @Transactional(rollbackFor = Exception.class) public void confirmReceiveMessage(String cid, MqMessageData messageData) { final String messageKey = messageData.getMessageKey(); log.info("confirmReceiveMessage - 消費者={}, 確認收到key={}的消息", cid, messageKey); // 持久化消費者確認消息 MqMessageData 到本地服務 mysql 中,表 pc_mq_message_data messageData.setMessageType(MqMessageTypeEnum.CONSUMER_MESSAGE.messageType()); messageData.setId(UniqueIdGenerator.generateId()); mqMessageDataMapper.insertSelective(messageData); // 調用遠端服務 TPC,更新確認收到消息表狀態為已確認,TpcMqConfirm,表 pc_tpc_mq_confirm; Wrapper wrapper = tpcMqMessageFeignApi.confirmReceiveMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }
緊接著第3步,調用遠端服務 TPC,更新確認收到消息狀態為已確認,TpcMqConfirm,表 pc_tpc_mq_confirm;
status:狀態, 10 - 未確認 ; 20 - 已確認; 30 已消費;
consumeCount:消費的次數,加 1;
@Override public void confirmReceiveMessage(final String cid, final String messageKey) { // 1. 校驗cid // 2. 校驗messageKey // 3. 校驗cid 和 messageKey Long confirmId = tpcMqConfirmMapper.getIdMqConfirm(cid, messageKey); // 3. 更新消費信息的狀態 tpcMqConfirmMapper.confirmReceiveMessage(confirmId); }
第3、4步執行后,返回切面,執行下面代碼,再返回注解修飾的方法;
result = joinPoint.proceed();
注解修飾方法,通過參數 MessageExt 獲取該消息的 topic(主題)、tag(標簽)、keys(唯一鍵)、body(消息體);
冪等性(避免重復消費):redis 中存儲消費過的該消息的 keys;
根據消息的 topic 執行相應的操作處理該消息
比如此流程的發送激活郵箱,使用 spring 框架的 TaskExecutor執行郵箱發送任務。
@Override public int sendSimpleMail(String subject, String text, Setto) { log.info("sendSimpleMail - 發送簡單郵件. subject={}, text={}, to={}", subject, text, to); int result = 1; try { SimpleMailMessage message = MailEntity.createSimpleMailMessage(subject, text, to); message.setFrom(from); taskExecutor.execute(() -> mailSender.send(message)); } catch (Exception e) { log.info("sendSimpleMail [FAIL] ex={}", e.getMessage(), e); result = 0; } return result; }
第8步 如果消息消費成功,郵件發送成功,redis 中存儲該消息(冪等,過期時間 10 天),同時返回消費成功代碼;
ops.set(keys, keys, 10, TimeUnit.DAYS); // 業務實現消費回調的時候,當且僅當返回下面代碼時,RocketMQ 才會認為這批消息是消費完成的 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
第8步 如果消息消費失敗,比如數據庫異常,扣款失敗,郵件發送失敗等需要重試的場景,返回重試消費代碼。
} catch (IllegalArgumentException ex) { log.error("校驗MQ message 失敗 ex={}", ex.getMessage(), ex); } catch (Exception e) { log.error("處理MQ message 失敗 topicName={}, keys={}, ex={}", topicName, keys, e.getMessage(), e); // 如果消息消費失敗,例如數據庫異常等,扣款失敗,發送失敗需要重試的場景, // 返回下面代碼,RocketMQ就認為消費失敗。 return ConsumeConcurrentlyStatus.RECONSUME_LATER; }
執行完注解修飾方法,再次返回切面中,繼續執行,判斷返回結果;
String methodName = joinPoint.getSignature().getName(); try { result = joinPoint.proceed(); log.info("result={}", result); if (CONSUME_SUCCESS.equals(result.toString())) { mqMessageService.saveAndConfirmFinishMessage(consumerGroup, messageKey); } } catch (Exception e) { log.error("發送可靠消息, 目標方法[{}], 出現異常={}", methodName, e.getMessage(), e); throw e; } finally { log.info("發送可靠消息 目標方法[{}], 總耗時={}", methodName, System.currentTimeMillis() - startTime); } return result;
第11步中,如果返回 CONSUME_SUCCESS,保存并確認消息完成;
調用遠端服務 TCP,更新消費確認消息列表 pc_tpc_mq_confirm,狀態為已消費;
@Override public void saveAndConfirmFinishMessage(String cid, String messageKey) { // 1. 調用遠端服務tcp,確認完成消費消息 Wrapper wrapper = tpcMqMessageFeignApi.confirmConsumedMessage(cid, messageKey); log.info("tpcMqMessageFeignApi.confirmReceiveMessage result={}", wrapper); if (wrapper == null) { throw new TpcBizException(ErrorCodeEnum.GL99990002); } if (wrapper.error()) { throw new TpcBizException(ErrorCodeEnum.TPC10050004, wrapper.getMessage(), messageKey); } }發送激活成功郵箱過程
發送激活成功郵箱同上面發送激活郵箱一樣利用可靠消息服務完成分布式事務操作。controller層
@GetMapping(value = "/activeUser/{activeUserToken}") @ApiOperation(httpMethod = "POST", value = "激活用戶") public Wrapper activeUser(@PathVariable String activeUserToken) { uacUserService.activeUser(activeUserToken); return WrapMapper.ok("激活成功"); }service 層
activeuser():
@Override public void activeUser(String activeUserToken) { Preconditions.checkArgument(!StringUtils.isEmpty(activeUserToken), "激活用戶失敗"); String activeUserKey = RedisKeyUtil.getActiveUserKey(activeUserToken); String email = redisService.getKey(activeUserKey); if (StringUtils.isEmpty(email)) { throw new UacBizException(ErrorCodeEnum.UAC10011030); } // 修改用戶狀態, 綁定訪客角色 UacUser uacUser = new UacUser(); uacUser.setEmail(email); uacUser = uacUserMapper.selectOne(uacUser); if (uacUser == null) { logger.error("找不到用戶信息. email={}", email); throw new UacBizException(ErrorCodeEnum.UAC10011004, email); } UacUser update = new UacUser(); update.setId(uacUser.getId()); update.setStatus(UacUserStatusEnum.ENABLE.getKey()); LoginAuthDto loginAuthDto = new LoginAuthDto(); loginAuthDto.setUserId(uacUser.getId()); loginAuthDto.setUserName(uacUser.getLoginName()); loginAuthDto.setLoginName(uacUser.getLoginName()); update.setUpdateInfo(loginAuthDto); UacUser user = this.queryByUserId(uacUser.getId()); Mapparam = Maps.newHashMap(); param.put("loginName", user.getLoginName()); param.put("dateTime", DateUtil.formatDateTime(new Date())); Set to = Sets.newHashSet(); to.add(user.getEmail()); // 構建激活成功消息體 MqMessageData mqMessageData = emailProducer.sendEmailMq(to, UacEmailTemplateEnum.ACTIVE_USER_SUCCESS, AliyunMqTopicConstants.MqTagEnum.ACTIVE_USER_SUCCESS, param); // 1. 可靠消息服務發送郵件 userManager.activeUser(mqMessageData, update, activeUserKey); }
調用userManager.activeUser():可以看到該方法也是注解@MqProducerStore修飾;
@MqProducerStore public void activeUser(final MqMessageData mqMessageData, final UacUser uacUser, final String activeUserKey) { log.info("激活用戶. mqMessageData={}, user={}", mqMessageData, uacUser); // 更新用戶信息 int result = uacUserMapper.updateByPrimaryKeySelective(uacUser); if (result < 1) { throw new UacBizException(ErrorCodeEnum.UAC10011038, uacUser.getId()); } // 綁定一個訪客角色默認值roleId=10000 final Long userId = uacUser.getId(); Preconditions.checkArgument(userId != null, "用戶Id不能爲空"); final Long roleId = 10000L; UacRoleUser roleUser = new UacRoleUser(); roleUser.setUserId(userId); roleUser.setRoleId(roleId); uacRoleUserMapper.insertSelective(roleUser); // 綁定一個組織 UacGroupUser groupUser = new UacGroupUser(); groupUser.setUserId(userId); groupUser.setGroupId(GlobalConstant.Sys.SUPER_MANAGER_GROUP_ID); uacGroupUserMapper.insertSelective(groupUser); // 刪除 activeUserToken redisService.deleteKey(activeUserKey); }
本地事務執行用戶信息更新和 redis 郵箱激活token刪除,切面編程發送激活成功郵箱分析過程和上面發送激活郵箱流程是一樣的,這里不再贅述。
這兩個過程根據發送消息的 tag 不同,從而處理邏輯不同。Topic 都是 SEND_EMAIL_TOPIC;
此處具體為郵箱內容模板不同,其余消息生產端和消費端流程一樣。
public enum MqTagEnum { /** * 激活用戶. */ ACTIVE_USER("ACTIVE_USER", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶"), /** * 激活用戶成功. */ ACTIVE_USER_SUCCESS("ACTIVE_USER_SUCCESS", MqTopicEnum.SEND_EMAIL_TOPIC.getTopic(), "激活用戶成功"), // ...省略其他tag String tag; String topic; String tagName; MqTagEnum(String tag, String topic, String tagName) { this.tag = tag; this.topic = topic; this.tagName = tagName; } public String getTag() { return tag; } public String getTopic() { return topic; } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73093.html
摘要:依次執行下面命令本地安裝從官方安裝包下載。管理界面提供多種管理方式命令行和界面等提供一個開源的擴展項目里面包含一個子項目配置下打個包就可以用了。 前言 github 開源項目--paascloud-master:https://github.com/paascloud/... paascloud-master 官方環境搭建:http://blog.paascloud.net/20...
摘要:它采用了請求響應模型。通信請求只能由客戶端發起,服務端對請求做出應答處理弊端協議無法實現服務器主動向客戶端發起消息。如何使用客戶端創建對象屬性表示連接狀態可選值表示連接尚未建立。表示連接正在進行關閉。 一言不合就上效果圖演示showImg(https://segmentfault.com/img/bVbkUDl?w=1920&h=638); 項目:http://112.74.164.1...
摘要:昨天研究了網站的注冊流程,感興趣的可以看下從前后端分別學習注冊登錄流程今天接著研究注冊登錄流程之登錄。為解決這個問題,引入,它是由一組隨機數組合的哈希表,當用戶登錄成功,本來發放給用戶,現在變成發放給用戶。 昨天研究了網站的注冊流程,感興趣的可以看下:從前后端分別學習——注冊/登錄流程1 今天接著研究注冊/登錄流程之登錄。 登錄 首先來看一下登陸過程:showImg(https://s...
摘要:本章講如何幫助健忘癥患者,重置用戶密碼。實際上不僅內置了密碼重置,還包括登錄登出密碼修改等功能。總結本章學習了使用第三方庫,高效完成了重置密碼的功能。有疑問請在杜賽的個人網站留言,我會盡快回復。 隨著技術的發展,驗證用戶身份的手段越來越多,指紋、面容、聲紋應有盡有,但密碼依然是最重要的手段。 互聯網處處都有密碼的身影,甚至變成了現代人的一種負擔。像筆者這樣的,動輒幾十個賬號密碼,忘記其...
摘要:中為何新增來作為主要的方式運行機制是怎樣的機制有什么優勢運行機制是怎樣的基于通信模式,除了端和端,還有兩角色一起合作完成進程間通信功能。 目錄介紹 2.0.0.1 什么是Binder?為什么要使用Binder?Binder中是如何進行線程管理的?總結binder講的是什么? 2.0.0.2 Android中進程和線程的關系?什么是IPC?為何需要進行IPC?多進程通信可能會出現什么問...
閱讀 2428·2021-11-23 10:04
閱讀 1497·2021-09-02 15:21
閱讀 894·2019-08-30 15:44
閱讀 1066·2019-08-30 10:48
閱讀 710·2019-08-29 17:21
閱讀 3559·2019-08-29 13:13
閱讀 1987·2019-08-23 17:17
閱讀 1790·2019-08-23 17:04