You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
aggregation-platform/src/com/platform/service/thread/ThreadMoveData.java

420 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.platform.service.thread;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import com.base.TaskOperateData;
import com.platform.dao.DataInfoDao;
import com.platform.dao.DataInfoMoveTmpDao;
import com.platform.entities.DataInfoEntity;
import com.platform.entities.DataInfoEntityMoveTmp;
import com.platform.glusterfs.CheckoutMD5;
import com.platform.http.gfs.CopyData;
import com.platform.glusterfs.ShowData;
import com.platform.utils.Bean2MapUtils;
import com.platform.utils.Constant;
import com.platform.utils.DateForm;
import com.platform.utils.FileOperateHelper;
/** 数据迁移---(该类不用了,丢弃)
* @author chen
*
*/
@Component
public class ThreadMoveData{
public final static Logger log = Logger.getLogger(ThreadMoveData.class);
@Resource(name = "dataInfoDao")
private DataInfoDao dataInfoDao;
/**
* 迁移
*/
CopyData copy = new CopyData();
/**
* MD5校验
*/
CheckoutMD5 check = new CheckoutMD5();
@Resource(name = "dataInfoMoveTmpDao")
private DataInfoMoveTmpDao dataInfoMoveTmpDao;
/**
* 查看数据
*/
ShowData show = new ShowData();
/**
* : 实时更新数据库--根据查询到的 正则迁移的数据
*/
public ThreadMoveData() {
}
//迁移数据 -- 2016-11-30后 使用
@Scheduled(fixedDelay = 4000)
public void moveDataByWebGfs(){
List<DataInfoEntityMoveTmp> result = new ArrayList<DataInfoEntityMoveTmp>();
List<DataInfoEntityMoveTmp>[] subMove = new ArrayList[4];
for (int i = 0; i < 4; i++) {
subMove[i] = new ArrayList<DataInfoEntityMoveTmp>();
}
try {
result = dataInfoMoveTmpDao.findAll();
} catch (Exception e) {
log.error(e);
}
Map<String, TaskOperateData> taskMap = new HashMap<String, TaskOperateData>();
for ( DataInfoEntityMoveTmp moveE : result) {
switch (moveE.getCompleteStatus()) {
//待迁移
case "0":
subMove[0].add(moveE);
break;
//正在迁移
case "1":
subMove[1].add(moveE);
break;
//迁移完成的
case "2":
subMove[2].add(moveE);
break;
//迁移失败
case "3":
subMove[3].add(moveE);
break;
default:
break;
}
}
//迁移失败---不处理 status = -1表示迁移完成校验失败,-2:表示迁移失败 -3表示删除失败
//迁移完成的--不处理 status = 3表示校验成功
//正则迁移 status = 1表示正在迁移(如果 web gfs 迁移成功 则 增加一条记录)
if(subMove[1].size() > 0){
//请求进度
List<TaskOperateData> list = new ArrayList<TaskOperateData>();
try {
list = copy.operationTask();
} catch (Exception e) {
log.error("copy.operationTask()");
log.error(e);
}
for (TaskOperateData taskOperateData : list) {
taskMap.put(FileOperateHelper.addLastLinuxSeparator(taskOperateData.getSourcePath())
+ "-" + FileOperateHelper.addLastLinuxSeparator(taskOperateData.getDestPath()),
taskOperateData);
}
for (DataInfoEntityMoveTmp moveE : subMove[1]) {
TaskOperateData taskOne = taskMap.get(FileOperateHelper.addLastLinuxSeparator(moveE.getDataPath())
+"-"+FileOperateHelper.addLastLinuxSeparator(makeDstPath(moveE.getDstPath())));
if (null == taskOne) {
long nowTime = new Date().getTime();
long timelong = nowTime - DateForm.string2DateBysecond(moveE.getLastTime()).getTime();
if (timelong > 1000*60*20) {
try {
dataInfoMoveTmpDao.update(moveE);
} catch (Exception e) {
log.error(e);
}
}
continue;
}
moveE.setRate(taskOne.getProgress());
moveE.setLastTime(DateForm.date2StringBysecond(new Date()));
if (3 == taskOne.getStatus()) {
//成功
makeDataInfo(moveE);
}else if(taskOne.getStatus() < 0){
//失败
moveE.setCompleteStatus("3");
try {
dataInfoMoveTmpDao.update(moveE);
} catch (Exception e) {
log.error(e);
}
}
}
}
//待迁移 status = 0准备迁移则开始迁移
if(subMove[0].size() > 0){
//正则迁移的 数量
int curMoveNum = subMove[1].size();
for ( DataInfoEntityMoveTmp moveE : subMove[0]) {
if(curMoveNum <= Constant.moveFileMaxNum){
moveE.setLastTime(DateForm.date2StringBysecond(new Date()));
//请求迁移
curMoveNum++;
try {
if(1==copy.copyFolder(moveE.getDataPath(), makeDstPath(moveE.getDstPath()))){
moveE.setCompleteStatus("1");
}
else {
moveE.setCompleteStatus("3");
}
} catch (Exception e) {
log.error("copy.copyFolder()");
log.error(e);
}
try {
dataInfoMoveTmpDao.update(moveE);
} catch (Exception e) {
log.error(e);
}
}
}
}
}
//5秒 //2016-11-30后 不使用
// @Scheduled(fixedDelay = 5000)
public void doSomething() {
try {
List<DataInfoEntityMoveTmp> result = null;
//查询 表 move_data_tmp
result = dataInfoMoveTmpDao.findAll();
if (null != result) {
//gfs 获取size
int rsize = result.size();
boolean isNoMove = true;
//该循环必须 循环每个,不能有 break;
// rate:大小:假的,待换成真实比例
double realRate = 0.00;
int moveFileCurrNum = 0;
for (int i = 0; i < rsize; i++) {
DataInfoEntityMoveTmp dataMove = result.get(i);
//如果拷贝进度超过20分钟未进行-- 判断为 迁移失败。
// "1" :正在上传0等待 迁移, 2成功 3失败
long srcSizeTemp = 0;
long dstSize = 0;
if ("1".equals(dataMove.getCompleteStatus())) {
long nowTime = new Date().getTime();
long timelong = nowTime - DateForm.string2DateBysecond(dataMove.getLastTime()).getTime();
if (timelong > 1000*60*20) {
doMd5(srcSizeTemp, dstSize, dataMove);
}
//正在上传的个数。
moveFileCurrNum++;
if (moveFileCurrNum >= Constant.moveFileMaxNum) {
isNoMove = false;
}
// 查询大小:。//gfs 获取size
srcSizeTemp = show.getFolderSize(dataMove.getDataPath());
long srcSize = (long) (srcSizeTemp * 0.998);
dstSize = show.getFolderSize(dataMove.getDstPath());
boolean isExist = false;
if (srcSize < 0) {
log.error(dataMove.getDataPath() + " : 路径不存在! ");
isExist = true;
}
if (dstSize < 0) {
log.error(dataMove.getDstPath() + " : 路径不存在! ");
isExist = true;
}
if (isExist) {
// 3:表示 迁移失败
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataMove.setCompleteStatus("3");
dataInfoMoveTmpDao.update(dataMove);
continue;
}
if (srcSize > 0 && dstSize > 0) {
realRate = (dstSize*100 / srcSize );
}
if (srcSize == dstSize || realRate > 100) {
realRate = 100;
}
if (realRate > dataMove.getRate()) {
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataMove.setRate((int) realRate);
}
}
if("1".equals(dataMove.getCompleteStatus()) && dataMove.getRate() > 0){
//传输完毕:进行校验
if (realRate >= 100) {
//TODO
doMd5(srcSizeTemp, dstSize, dataMove);
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
}
dataInfoMoveTmpDao.update(dataMove);
}
}
//循环 完了, 确定没有上传的 ,没有正在上传的
if (isNoMove) {
//查询 表 move_data_tmp
result = dataInfoMoveTmpDao.findAll();
if (null != result) {
int tmpsize = result.size();
// 上传下一个后
for (int i = 0; i < tmpsize; i++) {
DataInfoEntityMoveTmp next2move = result.get(i);
//如果是 待 迁移状态的
if ("0".equals(next2move.getCompleteStatus())) {
//待迁移的数据 -- 开始迁移
// 末尾 含有 /
Pattern pattern2 = Pattern.compile("\\d+\\/$");
String dstPath = next2move.getDstPath();
Matcher matcher2 = pattern2.matcher(dstPath);
//去掉 最后 的 数字 +/ 符合
if (matcher2.find()) {
String removeStr = matcher2.group();
dstPath = dstPath.replace(removeStr, "");
}
//数据迁移。
copy.copyFolder(next2move.getDataPath(), dstPath);
// "1" :正在上传0等待 迁移, 2成功 3失败
next2move.setCompleteStatus("1");
next2move.setLastTime(DateForm.date2StringBysecond(new Date()));
//更新sql
dataInfoMoveTmpDao.update(next2move);
break;
}
}
}
}
}
Thread.sleep(Constant.update_dataInfo_sleep_time);
} catch (Exception e) {
System.err.println(e);
}
}
/** md5校验
* @param srcSizeTemp
* @param dstSize
* @param dataMove
* @throws Exception
*/
private void doMd5(long srcSizeTemp, long dstSize, DataInfoEntityMoveTmp dataMove) throws Exception {
int difSize = (int) (srcSizeTemp-dstSize);
if (difSize < 1) {
difSize = 1;
}
Thread.sleep(10*difSize);
//TODO 查看当前拷贝目录进程是否结束?
// 进行MD5校验
int resl = check.checkoutMD5Folder(dataMove.getDataPath(), dataMove.getDstPath());
// 校验成功--则增加数据库记录
if(resl == 1){
try{
// 判断 迁移数据的status是否改为 2
DataInfoEntityMoveTmp movetmp = dataInfoMoveTmpDao.findById(dataMove.getId());
if(null != movetmp){
if (!"2".equals(movetmp.getCompleteStatus())) {
//校验成功--修改 数据库记录--
dataMove.setCompleteStatus("2");
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataInfoMoveTmpDao.update(dataMove);
//TODO 新增 一条数据-到-dataInfo
DataInfoEntity data = (DataInfoEntity) Bean2MapUtils.convertMap(
DataInfoEntity.class, Bean2MapUtils.convertBean(dataMove));
data.setDataPath(dataMove.getDstPath());
data.setVolumeIp(dataMove.getDstVolumeIp());
data.setVolumePath(dataMove.getDstVolumePath());
data.setVolumeIp(dataMove.getVolumeIp());
data.setPayResult(dataMove.getPayResult());
data.setExecResult(dataMove.getExecResult());
data.setId(0);
dataInfoDao.save(data);
}
else {
dataMove.setCompleteStatus("2");
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataInfoMoveTmpDao.update(dataMove);
}
}
}catch(Exception e){
log.error(e);
}
}
else {
// 3:表示 迁移失败
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataMove.setCompleteStatus("3");
}
}
private int makeDataInfo(DataInfoEntityMoveTmp dataMove){
try{
// 判断 迁移数据的status是否改为 2
DataInfoEntityMoveTmp movetmp = dataInfoMoveTmpDao.findById(dataMove.getId());
if(null != movetmp){
if (!"2".equals(movetmp.getCompleteStatus())) {
//校验成功--修改 数据库记录--
dataMove.setCompleteStatus("2");
dataMove.setLastTime(DateForm.date2StringBysecond(new Date()));
dataInfoMoveTmpDao.update(dataMove);
//TODO 新增 一条数据-到-dataInfo
DataInfoEntity data = (DataInfoEntity) Bean2MapUtils.convertMap(
DataInfoEntity.class, Bean2MapUtils.convertBean(dataMove));
data.setDataPath(dataMove.getDstPath());
if (null == dataMove.getDstVolumeIp()) {
data.setVolumeIp("localhost");
}else {
data.setVolumeIp(dataMove.getDstVolumeIp());
}
// volume路径没有传入
if (null == dataMove.getDstVolumePath()) {
data.setVolumePath(dataMove.getDstPath());
}else {
data.setVolumePath(dataMove.getDstVolumePath());
}
data.setPayResult(dataMove.getPayResult());
data.setExecResult(dataMove.getExecResult());
data.setId(0);
try {
dataInfoDao.save(data);
} catch (Exception e) {
log.error(e);
dataMove.setCompleteStatus("1");
dataInfoMoveTmpDao.update(dataMove);
}
}
}else {
dataInfoMoveTmpDao.update(dataMove);
}
}catch(Exception e){
log.error(e);
}
return 1;
}
/** 去掉 最后 的 数字 +/
* @param dstPath
* @return
*/
private String makeDstPath(String dstPath) {
// 末尾 含有 数字 + /
Pattern pattern2 = Pattern.compile("\\d+\\/$");
Matcher matcher2 = pattern2.matcher(dstPath);
//去掉 最后 的 数字 +/ 符合
if (matcher2.find()) {
String removeStr = matcher2.group();
dstPath = dstPath.replace(removeStr, "");
}
return dstPath;
}
}