diff --git a/WebContent/WEB-INF/config/config.properties b/WebContent/WEB-INF/config/config.properties index 6bf45a62..440045d7 100644 --- a/WebContent/WEB-INF/config/config.properties +++ b/WebContent/WEB-INF/config/config.properties @@ -83,30 +83,30 @@ oracle-psw=oracle # windows path #============================================================================================================= -#extract-log-localtion=D:\\test\\log\\ -#extract-standard-log-localtion=D:\\test\\log2\\ -# -#file_upload_path=D:\\test\\ -#file_download_path=D:\\test\\export.xlsx -# -#package_download_path=D:\\test\\ -#package_name=sql_script_standard -# -#sql_script_path_last=D:\\test\\sql_script_last\\ -#sql_script_path_standard=D:\\test\\sql_script_standard\\ +extract-log-localtion=D:\\test\\log\\ +extract-standard-log-localtion=D:\\test\\log2\\ -#============================================================================================================= -# linux path -#============================================================================================================= +file_upload_path=D:\\test\\ +file_download_path=D:\\test\\export.xlsx -extract-log-localtion=/home/web_manage/log/ -extract-standard-log-localtion=/home/web_manage/log2/ +package_download_path=D:\\test\\ +package_name=sql_script_standard -file_upload_path=/excel_import_dir/ -file_download_path=/excel_export_dir/export.xlsx +sql_script_path_last=D:\\test\\sql_script_last\\ +sql_script_path_standard=D:\\test\\sql_script_standard\\ -package_download_path=/ -package_name=DefaultDescription +#============================================================================================================= +# linux path +#============================================================================================================= -sql_script_path_last=/DefaultDescription_last/ -sql_script_path_standard=/DefaultDescription/ +#extract-log-localtion=/home/web_manage/log/ +#extract-standard-log-localtion=/home/web_manage/log2/ +# +#file_upload_path=/excel_import_dir/ +#file_download_path=/excel_export_dir/export.xlsx +# +#package_download_path=/ +#package_name=DefaultDescription +# +#sql_script_path_last=/DefaultDescription_last/ +#sql_script_path_standard=/DefaultDescription/ diff --git a/src/com/platform/controller/DataModelController.java b/src/com/platform/controller/DataModelController.java index 29e34bf9..a1f4dca9 100644 --- a/src/com/platform/controller/DataModelController.java +++ b/src/com/platform/controller/DataModelController.java @@ -23,30 +23,32 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import com.base.BaseController; +import com.platform.entities.BasedTask; import com.platform.entities.DataInfoEntity; import com.platform.entities.DataInfoEntityMoveTmp; import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.entities.OracleExtractExecuter; import com.platform.form.PagerOptions; import com.platform.form.oracleForm; import com.platform.form.volumeMoveForm; -import com.platform.http.HttpUtils; import com.platform.service.DataInfoService; import com.platform.service.ICodeService; -import com.platform.service.IGfsService; +import com.platform.service.IGatherOracleService; import com.platform.service.ILogRead; import com.platform.service.IMoveDataService; -import com.platform.service.IGatherOracleService; import com.platform.service.IOracleExtractService; import com.platform.service.IVolumeService; +import com.platform.service.OracleExtractTask; import com.platform.service.OracleStatusService; -import com.platform.service.thread.ThreadExtractOracle; -import com.platform.service.thread.ThreadGainOracleConnect; import com.platform.utils.Constant; import com.platform.utils.UtilsHelper; -/** 数据管理 +/** + * 数据管理 + * * @author chen - * + * */ @Controller public class DataModelController extends BaseController { @@ -94,12 +96,14 @@ public class DataModelController extends BaseController { */ @Resource(name = "logReadService") private ILogRead logReadService; - + public void setDfsImp(DataInfoService dfs) { this.dfs = dfs; } - /** 数据管理--分页查询数据 + /** + * 数据管理--分页查询数据 + * * @param res * @param req * @return @@ -124,17 +128,19 @@ public class DataModelController extends BaseController { sb.append(str).append(":").append("null").append(","); } } - log.info(sb.deleteCharAt(sb.length() - 1) - .append("}").toString()); + log.info(sb.deleteCharAt(sb.length() - 1).append("}").toString()); PagerOptions pagerOptions = (PagerOptions) UtilsHelper .newObjAndSetAttrsByClass(PagerOptions.class, params); - pagerOptions.setCurrentPageNum(Integer.valueOf(params.get("currentPageNum"))); - //冷热区查询字段mark + pagerOptions.setCurrentPageNum(Integer.valueOf(params + .get("currentPageNum"))); + // 冷热区查询字段mark pagerOptions.setMark(pagerOptions.getVolumeType().trim()); return dfs.getPagerTableData(pagerOptions); } - /** 数据管理--删除数据 + /** + * 数据管理--删除数据 + * * @param res * @param req * @throws Exception @@ -149,7 +155,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 连接oracle + /** + * 连接oracle + * * @param res * @param req * @throws UnsupportedEncodingException @@ -165,7 +173,8 @@ public class DataModelController extends BaseController { log.info("执行连接\t" + rcName); String cmd = "kubectl label --overwrite rc " + rcName + " status=0"; - List rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); StringBuffer sb = new StringBuffer(); for (String string : rList) sb.append(string).append("\n"); @@ -175,7 +184,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 断开oracle连接 + /** + * 断开oracle连接 + * * @param res * @param req * @throws UnsupportedEncodingException @@ -196,7 +207,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** gfs的 volume节点的查询 + /** + * gfs的 volume节点的查询 + * * @return * @throws Exception */ @@ -208,7 +221,9 @@ public class DataModelController extends BaseController { return rest; } - /** 数据迁移功能 + /** + * 数据迁移功能 + * * @param res * @param req * @param form @@ -233,7 +248,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** oracle的 汇总功能 + /** + * oracle的 汇总功能 + * * @param res * @param req * @param form @@ -241,39 +258,32 @@ public class DataModelController extends BaseController { */ @RequestMapping(value = "/oracle/{name}/extract", method = RequestMethod.POST) public void oracleExtract(HttpServletRequest res, HttpServletResponse req, - @RequestBody oracleForm form) throws Exception { + @RequestBody oracleForm form) throws Exception { log.error("/oracle/{name}/extract"); - boolean isConnect = false; - //5秒内是否能获得oracle连接,否则认为超时。 - if (null != form.getTarget()) { - ThreadGainOracleConnect thOrcl = new ThreadGainOracleConnect(form, OracleExtract); - thOrcl.start(); - for (int i = 0; i < 10; i++) { - Thread.sleep(400); - isConnect = thOrcl.isConnect(); - if (isConnect) { - break; - } - else { - if (thOrcl.isExcept()) { - break; + if (null != form.getTarget()) { // 检查请求参数中是否包含汇总库信息 + boolean isConnect = OracleExtract.isConnectTotalOracle(form + .getTarget()); // 检查汇总库是否可以连接成功,连接成功返回200状态吗,连接失败返回500状态吗 + if (isConnect) { + req.setStatus(200); + if (null != form.getInneed() && form.getInneed().size() > 0) { + for (OracleConnectorParams oracleParams : form.getInneed()) { + BasedTask task = new OracleExtractTask( + oracleParams.getName(), form.getTarget(), + oracleParams, OracleExtract); + OracleExtractExecuter oee = new OracleExtractExecuter( + task); + new Thread(oee, oracleParams.getName()).start(); } - Thread.sleep(100); } - } - } - if (isConnect) - req.setStatus(200); - else - req.setStatus(500); - // 开始抽取数据到汇总库 - if (isConnect && null != form.getInneed() && form.getInneed().size() > 0) { - ThreadExtractOracle thExtra = new ThreadExtractOracle(form, OracleExtract); - thExtra.start(); + } else + req.setStatus(500); } + } - /** oracle汇总、抽取库的 查询 + /** + * oracle汇总、抽取库的 查询 + * * @return * @throws Exception */ @@ -285,7 +295,9 @@ public class DataModelController extends BaseController { return result; } - /** oracle汇总、抽取库的 删除 + /** + * oracle汇总、抽取库的 删除 + * * @param req * @param res * @param id @@ -301,7 +313,9 @@ public class DataModelController extends BaseController { res.setStatus(200); } - /** oracle汇总、抽取库的 新增 + /** + * oracle汇总、抽取库的 新增 + * * @param res * @param req * @param id @@ -319,7 +333,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** oracle汇总、抽取库的 更新 + /** + * oracle汇总、抽取库的 更新 + * * @param res * @param req * @param id @@ -337,7 +353,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 迁移 数据 的查询 + /** + * 迁移 数据 的查询 + * * @return * @throws Exception */ @@ -349,7 +367,9 @@ public class DataModelController extends BaseController { return result; } - /** 迁移完成后的 删除记录功能 + /** + * 迁移完成后的 删除记录功能 + * * @param res * @param req * @param id @@ -368,7 +388,9 @@ public class DataModelController extends BaseController { return result; } - /** 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) + /** + * 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) + * * @param res * @param req * @param move @@ -381,12 +403,14 @@ public class DataModelController extends BaseController { HttpServletResponse req, @RequestBody DataInfoEntity move) throws Exception { log.debug("---------/task/transfer/save-----------------------"); -// int result = dfs.save(move); + // int result = dfs.save(move); req.setStatus(200); return 1; } - /** 地区和系统的 code 对应的名称 的 获取 + /** + * 地区和系统的 code 对应的名称 的 获取 + * * @return * @throws Exception */ @@ -398,7 +422,9 @@ public class DataModelController extends BaseController { return result; } - /** oracle 汇总的 日志 读取 + /** + * oracle 汇总的 日志 读取 + * * @param name * @param res * @param req @@ -410,8 +436,8 @@ public class DataModelController extends BaseController { public Object getExtractLog(@RequestParam("rcName") String name, HttpServletRequest res, HttpServletResponse req) throws Exception { log.info("---------/oracle/extract/log-------------------"); - String result = logReadService.readLog(name); - // "查看相应日志" + String result = logReadService.readLog(name); + // "查看相应日志" Map log = new HashMap(); log.put(name, result); return log; diff --git a/src/com/platform/entities/AbstractOracleExtractTask.java b/src/com/platform/entities/AbstractOracleExtractTask.java new file mode 100644 index 00000000..bc530b07 --- /dev/null +++ b/src/com/platform/entities/AbstractOracleExtractTask.java @@ -0,0 +1,32 @@ +package com.platform.entities; + +public abstract class AbstractOracleExtractTask implements BasedTask { + private String name; + private int status; // 任务的执行状态,0未执行,1执行中,2完成,3失败 + + public AbstractOracleExtractTask() { + } + + public AbstractOracleExtractTask(String name) { + this.name = name; + } + + public abstract void handler(); + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + +} diff --git a/src/com/platform/entities/BasedTask.java b/src/com/platform/entities/BasedTask.java new file mode 100644 index 00000000..d7fc88dc --- /dev/null +++ b/src/com/platform/entities/BasedTask.java @@ -0,0 +1,5 @@ +package com.platform.entities; + +public interface BasedTask { + public void handler(); +} diff --git a/src/com/platform/entities/OracleExtractExecuter.java b/src/com/platform/entities/OracleExtractExecuter.java new file mode 100644 index 00000000..23cc20c7 --- /dev/null +++ b/src/com/platform/entities/OracleExtractExecuter.java @@ -0,0 +1,15 @@ +package com.platform.entities; + +public class OracleExtractExecuter implements Runnable { + private BasedTask task; + + public OracleExtractExecuter(BasedTask task) { + this.task = task; + } + + @Override + public void run() { + if (null != task) + task.handler(); + } +} diff --git a/src/com/platform/kubernetes/BasedPathEntity.java b/src/com/platform/kubernetes/BasedPathEntity.java new file mode 100644 index 00000000..86e1b072 --- /dev/null +++ b/src/com/platform/kubernetes/BasedPathEntity.java @@ -0,0 +1,5 @@ +package com.platform.kubernetes; + +public class BasedPathEntity { + +} diff --git a/src/com/platform/kubernetes/SimpleKubeClient.java b/src/com/platform/kubernetes/SimpleKubeClient.java index 28c0da14..815205c9 100644 --- a/src/com/platform/kubernetes/SimpleKubeClient.java +++ b/src/com/platform/kubernetes/SimpleKubeClient.java @@ -4,6 +4,8 @@ import io.fabric8.kubernetes.api.model.Container; import io.fabric8.kubernetes.api.model.ContainerPort; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.ReplicationController; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; import io.fabric8.kubernetes.client.Client; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; @@ -156,4 +158,27 @@ public class SimpleKubeClient { client.close(); } + /** + * + * @param rcId --> 资源Id + * @param ns --> 资源名字空间 + * @return + */ + public ReplicationController createReplicas(String rcId, String ns) { + if (checkClientNull()) + client = new DefaultKubernetesClient(Configs.KUBE_MASTER_URL); + ReplicationController rc = client.replicationControllers() + .inNamespace(ns).createNew().editMetadata().withName(rcId) + .endMetadata().done(); + return rc; + } + + public ReplicationController createReplicas(String rcId, String ns, Map selector,Volume volume, VolumeMount volumeMounts){ + if (checkClientNull()) + client = new DefaultKubernetesClient(Configs.KUBE_MASTER_URL); + //Container container = new Container("", "", "", "", "", null, null, "oracle", ports, null, null, null, false, false, null, false, volumeMounts, null); + //container.setName("oracle"); + //ReplicationController rController = client.replicationControllers().inNamespace(ns).createNew().editMetadata().withName(rcId).withLabels(selector).endMetadata().editSpec().withReplicas(1).withSelector(selector).editTemplate().editMetadata().withLabels(selector).endMetadata().editSpec().addNewContainer().a. + return null; + } } diff --git a/src/com/platform/oracle/OracleConnector.java b/src/com/platform/oracle/OracleConnector.java index 37f2465c..a870b9ee 100644 --- a/src/com/platform/oracle/OracleConnector.java +++ b/src/com/platform/oracle/OracleConnector.java @@ -1,175 +1,184 @@ -package com.platform.oracle; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.log4j.Logger; - -import com.base.Custom4exception; -import com.base.CustomException; -import com.platform.entities.OracleConnectorParams; -import com.platform.utils.Configs; -import com.platform.utils.FileOperateHelper; - -public class OracleConnector { - - public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleConnector.class); - - public OracleConnector() { - } - - static { - try { - Class.forName("oracle.jdbc.driver.OracleDriver"); - Configs.CONSOLE_LOGGER.info("Oracle驱动加载成功"); - } catch (ClassNotFoundException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - } - - - public synchronized static Connection connectionBuilder(String url, String user, - String password, OracleConnectorParams oc) throws CustomException { - Connection conn=null; - try { - conn = DriverManager.getConnection(url, user, password); - } catch (SQLException e) { - Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage() + "]"); - if (null != oc) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + oc.getName()+".log", "创建oracle连接失败: [" + e.getMessage() + "]\r\n"); - } - throw new CustomException(Custom4exception.OracleSQL_Except, e); - } - return conn; - } - - public synchronized static boolean canConnect(String url, String user, String password) { - Connection result = null; - try { - result = connectionBuilder(url, user, password, null); - } catch (CustomException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - return (null != result); - } - - public synchronized static ResultSet getSQLExecResultSet(Connection conn, String sql, String filePath) { - ResultSet resultSet = null; - if (null != filePath) { - filePath = filePath.replace(".log", ""); - } - Statement statement = null; - try { - statement = conn - .createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, - ResultSet.CONCUR_UPDATABLE); - resultSet = statement.executeQuery(sql); - - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+ "OK \r\n"); - } catch (SQLException e) { - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); - log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - } - } - - return resultSet; - } - - public synchronized static ResultSet getSQLExecResultSet(String url, String user, - String password, String sql, String filePath) { - ResultSet result = null; - try { - result = getSQLExecResultSet(connectionBuilder(url, user, password, null), sql, filePath); - } catch (CustomException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - return result; - } - - /** - * 执行对oracle数据库的增、删 - * @param conn - * @param sql - * @return true:执行的不返回集合数据的sql成功, 是否执行成功 - */ - public synchronized static boolean execOracleSQL(Connection conn, String sql, String filePath) { - if (null != filePath) { - filePath = filePath.replace(".log", ""); - } - boolean flag = false; - Statement statement = null; - try { - statement = conn.createStatement(); - flag =statement.execute(sql); - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+ flag +" \r\n"); - } catch (SQLException e) { - flag = false; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); - log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - } - } - - return flag; - } - /** - * 执行对oracle数据库的返回集合数据的sql - * @param conn - * @param sql - * @return true:执行结果大于1,即有数据 是否执行成功 - */ - public synchronized static boolean execUpdateOracleSQL(Connection conn, String sql, String filePath) { - if (null != filePath) { - filePath = filePath.replace(".log", ""); - } - boolean flag = false; - Statement statement = null; - try { - statement = conn.createStatement(); - if(statement.executeUpdate(sql) > 0) - { - flag = true; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+"OK \r\n"); - } - } catch (SQLException e) { - flag = false; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); - log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - if (null != statement) { - try { - statement.close(); - } catch (SQLException e) { - log.error(Custom4exception.OracleSQL_Except, e); - } - } - } - return flag; - } +package com.platform.oracle; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.log4j.Logger; + +import com.base.Custom4exception; +import com.base.CustomException; +import com.platform.entities.OracleConnectorParams; +import com.platform.utils.Configs; +import com.platform.utils.FileOperateHelper; + +public class OracleConnector { + + public static Logger log = Configs.DAILY_ROLLING_LOGGER + .getLogger(OracleConnector.class); + + public OracleConnector() { + } + + static { + try { + Class.forName("oracle.jdbc.driver.OracleDriver"); + Configs.CONSOLE_LOGGER.info("Oracle驱动加载成功"); + } catch (ClassNotFoundException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + + public synchronized static Connection connectionBuilder(String url, + String user, String password, OracleConnectorParams oc) + throws CustomException { + Connection conn = null; + try { + conn = DriverManager.getConnection(url, user, password); + } catch (SQLException e) { + Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage() + + "]"); + if (null != oc) { + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log", + "创建oracle连接失败: [" + e.getMessage() + "]\r\n"); + } + throw new CustomException(Custom4exception.OracleSQL_Except, e); + } + return conn; + } + + public synchronized static boolean canConnect(String url, String user, + String password) { + Connection result = null; + try { + result = connectionBuilder(url, user, password, null); + } catch (CustomException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + return (null != result); + } + + public synchronized static ResultSet getSQLExecResultSet(Connection conn, + String sql, String filePath) { + ResultSet resultSet = null; + if (null != filePath) { + filePath = filePath.replace(".log", ""); + } + Statement statement = null; + try { + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + resultSet = statement.executeQuery(sql); + /* + * if(resultSet.next()){ System.out.println(resultSet.getInt(1)); } + */ + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); + } catch (SQLException e) { + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + } + + return resultSet; + } + + public synchronized static ResultSet getSQLExecResultSet(String url, + String user, String password, String sql, String filePath) { + ResultSet result = null; + try { + result = getSQLExecResultSet( + connectionBuilder(url, user, password, null), sql, filePath); + } catch (CustomException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + return result; + } + + /** + * 执行对oracle数据库的增、删 + * + * @param conn + * @param sql + * @return true:执行的不返回集合数据的sql成功, 是否执行成功 + */ + public synchronized static boolean execOracleSQL(Connection conn, + String sql, String filePath) { + if (null != filePath) { + filePath = filePath.replace(".log", ""); + } + boolean flag = false; + Statement statement = null; + try { + statement = conn.createStatement(); + statement.executeUpdate(sql); + flag = true; + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + flag + + " \r\n"); + } catch (SQLException e) { + flag = false; + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + } + + return flag ; + } + + /** + * 执行对oracle数据库的返回集合数据的sql + * + * @param conn + * @param sql + * @return true:执行结果大于1,即有数据 是否执行成功 + */ + public synchronized static boolean execUpdateOracleSQL(Connection conn, + String sql, String filePath) { + if (null != filePath) { + filePath = filePath.replace(".log", ""); + } + boolean flag = false; + Statement statement = null; + try { + statement = conn.createStatement(); + statement.executeUpdate(sql); + flag = true; + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); + + } catch (SQLException e) { + flag = false; + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + } + return flag; + } } \ No newline at end of file diff --git a/src/com/platform/service/IOracleExtractService.java b/src/com/platform/service/IOracleExtractService.java index 73a47834..6d8347d5 100644 --- a/src/com/platform/service/IOracleExtractService.java +++ b/src/com/platform/service/IOracleExtractService.java @@ -32,5 +32,11 @@ public interface IOracleExtractService { */ boolean extractStandardTable(String name, List dataInfolist, GatherOracleInfo oracleConnect) throws Exception; + + /** + * + * @param ocp + */ + public void updateDataExtractStatus(OracleConnectorParams ocp, int status); } diff --git a/src/com/platform/service/OracleExtractHelper.java b/src/com/platform/service/OracleExtractHelper.java index bec4e858..4b7c0873 100644 --- a/src/com/platform/service/OracleExtractHelper.java +++ b/src/com/platform/service/OracleExtractHelper.java @@ -1,334 +1,429 @@ -package com.platform.service; - -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; - -import org.apache.log4j.Logger; - -import com.base.Custom4exception; -import com.platform.entities.GatherOracleInfo; -import com.platform.entities.OracleConnectorParams; -import com.platform.oracle.OracleConnector; -import com.platform.utils.Configs; -import com.platform.utils.FileOperateHelper; - -public class OracleExtractHelper { - - public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleExtractHelper.class); - - /** - * 判断dblink是否已经存在 - * - * @param conn - * @param linkName - * dblink的名称 - * @return - */ - private boolean hasSameNameDBLink(Connection conn, String linkName, String filePath) { - - boolean flag = false; - String sql = "SELECT * FROM ALL_DB_LINKS WHERE DB_LINK='" + linkName+"'"; - ResultSet rSet = null; - try { - rSet = OracleConnector.getSQLExecResultSet(conn, sql, null); - rSet.last(); - if (rSet.getRow() > 0) - flag = true; - FileOperateHelper - .fileWrite(filePath, sql+ "\r\n"+"OK \r\n"); - } catch (Exception e) { - FileOperateHelper - .fileWrite(filePath, sql+ "\r\n"+ e.getMessage() + "\r\n,连接异常! \r\n"); - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - return flag; - } - - /** - * 创建dblink - * - * @param conn 汇总库的连接 - * @param oc 采集库 - * dblink连接参数实体 - */ - public void createDBLink(Connection conn, OracleConnectorParams oc) { - String linkName = "LINKTO" + oc.getName(); - String sql = "CREATE PUBLIC DATABASE LINK " - + linkName - + " CONNECT TO " - + oc.getUser() - + " IDENTIFIED BY " - + oc.getPassword() - + " USING \'(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = " - + oc.getIp() + ")(PORT = " + oc.getPort() - + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() - + ")))\'"; - if (null != oc) { - if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink - String deleteSql = "DROP PUBLIC DATABASE LINK " - + linkName; - //删除 dblink - if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } - } else { - Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n"); - } - - } else { - // 否则,创建dblink - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } - } - } - } - - /** - * 创建表空间 - * - * @param conn 汇总库连接 - * @param collectOracle - * @param oc 汇总库信息 - * @return - */ - public boolean createTableSpace(Connection conn, OracleConnectorParams collectOracle, GatherOracleInfo oc) { - String tmpSql = "select TABLESPACE_NAME from dba_tablespaces where TABLESPACE_NAME = '" - + oc.getTableName() + "'"; - // 存在 表空间 - if (OracleConnector.execUpdateOracleSQL(conn, tmpSql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) { - return true; - } else { - String sql = "create tablespace " + oc.getTableName() - + " datafile '" + Configs.GATHER_TABLESPACE_PATH - + oc.getTableName() + ".dbf" - + "' size 512M autoextend on next 512M maxsize unlimited"; - return OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName()); - } - } - - /** - * 创建用户并授权 - * - * @param conn 汇总库连接 - * @param oc 汇总库信息 - */ - public void createUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = oc.getName() + totalOracle.getSuffix(); - - String sql = "Create user " + strTUser + " default tablespace " - + totalOracle.getTableName() + " identified by " - + Configs.GATHER_TABLE_PASSWORD; - String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } - - /** - * 创建用户并授权 - * - * @param conn 汇总库连接 - * @param oc 汇总库信息 - */ - public void createOnlyUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = Configs.GATHER_STANDARD_USER_NAME; - - String sql = "Create user " + strTUser + " default tablespace " - + totalOracle.getTableName() + " identified by " - + Configs.GATHER_TABLE_PASSWORD; - String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } - - /** - * 创建用户并授权 - * - * @param conn 汇总库连接 - * @param oc 汇总库信息 - */ - public void createTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = oc.getName() + totalOracle.getSuffix(); - - String sql = "Create table " + strTUser + " default tablespace " - + totalOracle.getTableName() + " identified by " - + Configs.GATHER_TABLE_PASSWORD; - String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } - - /** - * 执行汇总操作 - * - * @param conn 汇总库连接 - * @param oc 采集库 - */ - public void extractColleDB(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = oc.getName() + totalOracle.getSuffix(); - String sql = "SELECT 'create table " - + strTUser - + ".J'|| substr(t.OWNER||'_'||t.TABLE_NAME,0,29)||' as select * from '||t.OWNER||'.'||t.TABLE_NAME||" - + "'@LINKTO" - + oc.getName() - + ";' FROM dba_tables@LINKTO" - + oc.getName() - + " t WHERE t.TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')" - + " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO" - + oc.getName() + " where 'data_type'='CLOB')"; - ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - try { - while (rsSet.next()) { - try { - rsSet.getRow(); - String resultSql = rsSet.getString(1).replace(";", ""); - OracleConnector.execUpdateOracleSQL(conn, resultSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } catch (SQLException e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - } - } catch (SQLException e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - - } - - /** 测试是否能否连接上? - * @return true:能连接上 - */ - public boolean testConnect(Connection conn) { - String testSql = "select count(*) from user_tables"; - boolean flag = false; - try { - Statement statement = conn.createStatement(); - if(statement.executeUpdate(testSql) > 0) - flag = true; - } catch (SQLException e) { - flag = false; - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - return flag; - } - - /** - * 执行抽取操作--支付表 - * - * @param conn 汇总库连接 - * @param oc 采集库 - */ - public void extractStandardPayTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = Configs.GATHER_STANDARD_USER_NAME; - createPay(conn, oc); - String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from " - + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME +"@LINKTOST" - + oc.getName(); - String resultSql = sql.replace(";", ""); - OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } - - /** 执行抽取操作--执行表 - * @param conn - * @param collectOracle - * @param oracleModel - */ - public void extractStandardExecTable(Connection conn, - OracleConnectorParams collectOracle, GatherOracleInfo oracleModel) { - createExec(conn, collectOracle); - String strTUser = Configs.GATHER_STANDARD_USER_NAME; - - String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from " - + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME +"@LINKTOST" - + collectOracle.getName(); - String resultSql = sql.replace(";", ""); - OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + collectOracle.getName()); - - } - - - - private void createPay(Connection conn, OracleConnectorParams oc) { - String payCmd = "CREATE TABLE u_bzbjy.zfxxb(XZQHDM Varchar(255),XZQHMC Varchar(255),PZBH Varchar(255),LYZBKZH Varchar(255)," - + "ZFDATE Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),YWGKCS Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255)," - +"XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," - +"JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255)," - + "SKRZHZH Varchar(255),FKZHCODE Varchar(255),FKZHNAME Varchar(255),FKYHCODE Varchar(255),FKYHNAME Varchar(255),QSZHCODE Varchar(255)," - + "QSZHNAME Varchar(255),QSYHCODE Varchar(255),QSYHNAME Varchar(255),JE Numeric(18,2), SFTK Varchar(255),NIAN Varchar(255),ZY Varchar(255))"; - - try { - OracleConnector.execOracleSQL(conn, payCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } catch (Exception e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - } - - /** - * 创建dblink - * - * @param conn 汇总库的连接 - * @param oc 采集库 - * dblink连接参数实体 - */ - public void createStandardDBLink(Connection conn, OracleConnectorParams oc) { - String linkName = "LINKTOST" + oc.getName(); - String sql = "CREATE PUBLIC DATABASE LINK " - + linkName - + " CONNECT TO " - + oc.getUser() - + " IDENTIFIED BY " - + oc.getPassword() - + " USING \'(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = " - + oc.getIp() + ")(PORT = " + oc.getPort() - + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() - + ")))\'"; - if (null != oc) { - if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink - String deleteSql = "DROP PUBLIC DATABASE LINK " - + linkName; - //删除 dblink - if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { -// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } - } else { - Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n"); - } - - } else { - // 否则,创建dblink -// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } - } - } - } - - private void createExec(Connection conn, OracleConnectorParams oc) { - String execCmd = "CREATE TABLE u_bzbjy.kzxzb(XZQHDM Varchar(255),XZQHMC Varchar(255),YSND Varchar(255),ZBCODE Varchar(255),ZB_ID Varchar(255),ZB_NO Varchar(255)," - + "ZBDJLXCODE Varchar(255),ZBDJLXNAME Varchar(255),ZBLXNAME Varchar(255),DOCNO Varchar(255),ZBSM Varchar(255),ZBFWDATE Varchar(255),ZBYSLXCODE Varchar(255)," - + "ZBYSLXNAME Varchar(255),ZBYSLYNAME Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," - + "JJFLNAME Varchar(255),ZBGLCSNAME Varchar(255),SZGLCODE Varchar(255),SZGLNAME Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255),GZBZ Varchar(255)," - + "JJBZ Varchar(255),CGBZ Varchar(255),ZFFSCODE Varchar(255),ZFFSNAME Varchar(255),JZZFBZ Varchar(255),ZBJE Numeric(18,2),ZBTJJE Numeric(18,2),ZBDJJE Numeric(18,2)," - + "ZBKYJE Numeric(18,2),ZYZFBZ Varchar(255),BZ Varchar(255))"; - - try { - OracleConnector.execOracleSQL(conn, execCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - } catch (Exception e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } - } -} +package com.platform.service; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.apache.log4j.Logger; + +import com.base.Custom4exception; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.oracle.OracleConnector; +import com.platform.utils.Configs; +import com.platform.utils.FileOperateHelper; + +public class OracleExtractHelper { + + public static Logger log = Logger.getLogger(OracleExtractHelper.class); + + /** + * 判断dblink是否已经存在 + * + * @param conn + * @param linkName + * dblink的名称 + * @return + */ + private boolean hasSameNameDBLink(Connection conn, String linkName, + String filePath) { + + boolean flag = false; + String sql = "SELECT COUNT(*) c FROM ALL_DB_LINKS WHERE DB_LINK='" + + linkName + "'"; + ResultSet rSet = null; + Statement statement = null; + try { + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + rSet = statement.executeQuery(sql); + if (rSet.next()) { + if (rSet.getInt("c") > 0) + flag = true; + else + flag = false; + } + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); + + } catch (Exception e) { + FileOperateHelper.fileWrite(filePath, sql + "\r\n" + e.getMessage() + + "\r\n,连接异常! \r\n"); + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + } + return flag; + } + + /** + * 创建dblink + * + * @param conn + * 汇总库的连接 + * @param oc + * 采集库 dblink连接参数实体 + */ + public void createDBLink(Connection conn, OracleConnectorParams oc) { + String linkName = "LINKTO_J" + oc.getName().replaceAll("-", "_"); + String sql = "CREATE PUBLIC DATABASE LINK " + + linkName + + " CONNECT TO " + + oc.getUser() + + " IDENTIFIED BY " + + oc.getPassword() + + " USING \'(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = " + + oc.getIp() + ")(PORT = " + oc.getPort() + + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() + + ")))\'"; + if (null != oc) { + if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION + + oc.getName())) { // 如果dblink已经存在,先删除dblink,在创建dblink + String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName; + // 删除 dblink + if (OracleConnector.execOracleSQL(conn, deleteSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } + } else { + Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + oc.getName() + ".log", + "删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n"); + } + + } else { + // 否则,创建dblink + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } + } + } + } + + /** + * 创建表空间 + * + * @param conn + * 汇总库连接 + * @param collectOracle + * @param oc + * 汇总库信息 + * @return + */ + public boolean createTableSpace(Connection conn, + OracleConnectorParams collectOracle, GatherOracleInfo oc) { + String tmpSql = "select TABLESPACE_NAME from dba_tablespaces where TABLESPACE_NAME = '" + + oc.getTableName() + "'"; + // 存在 表空间 + if (OracleConnector.execUpdateOracleSQL(conn, tmpSql, + Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) { + return true; + } else { + String sql = "create tablespace " + oc.getTableName() + + " datafile '" + Configs.GATHER_TABLESPACE_PATH + + oc.getTableName() + ".dbf" + + "' size 512M autoextend on next 512M maxsize unlimited"; + return OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName()); + } + } + + /** + * 创建用户并授权 + * + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 + */ + public void createUser(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = "J" + oc.getName().replaceAll("-", "_") + + totalOracle.getSuffix(); + + String sql = "Create user " + strTUser + " default tablespace " + + totalOracle.getTableName() + " identified by " + + Configs.GATHER_TABLE_PASSWORD; + String grantSql = "grant connect, resource, dba to " + strTUser; + + OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } + + /** + * 创建用户并授权 + * + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 + */ + public void createOnlyUser(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = Configs.GATHER_STANDARD_USER_NAME; + + String sql = "Create user " + strTUser + " default tablespace " + + totalOracle.getTableName() + " identified by " + + Configs.GATHER_TABLE_PASSWORD; + String grantSql = "grant connect, resource, dba to " + strTUser; + + OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + } + + /** + * 创建用户并授权 + * + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 + */ + public void createTable(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = oc.getName() + totalOracle.getSuffix(); + + String sql = "Create table " + strTUser + " default tablespace " + + totalOracle.getTableName() + " identified by " + + Configs.GATHER_TABLE_PASSWORD; + String grantSql = "grant connect, resource, dba to " + strTUser; + + OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } + + /** + * 执行汇总操作 + * + * @param conn + * 汇总库连接 + * @param oc + * 采集库 + */ + public void extractColleDB(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = "J" + oc.getName().replaceAll("-", "_") + + totalOracle.getSuffix(); + String sql = "SELECT 'create table " + + strTUser + + ".J'|| substr(t.OWNER||'_'||t.TABLE_NAME,0,29)||' as select * from '||t.OWNER||'.'||t.TABLE_NAME||" + + "'@LINKTO_J" + + oc.getName().replaceAll("-", "_") + + ";' FROM dba_tables@LINKTO_J" + + oc.getName().replaceAll("-", "_") + + " t WHERE t.TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')" + + " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO_J" + + oc.getName().replaceAll("-", "_") + + " where 'data_type'='CLOB')"; + /* + * ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql, + * Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + */ + Statement statement = null; + try { + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + ResultSet resultSet = statement.executeQuery(sql); + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log", sql + + "\r\n" + "OK \r\n"); + + while (resultSet.next()) { + try { + String resultSql = resultSet.getString(1).replace(";", ""); + OracleConnector.execUpdateOracleSQL(conn, resultSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } catch (SQLException e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } + } + } catch (SQLException e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } + + } + + /** + * 测试是否能否连接上? + * + * @return true:能连接上 + */ + public boolean testConnect(Connection conn) { + String testSql = "select count(*) from user_tables"; + boolean flag = false; + try { + Statement statement = conn.createStatement(); + if (statement.executeUpdate(testSql) > 0) + flag = true; + } catch (SQLException e) { + flag = false; + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } + return flag; + } + + /** + * 执行抽取操作--支付表 + * + * @param conn + * 汇总库连接 + * @param oc + * 采集库 + */ + public void extractStandardPayTable(Connection conn, + OracleConnectorParams oc, GatherOracleInfo totalOracle) { + String strTUser = Configs.GATHER_STANDARD_USER_NAME; + createPay(conn, oc); + String sql = "insert into " + strTUser + "." + + Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from " + + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME + + "@LINKTOST" + oc.getName(); + String resultSql = sql.replace(";", ""); + OracleConnector.execOracleSQL(conn, resultSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + } + + /** + * 执行抽取操作--执行表 + * + * @param conn + * @param collectOracle + * @param oracleModel + */ + public void extractStandardExecTable(Connection conn, + OracleConnectorParams collectOracle, GatherOracleInfo oracleModel) { + createExec(conn, collectOracle); + String strTUser = Configs.GATHER_STANDARD_USER_NAME; + + String sql = "insert into " + strTUser + "." + + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from " + + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + + "@LINKTOST" + collectOracle.getName(); + String resultSql = sql.replace(";", ""); + OracleConnector.execOracleSQL( + conn, + resultSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName()); + + } + + private void createPay(Connection conn, OracleConnectorParams oc) { + String payCmd = "CREATE TABLE u_bzbjy.zfxxb(XZQHDM Varchar(255),XZQHMC Varchar(255),PZBH Varchar(255),LYZBKZH Varchar(255)," + + "ZFDATE Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),YWGKCS Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255)," + + "XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," + + "JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255)," + + "SKRZHZH Varchar(255),FKZHCODE Varchar(255),FKZHNAME Varchar(255),FKYHCODE Varchar(255),FKYHNAME Varchar(255),QSZHCODE Varchar(255)," + + "QSZHNAME Varchar(255),QSYHCODE Varchar(255),QSYHNAME Varchar(255),JE Numeric(18,2), SFTK Varchar(255),NIAN Varchar(255),ZY Varchar(255))"; + + try { + OracleConnector.execOracleSQL(conn, payCmd, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + } catch (Exception e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } + } + + /** + * 创建dblink + * + * @param conn + * 汇总库的连接 + * @param oc + * 采集库 dblink连接参数实体 + */ + public void createStandardDBLink(Connection conn, OracleConnectorParams oc) { + String linkName = "LINKTOST" + oc.getName(); + String sql = "CREATE PUBLIC DATABASE LINK " + + linkName + + " CONNECT TO " + + oc.getUser() + + " IDENTIFIED BY " + + oc.getPassword() + + " USING \'(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = " + + oc.getIp() + ")(PORT = " + oc.getPort() + + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() + + ")))\'"; + if (null != oc) { + if (hasSameNameDBLink(conn, linkName, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName() + + ".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink + String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName; + // 删除 dblink + if (OracleConnector.execOracleSQL(conn, deleteSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { + // OracleConnector.execOracleSQL(conn, sql, + // Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + if (!OracleConnector.execOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName())) { + OracleConnector.execUpdateOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName()); + } + } else { + Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName() + ".log", + "删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n"); + } + + } else { + // 否则,创建dblink + // OracleConnector.execOracleSQL(conn, sql, + // Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName()); + } + } + } + } + + private void createExec(Connection conn, OracleConnectorParams oc) { + String execCmd = "CREATE TABLE u_bzbjy.kzxzb(XZQHDM Varchar(255),XZQHMC Varchar(255),YSND Varchar(255),ZBCODE Varchar(255),ZB_ID Varchar(255),ZB_NO Varchar(255)," + + "ZBDJLXCODE Varchar(255),ZBDJLXNAME Varchar(255),ZBLXNAME Varchar(255),DOCNO Varchar(255),ZBSM Varchar(255),ZBFWDATE Varchar(255),ZBYSLXCODE Varchar(255)," + + "ZBYSLXNAME Varchar(255),ZBYSLYNAME Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," + + "JJFLNAME Varchar(255),ZBGLCSNAME Varchar(255),SZGLCODE Varchar(255),SZGLNAME Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255),GZBZ Varchar(255)," + + "JJBZ Varchar(255),CGBZ Varchar(255),ZFFSCODE Varchar(255),ZFFSNAME Varchar(255),JZZFBZ Varchar(255),ZBJE Numeric(18,2),ZBTJJE Numeric(18,2),ZBDJJE Numeric(18,2)," + + "ZBKYJE Numeric(18,2),ZYZFBZ Varchar(255),BZ Varchar(255))"; + + try { + OracleConnector.execOracleSQL(conn, execCmd, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + } catch (Exception e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } + } +} diff --git a/src/com/platform/service/OracleExtractTask.java b/src/com/platform/service/OracleExtractTask.java new file mode 100644 index 00000000..90a68b6c --- /dev/null +++ b/src/com/platform/service/OracleExtractTask.java @@ -0,0 +1,107 @@ +package com.platform.service; + +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; + +import com.base.Custom4exception; +import com.platform.entities.AbstractOracleExtractTask; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.oracle.OracleConnector; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.DateForm; +import com.platform.utils.FileOperateHelper; + +public class OracleExtractTask extends AbstractOracleExtractTask { + + public static Logger log = Logger.getLogger(OracleExtractTask.class); + + private GatherOracleInfo gatherOracleInfo; + + private OracleConnectorParams oc; + + private IOracleExtractService OracleExtract; + + public OracleExtractTask(String name, GatherOracleInfo gatherOracleInfo, + OracleConnectorParams oc, IOracleExtractService OracleExtract) { + super(name); + this.gatherOracleInfo = gatherOracleInfo; + this.oc = oc; + this.OracleExtract = OracleExtract; + } + + public OracleExtractTask(GatherOracleInfo gatherOracleInfo, + OracleConnectorParams oc, IOracleExtractService OracleExtract) { + this.gatherOracleInfo = gatherOracleInfo; + this.oc = oc; + this.OracleExtract = OracleExtract; + } + + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + + @Override + public void handler() { + // TODO Auto-generated method stub + try { + Connection conn = OracleConnector.connectionBuilder( + createConnectUrl(), gatherOracleInfo.getUser(), + gatherOracleInfo.getPassword(), oc); + if (null != conn) { + String cmd = "kubectl label --overwrite rc " + oc.getName() + + " isExtract=1"; + OracleExtract.updateDataExtractStatus(oc, 1); // 更新数据库的状态 + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + + ".log", + "[" + + DateForm + .date2StringBysecond(new Date()) + + "]>>>>>>>>>>>>>>>>开始汇总 >>>>>>>>>>>>>>>>>>\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + oracleExtract.createDBLink(conn, oc); // 创建dblink + oracleExtract.createTableSpace(conn, oc, gatherOracleInfo); // 创建表空间 + oracleExtract.createUser(conn, oc, gatherOracleInfo);// 创建用户并授权 + oracleExtract.extractColleDB(conn, oc, gatherOracleInfo);// 执行抽取 + cmd = "kubectl label --overwrite rc " + oc.getName() + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + + ".log", + "[" + + DateForm + .date2StringBysecond(new Date()) + + "]>>>>>>>>>>>>>>>>汇总结束 >>>>>>>>>>>>>>>>>>\r\n\r\n"); + OracleExtract.updateDataExtractStatus(oc, 2); // 更新数据库的状态 + } + } catch (Exception e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + + private String createConnectUrl() { + String answer = ""; + if (null != gatherOracleInfo) { + answer = "jdbc:oracle:thin:@" + gatherOracleInfo.getIp() + ":" + + gatherOracleInfo.getPort() + ":" + + gatherOracleInfo.getDatabaseName(); + } + return answer; + } + +} diff --git a/src/com/platform/service/impl/LogReadServiceImpl.java b/src/com/platform/service/impl/LogReadServiceImpl.java index aa56b05f..20376cbb 100644 --- a/src/com/platform/service/impl/LogReadServiceImpl.java +++ b/src/com/platform/service/impl/LogReadServiceImpl.java @@ -12,7 +12,7 @@ public class LogReadServiceImpl implements ILogRead { @Override public String readLog(String filename) throws Exception { - String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + "J" + filename.replace("-", "_")+".log"); + String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + filename+".log"); return result; } diff --git a/src/com/platform/service/impl/OracleExtractServiceImpl.java b/src/com/platform/service/impl/OracleExtractServiceImpl.java index f0e5625f..1ac8a3a8 100644 --- a/src/com/platform/service/impl/OracleExtractServiceImpl.java +++ b/src/com/platform/service/impl/OracleExtractServiceImpl.java @@ -1,299 +1,380 @@ -package com.platform.service.impl; - -import java.sql.Connection; -import java.util.Date; -import java.util.List; - -import javax.annotation.Resource; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Service; - -import com.base.Custom4exception; -import com.base.CustomException; -import com.platform.controller.OracleController; -import com.platform.dao.DataInfoDao; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.GatherOracleInfo; -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.service.IOracleExtractService; -import com.platform.service.OracleExtractHelper; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.DateForm; -import com.platform.utils.FileOperateHelper; - -@Service(value = "OracleExtract") -public class OracleExtractServiceImpl implements IOracleExtractService { - - /** - * 日志 - */ - public final static Logger log = Logger.getLogger(OracleExtractServiceImpl.class); - - @Resource(name = "dataInfoDao") - private DataInfoDao dataInfoDao; - - /** - * kubernetes client - */ - private SimpleKubeClient client = new SimpleKubeClient(); - /** - * 抽取 - */ - private OracleExtractHelper oracleExtract = new OracleExtractHelper(); - - /** - * 数据库连接实现类 - */ - private OracleConnector connect = new OracleConnector(); - - @Override - public boolean extractOracle(String name, List dataInfolist, - GatherOracleInfo oracleConnect) throws Exception { - boolean isSuccess = false; - try{ - //map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - //采集库连接参数 - List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try{ - if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){ - data.setId(Integer.valueOf(collectOracle.getDataId())); - data.setExtractStatus(1); - dataInfoDao.updateExtract(data); - collectOracle.setName("J" + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=1"; - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 开始汇总 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createDBLink(conn, collectOracle); //创建dblink - oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间 - oracleExtract.createUser(conn, collectOracle, oracleModel);//创建用户并授权 - oracleExtract.extractColleDB(conn, collectOracle, oracleModel);//执行抽取 - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setExtractStatus(2); - dataInfoDao.updateExtract(data); - } - }catch(Exception e){ - log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - data.setExtractStatus(2); - dataInfoDao.updateExtract(data); - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 汇总结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - String cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=2"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - } - } - isSuccess = true; - }catch(Exception e){ - new CustomException(Custom4exception.OracleSQL_Except, e); - } - return isSuccess; - } - - @Override - public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) throws Exception { - boolean isConnect = false; - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(), null); - if (null == conn) { - isConnect = false; - throw new CustomException(Custom4exception.connect_Oracle_Except, null, oracleModel); -// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION -// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + "]\r\n"); - }else { - isConnect = oracleExtract.testConnect(conn); - } - return isConnect; - } - - @Override - public boolean extractStandardTable(String name, List dataInfolist, - GatherOracleInfo oracleConnect) throws Exception { - boolean isSuccess = false; - try{ - //map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - //采集库连接参数 - List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try{ - if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){ - data.setId(Integer.valueOf(collectOracle.getDataId())); - //设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setStandardExtractStatus("1"); - dataInfoDao.update(data); - collectOracle.setName("CQ" + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl annotate --overwrite rc " - + replicasName + " standardExtractStatus=1"; - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 开始抽取标准表 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createStandardDBLink(conn, collectOracle); //创建dblink - oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间 - oracleExtract.createOnlyUser(conn, collectOracle, oracleModel);//创建 抽取标准表的 用户并授权 - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - if (null != tmpdata) { - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getPayResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getPayResultLast()) - || Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getPayResultLast())) { - //抽取中 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try{ - oracleExtract.extractStandardPayTable(conn, collectOracle, oracleModel);//执行抽取 - }catch(Exception e){ - //改回 校验存在的状态 - data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - //抽取成功 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getExecResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getExecResultLast()) - || Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getExecResultLast())) { - //抽取中 - data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try{ - oracleExtract.extractStandardExecTable(conn, collectOracle, oracleModel);//执行抽取 - }catch(Exception e){ - //改回 校验存在的状态 - data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl annotate --overwrite rc " - + replicasName + " standardExtractStatus=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); - data.setStandardExtractStatus("2"); - dataInfoDao.update(data); - } - } - }catch(Exception e){ - log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); - data.setStandardExtractStatus("2"); - dataInfoDao.update(data); - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 抽取标准表结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - String cmd = "kubectl annotate --overwrite rc " - + replicasName + " standardExtractStatus=2"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - } - } - isSuccess = true; - }catch(Exception e){ - log.error(Custom4exception.OracleSQL_Except, e); - } - return isSuccess; - } - -// @Override -// public boolean extractOracle(String name, List dataInfos, GatherOracleInfo oracleConnect) throws Exception { -// boolean isSuccess = false; -// try{ -// //map转 bean(汇总库信息-带tableName的) -//// GatherOracleInfo oracleModel = (GatherOracleInfo) Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); -// -// //采集库连接参数 -//// List datainfos = new ArrayList(); -//// for (Map map : dataInfoMap) { -//// OracleConnectorParams dataInfoEntity = (OracleConnectorParams) Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); -//// datainfos.add(dataInfoEntity); -//// } -// -// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" -// + oracleConnect.getDatabaseName(), oracleConnect.getUser(), oracleConnect.getPassword()); -// -// for (OracleConnectorParams collectOracle : dataInfos) { -// -// oracleExtract.createDBLink(conn, collectOracle); -// oracleExtract.createTableSpace(conn, oracleConnect); -// oracleExtract.createUser(conn, oracleConnect); -// oracleExtract.extractColleDB(conn, collectOracle); -// } -// isSuccess = true; -// }catch(Exception e){ -// -// } -// return false; -// } - -} +package com.platform.service.impl; + +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import javax.annotation.Resource; + +import org.apache.log4j.Logger; +import org.springframework.stereotype.Service; + +import com.base.Custom4exception; +import com.base.CustomException; +import com.platform.dao.DataInfoDao; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.service.IOracleExtractService; +import com.platform.service.OracleExtractHelper; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.DateForm; +import com.platform.utils.FileOperateHelper; + +@Service(value = "OracleExtract") +public class OracleExtractServiceImpl implements IOracleExtractService { + + /** + * 日志 + */ + public final static Logger log = Logger + .getLogger(OracleExtractServiceImpl.class); + + @Resource(name = "dataInfoDao") + private DataInfoDao dataInfoDao; + + /** + * kubernetes client + */ + private SimpleKubeClient client = new SimpleKubeClient(); + /** + * 抽取 + */ + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + + /** + * 数据库连接实现类 + */ + private OracleConnector connect = new OracleConnector(); + + @Override + public boolean extractOracle(String name, + List datainfos, GatherOracleInfo oracleModel) + throws Exception { + boolean isSuccess = false; + try { + // map转 bean(汇总库信息-带tableName的) + // GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + // List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + datainfos.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + datainfos.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(collectOracle.getDataId())); + data.setExtractStatus(1); + dataInfoDao.updateExtract(data); + collectOracle.setName("J" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl label --overwrite rc " + + replicasName + " isExtract=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始汇总 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createUser(conn, collectOracle, + oracleModel);// 创建用户并授权 + oracleExtract.extractColleDB(conn, collectOracle, + oracleModel);// 执行抽取 + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setExtractStatus(2); + dataInfoDao.updateExtract(data); + } + } catch (Exception e) { + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 汇总结束 \r\n" + + DateForm.date2StringBysecond(new Date()) + + "\r\n"); + String cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=2"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + } + } + isSuccess = true; + } catch (Exception e) { + new CustomException(Custom4exception.OracleSQL_Except, e); + } + return isSuccess; + } + + @Override + public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) + throws Exception { + boolean isConnect = false; + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), null); + if (null == conn) { + isConnect = false; + throw new CustomException(Custom4exception.connect_Oracle_Except, + null, oracleModel); + // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + + // "]\r\n"); + } else { + isConnect = oracleExtract.testConnect(conn); + } + return isConnect; + } + + @Override + public boolean extractStandardTable(String name, + List dataInfolist, + GatherOracleInfo oracleConnect) throws Exception { + boolean isSuccess = false; + try { + // map转 bean(汇总库信息-带tableName的) + GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + dataInfolist.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + dataInfolist.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(collectOracle.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setStandardExtractStatus("1"); + dataInfoDao.update(data); + collectOracle.setName("CQ" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl annotate --overwrite rc " + + replicasName + " standardExtractStatus=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始抽取标准表 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createOnlyUser(conn, collectOracle, + oracleModel);// 创建 抽取标准表的 用户并授权 + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + if (null != tmpdata) { + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getPayResultLast()) + || Constant.CHECKOUT_STATUS_FIVE + .equals(tmpdata.getPayResultLast()) + || Constant.CHECKOUT_STATUS_SIX + .equals(tmpdata.getPayResultLast())) { + // 抽取中 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardPayTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + // 抽取成功 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getExecResultLast()) + || Constant.CHECKOUT_STATUS_FIVE + .equals(tmpdata.getExecResultLast()) + || Constant.CHECKOUT_STATUS_SIX + .equals(tmpdata.getExecResultLast())) { + // 抽取中 + data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardExecTable( + conn, collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl annotate --overwrite rc " + + replicasName + " standardExtractStatus=2"; + rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); + data.setStandardExtractStatus("2"); + dataInfoDao.update(data); + } + } + } catch (Exception e) { + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 抽取标准表结束 \r\n" + + DateForm.date2StringBysecond(new Date()) + + "\r\n"); + String cmd = "kubectl annotate --overwrite rc " + + replicasName + " standardExtractStatus=2"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + } + } + isSuccess = true; + } catch (Exception e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + return isSuccess; + } + + @Override + public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(ocp.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setExtractStatus(status); + try { + dataInfoDao.update(data); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + // @Override + // public boolean extractOracle(String name, List + // dataInfos, GatherOracleInfo oracleConnect) throws Exception { + // boolean isSuccess = false; + // try{ + // //map转 bean(汇总库信息-带tableName的) + // // GatherOracleInfo oracleModel = (GatherOracleInfo) + // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); + // + // //采集库连接参数 + // // List datainfos = new + // ArrayList(); + // // for (Map map : dataInfoMap) { + // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) + // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); + // // datainfos.add(dataInfoEntity); + // // } + // + // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" + // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), + // oracleConnect.getPassword()); + // + // for (OracleConnectorParams collectOracle : dataInfos) { + // + // oracleExtract.createDBLink(conn, collectOracle); + // oracleExtract.createTableSpace(conn, oracleConnect); + // oracleExtract.createUser(conn, oracleConnect); + // oracleExtract.extractColleDB(conn, collectOracle); + // } + // isSuccess = true; + // }catch(Exception e){ + // + // } + // return false; + // } + +}