diff --git a/src/com/platform/service/OracleStatusService.java b/src/com/platform/service/OracleStatusService.java index 76fd8f10..269679ec 100644 --- a/src/com/platform/service/OracleStatusService.java +++ b/src/com/platform/service/OracleStatusService.java @@ -1,206 +1,206 @@ -package com.platform.service; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.ReplicationController; - -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.utils.Configs; -import com.platform.utils.Constant; - -/** - * 计时 尝试连接oracle的时间,(超过10分钟判断失败) - * - * @author chen - * - */ -public class OracleStatusService { - private static Map alliveTask = new Hashtable(); - public final static int EXEC_TIME = 9;// 连接多少次后不成功,取消链接 - public final static long INTERVAL_TIME = 60 * 1000;// 每隔多少毫秒执行一次连接任务 - public final static long DELAY_TIME = 30 * 1000; // 延迟多少秒后执行 - - public void connectToOracle(String replicasName) { - SimpleKubeClient sKubeClient = new SimpleKubeClient(); - if (alliveTask.containsKey(replicasName)) { - killAlliveTask(replicasName); - } - OracleConnectorParams orp = new OracleConnectorParams(); - Timer timer = new Timer(); - alliveTask.put(replicasName, timer); - timer.schedule(new connectTask(replicasName, orp, sKubeClient), - DELAY_TIME, INTERVAL_TIME); - - } - - public void cancelToOracle(String replicasName, String operate) { - if (operate.equals("stop")) { - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName + " status=0"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // SimpleKubeClient sKubeClient = new SimpleKubeClient(); - // sKubeClient.updateOrAddReplicasLabelById(replicasName, - // "status","0"); - } - killAlliveTask(replicasName); - } - - /** - * 取消并移除指定定时任务 - * - * - * @param taskName - */ - public void killAlliveTask(String taskName) { - if (alliveTask.containsKey(taskName)) { - alliveTask.get(taskName).cancel(); - alliveTask.remove(taskName); - } - } - - public void killAlliveTasks(String... tasksName) { - for (String taskName : tasksName) - killAlliveTask(taskName); - } - - /** - * 清空定时任务 - */ - public void cleanUpAlliveTask() { - Iterator> iterator = alliveTask.entrySet() - .iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - entry.getValue().cancel(); - } - alliveTask.clear(); - } - - /** - * 链接oracle任务类 - * - * @author wuming - * - */ - class connectTask extends TimerTask { - private String taskName; - private int count; - private OracleConnectorParams ocp; - private SimpleKubeClient client; - - public connectTask(String taskName, OracleConnectorParams ocp, - SimpleKubeClient client) { - this.taskName = taskName; - this.ocp = ocp; - this.count = 0; - this.client = client; - } - - public connectTask(OracleConnectorParams ocp, SimpleKubeClient client) { - this.taskName = ocp.getName(); - this.ocp = ocp; - this.count = 0; - this.client = client; - } - - @Override - public void run() { - if (count == EXEC_TIME && alliveTask.containsKey(taskName)) { // 如果任务已经执行10次,则任务oracle启动失败,并取消oracle连接 - killAlliveTask(taskName); - // client.updateOrAddReplicasLabelById(taskName, "status", "1"); - // //更新ReplicationController标签,将oracle状态标示未1(0:启动中,1:失败,2:成功) - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + taskName + " status=1"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info("更新replicationController标签: " - + taskName + "\t[标签更新为: 失败]"); - } else { // 否则,执行连接oracle任务,判断oracle是否启动成功 - Pod pod = filterPod(); - if (pod != null) { - String ip = client.getPodHostIp(pod); - int port = client.getPodContainerport(pod); - if (ip != null && port != 0) { - String url = "jdbc:oracle:thin:@" + ip + ":" + port - + ":" + ocp.getDatabaseName(); - boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 - ocp.getUser(), ocp.getPassword()); - Configs.CONSOLE_LOGGER.info("url:" + url + ",user:" - + ocp.getUser() + ",password:" - + ocp.getPassword()); - String message = "失败"; - if (flag && alliveTask.containsKey(taskName)) { - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + taskName - + " status=2"; - // client.updateOrAddReplicasLabelById(taskNSyame, - // "status", "2"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - message = "成功"; - killAlliveTask(taskName); // 连接成功,取消连接 - Configs.CONSOLE_LOGGER - .info("更新replicationController标签: " - + taskName + "\t[标签更新为: 成功]"); - } - Configs.CONSOLE_LOGGER.info("连接到数据库服务: " + taskName - + "\t[连接结果: " + message + "]"); - } - } - } - count++; - } - - /** - * 获取oracle的连接ip地址和端口号 - * - * @return - */ - private Pod filterPod() { - Pod pod = null; - ReplicationController replicationController = client - .getReplicationController(taskName); - if (null != replicationController) { - List filterPods = client - .getPodsForApplicaList(replicationController); - if (filterPods != null && filterPods.size() > 0) { - pod = filterPods.get(0); - } - } - return pod; - } - - public String getTaskName() { - return taskName; - } - - public void setTaskName(String taskName) { - this.taskName = taskName; - } - - public int getCount() { - return count; - } - } +package com.platform.service; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.ReplicationController; + +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.utils.Configs; +import com.platform.utils.Constant; + +/** + * 计时 尝试连接oracle的时间,(超过10分钟判断失败) + * + * @author chen + * + */ +public class OracleStatusService { + private static Map alliveTask = new Hashtable(); + public final static int EXEC_TIME = 9;// 连接多少次后不成功,取消链接 + public final static long INTERVAL_TIME = 60 * 1000;// 每隔多少毫秒执行一次连接任务 + public final static long DELAY_TIME = 30 * 1000; // 延迟多少秒后执行 + + public void connectToOracle(String replicasName) { + SimpleKubeClient sKubeClient = new SimpleKubeClient(); + if (alliveTask.containsKey(replicasName)) { + killAlliveTask(replicasName); + } + OracleConnectorParams orp = new OracleConnectorParams(); + Timer timer = new Timer(); + alliveTask.put(replicasName, timer); + timer.schedule(new connectTask(replicasName, orp, sKubeClient), + DELAY_TIME, INTERVAL_TIME); + + } + + public void cancelToOracle(String replicasName, String operate) { + if (operate.equals("stop")) { + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + " status=0"; + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // SimpleKubeClient sKubeClient = new SimpleKubeClient(); + // sKubeClient.updateOrAddReplicasLabelById(replicasName, + // "status","0"); + } + killAlliveTask(replicasName); + } + + /** + * 取消并移除指定定时任务 + * + * + * @param taskName + */ + public void killAlliveTask(String taskName) { + if (alliveTask.containsKey(taskName)) { + alliveTask.get(taskName).cancel(); + alliveTask.remove(taskName); + } + } + + public void killAlliveTasks(String... tasksName) { + for (String taskName : tasksName) + killAlliveTask(taskName); + } + + /** + * 清空定时任务 + */ + public void cleanUpAlliveTask() { + Iterator> iterator = alliveTask.entrySet() + .iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + entry.getValue().cancel(); + } + alliveTask.clear(); + } + + /** + * 链接oracle任务类 + * + * @author wuming + * + */ + class connectTask extends TimerTask { + private String taskName; + private int count; + private OracleConnectorParams ocp; + private SimpleKubeClient client; + + public connectTask(String taskName, OracleConnectorParams ocp, + SimpleKubeClient client) { + this.taskName = taskName; + this.ocp = ocp; + this.count = 0; + this.client = client; + } + + public connectTask(OracleConnectorParams ocp, SimpleKubeClient client) { + this.taskName = ocp.getName(); + this.ocp = ocp; + this.count = 0; + this.client = client; + } + + @Override + public void run() { + if (count == EXEC_TIME && alliveTask.containsKey(taskName)) { // 如果任务已经执行10次,则任务oracle启动失败,并取消oracle连接 + killAlliveTask(taskName); + // client.updateOrAddReplicasLabelById(taskName, "status", "1"); + // //更新ReplicationController标签,将oracle状态标示未1(0:启动中,1:失败,2:成功) + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + taskName + " status=1"; + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info("更新replicationController标签: " + + taskName + "\t[标签更新为: 失败]"); + } else { // 否则,执行连接oracle任务,判断oracle是否启动成功 + Pod pod = filterPod(); + if (pod != null) { + String ip = client.getPodHostIp(pod); + int port = client.getPodContainerport(pod); + if (ip != null && port != 0) { + String url = "jdbc:oracle:thin:@" + ip + ":" + port + + ":" + ocp.getDatabaseName(); + boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 + ocp.getUser(), ocp.getPassword()); + Configs.CONSOLE_LOGGER.info("url:" + url + ",user:" + + ocp.getUser() + ",password:" + + ocp.getPassword()); + String message = "失败"; + if (flag && alliveTask.containsKey(taskName)) { + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + taskName + + " status=2"; + // client.updateOrAddReplicasLabelById(taskNSyame, + // "status", "2"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + message = "成功"; + killAlliveTask(taskName); // 连接成功,取消连接 + Configs.CONSOLE_LOGGER + .info("更新replicationController标签: " + + taskName + "\t[标签更新为: 成功]"); + } + Configs.CONSOLE_LOGGER.info("连接到数据库服务: " + taskName + + "\t[连接结果: " + message + "]"); + } + } + } + count++; + } + + /** + * 获取oracle的连接ip地址和端口号 + * + * @return + */ + private Pod filterPod() { + Pod pod = null; + ReplicationController replicationController = client + .getReplicationController(taskName); + if (null != replicationController) { + List filterPods = client + .getPodsForApplicaList(replicationController); + if (filterPods != null && filterPods.size() > 0) { + pod = filterPods.get(0); + } + } + return pod; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getCount() { + return count; + } + } } \ No newline at end of file diff --git a/src/com/platform/service/impl/CheckoutServiceImpl.java b/src/com/platform/service/impl/CheckoutServiceImpl.java index 8cf3e12b..cf9740b2 100644 --- a/src/com/platform/service/impl/CheckoutServiceImpl.java +++ b/src/com/platform/service/impl/CheckoutServiceImpl.java @@ -200,6 +200,7 @@ public class CheckoutServiceImpl implements ICheckoutService { int listSize = resultList.size(); for (int i = 0; i < listSize; i++) { CheckoutEntity obj = resultList.get(i); + obj.setAreaCode(obj.getAreaCode().toLowerCase()); if (null != obj && "y".equals(obj.getCollUpdate())) { obj.setCollUpdate("是"); } diff --git a/src/com/platform/service/impl/OracleExtractServiceImpl.java b/src/com/platform/service/impl/OracleExtractServiceImpl.java index 2082bd0c..fc136cac 100644 --- a/src/com/platform/service/impl/OracleExtractServiceImpl.java +++ b/src/com/platform/service/impl/OracleExtractServiceImpl.java @@ -1,419 +1,419 @@ -package com.platform.service.impl; - -import java.sql.Connection; -import java.util.Date; -import java.util.List; - -import javax.annotation.Resource; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Service; - -import com.base.Custom4exception; -import com.base.CustomException; -import com.platform.dao.DataInfoDao; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.GatherOracleInfo; -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.service.IOracleExtractService; -import com.platform.service.OracleExtractHelper; -import com.platform.utils.CacheSetCantDelete; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.DateForm; -import com.platform.utils.FileOperateHelper; - -@Service(value = "OracleExtract") -public class OracleExtractServiceImpl implements IOracleExtractService { - - /** - * 日志 - */ - public final static Logger log = Logger - .getLogger(OracleExtractServiceImpl.class); - - @Resource(name = "dataInfoDao") - private DataInfoDao dataInfoDao; - - /** - * kubernetes client - */ - private SimpleKubeClient client = new SimpleKubeClient(); - /** - * 抽取 - */ - private OracleExtractHelper oracleExtract = new OracleExtractHelper(); - - /** - * 数据库连接实现类 - */ - private OracleConnector connect = new OracleConnector(); - - @Override - public boolean extractOracle(String name, - List datainfos, GatherOracleInfo oracleModel) - throws Exception { - boolean isSuccess = false; - try { - // map转 bean(汇总库信息-带tableName的) - // GatherOracleInfo oracleModel = oracleConnect; - // 采集库连接参数 - // List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), - oracleModel.getUser(), oracleModel.getPassword(), - datainfos.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + datainfos.get(0).getName() + ".log", - "创建oracle连接失败: [" + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try { - if (null != collectOracle.getDataId() - && !"".equals(collectOracle.getDataId())) { - data.setId(Integer.valueOf(collectOracle.getDataId())); - data.setExtractStatus(1); - dataInfoDao.updateExtract(data); - collectOracle.setName("J" - + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName - + " isExtract=1"; - // sql日志记录时间: - FileOperateHelper - .fileWrite( - Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName() - + ".log", - "\r\n 开始汇总 \r\n" - + DateForm - .date2StringBysecond(new Date()) - + "\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "1"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createDBLink(conn, collectOracle); // 创建dblink - oracleExtract.createTableSpace(conn, collectOracle, - oracleModel); // 创建表空间 - oracleExtract.createUser(conn, collectOracle, - oracleModel);// 创建用户并授权 - oracleExtract.extractColleDB(conn, collectOracle, - oracleModel);// 执行抽取 - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "2"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName - + " isExtract=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setExtractStatus(2); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data - .getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - } - } catch (Exception e) { - String cmd = "kubectl label --overwrite rc " + replicasName - + " isExtract=0"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - data.setExtractStatus(0); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - log.error(Custom4exception.OracleSQL_Except, e); - } finally { - // 去掉保存的当前数据id, - CacheSetCantDelete.removeExtractId(collectOracle - .getDataId()); - String msg = "汇总结束"; - if (2 != data.getExtractStatus()) { - msg += " 汇总有异常!状态重置为待汇总 "; - data.setExtractStatus(0); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data - .getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - } - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName() + ".log", - "\r\n " + msg + " >>>>>>> " - + DateForm.date2StringBysecond(new Date()) - + "\r\n\r\n\n"); - } - } - isSuccess = true; - } catch (Exception e) { - new CustomException(Custom4exception.OracleSQL_Except, e); - } - return isSuccess; - } - - @Override - public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) - throws Exception { - boolean isConnect = false; - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), - oracleModel.getPassword(), null); - if (null == conn) { - isConnect = false; - throw new CustomException(Custom4exception.connect_Oracle_Except, - null, oracleModel); - // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + - // "]\r\n"); - } else { - isConnect = oracleExtract.testConnect(conn); - conn.close(); // 关闭连接 - } - return isConnect; - } - - @Override - public boolean extractStandardTable(String name, - List dataInfolist, - GatherOracleInfo oracleConnect) throws Exception { - boolean isSuccess = false; - // map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - // 采集库连接参数 - List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), - oracleModel.getPassword(), dataInfolist.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: [" - + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try { - if (null != collectOracle.getDataId() - && !"".equals(collectOracle.getDataId())) { - data.setId(Integer.valueOf(collectOracle.getDataId())); - // 设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setStandardExtractStatus("1"); - dataInfoDao.update(data); - collectOracle.setName("CQ" - + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=1"; - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName() + ".log", - "\r\n 开始抽取标准表 \r\n " - + DateForm.date2StringBysecond(new Date()) - + "\n\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "1"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink - oracleExtract.createStardardTableSpace(conn, collectOracle, - oracleModel); // 创建表空间 - oracleExtract.createOnlyUser(conn, collectOracle, - oracleModel);// 创建 抽取标准表的 用户并授权 - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - if (null != tmpdata) { - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata - .getPayResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata - .getPayResultLast())) { - // 抽取中 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try { - oracleExtract.extractStandardPayTable(conn, - collectOracle, oracleModel);// 执行抽取 - } catch (Exception e) { - // 改回 校验存在的状态 - data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - // 抽取成功 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata - .getExecResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata - .getExecResultLast())) { - // 抽取中 - data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try { - oracleExtract.extractStandardExecTable(conn, - collectOracle, oracleModel);// 执行抽取 - } catch (Exception e) { - // 改回 校验存在的状态 - data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "2"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); - data.setStandardExtractStatus("2"); - dataInfoDao.update(data); - DataInfoEntity tmpSrcData = dataInfoDao.findById(data - .getId()); - data.setId(tmpSrcData.getSrcId()); - dataInfoDao.update(data); - } - } - } catch (Exception e) { - - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=0"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - data.setStandardExtractStatus("0"); - dataInfoDao.update(data); - DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId()); - data.setId(tmpSrcData.getSrcId()); - dataInfoDao.update(data); - log.error(Custom4exception.OracleSQL_Except, e); - } finally { - // 去掉保存的当前数据id, - CacheSetCantDelete.removeStandardId(collectOracle.getDataId()); - String msg = "抽取标准表结束"; - if (!"2".equals(data.getStandardExtractStatus())) { - msg += " 抽取有异常!状态重置为待抽取 "; - data.setStandardExtractStatus("0"); - dataInfoDao.update(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.update(data); - } - // sql日志记录时间: - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName() + ".log", - " " + msg + " >>>>> " - + DateForm.date2StringBysecond(new Date()) - + "\r\n\r\n\n"); - } - } - isSuccess = true; - return isSuccess; - } - - @Override - public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { - DataInfoEntity data = new DataInfoEntity(); - data.setId(Integer.valueOf(ocp.getDataId())); - // 设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setExtractStatus(status); - try { - dataInfoDao.update(data); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - // @Override - // public boolean extractOracle(String name, List - // dataInfos, GatherOracleInfo oracleConnect) throws Exception { - // boolean isSuccess = false; - // try{ - // //map转 bean(汇总库信息-带tableName的) - // // GatherOracleInfo oracleModel = (GatherOracleInfo) - // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); - // - // //采集库连接参数 - // // List datainfos = new - // ArrayList(); - // // for (Map map : dataInfoMap) { - // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) - // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); - // // datainfos.add(dataInfoEntity); - // // } - // - // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" - // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" - // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), - // oracleConnect.getPassword()); - // - // for (OracleConnectorParams collectOracle : dataInfos) { - // - // oracleExtract.createDBLink(conn, collectOracle); - // oracleExtract.createTableSpace(conn, oracleConnect); - // oracleExtract.createUser(conn, oracleConnect); - // oracleExtract.extractColleDB(conn, collectOracle); - // } - // isSuccess = true; - // }catch(Exception e){ - // - // } - // return false; - // } - -} +package com.platform.service.impl; + +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import javax.annotation.Resource; + +import org.apache.log4j.Logger; +import org.springframework.stereotype.Service; + +import com.base.Custom4exception; +import com.base.CustomException; +import com.platform.dao.DataInfoDao; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.service.IOracleExtractService; +import com.platform.service.OracleExtractHelper; +import com.platform.utils.CacheSetCantDelete; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.DateForm; +import com.platform.utils.FileOperateHelper; + +@Service(value = "OracleExtract") +public class OracleExtractServiceImpl implements IOracleExtractService { + + /** + * 日志 + */ + public final static Logger log = Logger + .getLogger(OracleExtractServiceImpl.class); + + @Resource(name = "dataInfoDao") + private DataInfoDao dataInfoDao; + + /** + * kubernetes client + */ + private SimpleKubeClient client = new SimpleKubeClient(); + /** + * 抽取 + */ + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + + /** + * 数据库连接实现类 + */ + private OracleConnector connect = new OracleConnector(); + + @Override + public boolean extractOracle(String name, + List datainfos, GatherOracleInfo oracleModel) + throws Exception { + boolean isSuccess = false; + try { + // map转 bean(汇总库信息-带tableName的) + // GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + // List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + datainfos.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + datainfos.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + DataInfoEntity data = new DataInfoEntity(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + data.setId(Integer.valueOf(collectOracle.getDataId())); + data.setExtractStatus(1); + dataInfoDao.updateExtract(data); + collectOracle.setName("J" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + + " isExtract=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始汇总 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createUser(conn, collectOracle, + oracleModel);// 创建用户并授权 + oracleExtract.extractColleDB(conn, collectOracle, + oracleModel);// 执行抽取 + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setExtractStatus(2); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + } + } catch (Exception e) { + String cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=0"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + data.setExtractStatus(0); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // 去掉保存的当前数据id, + CacheSetCantDelete.removeExtractId(collectOracle + .getDataId()); + String msg = "汇总结束"; + if (2 != data.getExtractStatus()) { + msg += " 汇总有异常!状态重置为待汇总 "; + data.setExtractStatus(0); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + } + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n " + msg + " >>>>>>> " + + DateForm.date2StringBysecond(new Date()) + + "\r\n\r\n\n"); + } + } + isSuccess = true; + } catch (Exception e) { + new CustomException(Custom4exception.OracleSQL_Except, e); + } + return isSuccess; + } + + @Override + public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) + throws Exception { + boolean isConnect = false; + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), null); + if (null == conn) { + isConnect = false; + throw new CustomException(Custom4exception.connect_Oracle_Except, + null, oracleModel); + // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + + // "]\r\n"); + } else { + isConnect = oracleExtract.testConnect(conn); + conn.close(); // 关闭连接 + } + return isConnect; + } + + @Override + public boolean extractStandardTable(String name, + List dataInfolist, + GatherOracleInfo oracleConnect) throws Exception { + boolean isSuccess = false; + // map转 bean(汇总库信息-带tableName的) + GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), dataInfolist.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: [" + + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + DataInfoEntity data = new DataInfoEntity(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + data.setId(Integer.valueOf(collectOracle.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setStandardExtractStatus("1"); + dataInfoDao.update(data); + collectOracle.setName("CQ" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=1"; + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 开始抽取标准表 \r\n " + + DateForm.date2StringBysecond(new Date()) + + "\n\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createStardardTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createOnlyUser(conn, collectOracle, + oracleModel);// 创建 抽取标准表的 用户并授权 + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + if (null != tmpdata) { + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getPayResultLast()) + || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata + .getPayResultLast())) { + // 抽取中 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardPayTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + // 抽取成功 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getExecResultLast()) + || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata + .getExecResultLast())) { + // 抽取中 + data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardExecTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); + data.setStandardExtractStatus("2"); + dataInfoDao.update(data); + DataInfoEntity tmpSrcData = dataInfoDao.findById(data + .getId()); + data.setId(tmpSrcData.getSrcId()); + dataInfoDao.update(data); + } + } + } catch (Exception e) { + + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=0"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + data.setStandardExtractStatus("0"); + dataInfoDao.update(data); + DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId()); + data.setId(tmpSrcData.getSrcId()); + dataInfoDao.update(data); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // 去掉保存的当前数据id, + CacheSetCantDelete.removeStandardId(collectOracle.getDataId()); + String msg = "抽取标准表结束"; + if (!"2".equals(data.getStandardExtractStatus())) { + msg += " 抽取有异常!状态重置为待抽取 "; + data.setStandardExtractStatus("0"); + dataInfoDao.update(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.update(data); + } + // sql日志记录时间: + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + " " + msg + " >>>>> " + + DateForm.date2StringBysecond(new Date()) + + "\r\n\r\n\n"); + } + } + isSuccess = true; + return isSuccess; + } + + @Override + public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(ocp.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setExtractStatus(status); + try { + dataInfoDao.update(data); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + // @Override + // public boolean extractOracle(String name, List + // dataInfos, GatherOracleInfo oracleConnect) throws Exception { + // boolean isSuccess = false; + // try{ + // //map转 bean(汇总库信息-带tableName的) + // // GatherOracleInfo oracleModel = (GatherOracleInfo) + // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); + // + // //采集库连接参数 + // // List datainfos = new + // ArrayList(); + // // for (Map map : dataInfoMap) { + // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) + // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); + // // datainfos.add(dataInfoEntity); + // // } + // + // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" + // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), + // oracleConnect.getPassword()); + // + // for (OracleConnectorParams collectOracle : dataInfos) { + // + // oracleExtract.createDBLink(conn, collectOracle); + // oracleExtract.createTableSpace(conn, oracleConnect); + // oracleExtract.createUser(conn, oracleConnect); + // oracleExtract.extractColleDB(conn, collectOracle); + // } + // isSuccess = true; + // }catch(Exception e){ + // + // } + // return false; + // } + +} diff --git a/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java b/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java index 0e01dd24..cb8ae0b2 100644 --- a/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java +++ b/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java @@ -1,335 +1,335 @@ -package com.platform.service.thread; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.ReplicationController; - -import java.sql.Connection; -import java.util.List; -import java.util.Set; - -import org.apache.log4j.Logger; - -import com.platform.dao.DataInfoDao; -import com.platform.entities.CheckoutEntity; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.utils.CacheOracleCheckoutEntity; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.FileOperateHelper; - -/** - * 校验oracle标准表是否存在 - * - * @author chen - * - */ -public class ThreadCheckoutStandardOracle extends Thread { - - public final static Logger log = Configs.CONSOLE_LOGGER - .getLogger(ThreadCheckoutStandardOracle.class); - - /** - * kuber 客户端 - */ - private SimpleKubeClient client; - - /** - * dataInfo 的数据持久层 - */ - private DataInfoDao dataInfoDao; - - public ThreadCheckoutStandardOracle(DataInfoDao dataInfoDao) { - this.setDaemon(true); - this.client = new SimpleKubeClient(); - this.dataInfoDao = dataInfoDao; - } - - @Override - public void run() { - try { - Thread.sleep(1000 * 5); - } catch (InterruptedException e2) { - log.error(e2); - } - // 循环11次,每次休眠 - for (int i = 0; i < 11; i++) { - try { - // 数据的 keys (=kuber的应用名称 taskName) - Set taskNames = CacheOracleCheckoutEntity - .getCheckKeys(); - int lengs = taskNames.size(); - if (lengs == 0) { - break; - } - String[] taskNamekeys = taskNames.toArray(new String[lengs]); - StringBuffer sbtask = new StringBuffer(); - for (int j = 0; j < taskNamekeys.length; j++) { - sbtask.append(taskNamekeys[j]).append("\t"); - } - log.info("replicationController标签: " + sbtask.toString()); - for (String key : taskNamekeys) { - - // 获得 kuber的 pod - Pod tmpPod = filterPod(key); - if (null == tmpPod) { - log.info("replicationController标签: " + key - + " 的 pod 节点不存在!"); - if (i > 5) { - CacheOracleCheckoutEntity.checkRemove(key); - } - continue; - } - // 尝试 连接 oracle - connectOracle(tmpPod, key); - - if (i == 10) { - - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + key + " status=1"; - - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 失败]"); - log.info(sb.toString()); - - - String cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=0"; - List rList2 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - StringBuffer sb2 = new StringBuffer(); - for (String str : rList2) - sb2.append(str).append("\n"); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 未校验]"); - log.info(sb2.toString()); - // 当前key标签对应的 数据服务的记录 - CheckoutEntity tmp = CacheOracleCheckoutEntity - .getCheck(key); - tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO); - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO); - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO); - try { - // 更新数据库 - this.updateDataInfo(tmp); - } catch (Exception e) { - log.error(e); - } - } - } - } catch (Exception e1) { - log.error(e1.getStackTrace()); - } finally { - try { - Thread.sleep(1000 * 60); - } catch (InterruptedException e) { - log.error(e); - } - } - } - } - - /** - * 查找 pod 节点 - * - * @param taskName - * @return - */ - private Pod filterPod(String taskName) { - Pod pod = null; - ReplicationController replicationController = client - .getReplicationController(taskName); - if (null != replicationController) { - List filterPods = client - .getPodsForApplicaList(replicationController); - if (filterPods != null && filterPods.size() > 0) { - pod = filterPods.get(0); - } - } - return pod; - } - - /** - * 尝试连接 oracle服务 - * - * @param tmpPod - * @param key - */ - private void connectOracle(Pod pod, String key) throws Exception { - if (pod != null) { - String ip = client.getPodHostIp(pod); - int port = client.getPodContainerport(pod); - if (ip != null && port != 0) { - String url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" - + Configs.ORACLE_ORCL; - boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 - Configs.ORACLE_USER, Configs.ORACLE_PSW); - log.info("url:" + url + ",user:" + Configs.ORACLE_USER - + ",password:" + Configs.ORACLE_PSW); - String message = "失败"; - - - String cmd3 = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key + " checkoutFlag=0"; - - List rList3 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd3); - StringBuffer sb3 = new StringBuffer(); - for (String str : rList3) - sb3.append(str).append("\n"); - log.info(sb3.toString()); - log.info("更新replicationController标签: " + key + "\t[标签更新为:未校验]"); - - if (flag) { - - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + key + " status=2"; - - // 设置服务为 成功 - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info(sb.toString()); - message = "成功"; - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 成功]"); - // 校验标签 - - String cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=2"; - List rList2 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - StringBuffer sb2 = new StringBuffer(); - for (String str : rList2) - sb2.append(str).append("\n"); - log.info(sb2.toString()); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 校验中]"); - // 获得当前 服务对应的 数据 - CheckoutEntity tmp = CacheOracleCheckoutEntity - .getCheck(key); - if (null != tmp) { - CacheOracleCheckoutEntity.putExtract(key, tmp); - } - // 查询 对应的 2 个标准表 - OracleConnectorParams oc = new OracleConnectorParams(); - String logName = tmp.getAreaCode().toLowerCase() + "_" - + tmp.getSysCode() + "_" + tmp.getDataVersion(); - oc.setName(logName); - try { - Connection conn = OracleConnector.connectionBuilder( - url, Configs.ORACLE_USER, Configs.ORACLE_PSW, - oc); - // 支付--校验 - if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp - .getPayResultLast())) { - String paySql = "select * from dba_tables where owner = '" - + Configs.COLLECT_STANDARD_TABLE_USER - .toUpperCase() - + "' and table_name = '" - + Configs.COLLECT_PAY_TABLE.toUpperCase() - + "'"; - if (OracleConnector - .execUpdateOracleSQL( - conn, - paySql, - FileOperateHelper - .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) - + logName + "jy.log")) { - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); - } else { - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR); - } - } - // 可执行-- 校验 - if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp - .getExecResultLast())) { - String execSql = "select * from dba_tables where owner = '" - + Configs.COLLECT_STANDARD_TABLE_USER - .toUpperCase() - + "' and table_name = '" - + Configs.COLLECT_EXEC_TABLE.toUpperCase() - + "'"; - if (OracleConnector - .execUpdateOracleSQL( - conn, - execSql, - FileOperateHelper - .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) - + logName + "jy.log")) { - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - } else { - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR); - } - } - tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE); - // 更新数据库data_info - updateDataInfo(tmp); - // 将 CacheOracleCheckoutEntity checkMap中的 数据 - // 放入extractMap中 - /* CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity - .checkRemove(key)/* ) */; - // 更新kuber状态 - - cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=1"; - // client.updateOrAddReplicasLabelById(taskNSyame, - // "status", "2"); - rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info(sb.toString()); - message = "成功"; - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 已校验]"); - } catch (Exception e) { - log.error(e); - } - // 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录 - CacheOracleCheckoutEntity.checkRemove(key); - } - log.info("连接到数据库服务: " + key + "\t[连接结果: " + message + "]"); - } - - } - } - - /** - * 更新 dataInfo表 - * - * @param checkoutEntity - * @throws Exception - */ - private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception { - DataInfoEntity data = new DataInfoEntity(); - // 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO - data.setId(checkoutEntity.getDataId()); - data.setPayResultLast(checkoutEntity.getPayResultLast()); - data.setExecResultLast(checkoutEntity.getExecResultLast()); - data.setCheckoutFlag(checkoutEntity.getCheckoutFlag()); - dataInfoDao.update(data); - DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity - .getDataId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.update(data); - } -} +package com.platform.service.thread; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.ReplicationController; + +import java.sql.Connection; +import java.util.List; +import java.util.Set; + +import org.apache.log4j.Logger; + +import com.platform.dao.DataInfoDao; +import com.platform.entities.CheckoutEntity; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.utils.CacheOracleCheckoutEntity; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.FileOperateHelper; + +/** + * 校验oracle标准表是否存在 + * + * @author chen + * + */ +public class ThreadCheckoutStandardOracle extends Thread { + + public final static Logger log = Configs.CONSOLE_LOGGER + .getLogger(ThreadCheckoutStandardOracle.class); + + /** + * kuber 客户端 + */ + private SimpleKubeClient client; + + /** + * dataInfo 的数据持久层 + */ + private DataInfoDao dataInfoDao; + + public ThreadCheckoutStandardOracle(DataInfoDao dataInfoDao) { + this.setDaemon(true); + this.client = new SimpleKubeClient(); + this.dataInfoDao = dataInfoDao; + } + + @Override + public void run() { + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException e2) { + log.error(e2); + } + // 循环11次,每次休眠 + for (int i = 0; i < 11; i++) { + try { + // 数据的 keys (=kuber的应用名称 taskName) + Set taskNames = CacheOracleCheckoutEntity + .getCheckKeys(); + int lengs = taskNames.size(); + if (lengs == 0) { + break; + } + String[] taskNamekeys = taskNames.toArray(new String[lengs]); + StringBuffer sbtask = new StringBuffer(); + for (int j = 0; j < taskNamekeys.length; j++) { + sbtask.append(taskNamekeys[j]).append("\t"); + } + log.info("replicationController标签: " + sbtask.toString()); + for (String key : taskNamekeys) { + + // 获得 kuber的 pod + Pod tmpPod = filterPod(key); + if (null == tmpPod) { + log.info("replicationController标签: " + key + + " 的 pod 节点不存在!"); + if (i > 5) { + CacheOracleCheckoutEntity.checkRemove(key); + } + continue; + } + // 尝试 连接 oracle + connectOracle(tmpPod, key); + + if (i == 10) { + + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + key + " status=1"; + + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 失败]"); + log.info(sb.toString()); + + + String cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=0"; + List rList2 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + StringBuffer sb2 = new StringBuffer(); + for (String str : rList2) + sb2.append(str).append("\n"); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 未校验]"); + log.info(sb2.toString()); + // 当前key标签对应的 数据服务的记录 + CheckoutEntity tmp = CacheOracleCheckoutEntity + .getCheck(key); + tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO); + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO); + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO); + try { + // 更新数据库 + this.updateDataInfo(tmp); + } catch (Exception e) { + log.error(e); + } + } + } + } catch (Exception e1) { + log.error(e1.getStackTrace()); + } finally { + try { + Thread.sleep(1000 * 60); + } catch (InterruptedException e) { + log.error(e); + } + } + } + } + + /** + * 查找 pod 节点 + * + * @param taskName + * @return + */ + private Pod filterPod(String taskName) { + Pod pod = null; + ReplicationController replicationController = client + .getReplicationController(taskName); + if (null != replicationController) { + List filterPods = client + .getPodsForApplicaList(replicationController); + if (filterPods != null && filterPods.size() > 0) { + pod = filterPods.get(0); + } + } + return pod; + } + + /** + * 尝试连接 oracle服务 + * + * @param tmpPod + * @param key + */ + private void connectOracle(Pod pod, String key) throws Exception { + if (pod != null) { + String ip = client.getPodHostIp(pod); + int port = client.getPodContainerport(pod); + if (ip != null && port != 0) { + String url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + + Configs.ORACLE_ORCL; + boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 + Configs.ORACLE_USER, Configs.ORACLE_PSW); + log.info("url:" + url + ",user:" + Configs.ORACLE_USER + + ",password:" + Configs.ORACLE_PSW); + String message = "失败"; + + + String cmd3 = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + " checkoutFlag=0"; + + List rList3 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd3); + StringBuffer sb3 = new StringBuffer(); + for (String str : rList3) + sb3.append(str).append("\n"); + log.info(sb3.toString()); + log.info("更新replicationController标签: " + key + "\t[标签更新为:未校验]"); + + if (flag) { + + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + key + " status=2"; + + // 设置服务为 成功 + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info(sb.toString()); + message = "成功"; + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 成功]"); + // 校验标签 + + String cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=2"; + List rList2 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + StringBuffer sb2 = new StringBuffer(); + for (String str : rList2) + sb2.append(str).append("\n"); + log.info(sb2.toString()); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 校验中]"); + // 获得当前 服务对应的 数据 + CheckoutEntity tmp = CacheOracleCheckoutEntity + .getCheck(key); + if (null != tmp) { + CacheOracleCheckoutEntity.putExtract(key, tmp); + } + // 查询 对应的 2 个标准表 + OracleConnectorParams oc = new OracleConnectorParams(); + String logName = tmp.getAreaCode().toLowerCase() + "_" + + tmp.getSysCode() + "_" + tmp.getDataVersion(); + oc.setName(logName); + try { + Connection conn = OracleConnector.connectionBuilder( + url, Configs.ORACLE_USER, Configs.ORACLE_PSW, + oc); + // 支付--校验 + if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp + .getPayResultLast())) { + String paySql = "select * from dba_tables where owner = '" + + Configs.COLLECT_STANDARD_TABLE_USER + .toUpperCase() + + "' and table_name = '" + + Configs.COLLECT_PAY_TABLE.toUpperCase() + + "'"; + if (OracleConnector + .execUpdateOracleSQL( + conn, + paySql, + FileOperateHelper + .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) + + logName + "jy.log")) { + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + } else { + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR); + } + } + // 可执行-- 校验 + if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp + .getExecResultLast())) { + String execSql = "select * from dba_tables where owner = '" + + Configs.COLLECT_STANDARD_TABLE_USER + .toUpperCase() + + "' and table_name = '" + + Configs.COLLECT_EXEC_TABLE.toUpperCase() + + "'"; + if (OracleConnector + .execUpdateOracleSQL( + conn, + execSql, + FileOperateHelper + .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) + + logName + "jy.log")) { + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + } else { + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR); + } + } + tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE); + // 更新数据库data_info + updateDataInfo(tmp); + // 将 CacheOracleCheckoutEntity checkMap中的 数据 + // 放入extractMap中 + /* CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity + .checkRemove(key)/* ) */; + // 更新kuber状态 + + cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=1"; + // client.updateOrAddReplicasLabelById(taskNSyame, + // "status", "2"); + rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info(sb.toString()); + message = "成功"; + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 已校验]"); + } catch (Exception e) { + log.error(e); + } + // 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录 + CacheOracleCheckoutEntity.checkRemove(key); + } + log.info("连接到数据库服务: " + key + "\t[连接结果: " + message + "]"); + } + + } + } + + /** + * 更新 dataInfo表 + * + * @param checkoutEntity + * @throws Exception + */ + private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception { + DataInfoEntity data = new DataInfoEntity(); + // 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO + data.setId(checkoutEntity.getDataId()); + data.setPayResultLast(checkoutEntity.getPayResultLast()); + data.setExecResultLast(checkoutEntity.getExecResultLast()); + data.setCheckoutFlag(checkoutEntity.getCheckoutFlag()); + dataInfoDao.update(data); + DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity + .getDataId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.update(data); + } +}