You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
aggregation-platform/src/com/platform/service/thread/ThreadMoveData.java

190 lines
6.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.platform.service.thread;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import com.platform.dao.DataInfoDao;
import com.platform.dao.DataInfoMoveTmpDao;
import com.platform.dao.GatherOracleDao;
import com.platform.entities.DataInfoEntity;
import com.platform.entities.DataInfoEntityMoveTmp;
import com.platform.entities.GatherOracleInfo;
import com.platform.glusterfs.CheckoutMD5;
import com.platform.glusterfs.CopyData;
import com.platform.glusterfs.ShowData;
import com.platform.service.DataInfoService;
import com.platform.service.DataInfoServiceImp;
import com.platform.service.IMoveDataService;
import com.platform.service.impl.MoveDataServiceImpl;
import com.platform.utils.Bean2MapUtils;
import com.platform.utils.Constant;
import com.platform.utils.DateForm;
@Component
public class ThreadMoveData{
public static Logger log = Logger.getLogger(ThreadMoveData.class);
@Resource(name = "dataInfoDao")
private DataInfoDao dataInfoDao;
/**
* 迁移
*/
CopyData copy = new CopyData();
/**
* MD5校验
*/
CheckoutMD5 check = new CheckoutMD5();
@Resource(name = "dataInfoMoveTmpDao")
private DataInfoMoveTmpDao dataInfoMoveTmpDao;
/**
* 查看数据
*/
ShowData show = new ShowData();
/**
* : 实时更新数据库--根据查询到的 正则迁移的数据
*/
public ThreadMoveData() {
}
//5秒
@Scheduled(fixedDelay = 5000)
public void doSomething() {
try {
List<DataInfoEntityMoveTmp> result = null;
//查询 表 move_data_tmp
result = dataInfoMoveTmpDao.findAll();
if (null != result) {
//gfs 获取size
int rsize = result.size();
boolean isNoMove = true;
//该循环必须 循环每个,不能有 break;
// rate:大小:假的,待换成真实比例
double realRate = 0.00;
int moveFileCurrNum = 0;
for (int i = 0; i < rsize; i++) {
DataInfoEntityMoveTmp dataMove = result.get(i);
//如果拷贝进度超过20分钟未进行-- 判断为 迁移失败。
// "1" :正在上传0等待 迁移, 2成功 3失败
if ("1".equals(dataMove.getCompleteStatus())) {
long nowTime = new Date().getTime();
long timelong = nowTime - DateForm.string2DateBysecond(dataMove.getLastTime()).getTime();
if (timelong > 1000*60*20) {
dataMove.setCompleteStatus("3");
dataInfoMoveTmpDao.update(dataMove);
}
//正在上传的个数。
moveFileCurrNum++;
if (moveFileCurrNum >= Constant.moveFileMaxNum) {
isNoMove = false;
}
// 查询大小:。//gfs 获取size
long srcSize = show.getFolderSize(dataMove.getDataPath());
long dstSize = show.getFolderSize(dataMove.getDstPath());
if (srcSize < 0) {
log.error(dataMove.getDataPath() + " : 路径不存在! ");
continue;
}
if (dstSize < 0) {
log.error(dataMove.getDstPath() + " : 路径不存在! ");
continue;
}
if (srcSize > 0 && dstSize > 0) {
realRate = (dstSize*100 / srcSize );
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
}
if (srcSize == dstSize) {
realRate = 100;
}
dataMove.setRate((int) realRate);
}
if("1".equals(dataMove.getCompleteStatus()) && dataMove.getRate() > 0){
//传输完毕:进行校验
if (realRate == 100) {
//TODO 进行MD5校验
int resl = check.checkoutMD5Folder(dataMove.getDataPath(), dataMove.getDstPath());
//TODO 校验成功--则删除数据库记录
if(resl == 1){
//校验成功--修改 数据库记录--
dataMove.setCompleteStatus("2");
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataInfoMoveTmpDao.update(dataMove);
//TODO 新增 一条数据-到-dataInfo
DataInfoEntity data = (DataInfoEntity) Bean2MapUtils.convertMap(
DataInfoEntity.class, Bean2MapUtils.convertBean(dataMove));
data.setDataPath(dataMove.getDstPath());
data.setVolumeIp(dataMove.getDstVolumeIp());
data.setVolumePath(dataMove.getDstVolumePath());
data.setYear(DateForm.date2StringByMin(new Date()));
dataInfoDao.save(data);
}
else {
// 3:表示 迁移失败
dataMove.setCompleteStatus("3");
}
}
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataInfoMoveTmpDao.update(dataMove);
}
}
//循环 完了, 确定没有上传的 ,没有正在上传的
if (isNoMove) {
//查询 表 move_data_tmp
result = dataInfoMoveTmpDao.findAll();
if (null != result) {
int tmpsize = result.size();
// 上传下一个后
for (int i = 0; i < tmpsize; i++) {
DataInfoEntityMoveTmp next2move = result.get(i);
//如果是 待 迁移状态的
if ("0".equals(next2move.getCompleteStatus())) {
//待迁移的数据 -- 开始迁移
// 末尾 含有 /
Pattern pattern2 = Pattern.compile("\\/$");
String dstPath = next2move.getDataPath();
Matcher matcher2 = pattern2.matcher(dstPath);
//去掉 最后 的 / 符合
if (matcher2.find()) {
dstPath = dstPath.substring(0, dstPath.length()-1);
}
//数据迁移。
copy.copyFolder(dstPath+"/app", next2move.getDstPath());
// "1" :正在上传0等待 迁移, 2成功 3失败
next2move.setCompleteStatus("1");
next2move.setLastTime(DateForm.date2StringBysecond(new Date()));
//更新sql
dataInfoMoveTmpDao.update(next2move);
break;
}
}
}
}
}
Thread.sleep(Constant.update_dataInfo_sleep_time);
} catch (Exception e) {
System.err.println(e);
}
}
}