SpringBoot项目:RedisTemplate实现轻量级消息队列
内容导读
互联网集市收集整理的这篇技术教程文章主要介绍了SpringBoot项目:RedisTemplate实现轻量级消息队列,小编现在分享给大家,供广大互联网技能从业者学习和参考。文章包含17960字,纯文字阅读大概需要26分钟。
内容图文
![SpringBoot项目:RedisTemplate实现轻量级消息队列](/upload/InfoBanner/zyjiaocheng/863/d0e64c63cbe7448daa58579f3005fb46.jpg)
背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
excel文件读写--阿里easyexcel sdk
文件上传、下载--腾讯云对象存储
远程服务调用--restTemplate
生产者、消费者--redisTemplate leftPush和rightPop操作
异步处理数据--Executors线程池
读取网络文件流--HttpClient
自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口
当然, Java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库DAO层放到另一个模块了, 不是本文重点
三、主要maven依赖
1.easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>
????????<dependency>
????????????<groupId>com.alibaba</groupId>
????????????<artifactId>easyexcel</artifactId>
????????????<version>${easyexcel-latestVersion}</version>
????????</dependency>
2.JWT
????????<dependency>
????????????<groupId>io.jsonwebtoken</groupId>
????????????<artifactId>jjwt</artifactId>
????????????<version>0.7.0</version>
????????</dependency>
3.redis
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-redis</artifactId>
????????????<version>1.3.5.RELEASE</version>
????????</dependency>
4.腾讯cos
????????<dependency>
????????????<groupId>com.qcloud</groupId>
????????????<artifactId>cos_api</artifactId>
????????????<version>5.4.5</version>
????????</dependency>
四、流程
用户上传文件
将文件存储到腾讯cos
将上传后的文件id及上传记录保存到数据库
redis生产一条导入消息, 即保存文件id到redis
请求结束, 返回"处理中"状态
redis消费消息
读取cos文件, 异步处理数据
将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
客户端轮询查询处理状态, 并可以下载错误文件
结束
五、实现效果
1.上传文件
2.数据库导入记录
3.导入的数据
4.下载错误文件
5.错误数据提示
6.查询导入记录
六、代码实现
1、导入excel控制层
????@LoginRequired
????@RequestMapping(value?=?"doImport",?method?=?RequestMethod.POST)
????public?JsonResponse?doImport(@RequestParam("file")?MultipartFile?file,?HttpServletRequest?request)?{
????????PLUser?user?=?getUser(request);
????????return?orderImportService.doImport(file,?user.getId());
????}
2、service层
????@Override
????public?JsonResponse?doImport(MultipartFile?file,?Integer?userId)?{
????????if?(null?==?file?||?file.isEmpty())?{
????????????throw?new?ServiceException("文件不能为空");
????????}
????????String?filename?=?file.getOriginalFilename();
????????if?(!checkFileSuffix(filename))?{
????????????throw?new?ServiceException("当前仅支持xlsx格式的excel");
????????}
????????//?存储文件
????????String?fileId?=?saveToOss(file);
????????if?(StringUtils.isBlank(fileId))?{
????????????throw?new?ServiceException("文件上传失败,?请稍后重试");
????????}
????????//?保存记录到数据库
????????saveRecordToDB(userId,?fileId,?filename);
????????//?生产一条订单导入消息
????????redisProducer.produce(RedisKey.orderImportKey,?fileId);
????????return?JsonResponse.ok("导入成功,?处理中...");
????}
????/**
?????*?校验文件格式
?????*?@param?fileName
?????*?@return
?????*/
????private?static?boolean?checkFileSuffix(String?fileName)?{
????????if?(StringUtils.isBlank(fileName)?||?fileName.lastIndexOf(".")?<=?0)?{
????????????return?false;
????????}
????????int?pointIndex?=?fileName.lastIndexOf(".");
????????String?suffix?=?fileName.substring(pointIndex,?fileName.length()).toLowerCase();
????????if?(".xlsx".equals(suffix))?{
????????????return?true;
????????}
????????return?false;
????}
???/**
?????*?将文件存储到腾讯OSS
?????*?@param?file
?????*?@return
?????*/
????private?String?saveToOss(MultipartFile?file)?{
????????InputStream?ins?=?null;
????????try?{
????????????ins?=?file.getInputStream();
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????String?fileId;
????????try?{
????????????String?originalFilename?=?file.getOriginalFilename();
????????????File?f?=?new?File(originalFilename);
????????????inputStreamToFile(ins,?f);
????????????FileSystemResource?resource?=?new?FileSystemResource(f);
????????????MultiValueMap<String,?Object>?param?=?new?LinkedMultiValueMap<>();
????????????param.add("file",?resource);
????????????ResponseResult?responseResult?=?restTemplate.postForObject(txOssUploadUrl,?param,?ResponseResult.class);
????????????fileId?=?(String)?responseResult.getData();
????????}?catch?(Exception?e)?{
????????????fileId?=?null;
????????}
????????return?fileId;
????}
3、redis生产者
@Service
public?class?RedisProducerImpl?implements?RedisProducer?{
????@Autowired
????private?RedisTemplate?redisTemplate;
????@Override
????public?JsonResponse?produce(String?key,?String?msg)?{
????????Map<String,?String>?map?=?Maps.newHashMap();
????????map.put("fileId",?msg);
????????redisTemplate.opsForList().leftPush(key,?map);
????????return?JsonResponse.ok();
????}
}
4、redis消费者
@Service
public?class?RedisConsumer?{
????@Autowired
????public?RedisTemplate?redisTemplate;
????@Value("${txOssFileUrl}")
????private?String?txOssFileUrl;
????@Value("${txOssUploadUrl}")
????private?String?txOssUploadUrl;
????@PostConstruct
????public?void?init()?{
????????processOrderImport();
????}
????/**
?????*?处理订单导入
?????*/
????private?void?processOrderImport()?{
????????ExecutorService?executorService?=?Executors.newCachedThreadPool();
????????executorService.execute(()?->?{
????????????while?(true)?{
????????????????Object?object?=?redisTemplate.opsForList().rightPop(RedisKey.orderImportKey,?1,?TimeUnit.SECONDS);
????????????????if?(null?==?object)?{
????????????????????continue;
????????????????}
????????????????String?msg?=?JSON.toJSONString(object);
????????????????executorService.execute(new?OrderImportTask(msg,?txOssFileUrl,?txOssUploadUrl));
????????????}
????????});
????}
}
5、处理任务线程类
public?class?OrderImportTask?implements?Runnable?{
????public?OrderImportTask(String?msg,?String?txOssFileUrl,?String?txOssUploadUrl)?{
????????this.msg?=?msg;
????????this.txOssFileUrl?=?txOssFileUrl;
????????this.txOssUploadUrl?=?txOssUploadUrl;
????}
}
????/**
?????*?注入bean
?????*/
????private?void?autowireBean()?{
????????this.restTemplate?=?BeanContext.getApplicationContext().getBean(RestTemplate.class);
????????this.transactionTemplate?=?BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
????????this.orderImportService?=?BeanContext.getApplicationContext().getBean(OrderImportService.class);
????}
????@Override
????public?void?run()?{
????????//?注入bean
????????autowireBean();
????????JSONObject?jsonObject?=?JSON.parseObject(msg);
????????String?fileId?=?jsonObject.getString("fileId");
????????MultiValueMap<String,?Object>?param?=?new?LinkedMultiValueMap<>();
????????param.add("id",?fileId);
????????ResponseResult?responseResult?=?restTemplate.postForObject(txOssFileUrl,?param,?ResponseResult.class);
????????String?fileUrl?=?(String)?responseResult.getData();
????????if?(StringUtils.isBlank(fileUrl))?{
????????????return;
????????}
????????InputStream?inputStream?=?HttpClientUtil.readFileFromURL(fileUrl);
????????List<Object>?list?=?ExcelUtil.read(inputStream);
????????process(list,?fileId);
????}
????/**
?????*?将文件上传至oss
?????*?@param?file
?????*?@return
?????*/
????private?String?saveToOss(File?file)?{
????????String?fileId;
????????try?{
????????????FileSystemResource?resource?=?new?FileSystemResource(file);
????????????MultiValueMap<String,?Object>?param?=?new?LinkedMultiValueMap<>();
????????????param.add("file",?resource);
????????????ResponseResult?responseResult?=?restTemplate.postForObject(txOssUploadUrl,?param,?ResponseResult.class);
????????????fileId?=?(String)?responseResult.getData();
????????}?catch?(Exception?e)?{
????????????fileId?=?null;
????????}
????????return?fileId;
????}
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
????@RequestMapping("/txOssUpload")
????@ResponseBody
????public?ResponseResult?txOssUpload(@RequestParam("file")?MultipartFile?file)?throws?UnsupportedEncodingException?{
????????if?(null?==?file?||?file.isEmpty())?{
????????????return?ResponseResult.fail("文件不能为空");
????????}
????????String?originalFilename?=?file.getOriginalFilename();
????????originalFilename?=?MimeUtility.decodeText(originalFilename);//?解决中文乱码问题
????????String?contentType?=?getContentType(originalFilename);
????????String?key;
????????InputStream?ins?=?null;
????????File?f?=?null;
????????try?{
????????????ins?=?file.getInputStream();
????????????f?=?new?File(originalFilename);
????????????inputStreamToFile(ins,?f);
????????????key?=?iFileStorageClient.txOssUpload(new?FileInputStream(f),?originalFilename,?contentType);
????????}?catch?(Exception?e)?{
????????????return?ResponseResult.fail(e.getMessage());
????????}?finally?{
????????????if?(null?!=?ins)?{
????????????????try?{
????????????????????ins.close();
????????????????}?catch?(IOException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????????if?(f.exists())?{//?删除临时文件
????????????????f.delete();
????????????}
????????}
????????return?ResponseResult.ok(key);
????}
????public?static?void?inputStreamToFile(InputStream?ins,File?file)?{
????????try?{
????????????OutputStream?os?=?new?FileOutputStream(file);
????????????int?bytesRead?=?0;
????????????byte[]?buffer?=?new?byte[8192];
????????????while?((bytesRead?=?ins.read(buffer,?0,?8192))?!=?-1)?{
????????????????os.write(buffer,?0,?bytesRead);
????????????}
????????????os.close();
????????????ins.close();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
????public?String?txOssUpload(FileInputStream?inputStream,?String?key,?String?contentType)?{
????????key?=?Uuid.getUuid()?+?"-"?+?key;
????????OSSUtil.txOssUpload(inputStream,?key,?contentType);
????????try?{
????????????if?(null?!=?inputStream)?{
????????????????inputStream.close();
????????????}
????????}?catch?(IOException?e)?{
????????????e.printStackTrace();
????????}
????????return?key;
????}
????public?static?void?txOssUpload(FileInputStream?inputStream,?String?key,?String?contentType)?{
????????ObjectMetadata?objectMetadata?=?new?ObjectMetadata();
????????try{
????????????int?length?=?inputStream.available();
????????????objectMetadata.setContentLength(length);
????????}catch?(Exception?e){
????????????logger.info(e.getMessage());
????????}
????????objectMetadata.setContentType(contentType);
????????cosclient.putObject(txbucketName,?key,?inputStream,?objectMetadata);
????}
7、下载文件
????/**
?????*?腾讯云文件下载
?????*?@param?response
?????*?@param?id
?????*?@return
?????*/
????@RequestMapping("/txOssDownload")
????public?Object?txOssDownload(HttpServletResponse?response,?String?id)?{
????????COSObjectInputStream?cosObjectInputStream?=?iFileStorageClient.txOssDownload(id,?response);
????????String?contentType?=?getContentType(id);
????????FileUtil.txOssDownload(response,?contentType,?cosObjectInputStream,?id);
????????return?null;
????}
????public?static?void?txOssDownload(HttpServletResponse?response,?String?contentType,?InputStream?fileStream,?String?fileName)?{
????????FileOutputStream?fos?=?null;
????????response.reset();
????????OutputStream?os?=?null;
????????try?{
????????????response.setContentType(contentType?+?";?charset=utf-8");
????????????if(!contentType.equals(PlConstans.FileContentType.image)){
????????????????try?{
????????????????????response.setHeader("Content-Disposition",?"attachment;?filename="?+?new?String(fileName.getBytes("UTF-8"),?"ISO8859-1"));
????????????????}?catch?(UnsupportedEncodingException?e)?{
????????????????????response.setHeader("Content-Disposition",?"attachment;?filename="?+?fileName);
????????????????????logger.error("encoding?file?name?failed",?e);
????????????????}
????????????}
????????????os?=?response.getOutputStream();
????????????byte[]?b?=?new?byte[1024?*?1024];
????????????int?len;
????????????while?((len?=?fileStream.read(b))?>?0)?{
????????????????os.write(b,?0,?len);
????????????????os.flush();
????????????????try?{
????????????????????if(fos?!=?null)?{
????????????????????????fos.write(b,?0,?len);
????????????????????????fos.flush();
????????????????????}
????????????????}?catch?(Exception?e)?{
????????????????????logger.error(e.getMessage());
????????????????}
????????????}
????????}?catch?(IOException?e)?{
????????????IOUtils.closeQuietly(fos);
????????????fos?=?null;
????????}?finally?{
????????????IOUtils.closeQuietly(os);
????????????IOUtils.closeQuietly(fileStream);
????????????if(fos?!=?null)?{
????????????????IOUtils.closeQuietly(fos);
????????????}
????????}
????}
8、读取网络文件流
????/**
?????*?读取网络文件流
?????*?@param?url
?????*?@return
?????*/
????public?static?InputStream?readFileFromURL(String?url)?{
????????if?(StringUtils.isBlank(url))?{
????????????return?null;
????????}
????????HttpClient?httpClient?=?new?DefaultHttpClient();
????????HttpGet?methodGet?=?new?HttpGet(url);
????????try?{
????????????HttpResponse?response?=?httpClient.execute(methodGet);
????????????if?(response.getStatusLine().getStatusCode()?==?200)?{
????????????????HttpEntity?entity?=?response.getEntity();
????????????????return?entity.getContent();
????????????}
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?null;
????}
9、ExcelUtil
????/**
?????*?读excel
?????*?@param?inputStream?文件输入流
?????*?@return?list集合
?????*/
????public?static?List<Object>?read(InputStream?inputStream)?{
????????return?EasyExcelFactory.read(inputStream,?new?Sheet(1,?1));
????}
????/**
?????*?写excel
?????*?@param?data?list数据
?????*?@param?clazz
?????*?@param?saveFilePath?文件保存路径
?????*?@throws?IOException
?????*/
????public?static?void?write(List<??extends?BaseRowModel>?data,?Class<??extends?BaseRowModel>?clazz,?String?saveFilePath)?throws?IOException?{
????????File?tempFile?=?new?File(saveFilePath);
????????OutputStream?out?=?new?FileOutputStream(tempFile);
????????ExcelWriter?writer?=?EasyExcelFactory.getWriter(out);
????????Sheet?sheet?=?new?Sheet(1,?3,?clazz,?"Sheet1",?null);
????????writer.write(data,?sheet);
????????writer.finish();
????????out.close();
????}
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@LoginRequired注解
/**
?*?在需要登录验证的Controller的方法上使用此注解
?*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public?@interface?LoginRequired?{
}
2、MyControllerAdvice
@ControllerAdvice
public?class?MyControllerAdvice?{
????@ResponseBody
????@ExceptionHandler(TokenValidationException.class)
????public?JsonResponse?tokenValidationExceptionHandler()?{
????????return?JsonResponse.loginInvalid();
????}
????@ResponseBody
????@ExceptionHandler(ServiceException.class)
????public?JsonResponse?serviceExceptionHandler(ServiceException?se)?{
????????return?JsonResponse.fail(se.getMsg());
????}
????@ResponseBody
????@ExceptionHandler(Exception.class)
????public?JsonResponse?exceptionHandler(Exception?e)?{
????????e.printStackTrace();
????????return?JsonResponse.fail(e.getMessage());
????}
}
3、AuthenticationInterceptor
public?class?AuthenticationInterceptor?implements?HandlerInterceptor?{
????private?static?final?String?CURRENT_USER?=?"user";
????@Autowired
????private?UserService?userService;
????@Override
????public?boolean?preHandle(HttpServletRequest?request,?HttpServletResponse?response,?Object?handler)?{
????????//?如果不是映射到方法直接通过
????????if?(!(handler?instanceof?HandlerMethod))?{
????????????return?true;
????????}
????????HandlerMethod?handlerMethod?=?(HandlerMethod)?handler;
????????Method?method?=?handlerMethod.getMethod();
????????//?判断接口是否有@LoginRequired注解,?有则需要登录
????????LoginRequired?methodAnnotation?=?method.getAnnotation(LoginRequired.class);
????????if?(methodAnnotation?!=?null)?{
????????????//?验证token
????????????Integer?userId?=?JwtUtil.verifyToken(request);
????????????PLUser?plUser?=?userService.selectByPrimaryKey(userId);
????????????if?(null?==?plUser)?{
????????????????throw?new?RuntimeException("用户不存在,请重新登录");
????????????}
????????????request.setAttribute(CURRENT_USER,?plUser);
????????????return?true;
????????}
????????return?true;
????}
????@Override
????public?void?postHandle(HttpServletRequest?httpServletRequest,?HttpServletResponse?httpServletResponse,?Object?o,?ModelAndView?modelAndView)?throws?Exception?{
????}
????@Override
????public?void?afterCompletion(HttpServletRequest?httpServletRequest,?HttpServletResponse?httpServletResponse,?Object?o,?Exception?e)?throws?Exception?{
????}
}
4、JwtUtil
????public?static?final?long?EXPIRATION_TIME?=?2592_000_000L;?//?有效期30天
????public?static?final?String?SECRET?=?"pl_token_secret";
????public?static?final?String?HEADER?=?"token";
????public?static?final?String?USER_ID?=?"userId";
????/**
?????*?根据userId生成token
?????*?@param?userId
?????*?@return
?????*/
????public?static?String?generateToken(String?userId)?{
????????HashMap<String,?Object>?map?=?new?HashMap<>();
????????map.put(USER_ID,?userId);
????????String?jwt?=?Jwts.builder()
????????????????.setClaims(map)
????????????????.setExpiration(new?Date(System.currentTimeMillis()?+?EXPIRATION_TIME))
????????????????.signWith(SignatureAlgorithm.HS512,?SECRET)
????????????????.compact();
????????return?jwt;
????}
????/**
?????*?验证token
?????*?@param?request
?????*?@return?验证通过返回userId
?????*/
????public?static?Integer?verifyToken(HttpServletRequest?request)?{
????????String?token?=?request.getHeader(HEADER);
????????if?(token?!=?null)?{
????????????try?{
????????????????Map<String,?Object>?body?=?Jwts.parser()
????????????????????????.setSigningKey(SECRET)
????????????????????????.parseClaimsJws(token)
????????????????????????.getBody();
????????????????for?(Map.Entry?entry?:?body.entrySet())?{
????????????????????Object?key?=?entry.getKey();
????????????????????Object?value?=?entry.getValue();
????????????????????if?(key.toString().equals(USER_ID))?{
????????????????????????return?Integer.valueOf(value.toString());//?userId
????????????????????}
????????????????}
????????????????return?null;
????????????}?catch?(Exception?e)?{
????????????????logger.error(e.getMessage());
????????????????throw?new?TokenValidationException("unauthorized");
????????????}
????????}?else?{
????????????throw?new?TokenValidationException("missing?token");
????????}
????}
内容总结
以上是互联网集市为您收集整理的SpringBoot项目:RedisTemplate实现轻量级消息队列全部内容,希望文章能够帮你解决SpringBoot项目:RedisTemplate实现轻量级消息队列所遇到的程序开发问题。 如果觉得互联网集市技术教程内容还不错,欢迎将互联网集市网站推荐给程序员好友。
内容备注
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 gblab@vip.qq.com 举报,一经查实,本站将立刻删除。
内容手机端
扫描二维码推送至手机访问。