diff --git a/WebContent/WEB-INF/config/config.properties b/WebContent/WEB-INF/config/config.properties index 6bf45a62..440045d7 100644 --- a/WebContent/WEB-INF/config/config.properties +++ b/WebContent/WEB-INF/config/config.properties @@ -83,30 +83,30 @@ oracle-psw=oracle # windows path #============================================================================================================= -#extract-log-localtion=D:\\test\\log\\ -#extract-standard-log-localtion=D:\\test\\log2\\ -# -#file_upload_path=D:\\test\\ -#file_download_path=D:\\test\\export.xlsx -# -#package_download_path=D:\\test\\ -#package_name=sql_script_standard -# -#sql_script_path_last=D:\\test\\sql_script_last\\ -#sql_script_path_standard=D:\\test\\sql_script_standard\\ +extract-log-localtion=D:\\test\\log\\ +extract-standard-log-localtion=D:\\test\\log2\\ -#============================================================================================================= -# linux path -#============================================================================================================= +file_upload_path=D:\\test\\ +file_download_path=D:\\test\\export.xlsx -extract-log-localtion=/home/web_manage/log/ -extract-standard-log-localtion=/home/web_manage/log2/ +package_download_path=D:\\test\\ +package_name=sql_script_standard -file_upload_path=/excel_import_dir/ -file_download_path=/excel_export_dir/export.xlsx +sql_script_path_last=D:\\test\\sql_script_last\\ +sql_script_path_standard=D:\\test\\sql_script_standard\\ -package_download_path=/ -package_name=DefaultDescription +#============================================================================================================= +# linux path +#============================================================================================================= -sql_script_path_last=/DefaultDescription_last/ -sql_script_path_standard=/DefaultDescription/ +#extract-log-localtion=/home/web_manage/log/ +#extract-standard-log-localtion=/home/web_manage/log2/ +# +#file_upload_path=/excel_import_dir/ +#file_download_path=/excel_export_dir/export.xlsx +# +#package_download_path=/ +#package_name=DefaultDescription +# +#sql_script_path_last=/DefaultDescription_last/ +#sql_script_path_standard=/DefaultDescription/ diff --git a/src/com/platform/controller/DataModelController.java b/src/com/platform/controller/DataModelController.java index 29e34bf9..a1f4dca9 100644 --- a/src/com/platform/controller/DataModelController.java +++ b/src/com/platform/controller/DataModelController.java @@ -23,30 +23,32 @@ import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import com.base.BaseController; +import com.platform.entities.BasedTask; import com.platform.entities.DataInfoEntity; import com.platform.entities.DataInfoEntityMoveTmp; import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.entities.OracleExtractExecuter; import com.platform.form.PagerOptions; import com.platform.form.oracleForm; import com.platform.form.volumeMoveForm; -import com.platform.http.HttpUtils; import com.platform.service.DataInfoService; import com.platform.service.ICodeService; -import com.platform.service.IGfsService; +import com.platform.service.IGatherOracleService; import com.platform.service.ILogRead; import com.platform.service.IMoveDataService; -import com.platform.service.IGatherOracleService; import com.platform.service.IOracleExtractService; import com.platform.service.IVolumeService; +import com.platform.service.OracleExtractTask; import com.platform.service.OracleStatusService; -import com.platform.service.thread.ThreadExtractOracle; -import com.platform.service.thread.ThreadGainOracleConnect; import com.platform.utils.Constant; import com.platform.utils.UtilsHelper; -/** 数据管理 +/** + * 数据管理 + * * @author chen - * + * */ @Controller public class DataModelController extends BaseController { @@ -94,12 +96,14 @@ public class DataModelController extends BaseController { */ @Resource(name = "logReadService") private ILogRead logReadService; - + public void setDfsImp(DataInfoService dfs) { this.dfs = dfs; } - /** 数据管理--分页查询数据 + /** + * 数据管理--分页查询数据 + * * @param res * @param req * @return @@ -124,17 +128,19 @@ public class DataModelController extends BaseController { sb.append(str).append(":").append("null").append(","); } } - log.info(sb.deleteCharAt(sb.length() - 1) - .append("}").toString()); + log.info(sb.deleteCharAt(sb.length() - 1).append("}").toString()); PagerOptions pagerOptions = (PagerOptions) UtilsHelper .newObjAndSetAttrsByClass(PagerOptions.class, params); - pagerOptions.setCurrentPageNum(Integer.valueOf(params.get("currentPageNum"))); - //冷热区查询字段mark + pagerOptions.setCurrentPageNum(Integer.valueOf(params + .get("currentPageNum"))); + // 冷热区查询字段mark pagerOptions.setMark(pagerOptions.getVolumeType().trim()); return dfs.getPagerTableData(pagerOptions); } - /** 数据管理--删除数据 + /** + * 数据管理--删除数据 + * * @param res * @param req * @throws Exception @@ -149,7 +155,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 连接oracle + /** + * 连接oracle + * * @param res * @param req * @throws UnsupportedEncodingException @@ -165,7 +173,8 @@ public class DataModelController extends BaseController { log.info("执行连接\t" + rcName); String cmd = "kubectl label --overwrite rc " + rcName + " status=0"; - List rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); StringBuffer sb = new StringBuffer(); for (String string : rList) sb.append(string).append("\n"); @@ -175,7 +184,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 断开oracle连接 + /** + * 断开oracle连接 + * * @param res * @param req * @throws UnsupportedEncodingException @@ -196,7 +207,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** gfs的 volume节点的查询 + /** + * gfs的 volume节点的查询 + * * @return * @throws Exception */ @@ -208,7 +221,9 @@ public class DataModelController extends BaseController { return rest; } - /** 数据迁移功能 + /** + * 数据迁移功能 + * * @param res * @param req * @param form @@ -233,7 +248,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** oracle的 汇总功能 + /** + * oracle的 汇总功能 + * * @param res * @param req * @param form @@ -241,39 +258,32 @@ public class DataModelController extends BaseController { */ @RequestMapping(value = "/oracle/{name}/extract", method = RequestMethod.POST) public void oracleExtract(HttpServletRequest res, HttpServletResponse req, - @RequestBody oracleForm form) throws Exception { + @RequestBody oracleForm form) throws Exception { log.error("/oracle/{name}/extract"); - boolean isConnect = false; - //5秒内是否能获得oracle连接,否则认为超时。 - if (null != form.getTarget()) { - ThreadGainOracleConnect thOrcl = new ThreadGainOracleConnect(form, OracleExtract); - thOrcl.start(); - for (int i = 0; i < 10; i++) { - Thread.sleep(400); - isConnect = thOrcl.isConnect(); - if (isConnect) { - break; - } - else { - if (thOrcl.isExcept()) { - break; + if (null != form.getTarget()) { // 检查请求参数中是否包含汇总库信息 + boolean isConnect = OracleExtract.isConnectTotalOracle(form + .getTarget()); // 检查汇总库是否可以连接成功,连接成功返回200状态吗,连接失败返回500状态吗 + if (isConnect) { + req.setStatus(200); + if (null != form.getInneed() && form.getInneed().size() > 0) { + for (OracleConnectorParams oracleParams : form.getInneed()) { + BasedTask task = new OracleExtractTask( + oracleParams.getName(), form.getTarget(), + oracleParams, OracleExtract); + OracleExtractExecuter oee = new OracleExtractExecuter( + task); + new Thread(oee, oracleParams.getName()).start(); } - Thread.sleep(100); } - } - } - if (isConnect) - req.setStatus(200); - else - req.setStatus(500); - // 开始抽取数据到汇总库 - if (isConnect && null != form.getInneed() && form.getInneed().size() > 0) { - ThreadExtractOracle thExtra = new ThreadExtractOracle(form, OracleExtract); - thExtra.start(); + } else + req.setStatus(500); } + } - /** oracle汇总、抽取库的 查询 + /** + * oracle汇总、抽取库的 查询 + * * @return * @throws Exception */ @@ -285,7 +295,9 @@ public class DataModelController extends BaseController { return result; } - /** oracle汇总、抽取库的 删除 + /** + * oracle汇总、抽取库的 删除 + * * @param req * @param res * @param id @@ -301,7 +313,9 @@ public class DataModelController extends BaseController { res.setStatus(200); } - /** oracle汇总、抽取库的 新增 + /** + * oracle汇总、抽取库的 新增 + * * @param res * @param req * @param id @@ -319,7 +333,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** oracle汇总、抽取库的 更新 + /** + * oracle汇总、抽取库的 更新 + * * @param res * @param req * @param id @@ -337,7 +353,9 @@ public class DataModelController extends BaseController { req.setStatus(200); } - /** 迁移 数据 的查询 + /** + * 迁移 数据 的查询 + * * @return * @throws Exception */ @@ -349,7 +367,9 @@ public class DataModelController extends BaseController { return result; } - /** 迁移完成后的 删除记录功能 + /** + * 迁移完成后的 删除记录功能 + * * @param res * @param req * @param id @@ -368,7 +388,9 @@ public class DataModelController extends BaseController { return result; } - /** 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) + /** + * 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) + * * @param res * @param req * @param move @@ -381,12 +403,14 @@ public class DataModelController extends BaseController { HttpServletResponse req, @RequestBody DataInfoEntity move) throws Exception { log.debug("---------/task/transfer/save-----------------------"); -// int result = dfs.save(move); + // int result = dfs.save(move); req.setStatus(200); return 1; } - /** 地区和系统的 code 对应的名称 的 获取 + /** + * 地区和系统的 code 对应的名称 的 获取 + * * @return * @throws Exception */ @@ -398,7 +422,9 @@ public class DataModelController extends BaseController { return result; } - /** oracle 汇总的 日志 读取 + /** + * oracle 汇总的 日志 读取 + * * @param name * @param res * @param req @@ -410,8 +436,8 @@ public class DataModelController extends BaseController { public Object getExtractLog(@RequestParam("rcName") String name, HttpServletRequest res, HttpServletResponse req) throws Exception { log.info("---------/oracle/extract/log-------------------"); - String result = logReadService.readLog(name); - // "查看相应日志" + String result = logReadService.readLog(name); + // "查看相应日志" Map log = new HashMap(); log.put(name, result); return log; diff --git a/src/com/platform/entities/AbstractOracleExtractTask.java b/src/com/platform/entities/AbstractOracleExtractTask.java new file mode 100644 index 00000000..52ca16d8 --- /dev/null +++ b/src/com/platform/entities/AbstractOracleExtractTask.java @@ -0,0 +1,32 @@ +package com.platform.entities; + +public abstract class AbstractOracleExtractTask implements BasedTask { + private String name; + private int status; // 任务的执行状态,0未执行,1执行中,2完成,3失败 + + public AbstractOracleExtractTask() { + } + + public AbstractOracleExtractTask(String name) { + this.name = name; + } + + public abstract void handler(); + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + +} diff --git a/src/com/platform/entities/BasedTask.java b/src/com/platform/entities/BasedTask.java new file mode 100644 index 00000000..37179429 --- /dev/null +++ b/src/com/platform/entities/BasedTask.java @@ -0,0 +1,5 @@ +package com.platform.entities; + +public interface BasedTask { + public void handler(); +} diff --git a/src/com/platform/entities/OracleExtractExecuter.java b/src/com/platform/entities/OracleExtractExecuter.java new file mode 100644 index 00000000..7c07a8c8 --- /dev/null +++ b/src/com/platform/entities/OracleExtractExecuter.java @@ -0,0 +1,15 @@ +package com.platform.entities; + +public class OracleExtractExecuter implements Runnable { + private BasedTask task; + + public OracleExtractExecuter(BasedTask task) { + this.task = task; + } + + @Override + public void run() { + if (null != task) + task.handler(); + } +} diff --git a/src/com/platform/oracle/OracleConnector.java b/src/com/platform/oracle/OracleConnector.java index e3bc021f..9c91345a 100644 --- a/src/com/platform/oracle/OracleConnector.java +++ b/src/com/platform/oracle/OracleConnector.java @@ -14,13 +14,14 @@ import com.platform.entities.OracleConnectorParams; import com.platform.utils.Configs; import com.platform.utils.FileOperateHelper; -public class OracleConnector { - - public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleConnector.class); - +public class OracleConnector { + + public static Logger log = Configs.DAILY_ROLLING_LOGGER + .getLogger(OracleConnector.class); + public OracleConnector() { } - + static { try { Class.forName("oracle.jdbc.driver.OracleDriver"); @@ -28,26 +29,29 @@ public class OracleConnector { } catch (ClassNotFoundException e) { log.error(Custom4exception.OracleSQL_Except, e); } - } - + } - public synchronized static Connection connectionBuilder(String url, String user, - String password, OracleConnectorParams oc) throws CustomException { - Connection conn=null; + public synchronized static Connection connectionBuilder(String url, + String user, String password, OracleConnectorParams oc) + throws CustomException { + Connection conn = null; try { conn = DriverManager.getConnection(url, user, password); - } catch (SQLException e) { - Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage() + "]"); + } catch (SQLException e) { + Configs.CONSOLE_LOGGER.info("创建oracle连接失败: [" + e.getMessage() + + "]"); if (null != oc) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + oc.getName()+".log", "创建oracle连接失败: [" + e.getMessage() + "]\r\n"); + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log", + "创建oracle连接失败: [" + e.getMessage() + "]\r\n"); } throw new CustomException(Custom4exception.OracleSQL_Except, e); } return conn; } - public synchronized static boolean canConnect(String url, String user, String password) { + public synchronized static boolean canConnect(String url, String user, + String password) { Connection result = null; try { result = connectionBuilder(url, user, password, null); @@ -57,25 +61,27 @@ public class OracleConnector { return (null != result); } - public synchronized static ResultSet getSQLExecResultSet(Connection conn, String sql, String filePath) { + public synchronized static ResultSet getSQLExecResultSet(Connection conn, + String sql, String filePath) { ResultSet resultSet = null; if (null != filePath) { filePath = filePath.replace(".log", ""); } Statement statement = null; try { - statement = conn - .createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, - ResultSet.CONCUR_UPDATABLE); + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); resultSet = statement.executeQuery(sql); - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+"OK \r\n"); + /* + * if(resultSet.next()){ System.out.println(resultSet.getInt(1)); } + */ + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); } catch (SQLException e) { - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ + } finally { if (null != statement) { try { statement.close(); @@ -88,11 +94,12 @@ public class OracleConnector { return resultSet; } - public synchronized static ResultSet getSQLExecResultSet(String url, String user, - String password, String sql, String filePath) { - ResultSet result = null; + public synchronized static ResultSet getSQLExecResultSet(String url, + String user, String password, String sql, String filePath) { + ResultSet result = null; try { - result = getSQLExecResultSet(connectionBuilder(url, user, password, null), sql, filePath); + result = getSQLExecResultSet( + connectionBuilder(url, user, password, null), sql, filePath); } catch (CustomException e) { log.error(Custom4exception.OracleSQL_Except, e); } @@ -101,11 +108,13 @@ public class OracleConnector { /** * 执行对oracle数据库的增、删 + * * @param conn * @param sql - * @return true:执行的不返回集合数据的sql成功, 是否执行成功 + * @return true:执行的不返回集合数据的sql成功, 是否执行成功 */ - public synchronized static boolean execOracleSQL(Connection conn, String sql, String filePath) { + public synchronized static boolean execOracleSQL(Connection conn, + String sql, String filePath) { if (null != filePath) { filePath = filePath.replace(".log", ""); } @@ -113,16 +122,16 @@ public class OracleConnector { Statement statement = null; try { statement = conn.createStatement(); - flag =statement.execute(sql); - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+ flag +" \r\n"); + statement.executeUpdate(sql); + flag = true; + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + flag + + " \r\n"); } catch (SQLException e) { flag = false; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ + } finally { if (null != statement) { try { statement.close(); @@ -132,15 +141,18 @@ public class OracleConnector { } } - return flag; + return flag ; } + /** * 执行对oracle数据库的返回集合数据的sql + * * @param conn * @param sql - * @return true:执行结果大于1,即有数据 是否执行成功 + * @return true:执行结果大于1,即有数据 是否执行成功 */ - public synchronized static boolean execUpdateOracleSQL(Connection conn, String sql, String filePath) { + public synchronized static boolean execUpdateOracleSQL(Connection conn, + String sql, String filePath) { if (null != filePath) { filePath = filePath.replace(".log", ""); } @@ -148,19 +160,17 @@ public class OracleConnector { Statement statement = null; try { statement = conn.createStatement(); - if(statement.executeUpdate(sql) > 0) - { - flag = true; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+"OK \r\n"); - } + statement.executeUpdate(sql); + flag = true; + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); + } catch (SQLException e) { flag = false; - FileOperateHelper - .fileWrite(filePath+".log", sql+ "\r\n"+e.getMessage()+"\r\n"); + FileOperateHelper.fileWrite(filePath + ".log", + sql + "\r\n" + e.getMessage() + "\r\n"); log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ + } finally { if (null != statement) { try { statement.close(); diff --git a/src/com/platform/service/IOracleExtractService.java b/src/com/platform/service/IOracleExtractService.java index 73a47834..6d8347d5 100644 --- a/src/com/platform/service/IOracleExtractService.java +++ b/src/com/platform/service/IOracleExtractService.java @@ -32,5 +32,11 @@ public interface IOracleExtractService { */ boolean extractStandardTable(String name, List dataInfolist, GatherOracleInfo oracleConnect) throws Exception; + + /** + * + * @param ocp + */ + public void updateDataExtractStatus(OracleConnectorParams ocp, int status); } diff --git a/src/com/platform/service/OracleExtractHelper.java b/src/com/platform/service/OracleExtractHelper.java index bec4e858..2e66dad4 100644 --- a/src/com/platform/service/OracleExtractHelper.java +++ b/src/com/platform/service/OracleExtractHelper.java @@ -15,8 +15,8 @@ import com.platform.utils.Configs; import com.platform.utils.FileOperateHelper; public class OracleExtractHelper { - - public static Logger log = Configs.DAILY_ROLLING_LOGGER.getLogger(OracleExtractHelper.class); + + public static Logger log = Logger.getLogger(OracleExtractHelper.class); /** * 判断dblink是否已经存在 @@ -26,22 +26,39 @@ public class OracleExtractHelper { * dblink的名称 * @return */ - private boolean hasSameNameDBLink(Connection conn, String linkName, String filePath) { - + private boolean hasSameNameDBLink(Connection conn, String linkName, + String filePath) { + boolean flag = false; - String sql = "SELECT * FROM ALL_DB_LINKS WHERE DB_LINK='" + linkName+"'"; + String sql = "SELECT COUNT(*) c FROM ALL_DB_LINKS WHERE DB_LINK='" + + linkName + "'"; ResultSet rSet = null; + Statement statement = null; try { - rSet = OracleConnector.getSQLExecResultSet(conn, sql, null); - rSet.last(); - if (rSet.getRow() > 0) - flag = true; - FileOperateHelper - .fileWrite(filePath, sql+ "\r\n"+"OK \r\n"); + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + rSet = statement.executeQuery(sql); + if (rSet.next()) { + if (rSet.getInt("c") > 0) + flag = true; + else + flag = false; + } + FileOperateHelper.fileWrite(filePath + ".log", sql + "\r\n" + + "OK \r\n"); + } catch (Exception e) { - FileOperateHelper - .fileWrite(filePath, sql+ "\r\n"+ e.getMessage() + "\r\n,连接异常! \r\n"); + FileOperateHelper.fileWrite(filePath, sql + "\r\n" + e.getMessage() + + "\r\n,连接异常! \r\n"); log.error(Custom4exception.threadVolume_Oracle_Except, e); + } finally { + if (null != statement) { + try { + statement.close(); + } catch (SQLException e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } } return flag; } @@ -49,12 +66,13 @@ public class OracleExtractHelper { /** * 创建dblink * - * @param conn 汇总库的连接 - * @param oc 采集库 - * dblink连接参数实体 + * @param conn + * 汇总库的连接 + * @param oc + * 采集库 dblink连接参数实体 */ public void createDBLink(Connection conn, OracleConnectorParams oc) { - String linkName = "LINKTO" + oc.getName(); + String linkName = "LINKTO_J" + oc.getName().replaceAll("-", "_"); String sql = "CREATE PUBLIC DATABASE LINK " + linkName + " CONNECT TO " @@ -66,24 +84,30 @@ public class OracleExtractHelper { + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() + ")))\'"; if (null != oc) { - if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink - String deleteSql = "DROP PUBLIC DATABASE LINK " - + linkName; - //删除 dblink - if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_LOG_LOCALTION + + oc.getName())) { // 如果dblink已经存在,先删除dblink,在创建dblink + String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName; + // 删除 dblink + if (OracleConnector.execOracleSQL(conn, deleteSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); } } else { Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n"); + + oc.getName() + ".log", + "删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n"); } } else { // 否则,创建dblink - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); } } } @@ -92,119 +116,158 @@ public class OracleExtractHelper { /** * 创建表空间 * - * @param conn 汇总库连接 - * @param collectOracle - * @param oc 汇总库信息 + * @param conn + * 汇总库连接 + * @param collectOracle + * @param oc + * 汇总库信息 * @return */ - public boolean createTableSpace(Connection conn, OracleConnectorParams collectOracle, GatherOracleInfo oc) { + public boolean createTableSpace(Connection conn, + OracleConnectorParams collectOracle, GatherOracleInfo oc) { String tmpSql = "select TABLESPACE_NAME from dba_tablespaces where TABLESPACE_NAME = '" + oc.getTableName() + "'"; // 存在 表空间 - if (OracleConnector.execUpdateOracleSQL(conn, tmpSql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) { + if (OracleConnector.execUpdateOracleSQL(conn, tmpSql, + Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName())) { return true; } else { String sql = "create tablespace " + oc.getTableName() + " datafile '" + Configs.GATHER_TABLESPACE_PATH + oc.getTableName() + ".dbf" + "' size 512M autoextend on next 512M maxsize unlimited"; - return OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName()); + return OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_LOG_LOCALTION + collectOracle.getName()); } } /** * 创建用户并授权 * - * @param conn 汇总库连接 - * @param oc 汇总库信息 + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 */ - public void createUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = oc.getName() + totalOracle.getSuffix(); - + public void createUser(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = "J" + oc.getName().replaceAll("-", "_") + + totalOracle.getSuffix(); + String sql = "Create user " + strTUser + " default tablespace " + totalOracle.getTableName() + " identified by " + Configs.GATHER_TABLE_PASSWORD; String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + + OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); } - + /** * 创建用户并授权 * - * @param conn 汇总库连接 - * @param oc 汇总库信息 + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 */ - public void createOnlyUser(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { + public void createOnlyUser(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { String strTUser = Configs.GATHER_STANDARD_USER_NAME; - + String sql = "Create user " + strTUser + " default tablespace " + totalOracle.getTableName() + " identified by " + Configs.GATHER_TABLE_PASSWORD; String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + + OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); } - + /** * 创建用户并授权 * - * @param conn 汇总库连接 - * @param oc 汇总库信息 + * @param conn + * 汇总库连接 + * @param oc + * 汇总库信息 */ - public void createTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { + public void createTable(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { String strTUser = oc.getName() + totalOracle.getSuffix(); - + String sql = "Create table " + strTUser + " default tablespace " + totalOracle.getTableName() + " identified by " + Configs.GATHER_TABLE_PASSWORD; String grantSql = "grant connect, resource, dba to " + strTUser; - - OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - - OracleConnector.execOracleSQL(conn, grantSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + + OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_LOG_LOCALTION + + oc.getName()); + + OracleConnector.execOracleSQL(conn, grantSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); } /** * 执行汇总操作 * - * @param conn 汇总库连接 - * @param oc 采集库 + * @param conn + * 汇总库连接 + * @param oc + * 采集库 */ - public void extractColleDB(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { - String strTUser = oc.getName() + totalOracle.getSuffix(); + public void extractColleDB(Connection conn, OracleConnectorParams oc, + GatherOracleInfo totalOracle) { + String strTUser = "J" + oc.getName().replaceAll("-", "_") + + totalOracle.getSuffix(); String sql = "SELECT 'create table " + strTUser + ".J'|| substr(t.OWNER||'_'||t.TABLE_NAME,0,29)||' as select * from '||t.OWNER||'.'||t.TABLE_NAME||" - + "'@LINKTO" - + oc.getName() - + ";' FROM dba_tables@LINKTO" - + oc.getName() + + "'@LINKTO_J" + + oc.getName().replaceAll("-", "_") + + ";' FROM dba_tables@LINKTO_J" + + oc.getName().replaceAll("-", "_") + " t WHERE t.TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX')" - + " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO" - + oc.getName() + " where 'data_type'='CLOB')"; - ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - try { - while (rsSet.next()) { - try { - rsSet.getRow(); - String resultSql = rsSet.getString(1).replace(";", ""); - OracleConnector.execUpdateOracleSQL(conn, resultSql, Configs.EXTRACT_LOG_LOCALTION + oc.getName()); - } catch (SQLException e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); - } + + " and t.owner||t.table_name not in (select owner||table_name from dba_tables@LINKTO_J" + + oc.getName().replaceAll("-", "_") + + " where 'data_type'='CLOB')"; + /* + * ResultSet rsSet = OracleConnector.getSQLExecResultSet(conn, sql, + * Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + */ + Statement statement = null; + try { + statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_UPDATABLE); + ResultSet resultSet = statement.executeQuery(sql); + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + ".log", sql + + "\r\n" + "OK \r\n"); + + while (resultSet.next()) { + try { + String resultSql = resultSet.getString(1).replace(";", ""); + OracleConnector.execUpdateOracleSQL(conn, resultSql, + Configs.EXTRACT_LOG_LOCALTION + oc.getName()); + } catch (SQLException e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); } - } catch (SQLException e) { - log.error(Custom4exception.threadVolume_Oracle_Except, e); } + } catch (SQLException e) { + log.error(Custom4exception.threadVolume_Oracle_Except, e); + } } - - /** 测试是否能否连接上? + + /** + * 测试是否能否连接上? + * * @return true:能连接上 */ public boolean testConnect(Connection conn) { @@ -212,32 +275,39 @@ public class OracleExtractHelper { boolean flag = false; try { Statement statement = conn.createStatement(); - if(statement.executeUpdate(testSql) > 0) - flag = true; + if (statement.executeUpdate(testSql) > 0) + flag = true; } catch (SQLException e) { flag = false; log.error(Custom4exception.threadVolume_Oracle_Except, e); } return flag; } - + /** * 执行抽取操作--支付表 * - * @param conn 汇总库连接 - * @param oc 采集库 + * @param conn + * 汇总库连接 + * @param oc + * 采集库 */ - public void extractStandardPayTable(Connection conn, OracleConnectorParams oc, GatherOracleInfo totalOracle) { + public void extractStandardPayTable(Connection conn, + OracleConnectorParams oc, GatherOracleInfo totalOracle) { String strTUser = Configs.GATHER_STANDARD_USER_NAME; createPay(conn, oc); - String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from " - + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME +"@LINKTOST" - + oc.getName(); + String sql = "insert into " + strTUser + "." + + Configs.GATHER_STANDARD_PAY_TABLE_NAME + " select * from " + + strTUser + "." + Configs.GATHER_STANDARD_PAY_TABLE_NAME + + "@LINKTOST" + oc.getName(); String resultSql = sql.replace(";", ""); - OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + OracleConnector.execOracleSQL(conn, resultSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); } - - /** 执行抽取操作--执行表 + + /** + * 执行抽取操作--执行表 + * * @param conn * @param collectOracle * @param oracleModel @@ -246,38 +316,43 @@ public class OracleExtractHelper { OracleConnectorParams collectOracle, GatherOracleInfo oracleModel) { createExec(conn, collectOracle); String strTUser = Configs.GATHER_STANDARD_USER_NAME; - - String sql = "insert into " + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from " - + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME +"@LINKTOST" - + collectOracle.getName(); + + String sql = "insert into " + strTUser + "." + + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + " select * from " + + strTUser + "." + Configs.GATHER_STANDARD_EXEC_TABLE_NAME + + "@LINKTOST" + collectOracle.getName(); String resultSql = sql.replace(";", ""); - OracleConnector.execOracleSQL(conn, resultSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + collectOracle.getName()); - + OracleConnector.execOracleSQL( + conn, + resultSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName()); + } - - private void createPay(Connection conn, OracleConnectorParams oc) { String payCmd = "CREATE TABLE u_bzbjy.zfxxb(XZQHDM Varchar(255),XZQHMC Varchar(255),PZBH Varchar(255),LYZBKZH Varchar(255)," + "ZFDATE Varchar(255),YSDWCODE Varchar(255),YSDWNAME Varchar(255),YWGKCS Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255)," - +"XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," - +"JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255)," + + "XMLBCODE Varchar(255),XMLBNAME Varchar(255),ZB_NO Varchar(255),GNFLCODE Varchar(255),GNFLNAME Varchar(255),JJFLCODE Varchar(255)," + + "JJFLNAME Varchar(255),ZJXZCODE Varchar(255),ZJXZNAME Varchar(255),JSBFFSNAME Varchar(255),SKR Varchar(255),SKRYH Varchar(255)," + "SKRZHZH Varchar(255),FKZHCODE Varchar(255),FKZHNAME Varchar(255),FKYHCODE Varchar(255),FKYHNAME Varchar(255),QSZHCODE Varchar(255)," + "QSZHNAME Varchar(255),QSYHCODE Varchar(255),QSYHNAME Varchar(255),JE Numeric(18,2), SFTK Varchar(255),NIAN Varchar(255),ZY Varchar(255))"; - + try { - OracleConnector.execOracleSQL(conn, payCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + OracleConnector.execOracleSQL(conn, payCmd, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); } catch (Exception e) { log.error(Custom4exception.threadVolume_Oracle_Except, e); } } - + /** * 创建dblink * - * @param conn 汇总库的连接 - * @param oc 采集库 - * dblink连接参数实体 + * @param conn + * 汇总库的连接 + * @param oc + * 采集库 dblink连接参数实体 */ public void createStandardDBLink(Connection conn, OracleConnectorParams oc) { String linkName = "LINKTOST" + oc.getName(); @@ -292,31 +367,50 @@ public class OracleExtractHelper { + ")))(CONNECT_DATA =(SERVICE_NAME =" + oc.getDatabaseName() + ")))\'"; if (null != oc) { - if (hasSameNameDBLink(conn, linkName, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()+".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink - String deleteSql = "DROP PUBLIC DATABASE LINK " - + linkName; - //删除 dblink - if (OracleConnector.execOracleSQL(conn, deleteSql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { -// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + if (hasSameNameDBLink(conn, linkName, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName() + + ".log")) { // 如果dblink已经存在,先删除dblink,在创建dblink + String deleteSql = "DROP PUBLIC DATABASE LINK " + linkName; + // 删除 dblink + if (OracleConnector.execOracleSQL(conn, deleteSql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { + // OracleConnector.execOracleSQL(conn, sql, + // Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + if (!OracleConnector.execOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName())) { + OracleConnector.execUpdateOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName()); } } else { Configs.CONSOLE_LOGGER.error("删除已有的DBLink失败,无法创建新的DBLink!"); - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + oc.getName()+".log", "删除已有的DBLink失败,无法创建新的DBLink!"+"\r\n"); + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName() + ".log", + "删除已有的DBLink失败,无法创建新的DBLink!" + "\r\n"); } } else { // 否则,创建dblink -// OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); - if(!OracleConnector.execOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())){ - OracleConnector.execUpdateOracleSQL(conn, sql, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + // OracleConnector.execOracleSQL(conn, sql, + // Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + if (!OracleConnector.execOracleSQL(conn, sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName())) { + OracleConnector.execUpdateOracleSQL( + conn, + sql, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + oc.getName()); } } } } - + private void createExec(Connection conn, OracleConnectorParams oc) { String execCmd = "CREATE TABLE u_bzbjy.kzxzb(XZQHDM Varchar(255),XZQHMC Varchar(255),YSND Varchar(255),ZBCODE Varchar(255),ZB_ID Varchar(255),ZB_NO Varchar(255)," + "ZBDJLXCODE Varchar(255),ZBDJLXNAME Varchar(255),ZBLXNAME Varchar(255),DOCNO Varchar(255),ZBSM Varchar(255),ZBFWDATE Varchar(255),ZBYSLXCODE Varchar(255)," @@ -324,9 +418,10 @@ public class OracleExtractHelper { + "JJFLNAME Varchar(255),ZBGLCSNAME Varchar(255),SZGLCODE Varchar(255),SZGLNAME Varchar(255),XMCODE Varchar(255),XMNAME Varchar(255),GZBZ Varchar(255)," + "JJBZ Varchar(255),CGBZ Varchar(255),ZFFSCODE Varchar(255),ZFFSNAME Varchar(255),JZZFBZ Varchar(255),ZBJE Numeric(18,2),ZBTJJE Numeric(18,2),ZBDJJE Numeric(18,2)," + "ZBKYJE Numeric(18,2),ZYZFBZ Varchar(255),BZ Varchar(255))"; - + try { - OracleConnector.execOracleSQL(conn, execCmd, Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); + OracleConnector.execOracleSQL(conn, execCmd, + Configs.EXTRACT_STANDARD_LOG_LOCALTION + oc.getName()); } catch (Exception e) { log.error(Custom4exception.threadVolume_Oracle_Except, e); } diff --git a/src/com/platform/service/OracleExtractTask.java b/src/com/platform/service/OracleExtractTask.java new file mode 100644 index 00000000..b48d4627 --- /dev/null +++ b/src/com/platform/service/OracleExtractTask.java @@ -0,0 +1,107 @@ +package com.platform.service; + +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import org.apache.log4j.Logger; + +import com.base.Custom4exception; +import com.platform.entities.AbstractOracleExtractTask; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.oracle.OracleConnector; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.DateForm; +import com.platform.utils.FileOperateHelper; + +public class OracleExtractTask extends AbstractOracleExtractTask { + + public static Logger log = Logger.getLogger(OracleExtractTask.class); + + private GatherOracleInfo gatherOracleInfo; + + private OracleConnectorParams oc; + + private IOracleExtractService OracleExtract; + + public OracleExtractTask(String name, GatherOracleInfo gatherOracleInfo, + OracleConnectorParams oc, IOracleExtractService OracleExtract) { + super(name); + this.gatherOracleInfo = gatherOracleInfo; + this.oc = oc; + this.OracleExtract = OracleExtract; + } + + public OracleExtractTask(GatherOracleInfo gatherOracleInfo, + OracleConnectorParams oc, IOracleExtractService OracleExtract) { + this.gatherOracleInfo = gatherOracleInfo; + this.oc = oc; + this.OracleExtract = OracleExtract; + } + + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + + @Override + public void handler() { + // TODO Auto-generated method stub + try { + Connection conn = OracleConnector.connectionBuilder( + createConnectUrl(), gatherOracleInfo.getUser(), + gatherOracleInfo.getPassword(), oc); + if (null != conn) { + String cmd = "kubectl label --overwrite rc " + oc.getName() + + " isExtract=1"; + OracleExtract.updateDataExtractStatus(oc, 1); // 更新数据库的状态 + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + + ".log", + "[" + + DateForm + .date2StringBysecond(new Date()) + + "]>>>>>>>>>>>>>>>>开始汇总 >>>>>>>>>>>>>>>>>>\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + oracleExtract.createDBLink(conn, oc); // 创建dblink + oracleExtract.createTableSpace(conn, oc, gatherOracleInfo); // 创建表空间 + oracleExtract.createUser(conn, oc, gatherOracleInfo);// 创建用户并授权 + oracleExtract.extractColleDB(conn, oc, gatherOracleInfo);// 执行抽取 + cmd = "kubectl label --overwrite rc " + oc.getName() + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + oc.getName() + + ".log", + "[" + + DateForm + .date2StringBysecond(new Date()) + + "]>>>>>>>>>>>>>>>>汇总结束 >>>>>>>>>>>>>>>>>>\r\n\r\n"); + OracleExtract.updateDataExtractStatus(oc, 2); // 更新数据库的状态 + } + } catch (Exception e) { + log.error(Custom4exception.OracleSQL_Except, e); + } + } + + private String createConnectUrl() { + String answer = ""; + if (null != gatherOracleInfo) { + answer = "jdbc:oracle:thin:@" + gatherOracleInfo.getIp() + ":" + + gatherOracleInfo.getPort() + ":" + + gatherOracleInfo.getDatabaseName(); + } + return answer; + } + +} diff --git a/src/com/platform/service/impl/LogReadServiceImpl.java b/src/com/platform/service/impl/LogReadServiceImpl.java index aa56b05f..20376cbb 100644 --- a/src/com/platform/service/impl/LogReadServiceImpl.java +++ b/src/com/platform/service/impl/LogReadServiceImpl.java @@ -12,7 +12,7 @@ public class LogReadServiceImpl implements ILogRead { @Override public String readLog(String filename) throws Exception { - String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + "J" + filename.replace("-", "_")+".log"); + String result = FileOperateHelper.fileReader(Configs.EXTRACT_LOG_LOCALTION + filename+".log"); return result; } diff --git a/src/com/platform/service/impl/OracleExtractServiceImpl.java b/src/com/platform/service/impl/OracleExtractServiceImpl.java index 525be17f..5b751cce 100644 --- a/src/com/platform/service/impl/OracleExtractServiceImpl.java +++ b/src/com/platform/service/impl/OracleExtractServiceImpl.java @@ -27,15 +27,16 @@ import com.platform.utils.FileOperateHelper; @Service(value = "OracleExtract") public class OracleExtractServiceImpl implements IOracleExtractService { - + /** * 日志 */ - public final static Logger log = Logger.getLogger(OracleExtractServiceImpl.class); - + public final static Logger log = Logger + .getLogger(OracleExtractServiceImpl.class); + @Resource(name = "dataInfoDao") private DataInfoDao dataInfoDao; - + /** * kubernetes client */ @@ -43,252 +44,338 @@ public class OracleExtractServiceImpl implements IOracleExtractService { /** * 抽取 */ - private OracleExtractHelper oracleExtract = new OracleExtractHelper(); - + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + /** * 数据库连接实现类 */ private OracleConnector connect = new OracleConnector(); - + @Override - public boolean extractOracle(String name, List dataInfolist, - GatherOracleInfo oracleConnect) throws Exception { + public boolean extractOracle(String name, + List datainfos, GatherOracleInfo oracleModel) + throws Exception { boolean isSuccess = false; - try{ - //map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - //采集库连接参数 - List datainfos = dataInfolist; + try { + // map转 bean(汇总库信息-带tableName的) + // GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + // List datainfos = dataInfolist; if (datainfos.size() == 0) { return false; } - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0)); + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + datainfos.get(0)); if (null == conn) { FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n"); + + datainfos.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); return false; } - for (OracleConnectorParams collectOracle : datainfos) { + for (OracleConnectorParams collectOracle : datainfos) { String replicasName = collectOracle.getName(); - try{ - if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){ - DataInfoEntity data = new DataInfoEntity(); - data.setId(Integer.valueOf(collectOracle.getDataId())); - data.setExtractStatus(1); - dataInfoDao.updateExtract(data); - collectOracle.setName("J" + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=1"; - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 开始汇总 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createDBLink(conn, collectOracle); //创建dblink - oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间 - oracleExtract.createUser(conn, collectOracle, oracleModel);//创建用户并授权 - oracleExtract.extractColleDB(conn, collectOracle, oracleModel);//执行抽取 - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setExtractStatus(2); - dataInfoDao.updateExtract(data); - } - }catch(Exception e){ + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(collectOracle.getDataId())); + data.setExtractStatus(1); + dataInfoDao.updateExtract(data); + collectOracle.setName("J" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl label --overwrite rc " + + replicasName + " isExtract=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始汇总 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createUser(conn, collectOracle, + oracleModel);// 创建用户并授权 + oracleExtract.extractColleDB(conn, collectOracle, + oracleModel);// 执行抽取 + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setExtractStatus(2); + dataInfoDao.updateExtract(data); + } + } catch (Exception e) { log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 汇总结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - String cmd = "kubectl label --overwrite rc " - + replicasName + " isExtract=2"; + } finally { + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 汇总结束 \r\n" + + DateForm.date2StringBysecond(new Date()) + + "\r\n"); + String cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=2"; Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); } } isSuccess = true; - }catch(Exception e){ + } catch (Exception e) { new CustomException(Custom4exception.OracleSQL_Except, e); } return isSuccess; } @Override - public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) throws Exception { + public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) + throws Exception { boolean isConnect = false; - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(), null); + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), null); if (null == conn) { isConnect = false; - throw new CustomException(Custom4exception.connect_Oracle_Except, null, oracleModel); -// FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION -// + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + "]\r\n"); - }else { + throw new CustomException(Custom4exception.connect_Oracle_Except, + null, oracleModel); + // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + + // "]\r\n"); + } else { isConnect = oracleExtract.testConnect(conn); } return isConnect; } - + @Override - public boolean extractStandardTable(String name, List dataInfolist, + public boolean extractStandardTable(String name, + List dataInfolist, GatherOracleInfo oracleConnect) throws Exception { boolean isSuccess = false; - try{ - //map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - //采集库连接参数 + try { + // map转 bean(汇总库信息-带tableName的) + GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 List datainfos = dataInfolist; if (datainfos.size() == 0) { return false; } - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), oracleModel.getPassword(),dataInfolist.get(0)); + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + dataInfolist.get(0)); if (null == conn) { FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName()+".log", "创建oracle连接失败: [" + conn + "]\r\n"); + + dataInfolist.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); return false; } - for (OracleConnectorParams collectOracle : datainfos) { + for (OracleConnectorParams collectOracle : datainfos) { String replicasName = collectOracle.getName(); - try{ - if(null != collectOracle.getDataId() && !"".equals(collectOracle.getDataId())){ - DataInfoEntity data = new DataInfoEntity(); - data.setId(Integer.valueOf(collectOracle.getDataId())); - //设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setStandardExtractStatus("1"); - dataInfoDao.update(data); - collectOracle.setName("CQ" + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl annotate --overwrite rc " - + replicasName + " standardExtractStatus=1"; - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 开始抽取标准表 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "1"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createStandardDBLink(conn, collectOracle); //创建dblink - oracleExtract.createTableSpace(conn, collectOracle, oracleModel); //创建表空间 - oracleExtract.createOnlyUser(conn, collectOracle, oracleModel);//创建 抽取标准表的 用户并授权 - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - if (null != tmpdata) { - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getPayResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getPayResultLast()) - || Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getPayResultLast())) { - //抽取中 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try{ - oracleExtract.extractStandardPayTable(conn, collectOracle, oracleModel);//执行抽取 - }catch(Exception e){ - //改回 校验存在的状态 - data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(collectOracle.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setStandardExtractStatus("1"); + dataInfoDao.update(data); + collectOracle.setName("CQ" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl annotate --overwrite rc " + + replicasName + " standardExtractStatus=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始抽取标准表 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createOnlyUser(conn, collectOracle, + oracleModel);// 创建 抽取标准表的 用户并授权 + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + if (null != tmpdata) { + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getPayResultLast()) + || Constant.CHECKOUT_STATUS_FIVE + .equals(tmpdata.getPayResultLast()) + || Constant.CHECKOUT_STATUS_SIX + .equals(tmpdata.getPayResultLast())) { + // 抽取中 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); dataInfoDao.update(data); - isExtrac = false; + boolean isExtrac = true; + try { + oracleExtract.extractStandardPayTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + // 抽取成功 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } } - if (isExtrac) { - //抽取成功 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getExecResultLast()) + || Constant.CHECKOUT_STATUS_FIVE + .equals(tmpdata.getExecResultLast()) + || Constant.CHECKOUT_STATUS_SIX + .equals(tmpdata.getExecResultLast())) { + // 抽取中 + data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardExecTable( + conn, collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } } - } - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata.getExecResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata.getExecResultLast()) - || Constant.CHECKOUT_STATUS_SIX.equals(tmpdata.getExecResultLast())) { - //抽取中 - data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl annotate --overwrite rc " + + replicasName + " standardExtractStatus=2"; + rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); + data.setStandardExtractStatus("2"); dataInfoDao.update(data); - boolean isExtrac = true; - try{ - oracleExtract.extractStandardExecTable(conn, collectOracle, oracleModel);//执行抽取 - }catch(Exception e){ - //改回 校验存在的状态 - data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } } - // client.updateOrAddReplicasLabelById(collectOracle.getName(), "isExtract", "2"); //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl annotate --overwrite rc " - + replicasName + " standardExtractStatus=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); - data.setStandardExtractStatus("2"); - dataInfoDao.update(data); } - } - }catch(Exception e){ + } catch (Exception e) { log.error(Custom4exception.OracleSQL_Except, e); - } - finally{ - //sql日志记录时间: - FileOperateHelper.fileWrite(Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName()+".log", "\r\n 抽取标准表结束 \r\n"+ DateForm.date2StringBysecond(new Date()) +"\r\n"); + } finally { + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 抽取标准表结束 \r\n" + + DateForm.date2StringBysecond(new Date()) + + "\r\n"); String cmd = "kubectl annotate --overwrite rc " + replicasName + " standardExtractStatus=2"; Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); } } isSuccess = true; - }catch(Exception e){ + } catch (Exception e) { log.error(Custom4exception.OracleSQL_Except, e); } return isSuccess; } -// @Override -// public boolean extractOracle(String name, List dataInfos, GatherOracleInfo oracleConnect) throws Exception { -// boolean isSuccess = false; -// try{ -// //map转 bean(汇总库信息-带tableName的) -//// GatherOracleInfo oracleModel = (GatherOracleInfo) Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); -// -// //采集库连接参数 -//// List datainfos = new ArrayList(); -//// for (Map map : dataInfoMap) { -//// OracleConnectorParams dataInfoEntity = (OracleConnectorParams) Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); -//// datainfos.add(dataInfoEntity); -//// } -// -// Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" -// + oracleConnect.getDatabaseName(), oracleConnect.getUser(), oracleConnect.getPassword()); -// -// for (OracleConnectorParams collectOracle : dataInfos) { -// -// oracleExtract.createDBLink(conn, collectOracle); -// oracleExtract.createTableSpace(conn, oracleConnect); -// oracleExtract.createUser(conn, oracleConnect); -// oracleExtract.extractColleDB(conn, collectOracle); -// } -// isSuccess = true; -// }catch(Exception e){ -// -// } -// return false; -// } + @Override + public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(ocp.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setExtractStatus(status); + try { + dataInfoDao.update(data); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + // @Override + // public boolean extractOracle(String name, List + // dataInfos, GatherOracleInfo oracleConnect) throws Exception { + // boolean isSuccess = false; + // try{ + // //map转 bean(汇总库信息-带tableName的) + // // GatherOracleInfo oracleModel = (GatherOracleInfo) + // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); + // + // //采集库连接参数 + // // List datainfos = new + // ArrayList(); + // // for (Map map : dataInfoMap) { + // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) + // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); + // // datainfos.add(dataInfoEntity); + // // } + // + // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" + // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), + // oracleConnect.getPassword()); + // + // for (OracleConnectorParams collectOracle : dataInfos) { + // + // oracleExtract.createDBLink(conn, collectOracle); + // oracleExtract.createTableSpace(conn, oracleConnect); + // oracleExtract.createUser(conn, oracleConnect); + // oracleExtract.extractColleDB(conn, collectOracle); + // } + // isSuccess = true; + // }catch(Exception e){ + // + // } + // return false; + // } }