You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
aggregation-platform/src/com/platform/service/impl/OracleExtractServiceImpl.java

420 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.platform.service.impl;
import java.sql.Connection;
import java.util.Date;
import java.util.List;
import javax.annotation.Resource;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import com.base.Custom4exception;
import com.base.CustomException;
import com.platform.dao.DataInfoDao;
import com.platform.entities.DataInfoEntity;
import com.platform.entities.GatherOracleInfo;
import com.platform.entities.OracleConnectorParams;
import com.platform.kubernetes.SimpleKubeClient;
import com.platform.oracle.OracleConnector;
import com.platform.service.IOracleExtractService;
import com.platform.service.OracleExtractHelper;
import com.platform.utils.CacheSetCantDelete;
import com.platform.utils.Configs;
import com.platform.utils.Constant;
import com.platform.utils.DateForm;
import com.platform.utils.FileOperateHelper;
@Service(value = "OracleExtract")
public class OracleExtractServiceImpl implements IOracleExtractService {
/**
* 日志
*/
public final static Logger log = Logger
.getLogger(OracleExtractServiceImpl.class);
@Resource(name = "dataInfoDao")
private DataInfoDao dataInfoDao;
/**
* kubernetes client
*/
private SimpleKubeClient client = new SimpleKubeClient();
/**
* 抽取
*/
private OracleExtractHelper oracleExtract = new OracleExtractHelper();
/**
* 数据库连接实现类
*/
private OracleConnector connect = new OracleConnector();
@Override
public boolean extractOracle(String name,
List<OracleConnectorParams> datainfos, GatherOracleInfo oracleModel)
throws Exception {
boolean isSuccess = false;
try {
// map转 bean(汇总库信息-带tableName的)
// GatherOracleInfo oracleModel = oracleConnect;
// 采集库连接参数
// List<OracleConnectorParams> datainfos = dataInfolist;
if (datainfos.size() == 0) {
return false;
}
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(),
oracleModel.getUser(), oracleModel.getPassword(),
datainfos.get(0));
if (null == conn) {
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ datainfos.get(0).getName() + ".log",
"创建oracle连接失败: [" + conn + "]\r\n");
return false;
}
for (OracleConnectorParams collectOracle : datainfos) {
String replicasName = collectOracle.getName();
DataInfoEntity data = new DataInfoEntity();
try {
if (null != collectOracle.getDataId()
&& !"".equals(collectOracle.getDataId())) {
data.setId(Integer.valueOf(collectOracle.getDataId()));
data.setExtractStatus(1);
dataInfoDao.updateExtract(data);
collectOracle.setName("J"
+ collectOracle.getName().replace("-", "_"));
String cmd = "kubectl --server "
+ Configs.KUBE_MASTER_ADDRESS
+ " label --overwrite rc " + replicasName
+ " isExtract=1";
// sql日志记录时间
FileOperateHelper
.fileWrite(
Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName()
+ ".log",
"\r\n 开始汇总 \r\n"
+ DateForm
.date2StringBysecond(new Date())
+ "\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "1");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createDBLink(conn, collectOracle); // 创建dblink
oracleExtract.createTableSpace(conn, collectOracle,
oracleModel); // 创建表空间
oracleExtract.createUser(conn, collectOracle,
oracleModel);// 创建用户并授权
oracleExtract.extractColleDB(conn, collectOracle,
oracleModel);// 执行抽取
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "2");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS
+ " label --overwrite rc " + replicasName
+ " isExtract=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setExtractStatus(2);
dataInfoDao.updateExtract(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data
.getId());
data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data);
}
} catch (Exception e) {
String cmd = "kubectl label --overwrite rc " + replicasName
+ " isExtract=0";
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
data.setExtractStatus(0);
dataInfoDao.updateExtract(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data);
log.error(Custom4exception.OracleSQL_Except, e);
} finally {
// 去掉保存的当前数据id,
CacheSetCantDelete.removeExtractId(collectOracle
.getDataId());
String msg = "汇总结束";
if (2 != data.getExtractStatus()) {
msg += " 汇总有异常!状态重置为待汇总 ";
data.setExtractStatus(0);
dataInfoDao.updateExtract(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data
.getId());
data.setId(tmpdata.getSrcId());
dataInfoDao.updateExtract(data);
}
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_LOG_LOCALTION
+ collectOracle.getName() + ".log",
"\r\n " + msg + " >>>>>>> "
+ DateForm.date2StringBysecond(new Date())
+ "\r\n\r\n\n");
}
}
isSuccess = true;
} catch (Exception e) {
new CustomException(Custom4exception.OracleSQL_Except, e);
}
return isSuccess;
}
@Override
public boolean isConnectTotalOracle(GatherOracleInfo oracleModel)
throws Exception {
boolean isConnect = false;
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(),
oracleModel.getPassword(), null);
if (null == conn) {
isConnect = false;
throw new CustomException(Custom4exception.connect_Oracle_Except,
null, oracleModel);
// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn +
// "]\r\n");
} else {
isConnect = oracleExtract.testConnect(conn);
conn.close(); // 关闭连接
}
return isConnect;
}
@Override
public boolean extractStandardTable(String name,
List<OracleConnectorParams> dataInfolist,
GatherOracleInfo oracleConnect) throws Exception {
boolean isSuccess = false;
// map转 bean(汇总库信息-带tableName的)
GatherOracleInfo oracleModel = oracleConnect;
// 采集库连接参数
List<OracleConnectorParams> datainfos = dataInfolist;
if (datainfos.size() == 0) {
return false;
}
Connection conn = OracleConnector.connectionBuilder(
"jdbc:oracle:thin:@" + oracleModel.getIp() + ":"
+ oracleModel.getPort() + ":"
+ oracleModel.getDatabaseName(), oracleModel.getUser(),
oracleModel.getPassword(), dataInfolist.get(0));
if (null == conn) {
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
+ dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: ["
+ conn + "]\r\n");
return false;
}
for (OracleConnectorParams collectOracle : datainfos) {
String replicasName = collectOracle.getName();
DataInfoEntity data = new DataInfoEntity();
try {
if (null != collectOracle.getDataId()
&& !"".equals(collectOracle.getDataId())) {
data.setId(Integer.valueOf(collectOracle.getDataId()));
// 设置为 标准表 抽取中
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setStandardExtractStatus("1");
dataInfoDao.update(data);
collectOracle.setName("CQ"
+ collectOracle.getName().replace("-", "_"));
String cmd = "kubectl --server "
+ Configs.KUBE_MASTER_ADDRESS
+ " annotate --overwrite rc " + replicasName
+ " standardExtractStatus=1";
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName() + ".log",
"\r\n 开始抽取标准表 \r\n "
+ DateForm.date2StringBysecond(new Date())
+ "\n\r\n");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "1");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink
oracleExtract.createStardardTableSpace(conn, collectOracle,
oracleModel); // 创建表空间
oracleExtract.createOnlyUser(conn, collectOracle,
oracleModel);// 创建 抽取标准表的 用户并授权
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
if (null != tmpdata) {
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
.getPayResultLast())
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata
.getPayResultLast())) {
// 抽取中
data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardPayTable(conn,
collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
// 抽取成功
data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata
.getExecResultLast())
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata
.getExecResultLast())) {
// 抽取中
data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
dataInfoDao.update(data);
boolean isExtrac = true;
try {
oracleExtract.extractStandardExecTable(conn,
collectOracle, oracleModel);// 执行抽取
} catch (Exception e) {
// 改回 校验存在的状态
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
dataInfoDao.update(data);
isExtrac = false;
}
if (isExtrac) {
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
dataInfoDao.update(data);
}
}
// client.updateOrAddReplicasLabelById(collectOracle.getName(),
// "isExtract", "2");
// //更新oracle汇总状态0标示为未汇总1标示汇总中2标示汇总完成
cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS
+ " annotate --overwrite rc " + replicasName
+ " standardExtractStatus=2";
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
sb = new StringBuffer();
for (String string : rList)
sb.append(string).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
data.setStandardExtractStatus("2");
dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data
.getId());
data.setId(tmpSrcData.getSrcId());
dataInfoDao.update(data);
}
}
} catch (Exception e) {
String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS
+ " annotate --overwrite rc " + replicasName
+ " standardExtractStatus=0";
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
data.setStandardExtractStatus("0");
dataInfoDao.update(data);
DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId());
data.setId(tmpSrcData.getSrcId());
dataInfoDao.update(data);
log.error(Custom4exception.OracleSQL_Except, e);
} finally {
// 去掉保存的当前数据id,
CacheSetCantDelete.removeStandardId(collectOracle.getDataId());
String msg = "抽取标准表结束";
if (!"2".equals(data.getStandardExtractStatus())) {
msg += " 抽取有异常!状态重置为待抽取 ";
data.setStandardExtractStatus("0");
dataInfoDao.update(data);
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
data.setId(tmpdata.getSrcId());
dataInfoDao.update(data);
}
// sql日志记录时间
// sql日志记录时间
FileOperateHelper.fileWrite(
Configs.EXTRACT_STANDARD_LOG_LOCALTION
+ collectOracle.getName() + ".log",
" " + msg + " >>>>> "
+ DateForm.date2StringBysecond(new Date())
+ "\r\n\r\n\n");
}
}
isSuccess = true;
return isSuccess;
}
@Override
public void updateDataExtractStatus(OracleConnectorParams ocp, int status) {
DataInfoEntity data = new DataInfoEntity();
data.setId(Integer.valueOf(ocp.getDataId()));
// 设置为 标准表 抽取中
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
data.setExtractStatus(status);
try {
dataInfoDao.update(data);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// @Override
// public boolean extractOracle(String name, List<OracleConnectorParams>
// dataInfos, GatherOracleInfo oracleConnect) throws Exception {
// boolean isSuccess = false;
// try{
// //map转 bean(汇总库信息-带tableName的)
// // GatherOracleInfo oracleModel = (GatherOracleInfo)
// Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect);
//
// //采集库连接参数
// // List<OracleConnectorParams> datainfos = new
// ArrayList<OracleConnectorParams>();
// // for (Map<String, String> map : dataInfoMap) {
// // OracleConnectorParams dataInfoEntity = (OracleConnectorParams)
// Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect);
// // datainfos.add(dataInfoEntity);
// // }
//
// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@"
// + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/"
// + oracleConnect.getDatabaseName(), oracleConnect.getUser(),
// oracleConnect.getPassword());
//
// for (OracleConnectorParams collectOracle : dataInfos) {
//
// oracleExtract.createDBLink(conn, collectOracle);
// oracleExtract.createTableSpace(conn, oracleConnect);
// oracleExtract.createUser(conn, oracleConnect);
// oracleExtract.extractColleDB(conn, collectOracle);
// }
// isSuccess = true;
// }catch(Exception e){
//
// }
// return false;
// }
}