|
|
package com.platform.service.impl;
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
import java.util.Date;
|
|
|
import java.util.List;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import com.base.Custom4exception;
|
|
|
import com.base.CustomException;
|
|
|
import com.platform.controller.OracleController;
|
|
|
import com.platform.dao.DataInfoDao;
|
|
|
import com.platform.entities.DataInfoEntity;
|
|
|
import com.platform.entities.GatherOracleInfo;
|
|
|
import com.platform.entities.OracleConnectorParams;
|
|
|
import com.platform.kubernetes.SimpleKubeClient;
|
|
|
import com.platform.oracle.OracleConnector;
|
|
|
import com.platform.service.IOracleExtractService;
|
|
|
import com.platform.service.OracleExtractHelper;
|
|
|
import com.platform.utils.Configs;
|
|
|
import com.platform.utils.Constant;
|
|
|
import com.platform.utils.DateForm;
|
|
|
import com.platform.utils.FileOperateHelper;
|
|
|
|
|
|
@Service(value = "OracleExtract")
|
|
|
public class OracleExtractServiceImpl implements IOracleExtractService {
|
|
|
|
|
|
/**
|
|
|
* 日志
|
|
|
*/
|
|
|
public final static Logger log = Logger.getLogger(OracleExtractServiceImpl.class);
|
|
|
|
|
|
@Resource(name = "dataInfoDao")
|
|
|
private DataInfoDao dataInfoDao;
|
|
|
|
|
|
/**
|
|
|
* kubernetes client
|
|
|
*/
|
|
|
private SimpleKubeClient client = new SimpleKubeClient();
|
|
|
/**
|
|
|
* 抽取
|
|
|
*/
|
|
|
private OracleExtractHelper oracleExtract = new OracleExtractHelper();
|
|
|
|
|
|
/**
|
|
|
* 数据库连接实现类
|
|
|
*/
|
|
|
private OracleConnector connect = new OracleConnector();
|
|
|
|
|
|
@Override
|
|
|
public boolean extractOracle(String name, List<OracleConnectorParams> dataInfolist,
|
|
|
GatherOracleInfo oracleConnect) throws Exception {
|
|
|
boolean isSuccess = false;
|
|
|
try{
|
|
|
//map转 bean(汇总库信息-带tableName的)
|
|
|
GatherOracleInfo oracleModel = oracleConnect;
|
|
|
//采集库连接参数
|
|
|
List<OracleConnectorParams> datainfos = dataInfolist;
|
|
|
if (datainfos.size() == 0) {
|
|
|
return false;
|
|
|
}
|
|
|
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
|
|
|
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0));
|
|
|
if (null == conn) {
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
|
|
|
+ dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n");
|
|
|
return false;
|
|
|
}
|
|
|
for (OracleConnectorParams collectOracle : datainfos) {
|
|
|
String replicasName = collectOracle.getName();
|
|
|
try{
|
|
|
if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){
|
|
|
DataInfoEntity data = new DataInfoEntity();
|
|
|
data.setId(Integer.valueOf(collectOracle.getDataId()));
|
|
|
data.setExtractStatus(1);
|
|
|
dataInfoDao.updateExtract(data);
|
|
|
collectOracle.setName("J" + collectOracle.getName().replace("-", "_"));
|
|
|
String cmd = "kubectl label --overwrite rc "
|
|
|
+ replicasName + " isExtract=1";
|
|
|
//sql日志记录时间:
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
|
|
|
+ collectOracle.getName()+".log", "\r\n 开始汇总 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
|
|
|
List<String> rList = Constant.ganymedSSH
|
|
|
.execCmdWaitAcquiescent(cmd);
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
for (String string : rList)
|
|
|
sb.append(string).append("\n");
|
|
|
Configs.CONSOLE_LOGGER.info(sb.toString());
|
|
|
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成
|
|
|
oracleExtract.createDBLink(conn, collectOracle); //创建dblink
|
|
|
oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间
|
|
|
oracleExtract.createUser(conn, collectOracle, oracleModel);//创建用户并授权
|
|
|
oracleExtract.extractColleDB(conn, collectOracle, oracleModel);//执行抽取
|
|
|
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成
|
|
|
cmd = "kubectl label --overwrite rc "
|
|
|
+ replicasName + " isExtract=2";
|
|
|
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
|
|
|
sb = new StringBuffer();
|
|
|
for (String string : rList)
|
|
|
sb.append(string).append("\n");
|
|
|
Configs.CONSOLE_LOGGER.info(sb.toString());
|
|
|
data.setExtractStatus(2);
|
|
|
dataInfoDao.updateExtract(data);
|
|
|
}
|
|
|
}catch(Exception e){
|
|
|
log.error(Custom4exception.OracleSQL_Except, e);
|
|
|
}
|
|
|
finally{
|
|
|
//sql日志记录时间:
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
|
|
|
+ collectOracle.getName()+".log", "\r\n 汇总结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
|
|
|
String cmd = "kubectl label --overwrite rc "
|
|
|
+ replicasName + " isExtract=2";
|
|
|
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
|
|
|
}
|
|
|
}
|
|
|
isSuccess = true;
|
|
|
}catch(Exception e){
|
|
|
new CustomException(Custom4exception.OracleSQL_Except, e);
|
|
|
}
|
|
|
return isSuccess;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) throws Exception {
|
|
|
boolean isConnect = false;
|
|
|
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
|
|
|
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(), null);
|
|
|
if (null == conn) {
|
|
|
isConnect = false;
|
|
|
throw new CustomException(Custom4exception.connect_Oracle_Except, null, oracleModel);
|
|
|
// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
|
|
|
// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + "]\r\n");
|
|
|
}else {
|
|
|
isConnect = oracleExtract.testConnect(conn);
|
|
|
}
|
|
|
return isConnect;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public boolean extractStandardTable(String name, List<OracleConnectorParams> dataInfolist,
|
|
|
GatherOracleInfo oracleConnect) throws Exception {
|
|
|
boolean isSuccess = false;
|
|
|
try{
|
|
|
//map转 bean(汇总库信息-带tableName的)
|
|
|
GatherOracleInfo oracleModel = oracleConnect;
|
|
|
//采集库连接参数
|
|
|
List<OracleConnectorParams> datainfos = dataInfolist;
|
|
|
if (datainfos.size() == 0) {
|
|
|
return false;
|
|
|
}
|
|
|
Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":"
|
|
|
+ oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0));
|
|
|
if (null == conn) {
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION
|
|
|
+ dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n");
|
|
|
return false;
|
|
|
}
|
|
|
for (OracleConnectorParams collectOracle : datainfos) {
|
|
|
String replicasName = collectOracle.getName();
|
|
|
try{
|
|
|
if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){
|
|
|
DataInfoEntity data = new DataInfoEntity();
|
|
|
data.setId(Integer.valueOf(collectOracle.getDataId()));
|
|
|
//设置为 标准表 抽取中
|
|
|
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX);
|
|
|
data.setStandardExtractStatus("1");
|
|
|
dataInfoDao.update(data);
|
|
|
collectOracle.setName("CQ" + collectOracle.getName().replace("-", "_"));
|
|
|
String cmd = "kubectl annotate --overwrite rc "
|
|
|
+ replicasName + " standardExtractStatus=1";
|
|
|
//sql日志记录时间:
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION
|
|
|
+ collectOracle.getName()+".log", "\r\n 开始抽取标准表 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
|
|
|
List<String> rList = Constant.ganymedSSH
|
|
|
.execCmdWaitAcquiescent(cmd);
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
for (String string : rList)
|
|
|
sb.append(string).append("\n");
|
|
|
Configs.CONSOLE_LOGGER.info(sb.toString());
|
|
|
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成
|
|
|
oracleExtract.createStandardDBLink(conn, collectOracle); //创建dblink
|
|
|
oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间
|
|
|
oracleExtract.createOnlyUser(conn, collectOracle, oracleModel);//创建 抽取标准表的 用户并授权
|
|
|
DataInfoEntity tmpdata = dataInfoDao.findById(data.getId());
|
|
|
if (null != tmpdata) {
|
|
|
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getPayResultLast())
|
|
|
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getPayResultLast())
|
|
|
|| Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getPayResultLast())) {
|
|
|
//抽取中
|
|
|
data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX);
|
|
|
dataInfoDao.update(data);
|
|
|
boolean isExtrac = true;
|
|
|
try{
|
|
|
oracleExtract.extractStandardPayTable(conn, collectOracle, oracleModel);//执行抽取
|
|
|
}catch(Exception e){
|
|
|
//改回 校验存在的状态
|
|
|
data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
|
|
|
dataInfoDao.update(data);
|
|
|
isExtrac = false;
|
|
|
}
|
|
|
if (isExtrac) {
|
|
|
//抽取成功
|
|
|
data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN);
|
|
|
dataInfoDao.update(data);
|
|
|
}
|
|
|
}
|
|
|
if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getExecResultLast())
|
|
|
|| Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getExecResultLast())
|
|
|
|| Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getExecResultLast())) {
|
|
|
//抽取中
|
|
|
data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX);
|
|
|
dataInfoDao.update(data);
|
|
|
boolean isExtrac = true;
|
|
|
try{
|
|
|
oracleExtract.extractStandardExecTable(conn, collectOracle, oracleModel);//执行抽取
|
|
|
}catch(Exception e){
|
|
|
//改回 校验存在的状态
|
|
|
data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
|
|
|
dataInfoDao.update(data);
|
|
|
isExtrac = false;
|
|
|
}
|
|
|
if (isExtrac) {
|
|
|
data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN);
|
|
|
dataInfoDao.update(data);
|
|
|
}
|
|
|
}
|
|
|
// client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成
|
|
|
cmd = "kubectl annotate --overwrite rc "
|
|
|
+ replicasName + " standardExtractStatus=2";
|
|
|
rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
|
|
|
sb = new StringBuffer();
|
|
|
for (String string : rList)
|
|
|
sb.append(string).append("\n");
|
|
|
Configs.CONSOLE_LOGGER.info(sb.toString());
|
|
|
data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN);
|
|
|
data.setStandardExtractStatus("2");
|
|
|
dataInfoDao.update(data);
|
|
|
}
|
|
|
}
|
|
|
}catch(Exception e){
|
|
|
log.error(Custom4exception.OracleSQL_Except, e);
|
|
|
}
|
|
|
finally{
|
|
|
//sql日志记录时间:
|
|
|
FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION
|
|
|
+ collectOracle.getName()+".log", "\r\n 抽取标准表结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n");
|
|
|
String cmd = "kubectl annotate --overwrite rc "
|
|
|
+ replicasName + " standardExtractStatus=2";
|
|
|
Constant.ganymedSSH.execCmdWaitAcquiescent(cmd);
|
|
|
}
|
|
|
}
|
|
|
isSuccess = true;
|
|
|
}catch(Exception e){
|
|
|
log.error(Custom4exception.OracleSQL_Except, e);
|
|
|
}
|
|
|
return isSuccess;
|
|
|
}
|
|
|
|
|
|
// @Override
|
|
|
// public boolean extractOracle(String name, List<OracleConnectorParams> dataInfos, GatherOracleInfo oracleConnect) throws Exception {
|
|
|
// boolean isSuccess = false;
|
|
|
// try{
|
|
|
// //map转 bean(汇总库信息-带tableName的)
|
|
|
//// GatherOracleInfo oracleModel = (GatherOracleInfo) Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect);
|
|
|
//
|
|
|
// //采集库连接参数
|
|
|
//// List<OracleConnectorParams> datainfos = new ArrayList<OracleConnectorParams>();
|
|
|
//// for (Map<String, String> map : dataInfoMap) {
|
|
|
//// OracleConnectorParams dataInfoEntity = (OracleConnectorParams) Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect);
|
|
|
//// datainfos.add(dataInfoEntity);
|
|
|
//// }
|
|
|
//
|
|
|
// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/"
|
|
|
// + oracleConnect.getDatabaseName(), oracleConnect.getUser(), oracleConnect.getPassword());
|
|
|
//
|
|
|
// for (OracleConnectorParams collectOracle : dataInfos) {
|
|
|
//
|
|
|
// oracleExtract.createDBLink(conn, collectOracle);
|
|
|
// oracleExtract.createTableSpace(conn, oracleConnect);
|
|
|
// oracleExtract.createUser(conn, oracleConnect);
|
|
|
// oracleExtract.extractColleDB(conn, collectOracle);
|
|
|
// }
|
|
|
// isSuccess = true;
|
|
|
// }catch(Exception e){
|
|
|
//
|
|
|
// }
|
|
|
// return false;
|
|
|
// }
|
|
|
|
|
|
}
|