You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
aggregation-platform/src/com/platform/service/thread/ThreadCheckoutStandardOracl...

269 lines
9.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package com.platform.service.thread;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.ReplicationController;
import java.sql.Connection;
import java.util.List;
import java.util.Set;
import com.base.CustomException;
import com.platform.dao.DataInfoDao;
import com.platform.entities.CheckoutEntity;
import com.platform.entities.DataInfoEntity;
import com.platform.entities.OracleConnectorParams;
import com.platform.kubernetes.SimpleKubeClient;
import com.platform.oracle.OracleConnector;
import com.platform.utils.CacheOracleCheckoutEntity;
import com.platform.utils.Configs;
import com.platform.utils.Constant;
import com.platform.utils.FileOperateHelper;
/** 校验oracle标准表是否存在
* @author chen
*
*/
public class ThreadCheckoutStandardOracle extends Thread {
/**
* kuber 客户端
*/
private SimpleKubeClient client;
/**
* dataInfo 的数据持久层
*/
private DataInfoDao dataInfoDao;
public ThreadCheckoutStandardOracle(DataInfoDao dataInfoDao) {
this.client = new SimpleKubeClient();
this.dataInfoDao = dataInfoDao;
}
@Override
public void run() {
//循环11次每次休眠
for (int i = 0; i < 11; i++) {
// 数据的 keys =kuber的应用名称 taskName)
Set<String> taskNames = CacheOracleCheckoutEntity.getCheckKeys();
int lengs = taskNames.size();
if (lengs == 0) {
break;
}
String[] taskNamekeys = taskNames.toArray(new String[lengs]);
for (String key : taskNamekeys) {
String cmd3 = "kubectl annotate --overwrite rc " + key
+ " checkoutFlag=0";
List<String> rList3 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd3);
StringBuffer sb3 = new StringBuffer();
for (String str : rList3)
sb3.append(str).append("\n");
Configs.CONSOLE_LOGGER.info("更新replicationController标签 "
+ key + "\t[标签更新为: 失败]");
Configs.CONSOLE_LOGGER.info(sb3.toString());
// 获得 kuber的 pod
Pod tmpPod = filterPod(key);
if (null == tmpPod) {
CacheOracleCheckoutEntity.checkRemove(key);
continue;
}
// 尝试 连接 oracle
connectOracle(tmpPod, key);
if (i == 10) {
String cmd = "kubectl label --overwrite rc " + key
+ " status=1";
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String str : rList)
sb.append(str).append("\n");
Configs.CONSOLE_LOGGER.info("更新replicationController标签 "
+ key + "\t[标签更新为: 失败]");
Configs.CONSOLE_LOGGER.info(sb.toString());
String cmd2 = "kubectl annotate --overwrite rc " + key
+ " checkoutFlag=0";
List<String> rList2 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd2);
StringBuffer sb2 = new StringBuffer();
for (String str : rList2)
sb2.append(str).append("\n");
Configs.CONSOLE_LOGGER.info("更新replicationController标签 "
+ key + "\t[标签更新为: 失败]");
Configs.CONSOLE_LOGGER.info(sb2.toString());
//更新数据库CheckoutEntity tmp = CacheOracleCheckoutEntity.getCheck(key);
CheckoutEntity tmp = CacheOracleCheckoutEntity.getCheck(key);
tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO);
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO);
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO);
try {
this.updateDataInfo(tmp);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
try {
Thread.sleep(1000*60);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/** 查找 pod 节点
* @param taskName
* @return
*/
private Pod filterPod(String taskName) {
Pod pod = null;
ReplicationController replicationController = client
.getReplicationController(taskName);
if (null != replicationController) {
List<Pod> filterPods = client
.getPodsForApplicaList(replicationController);
if (filterPods != null && filterPods.size() > 0) {
pod = filterPods.get(0);
}
}
return pod;
}
/** 尝试连接 oracle服务
* @param tmpPod
* @param key
*/
private void connectOracle(Pod pod, String key) {
if (pod != null) {
String ip = client.getPodHostIp(pod);
int port = client.getPodContainerport(pod);
if (ip != null && port != 0) {
String url = "jdbc:oracle:thin:@" + ip + ":" + port
+ ":" + Configs.ORACLE_ORCL;
boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数true标示连接成功false标示连接失败
Configs.ORACLE_USER, Configs.ORACLE_PSW);
Configs.CONSOLE_LOGGER.info("url:" + url + ",user:"
+ Configs.ORACLE_USER + ",password:"
+ Configs.ORACLE_PSW);
String message = "失败";
String cmd3 = "kubectl annotate --overwrite rc "
+ key + " checkoutFlag=0";
List<String> rList3 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd3);
StringBuffer sb3 = new StringBuffer();
for (String str : rList3)
sb3.append(str).append("\n");
Configs.CONSOLE_LOGGER.info(sb3.toString());
if (flag) {
String cmd = "kubectl label --overwrite rc "
+ key + " status=2";
// client.updateOrAddReplicasLabelById(taskNSyame,
// "status", "2");
List<String> rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd);
StringBuffer sb = new StringBuffer();
for (String str : rList)
sb.append(str).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
message = "成功";
Configs.CONSOLE_LOGGER
.info("更新replicationController标签 "
+ key + "\t[标签更新为: 成功]");
String cmd2 = "kubectl annotate --overwrite rc "
+ key + " checkoutFlag=2";
List<String> rList2 = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd2);
StringBuffer sb2 = new StringBuffer();
for (String str : rList2)
sb2.append(str).append("\n");
Configs.CONSOLE_LOGGER.info(sb2.toString());
//
CheckoutEntity tmp = CacheOracleCheckoutEntity.getCheck(key);
if (null != tmp) {
CacheOracleCheckoutEntity.putExtract(key, tmp);
}
// TODO 查询 对应的 2 个标准表
OracleConnectorParams oc = new OracleConnectorParams();
String logName = tmp.getAreaCode().toLowerCase()+"_"+tmp.getSysCode()+"_"+tmp.getDataVersion();
oc.setName(logName);
try {
Connection conn = OracleConnector.ConnectionBuilder(url, Configs.ORACLE_USER, Configs.ORACLE_PSW, oc);
//支付--校验
if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp.getPayResultLast())) {
String paySql = "select * from dba_tables where owner = '"+Configs.COLLECT_STANDARD_TABLE_USER.toUpperCase()
+ "' and table_name = '"+Configs.COLLECT_PAY_TABLE.toUpperCase() +"'";
if(OracleConnector.execUpdateOracleSQL(conn, paySql,
FileOperateHelper.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)+ logName +"jy.log")){
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE);
}
else {
tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR);
}
}
//可执行-- 校验
if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp.getExecResultLast())) {
String execSql = "select * from dba_tables where owner = '"+Configs.COLLECT_STANDARD_TABLE_USER.toUpperCase()
+ "' and table_name = '"+Configs.COLLECT_EXEC_TABLE.toUpperCase() +"'";
if(OracleConnector.execUpdateOracleSQL(conn, execSql,
FileOperateHelper.addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION)+ logName +"jy.log")){
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE);
}else {
tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR);
}
}
tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE);
// 更新数据库data_info
updateDataInfo(tmp);
// 将 CacheOracleCheckoutEntity checkMap中的 数据 放入extractMap中
/*CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity.checkRemove(key)/*)*/;
// 更新kuber状态
cmd2 = "kubectl annotate --overwrite rc "
+ key + " checkoutFlag=1";
// client.updateOrAddReplicasLabelById(taskNSyame,
// "status", "2");
rList = Constant.ganymedSSH
.execCmdWaitAcquiescent(cmd2);
sb = new StringBuffer();
for (String str : rList)
sb.append(str).append("\n");
Configs.CONSOLE_LOGGER.info(sb.toString());
message = "成功";
Configs.CONSOLE_LOGGER
.info("更新replicationController标签 "
+ key + "\t[标签更新为: 成功]");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录
CacheOracleCheckoutEntity.checkRemove(key);
}
Configs.CONSOLE_LOGGER.info("连接到数据库服务: " + key
+ "\t[连接结果: " + message + "]");
}
}
}
/** 更新 dataInfo表
* @param checkoutEntity
* @throws Exception
*/
private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception {
DataInfoEntity data = new DataInfoEntity();
// 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO
data.setId(checkoutEntity.getDataId());
data.setPayResultLast(checkoutEntity.getPayResultLast());
data.setExecResultLast(checkoutEntity.getExecResultLast());
data.setCheckoutFlag(checkoutEntity.getCheckoutFlag());
dataInfoDao.update(data);
}
}