2017.2.7出差过程中伍铭+陈立文修改的代码。

web_backend_develope
chenlw 8 years ago
parent 234a57c005
commit 7388adfa9a

@ -42,6 +42,7 @@ import com.platform.service.IVolumeService;
import com.platform.service.OracleExtractTask; import com.platform.service.OracleExtractTask;
import com.platform.service.OracleStatusService; import com.platform.service.OracleStatusService;
import com.platform.utils.CacheSetCantDelete; import com.platform.utils.CacheSetCantDelete;
import com.platform.utils.Configs;
import com.platform.utils.Constant; import com.platform.utils.Constant;
import com.platform.utils.ThreadVolumeImm; import com.platform.utils.ThreadVolumeImm;
import com.platform.utils.UtilsHelper; import com.platform.utils.UtilsHelper;
@ -159,9 +160,9 @@ public class DataModelController extends BaseController {
if (null != data && data.length > 0) { if (null != data && data.length > 0) {
List<String> list = new ArrayList<String>(); List<String> list = new ArrayList<String>();
List<String> errList = new ArrayList<String>(); List<String> errList = new ArrayList<String>();
//判断是否有 不能删除的。 // 判断是否有 不能删除的。
for (String dataId : data) { for (String dataId : data) {
if(CacheSetCantDelete.containsId(dataId)) if (CacheSetCantDelete.containsId(dataId))
errList.add(dataId); errList.add(dataId);
else else
list.add(dataId); list.add(dataId);
@ -196,8 +197,7 @@ public class DataModelController extends BaseController {
if (oraclesName != null) if (oraclesName != null)
for (String rcName : oraclesName) { for (String rcName : oraclesName) {
log.info("执行连接\t" + rcName); log.info("执行连接\t" + rcName);
String cmd = "kubectl label --overwrite rc " + rcName String cmd = "kubectl label --overwrite rc " + rcName + " status=0";
+ " status=0";
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd); .execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
@ -243,7 +243,8 @@ public class DataModelController extends BaseController {
public String volumeList() throws Exception { public String volumeList() throws Exception {
log.info("-----/volume/list------"); log.info("-----/volume/list------");
String rest = volumeService.getAllvolume(); String rest = volumeService.getAllvolume();
new ThreadVolumeImm("ThreadVolumeImm-in-VolumeController-volumeList").start(); new ThreadVolumeImm("ThreadVolumeImm-in-VolumeController-volumeList")
.start();
return rest; return rest;
} }
@ -465,7 +466,7 @@ public class DataModelController extends BaseController {
String result = logReadService.readLog(name); String result = logReadService.readLog(name);
// "查看相应日志" // "查看相应日志"
Map<String, String> log = new HashMap<String, String>(); Map<String, String> log = new HashMap<String, String>();
log.put(name, result+"\r\n"); log.put(name, result + "\r\n");
return log; return log;
} }
} }

@ -16,7 +16,9 @@ import com.platform.oracle.OracleConnector;
import com.platform.utils.Configs; import com.platform.utils.Configs;
import com.platform.utils.Constant; import com.platform.utils.Constant;
/** oracle10 /**
* oracle10
*
* @author chen * @author chen
* *
*/ */
@ -41,8 +43,7 @@ public class OracleStatusService {
public void cancelToOracle(String replicasName, String operate) { public void cancelToOracle(String replicasName, String operate) {
if (operate.equals("stop")) { if (operate.equals("stop")) {
String cmd = "kubectl label --overwrite rc " + replicasName String cmd = "kubectl label --overwrite rc " + replicasName + " status=0";
+ " status=0";
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd); .execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
@ -120,8 +121,7 @@ public class OracleStatusService {
killAlliveTask(taskName); killAlliveTask(taskName);
// client.updateOrAddReplicasLabelById(taskName, "status", "1"); // client.updateOrAddReplicasLabelById(taskName, "status", "1");
// //更新ReplicationController标签将oracle状态标示未1(0:启动中1失败2成功) // //更新ReplicationController标签将oracle状态标示未1(0:启动中1失败2成功)
String cmd = "kubectl label --overwrite rc " + taskName String cmd = "kubectl label --overwrite rc " + taskName + " status=1";
+ " status=1";
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd); .execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
@ -144,8 +144,8 @@ public class OracleStatusService {
+ ocp.getPassword()); + ocp.getPassword());
String message = "失败"; String message = "失败";
if (flag && alliveTask.containsKey(taskName)) { if (flag && alliveTask.containsKey(taskName)) {
String cmd = "kubectl label --overwrite rc " String cmd = "kubectl label --overwrite rc " + taskName
+ taskName + " status=2"; + " status=2";
// client.updateOrAddReplicasLabelById(taskNSyame, // client.updateOrAddReplicasLabelById(taskNSyame,
// "status", "2"); // "status", "2");
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH

@ -76,9 +76,9 @@ public class CheckoutServiceImpl implements ICheckoutService {
List<CheckoutEntity> checks = new ArrayList<CheckoutEntity>(); List<CheckoutEntity> checks = new ArrayList<CheckoutEntity>();
List<PreDataInfo> result = preDataInfoDao.findAllCollect(); List<PreDataInfo> result = preDataInfoDao.findAllCollect();
DataInfoEntity data = new DataInfoEntity(); DataInfoEntity data = new DataInfoEntity();
Calendar c2 = Calendar.getInstance(); Calendar c2 = Calendar.getInstance();
// 时间设置为 半年前的时间 // 时间设置为 半年前的时间
c2.set(Calendar.MONTH, getMonBeforeHalfYear(c2.get(Calendar.MONTH))); this.getMonBeforeHalfYear(c2);
String time = DateForm.date2StringByDay(c2.getTime()); String time = DateForm.date2StringByDay(c2.getTime());
data.setCollectingTime(time); data.setCollectingTime(time);
// data.setCollectingTime(collectingTime); // data.setCollectingTime(collectingTime);
@ -124,7 +124,7 @@ public class CheckoutServiceImpl implements ICheckoutService {
CheckoutEntity cksql = new CheckoutEntity(); CheckoutEntity cksql = new CheckoutEntity();
Calendar c2 = Calendar.getInstance(); Calendar c2 = Calendar.getInstance();
// 时间设置为 半年前的时间 // 时间设置为 半年前的时间
c2.set(Calendar.MONTH, getMonBeforeHalfYear(c2.get(Calendar.MONTH))); this.getMonBeforeHalfYear(c2);
String time = DateForm.date2StringByDay(c2.getTime()); String time = DateForm.date2StringByDay(c2.getTime());
cksql.setCollectingTime(time); cksql.setCollectingTime(time);
cksql.setCityName(city); cksql.setCityName(city);
@ -451,12 +451,9 @@ public class CheckoutServiceImpl implements ICheckoutService {
* @param num * @param num
* @return * @return
*/ */
private int getMonBeforeHalfYear(int num){ private int getMonBeforeHalfYear(Calendar c2){
num -= Configs.dataBefore; c2.set(Calendar.DAY_OF_YEAR, c2.get(Calendar.DAY_OF_YEAR) -(Configs.dataBefore*30));
if (num <= 0) { return 1;
num = num + 12;
}
return num;
} }
private String isY(String str1, String str2) { private String isY(String str1, String str2) {

@ -87,8 +87,8 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
collectOracle.setName("J" collectOracle.setName("J"
+ collectOracle.getName().replace("-", "_")); + collectOracle.getName().replace("-", "_"));
String cmd = "kubectl label --overwrite rc " String cmd = "kubectl label --overwrite rc " + replicasName
+ replicasName + " isExtract=1"; + " isExtract=1";
// sql日志记录时间 // sql日志记录时间
FileOperateHelper FileOperateHelper
.fileWrite( .fileWrite(
@ -127,7 +127,8 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
Configs.CONSOLE_LOGGER.info(sb.toString()); Configs.CONSOLE_LOGGER.info(sb.toString());
data.setExtractStatus(2); data.setExtractStatus(2);
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); DataInfoEntity tmpdata = dataInfoDao.findById(data
.getId());
data.setId(tmpdata.getSrcId()); data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
} }
@ -141,29 +142,27 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
data.setId(tmpdata.getSrcId()); data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
log.error(Custom4exception.OracleSQL_Except, e); log.error(Custom4exception.OracleSQL_Except, e);
} } finally {
finally{ // 去掉保存的当前数据id,
//去掉保存的当前数据id, CacheSetCantDelete.removeExtractId(collectOracle
CacheSetCantDelete.removeExtractId(collectOracle.getDataId()); .getDataId());
String msg = "汇总结束"; String msg = "汇总结束";
if (2 != data.getExtractStatus()) { if (2 != data.getExtractStatus()) {
msg +=" 汇总有异常!状态重置为待汇总 "; msg += " 汇总有异常!状态重置为待汇总 ";
data.setExtractStatus(0); data.setExtractStatus(0);
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); DataInfoEntity tmpdata = dataInfoDao.findById(data
.getId());
data.setId(tmpdata.getSrcId()); data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data); dataInfoDao.updateExtract(data);
} }
// sql日志记录时间 // sql日志记录时间
FileOperateHelper FileOperateHelper.fileWrite(
.fileWrite( Configs.EXTRACT_LOG_LOCALTION
Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName() + ".log",
+ collectOracle.getName() "\r\n " + msg + " >>>>>>> "
+ ".log", + DateForm.date2StringBysecond(new Date())
"\r\n "+msg+" >>>>>>> " + "\r\n\r\n\n");
+ DateForm
.date2StringBysecond(new Date())
+ "\r\n\r\n\n");
} }
} }
isSuccess = true; isSuccess = true;
@ -200,166 +199,160 @@ public class OracleExtractServiceImpl implements IOracleExtractService {
List<OracleConnectorParams> dataInfolist, List<OracleConnectorParams> dataInfolist,
GatherOracleInfo oracleConnect) throws Exception { GatherOracleInfo oracleConnect) throws Exception {
boolean isSuccess = false; boolean isSuccess = false;
// map转 bean(汇总库信息-带tableName的) // map转 bean(汇总库信息-带tableName的)
GatherOracleInfo oracleModel = oracleConnect; GatherOracleInfo oracleModel = oracleConnect;
// 采集库连接参数 // 采集库连接参数
List<OracleConnectorParams> datainfos = dataInfolist; List<OracleConnectorParams> datainfos = dataInfolist;
if (datainfos.size() == 0) { if (datainfos.size() == 0) {
return false; return false;
} }
Connection conn = OracleConnector.connectionBuilder( Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":" "jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":" + oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), + oracleModel.getDatabaseName(), oracleModel.getUser(),
oracleModel.getUser(), oracleModel.getPassword(), oracleModel.getPassword(), dataInfolist.get(0));
dataInfolist.get(0)); if (null == conn) {
if (null == conn) { FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: ["
+ dataInfolist.get(0).getName() + ".log", + conn + "]\r\n");
"创建oracle连接失败: [" + conn + "]\r\n"); return false;
return false; }
} for (OracleConnectorParams collectOracle : datainfos) {
for (OracleConnectorParams collectOracle : datainfos) { String replicasName = collectOracle.getName();
String replicasName = collectOracle.getName(); DataInfoEntity data = new DataInfoEntity();
DataInfoEntity data = new DataInfoEntity(); try {
try { if (null != collectOracle.getDataId()
if (null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())) {
&& !"".equals(collectOracle.getDataId())) { data.setId(Integer.valueOf(collectOracle.getDataId()));
data.setId(Integer.valueOf(collectOracle.getDataId())); // 设置为 标准表 抽取中
// 设置为 标准表 抽取中 data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); data.setStandardExtractStatus("1");
data.setStandardExtractStatus("1"); dataInfoDao.update(data);
dataInfoDao.update(data); collectOracle.setName("CQ"
collectOracle.setName("CQ" + collectOracle.getName().replace("-", "_"));
+ collectOracle.getName().replace("-", "_")); String cmd = "kubectl annotate --overwrite rc " + replicasName
String cmd = "kubectl annotate --overwrite rc " + " standardExtractStatus=1";
+ replicasName + " standardExtractStatus=1"; // sql日志记录时间
// sql日志记录时间 FileOperateHelper.fileWrite(
FileOperateHelper Configs.EXTRACT_STANDARD_LOG_LOCALTION
.fileWrite( + collectOracle.getName() + ".log",
Configs.EXTRACT_STANDARD_LOG_LOCALTION "\r\n 开始抽取标准表 \r\n "
+ collectOracle.getName() + DateForm.date2StringBysecond(new Date())
+ ".log", + "\n\r\n");
"\r\n 开始抽取标准表 \r\n " List<String> rList = Constant.ganymedSSH
+ DateForm .execCmdWaitAcquiescent(cmd);
.date2StringBysecond(new Date()) StringBuffer sb = new StringBuffer();
+ "\n\r\n"); for (String string : rList)
List<String> rList = Constant.ganymedSSH sb.append(string).append("\n");
.execCmdWaitAcquiescent(cmd); Configs.CONSOLE_LOGGER.info(sb.toString());
StringBuffer sb = new StringBuffer(); // client.updateOrAddReplicasLabelById(collectOracle.getName(),
for (String string : rList) // "isExtract", "1");
sb.append(string).append("\n"); // //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
Configs.CONSOLE_LOGGER.info(sb.toString()); oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink
// client.updateOrAddReplicasLabelById(collectOracle.getName(), oracleExtract.createStardardTableSpace(conn, collectOracle,
// "isExtract", "1"); oracleModel); // 创建表空间
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成 oracleExtract.createOnlyUser(conn, collectOracle,
oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink oracleModel);// 创建 抽取标准表的 用户并授权
oracleExtract.createStardardTableSpace(conn, collectOracle, DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
oracleModel); // 创建表空间 if (null != tmpdata) {
oracleExtract.createOnlyUser(conn, collectOracle, if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
oracleModel);// 创建 抽取标准表的 用户并授权 .getPayResultLast())
DataInfoEntity tmpdata = dataInfoDao.findById(data || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata
.getId()); .getPayResultLast())) {
if (null != tmpdata) { // 抽取中
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX);
.getPayResultLast()) dataInfoDao.update(data);
|| Constant.CHECKOUT_STATUS_FIVE boolean isExtrac = true;
.equals(tmpdata.getPayResultLast())) { try {
// 抽取中 oracleExtract.extractStandardPayTable(conn,
data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data); dataInfoDao.update(data);
boolean isExtrac = true; isExtrac = false;
try {
oracleExtract.extractStandardPayTable(conn,
collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
// 抽取成功
data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
} }
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata if (isExtrac) {
.getExecResultLast()) // 抽取成功
|| Constant.CHECKOUT_STATUS_FIVE data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
.equals(tmpdata.getExecResultLast())) {
// 抽取中
data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data); dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardExecTable(
conn, collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
} }
// client.updateOrAddReplicasLabelById(collectOracle.getName(), }
// "isExtract", "2"); if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成 .getExecResultLast())
cmd = "kubectl annotate --overwrite rc " || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata
+ replicasName + " standardExtractStatus=2"; .getExecResultLast())) {
rList = Constant.ganymedSSH // 抽取中
.execCmdWaitAcquiescent(cmd); data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
data.setStandardExtractStatus("2");
dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId());
data.setId(tmpSrcData.getSrcId());
dataInfoDao.update(data); dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardExecTable(conn,
collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
} }
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "2");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl annotate --overwrite rc " + replicasName
+ " standardExtractStatus=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
data.setStandardExtractStatus("2");
dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data
.getId());
data.setId(tmpSrcData.getSrcId());
dataInfoDao.update(data);
} }
} catch (Exception e) { }
} catch (Exception e) {
String cmd = "kubectl annotate --overwrite rc " String cmd = "kubectl annotate --overwrite rc " + replicasName
+ replicasName + " standardExtractStatus=0"; + " standardExtractStatus=0";
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
data.setStandardExtractStatus("0");
dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId());
data.setId(tmpSrcData.getSrcId());
dataInfoDao.update(data);
log.error(Custom4exception.OracleSQL_Except, e);
} finally {
// 去掉保存的当前数据id,
CacheSetCantDelete.removeStandardId(collectOracle.getDataId());
String msg = "抽取标准表结束";
if (!"2".equals(data.getStandardExtractStatus())) {
msg += " 抽取有异常!状态重置为待抽取 ";
data.setStandardExtractStatus("0"); data.setStandardExtractStatus("0");
dataInfoDao.update(data); dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId()); DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
data.setId(tmpSrcData.getSrcId()); data.setId(tmpdata.getSrcId());
dataInfoDao.update(data); dataInfoDao.update(data);
log.error(Custom4exception.OracleSQL_Except, e);
}
finally{
//去掉保存的当前数据id,
CacheSetCantDelete.removeStandardId(collectOracle.getDataId());
String msg = "抽取标准表结束";
if (!"2".equals(data.getStandardExtractStatus())) {
msg +=" 抽取有异常!状态重置为待抽取 ";
data.setStandardExtractStatus("0");
dataInfoDao.update(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
data.setId(tmpdata.getSrcId());
dataInfoDao.update(data);
}
// sql日志记录时间
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName() + ".log",
" "+msg+" >>>>> "
+ DateForm.date2StringBysecond(new Date())
+ "\r\n\r\n\n");
} }
// sql日志记录时间
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName() + ".log",
" " + msg + " >>>>> "
+ DateForm.date2StringBysecond(new Date())
+ "\r\n\r\n\n");
} }
isSuccess = true; }
isSuccess = true;
return isSuccess; return isSuccess;
} }

@ -20,13 +20,16 @@ import com.platform.utils.Configs;
import com.platform.utils.Constant; import com.platform.utils.Constant;
import com.platform.utils.FileOperateHelper; import com.platform.utils.FileOperateHelper;
/** oracle /**
* oracle
*
* @author chen * @author chen
* *
*/ */
public class ThreadCheckoutStandardOracle extends Thread { public class ThreadCheckoutStandardOracle extends Thread {
public final static Logger log = Configs.CONSOLE_LOGGER.getLogger(ThreadCheckoutStandardOracle.class); public final static Logger log = Configs.CONSOLE_LOGGER
.getLogger(ThreadCheckoutStandardOracle.class);
/** /**
* kuber * kuber
@ -47,15 +50,16 @@ public class ThreadCheckoutStandardOracle extends Thread {
@Override @Override
public void run() { public void run() {
try { try {
Thread.sleep(1000*5); Thread.sleep(1000 * 5);
} catch (InterruptedException e2) { } catch (InterruptedException e2) {
log.error(e2); log.error(e2);
} }
//循环11次每次休眠 // 循环11次每次休眠
for (int i = 0; i < 11; i++) { for (int i = 0; i < 11; i++) {
try { try {
// 数据的 keys =kuber的应用名称 taskName) // 数据的 keys =kuber的应用名称 taskName)
Set<String> taskNames = CacheOracleCheckoutEntity.getCheckKeys(); Set<String> taskNames = CacheOracleCheckoutEntity
.getCheckKeys();
int lengs = taskNames.size(); int lengs = taskNames.size();
if (lengs == 0) { if (lengs == 0) {
break; break;
@ -68,28 +72,28 @@ public class ThreadCheckoutStandardOracle extends Thread {
log.info("replicationController标签 " + sbtask.toString()); log.info("replicationController标签 " + sbtask.toString());
for (String key : taskNamekeys) { for (String key : taskNamekeys) {
// 获得 kuber的 pod // 获得 kuber的 pod
Pod tmpPod = filterPod(key); Pod tmpPod = filterPod(key);
if (null == tmpPod) { if (null == tmpPod) {
log.info("replicationController标签 " + key + " 的 pod 节点不存在!"); log.info("replicationController标签 " + key
+ " 的 pod 节点不存在!");
if (i > 5) { if (i > 5) {
CacheOracleCheckoutEntity.checkRemove(key); CacheOracleCheckoutEntity.checkRemove(key);
} }
continue; continue;
} }
// 尝试 连接 oracle // 尝试 连接 oracle
connectOracle(tmpPod, key); connectOracle(tmpPod, key);
if (i == 10) { if (i == 10) {
String cmd = "kubectl label --overwrite rc " + key String cmd = "kubectl label --overwrite rc " + key + " status=1";
+ " status=1";
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd); .execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
for (String str : rList) for (String str : rList)
sb.append(str).append("\n"); sb.append(str).append("\n");
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key
+ key + "\t[标签更新为: 失败]"); + "\t[标签更新为: 失败]");
log.info(sb.toString()); log.info(sb.toString());
String cmd2 = "kubectl annotate --overwrite rc " + key String cmd2 = "kubectl annotate --overwrite rc " + key
@ -99,16 +103,17 @@ public class ThreadCheckoutStandardOracle extends Thread {
StringBuffer sb2 = new StringBuffer(); StringBuffer sb2 = new StringBuffer();
for (String str : rList2) for (String str : rList2)
sb2.append(str).append("\n"); sb2.append(str).append("\n");
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key
+ key + "\t[标签更新为: 未校验]"); + "\t[标签更新为: 未校验]");
log.info(sb2.toString()); log.info(sb2.toString());
// 当前key标签对应的 数据服务的记录 // 当前key标签对应的 数据服务的记录
CheckoutEntity tmp = CacheOracleCheckoutEntity.getCheck(key); CheckoutEntity tmp = CacheOracleCheckoutEntity
.getCheck(key);
tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO); tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO);
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO); tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO);
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO); tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO);
try { try {
//更新数据库 // 更新数据库
this.updateDataInfo(tmp); this.updateDataInfo(tmp);
} catch (Exception e) { } catch (Exception e) {
log.error(e); log.error(e);
@ -117,10 +122,9 @@ public class ThreadCheckoutStandardOracle extends Thread {
} }
} catch (Exception e1) { } catch (Exception e1) {
log.error(e1.getStackTrace()); log.error(e1.getStackTrace());
} } finally {
finally{
try { try {
Thread.sleep(1000*60); Thread.sleep(1000 * 60);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error(e); log.error(e);
} }
@ -128,7 +132,9 @@ public class ThreadCheckoutStandardOracle extends Thread {
} }
} }
/** pod /**
* pod
*
* @param taskName * @param taskName
* @return * @return
*/ */
@ -146,38 +152,36 @@ public class ThreadCheckoutStandardOracle extends Thread {
return pod; return pod;
} }
/** oracle /**
* oracle
*
* @param tmpPod * @param tmpPod
* @param key * @param key
*/ */
private void connectOracle(Pod pod, String key) throws Exception{ private void connectOracle(Pod pod, String key) throws Exception {
if (pod != null) { if (pod != null) {
String ip = client.getPodHostIp(pod); String ip = client.getPodHostIp(pod);
int port = client.getPodContainerport(pod); int port = client.getPodContainerport(pod);
if (ip != null && port != 0) { if (ip != null && port != 0) {
String url = "jdbc:oracle:thin:@" + ip + ":" + port String url = "jdbc:oracle:thin:@" + ip + ":" + port + ":"
+ ":" + Configs.ORACLE_ORCL; + Configs.ORACLE_ORCL;
boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数true标示连接成功false标示连接失败 boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数true标示连接成功false标示连接失败
Configs.ORACLE_USER, Configs.ORACLE_PSW); Configs.ORACLE_USER, Configs.ORACLE_PSW);
log.info("url:" + url + ",user:" log.info("url:" + url + ",user:" + Configs.ORACLE_USER
+ Configs.ORACLE_USER + ",password:" + ",password:" + Configs.ORACLE_PSW);
+ Configs.ORACLE_PSW);
String message = "失败"; String message = "失败";
String cmd3 = "kubectl annotate --overwrite rc " String cmd3 = "kubectl annotate --overwrite rc " + key + " checkoutFlag=0";
+ key + " checkoutFlag=0";
List<String> rList3 = Constant.ganymedSSH List<String> rList3 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd3); .execCmdWaitAcquiescent(cmd3);
StringBuffer sb3 = new StringBuffer(); StringBuffer sb3 = new StringBuffer();
for (String str : rList3) for (String str : rList3)
sb3.append(str).append("\n"); sb3.append(str).append("\n");
log.info(sb3.toString()); log.info(sb3.toString());
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key + "\t[标签更新为:未校验]");
+ key + "\t[标签更新为:未校验]");
if (flag) { if (flag) {
String cmd = "kubectl label --overwrite rc " String cmd = "kubectl label --overwrite rc " + key + " status=2";
+ key + " status=2";
// 设置服务为 成功 // 设置服务为 成功
List<String> rList = Constant.ganymedSSH List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd); .execCmdWaitAcquiescent(cmd);
@ -186,61 +190,86 @@ public class ThreadCheckoutStandardOracle extends Thread {
sb.append(str).append("\n"); sb.append(str).append("\n");
log.info(sb.toString()); log.info(sb.toString());
message = "成功"; message = "成功";
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key
+ key + "\t[标签更新为: 成功]"); + "\t[标签更新为: 成功]");
//校验标签 // 校验标签
String cmd2 = "kubectl annotate --overwrite rc " String cmd2 = "kubectl annotate --overwrite rc " + key
+ key + " checkoutFlag=2"; + " checkoutFlag=2";
List<String> rList2 = Constant.ganymedSSH List<String> rList2 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd2); .execCmdWaitAcquiescent(cmd2);
StringBuffer sb2 = new StringBuffer(); StringBuffer sb2 = new StringBuffer();
for (String str : rList2) for (String str : rList2)
sb2.append(str).append("\n"); sb2.append(str).append("\n");
log.info(sb2.toString()); log.info(sb2.toString());
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key
+ key + "\t[标签更新为: 校验中]"); + "\t[标签更新为: 校验中]");
//获得当前 服务对应的 数据 // 获得当前 服务对应的 数据
CheckoutEntity tmp = CacheOracleCheckoutEntity.getCheck(key); CheckoutEntity tmp = CacheOracleCheckoutEntity
.getCheck(key);
if (null != tmp) { if (null != tmp) {
CacheOracleCheckoutEntity.putExtract(key, tmp); CacheOracleCheckoutEntity.putExtract(key, tmp);
} }
// 查询 对应的 2 个标准表 // 查询 对应的 2 个标准表
OracleConnectorParams oc = new OracleConnectorParams(); OracleConnectorParams oc = new OracleConnectorParams();
String logName = tmp.getAreaCode().toLowerCase()+"_"+tmp.getSysCode()+"_"+tmp.getDataVersion(); String logName = tmp.getAreaCode().toLowerCase() + "_"
+ tmp.getSysCode() + "_" + tmp.getDataVersion();
oc.setName(logName); oc.setName(logName);
try { try {
Connection conn = OracleConnector.connectionBuilder(url, Configs.ORACLE_USER, Configs.ORACLE_PSW, oc); Connection conn = OracleConnector.connectionBuilder(
//支付--校验 url, Configs.ORACLE_USER, Configs.ORACLE_PSW,
if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp.getPayResultLast())) { oc);
String paySql = "select * from dba_tables where owner = '"+Configs.COLLECT_STANDARD_TABLE_USER.toUpperCase() // 支付--校验
+ "' and table_name = '"+Configs.COLLECT_PAY_TABLE.toUpperCase() +"'"; if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp
if(OracleConnector.execUpdateOracleSQL(conn, paySql, .getPayResultLast())) {
FileOperateHelper.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)+ logName +"jy.log")){ String paySql = "select * from dba_tables where owner = '"
+ Configs.COLLECT_STANDARD_TABLE_USER
.toUpperCase()
+ "' and table_name = '"
+ Configs.COLLECT_PAY_TABLE.toUpperCase()
+ "'";
if (OracleConnector
.execUpdateOracleSQL(
conn,
paySql,
FileOperateHelper
.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)
+ logName + "jy.log")) {
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
} } else {
else {
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR); tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR);
} }
} }
//可执行-- 校验 // 可执行-- 校验
if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp.getExecResultLast())) { if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp
String execSql = "select * from dba_tables where owner = '"+Configs.COLLECT_STANDARD_TABLE_USER.toUpperCase() .getExecResultLast())) {
+ "' and table_name = '"+Configs.COLLECT_EXEC_TABLE.toUpperCase() +"'"; String execSql = "select * from dba_tables where owner = '"
if(OracleConnector.execUpdateOracleSQL(conn, execSql, + Configs.COLLECT_STANDARD_TABLE_USER
FileOperateHelper.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)+ logName +"jy.log")){ .toUpperCase()
+ "' and table_name = '"
+ Configs.COLLECT_EXEC_TABLE.toUpperCase()
+ "'";
if (OracleConnector
.execUpdateOracleSQL(
conn,
execSql,
FileOperateHelper
.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)
+ logName + "jy.log")) {
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
}else { } else {
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR); tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR);
} }
} }
tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE); tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE);
// 更新数据库data_info // 更新数据库data_info
updateDataInfo(tmp); updateDataInfo(tmp);
// 将 CacheOracleCheckoutEntity checkMap中的 数据 放入extractMap中 // 将 CacheOracleCheckoutEntity checkMap中的 数据
/*CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity.checkRemove(key)/*)*/; // 放入extractMap中
/* CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity
.checkRemove(key)/* ) */;
// 更新kuber状态 // 更新kuber状态
cmd2 = "kubectl annotate --overwrite rc " cmd2 = "kubectl annotate --overwrite rc " + key
+ key + " checkoutFlag=1"; + " checkoutFlag=1";
// client.updateOrAddReplicasLabelById(taskNSyame, // client.updateOrAddReplicasLabelById(taskNSyame,
// "status", "2"); // "status", "2");
rList = Constant.ganymedSSH rList = Constant.ganymedSSH
@ -250,34 +279,36 @@ public class ThreadCheckoutStandardOracle extends Thread {
sb.append(str).append("\n"); sb.append(str).append("\n");
log.info(sb.toString()); log.info(sb.toString());
message = "成功"; message = "成功";
log.info("更新replicationController标签 " log.info("更新replicationController标签 " + key
+ key + "\t[标签更新为: 已校验]"); + "\t[标签更新为: 已校验]");
} catch (Exception e) { } catch (Exception e) {
log.error(e); log.error(e);
} }
// 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录 // 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录
CacheOracleCheckoutEntity.checkRemove(key); CacheOracleCheckoutEntity.checkRemove(key);
} }
log.info("连接到数据库服务: " + key log.info("连接到数据库服务: " + key + "\t[连接结果: " + message + "]");
+ "\t[连接结果: " + message + "]");
} }
} }
} }
/** dataInfo /**
* dataInfo
*
* @param checkoutEntity * @param checkoutEntity
* @throws Exception * @throws Exception
*/ */
private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception { private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception {
DataInfoEntity data = new DataInfoEntity(); DataInfoEntity data = new DataInfoEntity();
// 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO // 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO
data.setId(checkoutEntity.getDataId()); data.setId(checkoutEntity.getDataId());
data.setPayResultLast(checkoutEntity.getPayResultLast()); data.setPayResultLast(checkoutEntity.getPayResultLast());
data.setExecResultLast(checkoutEntity.getExecResultLast()); data.setExecResultLast(checkoutEntity.getExecResultLast());
data.setCheckoutFlag(checkoutEntity.getCheckoutFlag()); data.setCheckoutFlag(checkoutEntity.getCheckoutFlag());
dataInfoDao.update(data); dataInfoDao.update(data);
DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity.getDataId()); DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity
.getDataId());
data.setId(tmpdata.getSrcId()); data.setId(tmpdata.getSrcId());
dataInfoDao.update(data); dataInfoDao.update(data);
} }

@ -27,6 +27,7 @@ public class Configs {
public static String KUBE_MASTER_URL = "http://192.168.0.110:8080/"; // kubernetes集群的maser public static String KUBE_MASTER_URL = "http://192.168.0.110:8080/"; // kubernetes集群的maser
// URl // URl
public static String KUBE_MASTER_IP="127.0.0.1:8080"; //
public static int ORACLE_DEFAULT_PORT = 1521; // oracle的默认端口号 public static int ORACLE_DEFAULT_PORT = 1521; // oracle的默认端口号

@ -64,6 +64,8 @@ public class ConfigsLoader implements ServletContextListener {
Configs.GATHER_SERVICE_NAME = properties Configs.GATHER_SERVICE_NAME = properties
.getProperty("gather-service-name"); .getProperty("gather-service-name");
Configs.KUBE_MASTER_IP= properties.getProperty("kuberMasterIp");
Configs.TABLE_SUFFIX = properties.getProperty("table-suffix"); Configs.TABLE_SUFFIX = properties.getProperty("table-suffix");
Configs.EXTRACT_LOG_LOCALTION = properties Configs.EXTRACT_LOG_LOCALTION = properties

@ -114,6 +114,9 @@ public class GanymedSSH {
Session sess = null; Session sess = null;
try { try {
// conn = getOpenedConnection(host, username, password, port); // conn = getOpenedConnection(host, username, password, port);
if (null != conn) {
// conn=getOpenedConnection(Configs., username, password, port)
}
sess = conn.openSession(); sess = conn.openSession();
// 执锟斤拷cmd // 执锟斤拷cmd
sess.execCommand(cmd); sess.execCommand(cmd);

Loading…
Cancel
Save