前提说明:涉及隐私问题,部分引入依赖包已删除。

功能说明: 该Java类名为RyTask,是一个定时任务调度器。它的主要功能是从指定路径的压缩文件中解压出CSV数据,并将解析后的数据使用多线程的方式进行批量插入到数据库中。整个过程包含以下功能:

  1. 获取当前日期和前三天的日期,并构建文件夹路径列表用于查找数据文件。
  2. 遍历文件夹路径列表,查找是否存在名为”test.zip”的压缩文件。
  3. 解压”test.zip”文件中的”test.csv”文件,并读取CSV数据保存在名为”lines”的列表中。
  4. 创建线程池,根据CPU核心数创建对应线程数,用于处理CSV数据的并发插入操作。
  5. 遍历CSV数据列表”lines”,从第二行开始(跳过标题行),使用线程池处理每一行的插入操作。
  6. 解析CSV数据并构造对象”NbAdsQgIncomeIQ”,将对象添加到并发队列”imQueue”中。
  7. 等待所有线程完成任务后,执行批量插入操作,将并发队列中的数据存储到数据库中。
  8. 定时任务实现数据的定期同步与导入。
import java.io.*;import java.nio.charset.StandardCharsets;import java.nio.file.Files;import java.nio.file.Path;import java.nio.file.Paths;import java.text.SimpleDateFormat;import java.time.LocalDate;import java.time.format.DateTimeFormatter;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.List;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.zip.ZipEntry;import java.util.zip.ZipFile;import java.util.zip.ZipInputStream;/*** 定时任务调度测试* 用于解压、读取、导入test.csv数据到数据库* 通过多线程处理CSV数据并进行批量插入操作* 使用定时任务实现数据的定期同步与导入*/@Component("ryTask")public class RyTask {@Autowiredprivate INbAdsQgIncomeIQService nbAdsQgIncomeIQService;@Autowiredprivate ServerCommonMapper serverCommonMapper;/*** 定时解压、读取、导入test.csv* @return 返回导入操作结果*/public AjaxResult exportIq() {// 获取当前日期,格式为yyyyMMddString currentDate = new SimpleDateFormat("yyyyMMdd").format(new Date());// 构建三天前、两天前和一天前的日期String threeDaysAgo = LocalDate.now().minusDays(3).format(DateTimeFormatter.BASIC_ISO_DATE);String twoDaysAgo = LocalDate.now().minusDays(2).format(DateTimeFormatter.BASIC_ISO_DATE);String oneDayAgo = LocalDate.now().minusDays(1).format(DateTimeFormatter.BASIC_ISO_DATE);// 定义文件夹路径列表List folderPaths = Arrays.asList(currentDate, oneDayAgo, twoDaysAgo);// 标志是否找到zip文件boolean zipFileFound = false;String zipFilePath = "";// 遍历文件夹路径列表,查找zip文件for (String folderPath : folderPaths) {String zipFilePathSftp = "/home/mysftp/upload/all/push_service_data/" + folderPath + "/test.zip";File zipFile = new File(zipFilePathSftp);// 如果zip文件存在,设定标志为true并跳出循环if (zipFile.exists()) {zipFileFound = true;zipFilePath = zipFilePathSftp;break;}}// 如果没有找到zip文件,返回错误信息if (!zipFileFound) {return AjaxResult.error("未找到test.zip文件");}// 解压文件并读取CSV数据List lines = readCSVFromZip(zipFilePath, "test.csv");if (lines.size() <= 1) {return AjaxResult.error();}// 创建线程池,用于多线程处理int numThreads = Runtime.getRuntime().availableProcessors(); // 获取CPU核心数ExecutorService executor = Executors.newFixedThreadPool(numThreads);// 创建存储数据的并发队列ConcurrentLinkedQueue imQueue = new ConcurrentLinkedQueue();// 循环处理CSV数据for (int i = 1; i  {try {String[] data = line.split("\u0001");for (int j = 0; j < data.length; j++) {data[j] = data[j].replace("\\N", ""); // 去除换行符}NbAdsQgIncomeIQ iq = new NbAdsQgIncomeIQ();iq.setCompanyId(data[0]);iq.setCompanyName(data[1]);iq.setCreditCode(data[2]);iq.setReportYear(data[3]);iq.setReportTermType(data[4]);if (!data[5].isEmpty()) {iq.setBusinessRevenue(Double.valueOf(data[5]));}if (!data[6].isEmpty()) {iq.setTotalProfitAmount(Double.valueOf(data[6]));}String pattern = "yyyy-MM-dd HH:mm:ss";SimpleDateFormat dateFormat = new SimpleDateFormat(pattern);Date date = dateFormat.parse(data[7]);iq.setUpdateTime(date);iq.setBatchNumber(new Date());imQueue.add(iq);} catch (Exception e) {e.printStackTrace();}});}// 等待所有线程完成任务executor.shutdown();try {executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {e.printStackTrace();}// 执行批量插入操作nbAdsQgIncomeIQService.saveBatch(imQueue);return AjaxResult.success("同步成功");}private List readCSVFromZip(String zipFilePath, String csvFileName) {List lines = new ArrayList();try (ZipFile zipFile = new ZipFile(zipFilePath);InputStream inputStream = zipFile.getInputStream(zipFile.getEntry(csvFileName));InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {String line;while ((line = bufferedReader.readLine()) != null) {lines.add(line);}} catch (IOException e) {e.printStackTrace();}return lines;}}