首页 > 文章列表 > Springboot微服务项目整合Kafka如何实现文章上下架功能

Springboot微服务项目整合Kafka如何实现文章上下架功能

Kafka springboot
359 2023-05-12

Springboot微服务项目整合Kafka如何实现文章上下架功能

一:Kafka消息发送快速入门

1.传递字符串消息

(1)发送消息

创建一个Controller包并编写一个测试类用于发送消息

package com.my.kafka.controller;

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

 

@RestController

public class HelloController {

    @Autowired

    private KafkaTemplate<String,String> kafkaTemplate;

 

    @GetMapping("hello")

    public String helloProducer(){

        kafkaTemplate.send("my-topic","Hello~");

        return "ok";

    }

}
(2)监听消息

编写测试类用于接收消息:

package com.my.kafka.listener;

 

import org.junit.platform.commons.util.StringUtils;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

 

@Component

public class HelloListener {

    @KafkaListener(topics = "my-topic")

    public void helloListener(String message) {

        if(StringUtils.isNotBlank(message)) {

            System.out.println(message);

        }

    }

}
(3)测试结果

打开浏览器输入localhost:9991/hello,然后到控制台查看消息,可以看到成功消息监听到并且进行了消费。

2.传递对象消息

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式:

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,这里不做介绍。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式。

(1)修改生产者代码
@GetMapping("hello")

public String helloProducer(){

    User user = new User();

    user.setName("赵四");

    user.setAge(20);

    kafkaTemplate.send("my-topic", JSON.toJSONString(user));

    return "ok";

}
(2)结果测试

可以看到成功接收都对象参数,后期要使用该对象只需要将其转换成User对象即可。

二:功能引入

1.需求分析

发布文章之后,可能会由于文章出现某些错误或者其他原因,我们会在文章管理端实现文章的上下架功能(见下图),也即当管理端实现对文章下架之后移动端将不会再展示该文章,只有该文章重新被上架之后才能在移动端看到该文章信息。

2.逻辑分析

后端接收到前端传过来的参数之后要先做一个校验,参数不为空才能继续往下执行,首先应该根据前端传过来的文章id(自媒体端文章id)查询自媒体数据库的文章信息并判断该文章是否已是发布状态,因为只有审核成功并成功发布了的文章才能进行上下架操作。自媒体端微服务对文章上下架状态进行修改之后便可以向Kafka发送一条消息,该消息为Map对象,里面存储的数据为移动端的文章id以及前端传过来的上下架参数enable,当然要将该Map对象转换成JSON字符串才能进行发送。

文章微服务监听到Kafka发送过来的消息之后将JSON字符串转换成Map对象之后再获取相关参数对移动端文章的上下架状态进行修改。

三:前期准备

1.引入依赖

<!-- kafkfa -->

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka-clients</artifactId>

</dependency>

2.定义常量

package com.my.common.constans;

public class WmNewsMessageConstants {

    public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic";

}

3.Kafka配置信息

由于我是用Nacos来作为注册中心,所以配置信息放置在Nacos上面即可。

(1)自媒体端配置

spring:

  kafka:

    bootstrap-servers: 4.234.52.122:9092

    producer:

      retries: 10

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

(2)移动端配置

spring:

  kafka:

    bootstrap-servers: 4.234.52.122:9092

    consumer:

      group-id: ${spring.application.name}-test

      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

四:代码实现

1.自媒体端

@Autowired

private KafkaTemplate<String,String> kafkaTemplate;

/**

 * 文章下架或上架

 * @param id

 * @param enable

 * @return

 */

@Override

public ResponseResult downOrUp(Integer id,Integer enable) {

    log.info("执行文章上下架操作...");

    if(id == null || enable == null) {

        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);

    }

    //根据id获取文章

    WmNews news = getById(id);

    if(news == null) {

        return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章信息不存在");

    }

    //获取当前文章状态

    Short status = news.getStatus();

    if(!status.equals(WmNews.Status.PUBLISHED.getCode())) {

        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"文章非发布状态,不能上下架");

    }

 

    //更改文章状态

    news.setEnable(enable.shortValue());

    updateById(news);

    log.info("更改文章上架状态{}-->{}",status,news.getEnable());

 

    //发送消息到Kafka

    Map<String, Object> map = new HashMap<>();

    map.put("articleId",news.getArticleId());

    map.put("enable",enable.shortValue());

    kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map));

    log.info("发送消息到Kafka...");

 

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}

2.移动端

(1)设置监听器

package com.my.article.listener;

 

import com.baomidou.mybatisplus.core.toolkit.StringUtils;

import com.my.article.service.ApArticleService;

import com.my.common.constans.WmNewsMessageConstants;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import org.springframework.kafka.annotation.KafkaListener;

 

 

@Slf4j

@Component

public class EnableListener {

    @Autowired

    private ApArticleService apArticleService;

 

    @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)

    public void downOrUp(String message) {

        if(StringUtils.isNotBlank(message)) {

            log.info("监听到消息{}",message);

            apArticleService.downOrUp(message);

        }

    }

}

(2)获取消息并修改文章状态

/**

* 文章上下架

* @param message

* @return

*/

@Override

public ResponseResult downOrUp(String message) {

    Map map = JSON.parseObject(message, Map.class);

    //获取文章id

    Long articleId = (Long) map.get("articleId");

    //获取文章待修改状态

    Integer enable = (Integer) map.get("enable");

    //查询文章配置

    ApArticleConfig apArticleConfig = apArticleConfigMapper.selectOne

            (Wrappers.<ApArticleConfig>lambdaQuery().eq(ApArticleConfig::getArticleId, articleId));

    if(apArticleConfig != null) {

        //上架

        if(enable == 1) {

            log.info("文章重新上架");

            apArticleConfig.setIsDown(false);

            apArticleConfigMapper.updateById(apArticleConfig);

        }

        //下架

        if(enable == 0) {

            log.info("文章下架");

            apArticleConfig.setIsDown(true);

            apArticleConfigMapper.updateById(apArticleConfig);

        }

    }

    else {

        throw new RuntimeException("文章信息不存在");

    }

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

}