diff --git a/WebContent/WEB-INF/config/config.properties b/WebContent/WEB-INF/config/config.properties index a6b21eb0..6540c373 100644 --- a/WebContent/WEB-INF/config/config.properties +++ b/WebContent/WEB-INF/config/config.properties @@ -1,119 +1,120 @@ -#============================================================================================================= -# MySQL -#============================================================================================================= -# 驱动程序 -jdbc.mysql.driver=com.mysql.jdbc.Driver -# 连接url - -jdbc.mysql.url=jdbc:mysql://192.168.0.110:3306/ftpdata?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&autoReconnect=true&failOverReadOnly=false - -# 用户名 -jdbc.mysql.username=root -# 密码 -jdbc.mysql.password=root - -#============================================================================================================= -# Oracle 连接配置 -#============================================================================================================= -# hui用户名 -gather-user-name=system -gather-standard-user-name=u_bzbjy -gather-standard-exec-table-name=kzxzb -gather-standard-pay-table-name=zfxxb -# 密码 -gather-user-password=oracle - -# -gather-port=1521 -# -gather-service-name=orcl -#============================================================================================================= -# 数据库公共配置 -#============================================================================================================= -jdbc.initialSize=5 -jdbc.minIdle=5 -jdbc.maxIdle=20 -jdbc.maxActive=100 -jdbc.maxWait=100000 -jdbc.defaultAutoCommit=false -jdbc.removeAbandoned=true -jdbc.removeAbandonedTimeout=600 -jdbc.testWhileIdle=true -jdbc.timeBetweenEvictionRunsMillis=60000 -jdbc.numTestsPerEvictionRun=20 -jdbc.minEvictableIdleTimeMillis=300000 -#============================================================================================================== -# -#============================================================================================================== -table-suffix=_20152016 - -gather-tablespace-name=TS_TTSSS - -#dbf file's path must exist -gather-tablespace-path=/opt/oracle/app/oradata/orcl/ -# user's password must exist -gather-table-user-password=user - -#kuber's url must exist -kubeMasterUrl=http://192.168.0.110:8080/ -kubeMasterAddress=192.168.0.110:8080 - -collect-user-name=system -collect-password=oracle -collect-service-name=orcl -# standard table : -# eg:sql -> select count(*) from u_bzbjy.kzxzb; select count(*) from u_bzbjy.zfxxb; -collect-user-table=U_BZBJY -collect-exec-table=KZXZB -collect-pay-table=ZFXXB -#gfs web's IP -gfs_control_ip=192.168.0.110 -#gfs web password of root -gfs_control_rootPassWd=root -#url of gfs web -HttpClientConstant_URL_IP_PORT=http://192.168.0.110:9001/ -#data between 6 month ago and now -dataBefore=6 -#move a data once -moveFileMaxNum=1 -#import 5 rows of Excel once -numOneImportExcel=5 -oracle-orcl=orcl -oracle-user=system -oracle-psw=oracle - -#============================================================================================================= -# windows path -#============================================================================================================= - -extract-log-localtion=D:\\test\\log\\ -extract-standard-log-localtion=D:\\test\\log2\\ - -file_upload_path=D:\\test\\ -file_download_path=D:\\test\\export.xlsx - -package_download_path=D:\\test\\ -package_name=sql_script_standard - -sql_script_path_last=D:\\test\\sql_script_last\\ -sql_script_path_standard=D:\\test\\sql_script_standard\\ - -#============================================================================================================= -# linux path -#============================================================================================================= -## extract's log file location -#extract-log-localtion=/home/web_manage/log/ -## standard extract's log file location -#extract-standard-log-localtion=/home/web_manage/log2/ -##import excel file's location -#file_upload_path=/excel_import_dir/ -##export excel file's location -#file_download_path=/excel_export_dir/export.xlsx -##file package download's location -#package_download_path=/ -##file package's name -#package_name=DefaultDescription -##location of sql script lastest -#sql_script_path_last=/DefaultDescription_last/ -##location of sql script standard -#sql_script_path_standard=/DefaultDescription/ +#============================================================================================================= +# MySQL +#============================================================================================================= +# 驱动程序 +jdbc.mysql.driver=com.mysql.jdbc.Driver +# 连接url + +jdbc.mysql.url=jdbc:mysql://192.168.0.110:3306/ftpdata?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&autoReconnect=true&failOverReadOnly=false + +# 用户名 +jdbc.mysql.username=root +# 密码 +jdbc.mysql.password=root + +#============================================================================================================= +# Oracle 连接配置 +#============================================================================================================= +# hui用户名 +gather-user-name=system +gather-standard-user-name=u_bzbjy +gather-standard-exec-table-name=kzxzb +gather-standard-pay-table-name=zfxxb +# 密码 +gather-user-password=oracle + +# +gather-port=1521 +# +gather-service-name=orcl +#============================================================================================================= +# 数据库公共配置 +#============================================================================================================= +jdbc.initialSize=5 +jdbc.minIdle=5 +jdbc.maxIdle=20 +jdbc.maxActive=100 +jdbc.maxWait=100000 +jdbc.defaultAutoCommit=false +jdbc.removeAbandoned=true +jdbc.removeAbandonedTimeout=600 +jdbc.testWhileIdle=true +jdbc.timeBetweenEvictionRunsMillis=60000 +jdbc.numTestsPerEvictionRun=20 +jdbc.minEvictableIdleTimeMillis=300000 +#============================================================================================================== +# +#============================================================================================================== +table-suffix=_20152016 + +gather-tablespace-name=TS_TTSSS + +#dbf file's path must exist +gather-tablespace-path=/opt/oracle/app/oradata/orcl/ +# user's password must exist +gather-table-user-password=user + +#kuber's url must exist +kubeMasterUrl=http://192.168.0.110:8080/ + +kubeMasterAddress=192.168.0.110:8080 + +collect-user-name=system +collect-password=oracle +collect-service-name=orcl +# standard table : +# eg:sql -> select count(*) from u_bzbjy.kzxzb; select count(*) from u_bzbjy.zfxxb; +collect-user-table=U_BZBJY +collect-exec-table=KZXZB +collect-pay-table=ZFXXB +#gfs web's IP +gfs_control_ip=192.168.0.110 +#gfs web password of root +gfs_control_rootPassWd=root +#url of gfs web +HttpClientConstant_URL_IP_PORT=http://192.168.0.110:9001/ +#data between 6 month ago and now +dataBefore=6 +#move a data once +moveFileMaxNum=1 +#import 5 rows of Excel once +numOneImportExcel=5 +oracle-orcl=orcl +oracle-user=system +oracle-psw=oracle + +#============================================================================================================= +# windows path +#============================================================================================================= + +extract-log-localtion=D:\\test\\log\\ +extract-standard-log-localtion=D:\\test\\log2\\ + +file_upload_path=D:\\test\\ +file_download_path=D:\\test\\export.xlsx + +package_download_path=D:\\test\\ +package_name=sql_script_standard + +sql_script_path_last=D:\\test\\sql_script_last\\ +sql_script_path_standard=D:\\test\\sql_script_standard\\ + +#============================================================================================================= +# linux path +#============================================================================================================= +## extract's log file location +#extract-log-localtion=/home/web_manage/log/ +## standard extract's log file location +#extract-standard-log-localtion=/home/web_manage/log2/ +##import excel file's location +#file_upload_path=/excel_import_dir/ +##export excel file's location +#file_download_path=/excel_export_dir/export.xlsx +##file package download's location +#package_download_path=/ +##file package's name +#package_name=DefaultDescription +##location of sql script lastest +#sql_script_path_last=/DefaultDescription_last/ +##location of sql script standard +#sql_script_path_standard=/DefaultDescription/ diff --git a/src/com/platform/controller/CheckoutController.java b/src/com/platform/controller/CheckoutController.java index 00ead0f8..5986d893 100644 --- a/src/com/platform/controller/CheckoutController.java +++ b/src/com/platform/controller/CheckoutController.java @@ -58,7 +58,7 @@ public class CheckoutController extends BaseController { } /** - * 信息系统--校验--查看所有系统的 标准表情况 + * 信息系统--校验--查看系统的 标准表情况 * @return * @throws Exception */ @@ -95,7 +95,7 @@ public class CheckoutController extends BaseController { } /** - * 信息系统--校验--查看单条数据的修改详情 + * 信息系统--校验--查看单条数据的详情 * @return * @throws Exception */ @@ -116,7 +116,7 @@ public class CheckoutController extends BaseController { } /** - * 信息系统--校验--查看后 -> 单条数据的修改详情 + * 信息系统--校验--(查看后 )单条数据的修改详情 * @return * @throws Exception */ diff --git a/src/com/platform/controller/DataModelController.java b/src/com/platform/controller/DataModelController.java index 6e54c5a7..229401e3 100644 --- a/src/com/platform/controller/DataModelController.java +++ b/src/com/platform/controller/DataModelController.java @@ -1,473 +1,501 @@ -package com.platform.controller; - -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Resource; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.lang.StringUtils; -import org.apache.log4j.Logger; -import org.springframework.stereotype.Controller; -import org.springframework.ui.ModelMap; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; - -import com.base.BaseController; -import com.platform.entities.BasedTask; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.DataInfoEntityMoveTmp; -import com.platform.entities.GatherOracleInfo; -import com.platform.entities.OracleConnectorParams; -import com.platform.entities.OracleExtractExecuter; -import com.platform.form.PagerOptions; -import com.platform.form.oracleForm; -import com.platform.form.volumeMoveForm; -import com.platform.service.DataInfoService; -import com.platform.service.ICodeService; -import com.platform.service.IGatherOracleService; -import com.platform.service.ILogRead; -import com.platform.service.IMoveDataService; -import com.platform.service.IOracleExtractService; -import com.platform.service.IVolumeService; -import com.platform.service.OracleExtractTask; -import com.platform.service.OracleStatusService; -import com.platform.utils.CacheSetCantDelete; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.ThreadVolumeImm; -import com.platform.utils.UtilsHelper; - -/** - * 数据管理 - * - * @author chen - * - */ -@Controller -public class DataModelController extends BaseController { - - public static Logger log = Logger.getLogger(DataModelController.class); - - /** - * 数据管理--业务类 - */ - @Resource(name = "dataInfoService") - private DataInfoService dfs; - - /** - * gfs业务类 - */ - @Resource(name = "volumeService") - private IVolumeService volumeService; - - /** - * mysql相关业务 - */ - @Resource(name = "mySqlService") - private IGatherOracleService mySqlService; - - /** - * oracle汇总功能业务 - */ - @Resource(name = "OracleExtract") - private IOracleExtractService OracleExtract; - - /** - * 迁移数据业务类 - */ - @Resource(name = "moveDataService") - private IMoveDataService moveDataService; - - /** - * 地区、系统code业务类 - */ - @Resource(name = "codeService") - private ICodeService codeService; - - /** - * 日志业务管理类 - */ - @Resource(name = "logReadService") - private ILogRead logReadService; - - public void setDfsImp(DataInfoService dfs) { - this.dfs = dfs; - } - - /** - * 数据管理--分页查询数据 - * - * @param res - * @param req - * @return - * @throws Exception - */ - @RequestMapping("/data.json") - @ResponseBody - public ModelMap getAllDataToJson(HttpServletRequest res, - HttpServletResponse req) throws Exception { - log.info("-----------/data.json-----------"); - res.setCharacterEncoding("UTF-8"); - Map paramMap = res.getParameterMap(); - Set keySet = paramMap.keySet(); - Map params = new HashMap(); - StringBuffer sb = new StringBuffer().append("当前的请求参数:{"); - for (String str : keySet) { - String value = paramMap.get(str)[0]; - if (StringUtils.isNotEmpty(value)) { - params.put(str, value); - sb.append(str).append(":").append(value).append(","); - } else { - sb.append(str).append(":").append("null").append(","); - } - } - log.info(sb.deleteCharAt(sb.length() - 1).append("}").toString()); - PagerOptions pagerOptions = (PagerOptions) UtilsHelper - .newObjAndSetAttrsByClass(PagerOptions.class, params); - pagerOptions.setCurrentPageNum(Integer.valueOf(params - .get("currentPageNum"))); - // 冷热区查询字段mark - pagerOptions.setMark(pagerOptions.getVolumeType().trim()); - return dfs.getPagerTableData(pagerOptions); - } - - /** - * 数据管理--删除数据 - * - * @param res - * @param req - * @throws Exception - */ - @RequestMapping(value = "/delete/data") - @ResponseBody - public ModelMap deleteData(HttpServletRequest res, HttpServletResponse req) - throws Exception { - log.info("-----------/delete/data-----------"); - ModelMap mode = new ModelMap(); - res.setCharacterEncoding("UTF-8"); - Map paramMap = res.getParameterMap(); - String[] data = paramMap.get("data"); - if (null != data && data.length > 0) { - List list = new ArrayList(); - List errList = new ArrayList(); - // 判断是否有 不能删除的。 - for (String dataId : data) { - if (CacheSetCantDelete.containsId(dataId)) - errList.add(dataId); - else - list.add(dataId); - } - if (list.size() > 0) { - String[] ids = list.toArray(new String[list.size()]); - dfs.deleteData(ids); - req.setStatus(200); - } - if (errList.size() > 0) { - mode.put("err", errList); - req.setStatus(500); - } - - } - return mode; - } - - /** - * 连接oracle - * - * @param res - * @param req - * @throws UnsupportedEncodingException - */ - @RequestMapping("/connectOracle") - public void connectOracle(HttpServletRequest res, HttpServletResponse req) - throws UnsupportedEncodingException { - res.setCharacterEncoding("UTF-8"); - Map paramMap = res.getParameterMap(); - String[] oraclesName = paramMap.get("oracleName"); - if (oraclesName != null) - for (String rcName : oraclesName) { - log.info("执行连接\t" + rcName); - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + rcName + " status=0"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - log.info(sb.toString()); - new OracleStatusService().connectToOracle(rcName); - } - req.setStatus(200); - } - - /** - * 断开oracle连接 - * - * @param res - * @param req - * @throws UnsupportedEncodingException - */ - @RequestMapping("/cancelOracleConection") - public void cancelOracleConnection(HttpServletRequest res, - HttpServletResponse req) throws UnsupportedEncodingException { - res.setCharacterEncoding("UTF-8"); - Map paramMap = res.getParameterMap(); - String[] oraclesName = paramMap.get("oracleName"); - String operate = paramMap.get("operation")[0]; - if (null != oraclesName) { - for (String rcName : oraclesName) { - log.info("取消连接:\t" + rcName); - new OracleStatusService().cancelToOracle(rcName, operate); - } - } - req.setStatus(200); - } - - /** - * gfs的 volume节点的查询 - * - * @return - * @throws Exception - */ - @RequestMapping(value = "/volume/list", method = RequestMethod.POST) - @ResponseBody - public String volumeList() throws Exception { - log.info("-----/volume/list------"); - String rest = volumeService.getAllvolume(); - new ThreadVolumeImm("ThreadVolumeImm-in-VolumeController-volumeList") - .start(); - return rest; - } - - /** - * 数据迁移功能 - * - * @param res - * @param req - * @param form - * @throws Exception - */ - @RequestMapping(value = "/volume/{name}/move", method = RequestMethod.POST) - @ResponseBody - public void volumeMove(HttpServletRequest res, HttpServletResponse req, - @RequestBody volumeMoveForm form) throws Exception { - List datas = new ArrayList(); - List selectItems = form.getSelectItems(); - if (null == selectItems) { - req.setStatus(200); - return; - } - datas.addAll(selectItems); - - log.info("------/volume/{name}/move--------"); - if (datas.size() > 0) { - moveDataService.moveData(datas, form.getSelectNode()); - } - req.setStatus(200); - } - - /** - * oracle的 汇总功能 - * - * @param res - * @param req - * @param form - * @throws Exception - */ - @RequestMapping(value = "/oracle/{name}/extract", method = RequestMethod.POST) - public void oracleExtract(HttpServletRequest res, HttpServletResponse req, - @RequestBody oracleForm form) throws Exception { - log.info("/oracle/{name}/extract"); - if (null != form.getTarget()) { // 检查请求参数中是否包含汇总库信息 - boolean isConnect = OracleExtract.isConnectTotalOracle(form - .getTarget()); // 检查汇总库是否可以连接成功,连接成功返回200状态吗,连接失败返回500状态吗 - if (isConnect) { - req.setStatus(200); - if (null != form.getInneed() && form.getInneed().size() > 0) { - for (OracleConnectorParams oracleParams : form.getInneed()) { - BasedTask task = new OracleExtractTask( - oracleParams.getName(), form.getTarget(), - oracleParams, OracleExtract); - OracleExtractExecuter oee = new OracleExtractExecuter( - task); - new Thread(oee, oracleParams.getName()).start(); - } - } - } else - req.setStatus(500); - } - - } - - /** - * oracle汇总、抽取库的 查询 - * - * @return - * @throws Exception - */ - @RequestMapping(value = "/oracle/list", method = RequestMethod.POST) - @ResponseBody - public List oracleList() throws Exception { - log.info("----------getOracleInfo-----------------------"); - List result = mySqlService.findAllOracle(); - return result; - } - - /** - * oracle汇总、抽取库的 删除 - * - * @param req - * @param res - * @param id - * @throws Exception - */ - @RequestMapping(value = "/oracle/{id}/delete", method = RequestMethod.POST) - public void oracleDelete(HttpServletRequest req, HttpServletResponse res, - @PathVariable String id) throws Exception { - log.info("----------deleteOracleInfo----------------"); - Integer num = Integer.valueOf(id); - Integer result = mySqlService.deleteOracle(num); - log.info("oracleDelete : " + result); - res.setStatus(200); - } - - /** - * oracle汇总、抽取库的 新增 - * - * @param res - * @param req - * @param id - * @param oracle - * @throws Exception - */ - @RequestMapping(value = "/oracle/{id}/insert", method = RequestMethod.POST) - @ResponseBody - public void oracleInsert(HttpServletRequest res, HttpServletResponse req, - @PathVariable String id, @RequestBody GatherOracleInfo oracle) - throws Exception { - log.info("----------insertOracleInfo----------------"); - oracle.setId(Integer.valueOf(id)); - mySqlService.insertOracle(oracle); - req.setStatus(200); - } - - /** - * oracle汇总、抽取库的 更新 - * - * @param res - * @param req - * @param id - * @param oracle - * @throws Exception - */ - @RequestMapping(value = "/oracle/{id}/update", method = RequestMethod.POST) - @ResponseBody - public void oracleUpdate(HttpServletRequest res, HttpServletResponse req, - @PathVariable("id") String id, @RequestBody GatherOracleInfo oracle) - throws Exception { - log.info("----------updateOracleInfo-----------------------"); - log.info(oracle); - mySqlService.updateOracle(oracle); - req.setStatus(200); - } - - /** - * 迁移 数据 的查询 - * - * @return - * @throws Exception - */ - @RequestMapping(value = "/task/transfer/list", method = RequestMethod.POST) - @ResponseBody - public Object taskTransferList() throws Exception { - log.debug("----------/task/transfer/list-----------------------"); - List result = moveDataService.findAll(); - return result; - } - - /** - * 迁移完成后的 删除记录功能 - * - * @param res - * @param req - * @param id - * @return - * @throws Exception - */ - @RequestMapping(value = "/task/transfer/{id}/delete", method = RequestMethod.POST) - @ResponseBody - public Object taskTransferDelete(HttpServletRequest res, - HttpServletResponse req, @PathVariable String id) throws Exception { - log.info("----------/task/transfer/{id}/delete-----------------------"); - DataInfoEntityMoveTmp move = new DataInfoEntityMoveTmp(); - move.setId(Integer.valueOf(id)); - int result = moveDataService.delete(move); - req.setStatus(200); - return result; - } - - /** - * 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) - * - * @param res - * @param req - * @param move - * @return - * @throws Exception - */ - @RequestMapping(value = "/task/transfer/save", method = RequestMethod.POST) - @ResponseBody - public Object taskTransferSave(HttpServletRequest res, - HttpServletResponse req, @RequestBody DataInfoEntity move) - throws Exception { - log.info("---------/task/transfer/save-----------------------"); - // int result = dfs.save(move); - req.setStatus(200); - return 1; - } - - /** - * 地区和系统的 code 对应的名称 的 获取 - * - * @return - * @throws Exception - */ - @RequestMapping(value = "/code/list", method = RequestMethod.POST) - @ResponseBody - public Object findCodeList() throws Exception { - log.info("---------/findSystemCode-----------------------"); - Map result = codeService.findAll(); - return result; - } - - /** - * oracle 汇总的 日志 读取 - * - * @param name - * @param res - * @param req - * @return - * @throws Exception - */ - @RequestMapping(value = "/oracle/extract/log", method = RequestMethod.POST) - @ResponseBody - public Object getExtractLog(@RequestParam("rcName") String name, - HttpServletRequest res, HttpServletResponse req) throws Exception { - log.info("---------/oracle/extract/log-------------------"); - String result = logReadService.readLog(name); - // "查看相应日志" - Map log = new HashMap(); - log.put(name, result + "\r\n"); - return log; - } -} +package com.platform.controller; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Resource; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.lang.StringUtils; +import org.apache.log4j.Logger; +import org.springframework.stereotype.Controller; +import org.springframework.ui.ModelMap; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; + +import com.base.BaseController; +import com.platform.entities.BasedTask; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.DataInfoEntityMoveTmp; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.entities.OracleExtractExecuter; +import com.platform.form.PagerOptions; +import com.platform.form.oracleForm; +import com.platform.form.volumeMoveForm; +import com.platform.service.DataInfoService; +import com.platform.service.ICodeService; +import com.platform.service.IGatherOracleService; +import com.platform.service.ILogRead; +import com.platform.service.IMoveDataService; +import com.platform.service.IOracleExtractService; +import com.platform.service.IVolumeService; +import com.platform.service.OracleExtractTask; +import com.platform.service.OracleStatusService; +import com.platform.utils.CacheSetCantDelete; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.ThreadVolumeImm; +import com.platform.utils.UtilsHelper; + +/** + * 数据管理 + * + * @author chen + * + */ +@Controller +public class DataModelController extends BaseController { + + public static Logger log = Logger.getLogger(DataModelController.class); + + /** + * 数据管理--业务类 + */ + @Resource(name = "dataInfoService") + private DataInfoService dfs; + + /** + * gfs业务类 + */ + @Resource(name = "volumeService") + private IVolumeService volumeService; + + /** + * mysql相关业务 + */ + @Resource(name = "mySqlService") + private IGatherOracleService mySqlService; + + /** + * oracle汇总功能业务 + */ + @Resource(name = "OracleExtract") + private IOracleExtractService OracleExtract; + + /** + * 迁移数据业务类 + */ + @Resource(name = "moveDataService") + private IMoveDataService moveDataService; + + /** + * 地区、系统code业务类 + */ + @Resource(name = "codeService") + private ICodeService codeService; + + /** + * 日志业务管理类 + */ + @Resource(name = "logReadService") + private ILogRead logReadService; + + public void setDfsImp(DataInfoService dfs) { + this.dfs = dfs; + } + + /** + * 数据管理--分页查询数据 + * + * @param res + * @param req + * @return + * @throws Exception + */ + @RequestMapping("/data.json") + @ResponseBody + public ModelMap getAllDataToJson(HttpServletRequest res, + HttpServletResponse req) throws Exception { + log.info("-----------/data.json-----------"); + res.setCharacterEncoding("UTF-8"); + Map paramMap = res.getParameterMap(); + Set keySet = paramMap.keySet(); + Map params = new HashMap(); + StringBuffer sb = new StringBuffer().append("当前的请求参数:{"); + for (String str : keySet) { + String value = paramMap.get(str)[0]; + if (StringUtils.isNotEmpty(value)) { + params.put(str, value); + sb.append(str).append(":").append(value).append(","); + } else { + sb.append(str).append(":").append("null").append(","); + } + } + log.info(sb.deleteCharAt(sb.length() - 1).append("}").toString()); + PagerOptions pagerOptions = (PagerOptions) UtilsHelper + .newObjAndSetAttrsByClass(PagerOptions.class, params); + pagerOptions.setCurrentPageNum(Integer.valueOf(params + .get("currentPageNum"))); + // 冷热区查询字段mark + pagerOptions.setMark(pagerOptions.getVolumeType().trim()); + return dfs.getPagerTableData(pagerOptions); + } + + /** + * 数据管理--删除数据 + * + * @param res + * @param req + * @throws Exception + */ + @RequestMapping(value = "/delete/data") + @ResponseBody + public ModelMap deleteData(HttpServletRequest res, HttpServletResponse req) + throws Exception { + log.info("-----------/delete/data-----------"); + ModelMap mode = new ModelMap(); + res.setCharacterEncoding("UTF-8"); + Map paramMap = res.getParameterMap(); + String[] data = paramMap.get("data"); + if (null != data && data.length > 0) { + List list = new ArrayList(); + List errList = new ArrayList(); + // 判断是否有 不能删除的。 + for (String dataId : data) { + if (CacheSetCantDelete.containsId(dataId)) + errList.add(dataId); + else + list.add(dataId); + } + if (list.size() > 0) { + String[] ids = list.toArray(new String[list.size()]); + dfs.deleteData(ids); + req.setStatus(200); + } + if (errList.size() > 0) { + mode.put("err", errList); + req.setStatus(500); + } + + } + return mode; + } + + /** + * 连接oracle + * + * @param res + * @param req + * @throws UnsupportedEncodingException + */ + @RequestMapping("/connectOracle") + public void connectOracle(HttpServletRequest res, HttpServletResponse req) + throws UnsupportedEncodingException { + res.setCharacterEncoding("UTF-8"); + Map paramMap = res.getParameterMap(); + String[] oraclesName = paramMap.get("oracleName"); + if (oraclesName != null) + for (String rcName : oraclesName) { + log.info("执行连接\t" + rcName); + + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + rcName + " status=0"; + + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + log.info(sb.toString()); + new OracleStatusService().connectToOracle(rcName); + } + req.setStatus(200); + } + + /** + * 断开oracle连接 + * + * @param res + * @param req + * @throws UnsupportedEncodingException + */ + @RequestMapping("/cancelOracleConection") + public void cancelOracleConnection(HttpServletRequest res, + HttpServletResponse req) throws UnsupportedEncodingException { + res.setCharacterEncoding("UTF-8"); + Map paramMap = res.getParameterMap(); + String[] oraclesName = paramMap.get("oracleName"); + String operate = paramMap.get("operation")[0]; + if (null != oraclesName) { + for (String rcName : oraclesName) { + log.info("取消连接:\t" + rcName); + new OracleStatusService().cancelToOracle(rcName, operate); + } + } + req.setStatus(200); + } + + /** + * gfs的 volume节点的查询 + * + * @return + * @throws Exception + */ + @RequestMapping(value = "/volume/list", method = RequestMethod.POST) + @ResponseBody + public String volumeList() throws Exception { + log.info("-----/volume/list------"); + String rest = volumeService.getAllvolume(); + new ThreadVolumeImm("ThreadVolumeImm-in-VolumeController-volumeList") + .start(); + return rest; + } + + /** + * 数据迁移功能 + * + * @param res + * @param req + * @param form + * @throws Exception + */ + @RequestMapping(value = "/volume/{name}/move", method = RequestMethod.POST) + @ResponseBody + public void volumeMove(HttpServletRequest res, HttpServletResponse req, + @RequestBody volumeMoveForm form) throws Exception { + List datas = new ArrayList(); + List selectItems = form.getSelectItems(); + if (null == selectItems) { + req.setStatus(200); + return; + } + datas.addAll(selectItems); + + log.info("------/volume/{name}/move--------"); + if (datas.size() > 0) { + moveDataService.moveData(datas, form.getSelectNode()); + } + req.setStatus(200); + } + + /** + * oracle的 汇总功能 + * + * @param res + * @param req + * @param form + * @throws Exception + */ + @RequestMapping(value = "/oracle/{name}/extract", method = RequestMethod.POST) + public void oracleExtract(HttpServletRequest res, HttpServletResponse req, + @RequestBody oracleForm form) throws Exception { + log.info("/oracle/{name}/extract"); + if (null != form.getTarget()) { // 检查请求参数中是否包含汇总库信息 + boolean isConnect = OracleExtract.isConnectTotalOracle(form + .getTarget()); // 检查汇总库是否可以连接成功,连接成功返回200状态吗,连接失败返回500状态吗 + if (isConnect) { + req.setStatus(200); + if (null != form.getInneed() && form.getInneed().size() > 0) { + for (OracleConnectorParams oracleParams : form.getInneed()) { + BasedTask task = new OracleExtractTask( + oracleParams.getName(), form.getTarget(), + oracleParams, OracleExtract); + OracleExtractExecuter oee = new OracleExtractExecuter( + task); + new Thread(oee, oracleParams.getName()).start(); + } + } + } else + req.setStatus(500); + } + + } + + /** + * oracle汇总、抽取库的 查询 + * + * @return + * @throws Exception + */ + @RequestMapping(value = "/oracle/list", method = RequestMethod.POST) + @ResponseBody + public List oracleList() throws Exception { + log.info("----------getOracleInfo-----------------------"); + List result = mySqlService.findAllOracle(); + return result; + } + + /** + * oracle汇总、抽取库的 删除 + * + * @param req + * @param res + * @param id + * @throws Exception + */ + @RequestMapping(value = "/oracle/{id}/delete", method = RequestMethod.POST) + public void oracleDelete(HttpServletRequest req, HttpServletResponse res, + @PathVariable String id) throws Exception { + log.info("----------deleteOracleInfo----------------"); + Integer num = Integer.valueOf(id); + Integer result = mySqlService.deleteOracle(num); + log.info("oracleDelete : " + result); + res.setStatus(200); + } + + /** + * oracle汇总、抽取库的 新增 + * + * @param res + * @param req + * @param id + * @param oracle + * @throws Exception + */ + @RequestMapping(value = "/oracle/{id}/insert", method = RequestMethod.POST) + @ResponseBody + public void oracleInsert(HttpServletRequest res, HttpServletResponse req, + @PathVariable String id, @RequestBody GatherOracleInfo oracle) + throws Exception { + log.info("----------insertOracleInfo----------------"); + oracle.setId(Integer.valueOf(id)); + mySqlService.insertOracle(oracle); + req.setStatus(200); + } + + /** + * oracle汇总、抽取库的 更新 + * + * @param res + * @param req + * @param id + * @param oracle + * @throws Exception + */ + @RequestMapping(value = "/oracle/{id}/update", method = RequestMethod.POST) + @ResponseBody + public void oracleUpdate(HttpServletRequest res, HttpServletResponse req, + @PathVariable("id") String id, @RequestBody GatherOracleInfo oracle) + throws Exception { + log.info("----------updateOracleInfo-----------------------"); + log.info(oracle); + mySqlService.updateOracle(oracle); + req.setStatus(200); + } + + /** + * 迁移任务 的查询 + * + * @return + * @throws Exception + */ + @RequestMapping(value = "/task/transfer/list", method = RequestMethod.POST) + @ResponseBody + public Object taskTransferList() throws Exception { + log.debug("----------/task/transfer/list-----------------------"); + List result = moveDataService.findAll(); + return result; + } + + /** + * 迁移完成后的 删除记录功能 + * + * @param res + * @param req + * @param id + * @return + * @throws Exception + */ + @RequestMapping(value = "/task/transfer/{id}/delete", method = RequestMethod.POST) + @ResponseBody + public Object taskTransferDelete(HttpServletRequest res, + HttpServletResponse req, @PathVariable String id) throws Exception { + log.info("----------/task/transfer/{id}/delete-----------------------"); + DataInfoEntityMoveTmp move = new DataInfoEntityMoveTmp(); + move.setId(Integer.valueOf(id)); + int result = moveDataService.delete(move); + req.setStatus(200); + return result; + } + + /** + * 迁移完成后的 删除记录功能 + * + * @param res + * @param req + * @param id + * @return + * @throws Exception + */ + @RequestMapping(value = "/task/transfer/deletes", method = RequestMethod.POST) + @ResponseBody + public Object taskTransferDeletes(HttpServletRequest res, + HttpServletResponse req, @RequestBody String[] ids) throws Exception { + log.info("---------/task/transfer/deletes----------------------"); + req.setStatus(200); + int result = 0; + if (ids != null ) { +// String[] ids = idsString.split(","); + result = moveDataService.delete(ids); + } + else { + req.setStatus(500); + } + return result; + } + + /** + * 迁移数据完成后新增一条数据(暂时去掉,新增功能不在此) + * + * @param res + * @param req + * @param move + * @return + * @throws Exception + */ + @RequestMapping(value = "/task/transfer/save", method = RequestMethod.POST) + @ResponseBody + public Object taskTransferSave(HttpServletRequest res, + HttpServletResponse req, @RequestBody DataInfoEntity move) + throws Exception { + log.info("---------/task/transfer/save-----------------------"); + // int result = dfs.save(move); + req.setStatus(200); + return 1; + } + + /** + * 地区和系统的 code 对应的名称 的 获取 + * + * @return + * @throws Exception + */ + @RequestMapping(value = "/code/list", method = RequestMethod.POST) + @ResponseBody + public Object findCodeList() throws Exception { + log.info("---------/findSystemCode-----------------------"); + Map result = codeService.findAll(); + return result; + } + + /** + * oracle 汇总的 日志 读取 + * + * @param name + * @param res + * @param req + * @return + * @throws Exception + */ + @RequestMapping(value = "/oracle/extract/log", method = RequestMethod.POST) + @ResponseBody + public Object getExtractLog(@RequestParam("rcName") String name, + HttpServletRequest res, HttpServletResponse req) throws Exception { + log.info("---------/oracle/extract/log-------------------"); + String result = logReadService.readLog(name); + // "查看相应日志" + Map log = new HashMap(); + log.put(name, result + "\r\n"); + return log; + } +} diff --git a/src/com/platform/form/PagerOptions.java b/src/com/platform/form/PagerOptions.java index 6cf01887..ce4212ec 100644 --- a/src/com/platform/form/PagerOptions.java +++ b/src/com/platform/form/PagerOptions.java @@ -39,13 +39,13 @@ public class PagerOptions extends Page{ private Integer offset; // 查询偏移量:起始id - private String keyQuery; - //模糊查询字段 - private List array; + private String keyQuery;//模糊查询字段 - private String volumeType; - //冷热区字段 - private String mark; + private List array;//模糊查询字段数组 + + private String volumeType;//冷热区字段 + + private String mark;//冷热区字段 public String getDataType() { return dataType; diff --git a/src/com/platform/form/ScriptForm.java b/src/com/platform/form/ScriptForm.java index ad50882a..dfbfe8be 100644 --- a/src/com/platform/form/ScriptForm.java +++ b/src/com/platform/form/ScriptForm.java @@ -5,10 +5,20 @@ import java.util.Properties; import com.platform.entities.SqlFileInfoEntity; +/** + * @author chen + * 脚本 表单接收 + */ public class ScriptForm { + /** + * 脚本内容 + */ private String content; + /** + * 脚本实体内容 + */ private SqlFileInfoEntity item; /** diff --git a/src/com/platform/form/VolumeForm.java b/src/com/platform/form/VolumeForm.java index 17abc412..cb333f01 100644 --- a/src/com/platform/form/VolumeForm.java +++ b/src/com/platform/form/VolumeForm.java @@ -29,9 +29,9 @@ public class VolumeForm { /** 挂载点 */ private String path; - /** * exist,正常返回状态Started,Stopped,Created */ private boolean status; + /** * exist,正常返回状态Started,Stopped,Created */ private String type; /** volume树形目录 */ diff --git a/src/com/platform/form/volumeMoveForm.java b/src/com/platform/form/volumeMoveForm.java index 7e1ea5b5..69114cda 100644 --- a/src/com/platform/form/volumeMoveForm.java +++ b/src/com/platform/form/volumeMoveForm.java @@ -5,16 +5,25 @@ import java.util.List; import com.platform.entities.DataInfoEntity; import com.platform.entities.FolderNode; -/**迁移数据--接收前端传值:volume名,目录,数据对象 +/**迁移数据--接收前端传值 * @author chen * */ public class volumeMoveForm { + /** + * volume名 + */ private String name; + /** + * 目录 + */ private FolderNode selectNode; + /** + * 数据对象 + */ private List selectItems; /** diff --git a/src/com/platform/service/IMoveDataService.java b/src/com/platform/service/IMoveDataService.java index 8fc64426..4f6eb91d 100644 --- a/src/com/platform/service/IMoveDataService.java +++ b/src/com/platform/service/IMoveDataService.java @@ -32,6 +32,12 @@ public interface IMoveDataService { */ public int delete(DataInfoEntityMoveTmp dataMove) throws Exception; + /** 删除多个 + * @return + * @throws Exception + */ + public int delete(String... ids) throws Exception; + /** 迁移数据--更新 * @param data * @return diff --git a/src/com/platform/service/OracleStatusService.java b/src/com/platform/service/OracleStatusService.java index 269679ec..76fd8f10 100644 --- a/src/com/platform/service/OracleStatusService.java +++ b/src/com/platform/service/OracleStatusService.java @@ -1,206 +1,206 @@ -package com.platform.service; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.ReplicationController; - -import java.util.Hashtable; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; - -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.utils.Configs; -import com.platform.utils.Constant; - -/** - * 计时 尝试连接oracle的时间,(超过10分钟判断失败) - * - * @author chen - * - */ -public class OracleStatusService { - private static Map alliveTask = new Hashtable(); - public final static int EXEC_TIME = 9;// 连接多少次后不成功,取消链接 - public final static long INTERVAL_TIME = 60 * 1000;// 每隔多少毫秒执行一次连接任务 - public final static long DELAY_TIME = 30 * 1000; // 延迟多少秒后执行 - - public void connectToOracle(String replicasName) { - SimpleKubeClient sKubeClient = new SimpleKubeClient(); - if (alliveTask.containsKey(replicasName)) { - killAlliveTask(replicasName); - } - OracleConnectorParams orp = new OracleConnectorParams(); - Timer timer = new Timer(); - alliveTask.put(replicasName, timer); - timer.schedule(new connectTask(replicasName, orp, sKubeClient), - DELAY_TIME, INTERVAL_TIME); - - } - - public void cancelToOracle(String replicasName, String operate) { - if (operate.equals("stop")) { - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName + " status=0"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // SimpleKubeClient sKubeClient = new SimpleKubeClient(); - // sKubeClient.updateOrAddReplicasLabelById(replicasName, - // "status","0"); - } - killAlliveTask(replicasName); - } - - /** - * 取消并移除指定定时任务 - * - * - * @param taskName - */ - public void killAlliveTask(String taskName) { - if (alliveTask.containsKey(taskName)) { - alliveTask.get(taskName).cancel(); - alliveTask.remove(taskName); - } - } - - public void killAlliveTasks(String... tasksName) { - for (String taskName : tasksName) - killAlliveTask(taskName); - } - - /** - * 清空定时任务 - */ - public void cleanUpAlliveTask() { - Iterator> iterator = alliveTask.entrySet() - .iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - entry.getValue().cancel(); - } - alliveTask.clear(); - } - - /** - * 链接oracle任务类 - * - * @author wuming - * - */ - class connectTask extends TimerTask { - private String taskName; - private int count; - private OracleConnectorParams ocp; - private SimpleKubeClient client; - - public connectTask(String taskName, OracleConnectorParams ocp, - SimpleKubeClient client) { - this.taskName = taskName; - this.ocp = ocp; - this.count = 0; - this.client = client; - } - - public connectTask(OracleConnectorParams ocp, SimpleKubeClient client) { - this.taskName = ocp.getName(); - this.ocp = ocp; - this.count = 0; - this.client = client; - } - - @Override - public void run() { - if (count == EXEC_TIME && alliveTask.containsKey(taskName)) { // 如果任务已经执行10次,则任务oracle启动失败,并取消oracle连接 - killAlliveTask(taskName); - // client.updateOrAddReplicasLabelById(taskName, "status", "1"); - // //更新ReplicationController标签,将oracle状态标示未1(0:启动中,1:失败,2:成功) - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + taskName + " status=1"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info("更新replicationController标签: " - + taskName + "\t[标签更新为: 失败]"); - } else { // 否则,执行连接oracle任务,判断oracle是否启动成功 - Pod pod = filterPod(); - if (pod != null) { - String ip = client.getPodHostIp(pod); - int port = client.getPodContainerport(pod); - if (ip != null && port != 0) { - String url = "jdbc:oracle:thin:@" + ip + ":" + port - + ":" + ocp.getDatabaseName(); - boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 - ocp.getUser(), ocp.getPassword()); - Configs.CONSOLE_LOGGER.info("url:" + url + ",user:" - + ocp.getUser() + ",password:" - + ocp.getPassword()); - String message = "失败"; - if (flag && alliveTask.containsKey(taskName)) { - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + taskName - + " status=2"; - // client.updateOrAddReplicasLabelById(taskNSyame, - // "status", "2"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - message = "成功"; - killAlliveTask(taskName); // 连接成功,取消连接 - Configs.CONSOLE_LOGGER - .info("更新replicationController标签: " - + taskName + "\t[标签更新为: 成功]"); - } - Configs.CONSOLE_LOGGER.info("连接到数据库服务: " + taskName - + "\t[连接结果: " + message + "]"); - } - } - } - count++; - } - - /** - * 获取oracle的连接ip地址和端口号 - * - * @return - */ - private Pod filterPod() { - Pod pod = null; - ReplicationController replicationController = client - .getReplicationController(taskName); - if (null != replicationController) { - List filterPods = client - .getPodsForApplicaList(replicationController); - if (filterPods != null && filterPods.size() > 0) { - pod = filterPods.get(0); - } - } - return pod; - } - - public String getTaskName() { - return taskName; - } - - public void setTaskName(String taskName) { - this.taskName = taskName; - } - - public int getCount() { - return count; - } - } +package com.platform.service; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.ReplicationController; + +import java.util.Hashtable; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.utils.Configs; +import com.platform.utils.Constant; + +/** + * 计时 尝试连接oracle的时间,(超过10分钟判断失败) + * + * @author chen + * + */ +public class OracleStatusService { + private static Map alliveTask = new Hashtable(); + public final static int EXEC_TIME = 9;// 连接多少次后不成功,取消链接 + public final static long INTERVAL_TIME = 60 * 1000;// 每隔多少毫秒执行一次连接任务 + public final static long DELAY_TIME = 30 * 1000; // 延迟多少秒后执行 + + public void connectToOracle(String replicasName) { + SimpleKubeClient sKubeClient = new SimpleKubeClient(); + if (alliveTask.containsKey(replicasName)) { + killAlliveTask(replicasName); + } + OracleConnectorParams orp = new OracleConnectorParams(); + Timer timer = new Timer(); + alliveTask.put(replicasName, timer); + timer.schedule(new connectTask(replicasName, orp, sKubeClient), + DELAY_TIME, INTERVAL_TIME); + + } + + public void cancelToOracle(String replicasName, String operate) { + if (operate.equals("stop")) { + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + " status=0"; + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // SimpleKubeClient sKubeClient = new SimpleKubeClient(); + // sKubeClient.updateOrAddReplicasLabelById(replicasName, + // "status","0"); + } + killAlliveTask(replicasName); + } + + /** + * 取消并移除指定定时任务 + * + * + * @param taskName + */ + public void killAlliveTask(String taskName) { + if (alliveTask.containsKey(taskName)) { + alliveTask.get(taskName).cancel(); + alliveTask.remove(taskName); + } + } + + public void killAlliveTasks(String... tasksName) { + for (String taskName : tasksName) + killAlliveTask(taskName); + } + + /** + * 清空定时任务 + */ + public void cleanUpAlliveTask() { + Iterator> iterator = alliveTask.entrySet() + .iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + entry.getValue().cancel(); + } + alliveTask.clear(); + } + + /** + * 链接oracle任务类 + * + * @author wuming + * + */ + class connectTask extends TimerTask { + private String taskName; + private int count; + private OracleConnectorParams ocp; + private SimpleKubeClient client; + + public connectTask(String taskName, OracleConnectorParams ocp, + SimpleKubeClient client) { + this.taskName = taskName; + this.ocp = ocp; + this.count = 0; + this.client = client; + } + + public connectTask(OracleConnectorParams ocp, SimpleKubeClient client) { + this.taskName = ocp.getName(); + this.ocp = ocp; + this.count = 0; + this.client = client; + } + + @Override + public void run() { + if (count == EXEC_TIME && alliveTask.containsKey(taskName)) { // 如果任务已经执行10次,则任务oracle启动失败,并取消oracle连接 + killAlliveTask(taskName); + // client.updateOrAddReplicasLabelById(taskName, "status", "1"); + // //更新ReplicationController标签,将oracle状态标示未1(0:启动中,1:失败,2:成功) + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + taskName + " status=1"; + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info("更新replicationController标签: " + + taskName + "\t[标签更新为: 失败]"); + } else { // 否则,执行连接oracle任务,判断oracle是否启动成功 + Pod pod = filterPod(); + if (pod != null) { + String ip = client.getPodHostIp(pod); + int port = client.getPodContainerport(pod); + if (ip != null && port != 0) { + String url = "jdbc:oracle:thin:@" + ip + ":" + port + + ":" + ocp.getDatabaseName(); + boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 + ocp.getUser(), ocp.getPassword()); + Configs.CONSOLE_LOGGER.info("url:" + url + ",user:" + + ocp.getUser() + ",password:" + + ocp.getPassword()); + String message = "失败"; + if (flag && alliveTask.containsKey(taskName)) { + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + taskName + + " status=2"; + // client.updateOrAddReplicasLabelById(taskNSyame, + // "status", "2"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + message = "成功"; + killAlliveTask(taskName); // 连接成功,取消连接 + Configs.CONSOLE_LOGGER + .info("更新replicationController标签: " + + taskName + "\t[标签更新为: 成功]"); + } + Configs.CONSOLE_LOGGER.info("连接到数据库服务: " + taskName + + "\t[连接结果: " + message + "]"); + } + } + } + count++; + } + + /** + * 获取oracle的连接ip地址和端口号 + * + * @return + */ + private Pod filterPod() { + Pod pod = null; + ReplicationController replicationController = client + .getReplicationController(taskName); + if (null != replicationController) { + List filterPods = client + .getPodsForApplicaList(replicationController); + if (filterPods != null && filterPods.size() > 0) { + pod = filterPods.get(0); + } + } + return pod; + } + + public String getTaskName() { + return taskName; + } + + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + public int getCount() { + return count; + } + } } \ No newline at end of file diff --git a/src/com/platform/service/impl/CheckoutServiceImpl.java b/src/com/platform/service/impl/CheckoutServiceImpl.java index a783abf1..8cf3e12b 100644 --- a/src/com/platform/service/impl/CheckoutServiceImpl.java +++ b/src/com/platform/service/impl/CheckoutServiceImpl.java @@ -76,9 +76,9 @@ public class CheckoutServiceImpl implements ICheckoutService { List checks = new ArrayList(); List result = preDataInfoDao.findAllCollect(); DataInfoEntity data = new DataInfoEntity(); - Calendar c2 = Calendar.getInstance(); + Calendar c2 = Calendar.getInstance(); // 时间设置为 半年前的时间 - c2.set(Calendar.MONTH, getMonBeforeHalfYear(c2.get(Calendar.MONTH))); + this.getMonBeforeHalfYear(c2); String time = DateForm.date2StringByDay(c2.getTime()); data.setCollectingTime(time); // data.setCollectingTime(collectingTime); @@ -124,7 +124,7 @@ public class CheckoutServiceImpl implements ICheckoutService { CheckoutEntity cksql = new CheckoutEntity(); Calendar c2 = Calendar.getInstance(); // 时间设置为 半年前的时间 - c2.set(Calendar.MONTH, getMonBeforeHalfYear(c2.get(Calendar.MONTH))); + this.getMonBeforeHalfYear(c2); String time = DateForm.date2StringByDay(c2.getTime()); cksql.setCollectingTime(time); cksql.setCityName(city); @@ -451,12 +451,9 @@ public class CheckoutServiceImpl implements ICheckoutService { * @param num * @return */ - private int getMonBeforeHalfYear(int num){ - num -= Configs.dataBefore; - if (num <= 0) { - num = num + 12; - } - return num; + private int getMonBeforeHalfYear(Calendar c2){ + c2.set(Calendar.DAY_OF_YEAR, c2.get(Calendar.DAY_OF_YEAR) -(Configs.dataBefore*30)); + return 1; } private String isY(String str1, String str2) { diff --git a/src/com/platform/service/impl/MoveDataServiceImpl.java b/src/com/platform/service/impl/MoveDataServiceImpl.java index c9c31a6e..5ba8ac1e 100644 --- a/src/com/platform/service/impl/MoveDataServiceImpl.java +++ b/src/com/platform/service/impl/MoveDataServiceImpl.java @@ -210,11 +210,14 @@ public class MoveDataServiceImpl implements IMoveDataService { } } + int result = 0; //迁移失败 if ("3".equals(dataMove.getCompleteStatus())) { - removeservice.deleteFolder(dataMove.getDstPath()); + int code = removeservice.deleteFolder(dataMove.getDstPath()); + if (code == -100) { + return result; + } } - int result = 0; //是正则迁移时: if ("1".equals(dataMove.getCompleteStatus())) { if(1 != removeservice.abortcopyFolder(dataMove.getDataPath(), makeDstPath(dataMove.getDstPath()))){ @@ -263,4 +266,42 @@ public class MoveDataServiceImpl implements IMoveDataService { } return dstPath; } + + @Override + public int delete(String... ids) throws Exception { + List list = dataInfoMoveTmpDao.findAll(); + List dellist = new ArrayList(); + if (null != list) { + for (DataInfoEntityMoveTmp dataInfoEntityMoveTmp : list) { + for (String id : ids) { + if (Integer.valueOf(id) == dataInfoEntityMoveTmp.getId()) { + dellist.add(dataInfoEntityMoveTmp); + } + } + } + + } + int result = 0; + for (DataInfoEntityMoveTmp dataMove : dellist) { + //迁移失败 + if ("3".equals(dataMove.getCompleteStatus())) { + int code = removeservice.deleteFolder(dataMove.getDstPath()); + if (code == -100) { + continue; + } + } + //是正则迁移时: + if ("1".equals(dataMove.getCompleteStatus())) { + if(1 != removeservice.abortcopyFolder(dataMove.getDataPath(), makeDstPath(dataMove.getDstPath()))){ + try{ + removeservice.deleteFolder(dataMove.getDstPath()); + }catch(Exception e){ + log.error(e); + } + } + } + result = dataInfoMoveTmpDao.remove(dataMove.getId()); + } + return result; + } } diff --git a/src/com/platform/service/impl/OracleExtractServiceImpl.java b/src/com/platform/service/impl/OracleExtractServiceImpl.java index fc136cac..2082bd0c 100644 --- a/src/com/platform/service/impl/OracleExtractServiceImpl.java +++ b/src/com/platform/service/impl/OracleExtractServiceImpl.java @@ -1,419 +1,419 @@ -package com.platform.service.impl; - -import java.sql.Connection; -import java.util.Date; -import java.util.List; - -import javax.annotation.Resource; - -import org.apache.log4j.Logger; -import org.springframework.stereotype.Service; - -import com.base.Custom4exception; -import com.base.CustomException; -import com.platform.dao.DataInfoDao; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.GatherOracleInfo; -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.service.IOracleExtractService; -import com.platform.service.OracleExtractHelper; -import com.platform.utils.CacheSetCantDelete; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.DateForm; -import com.platform.utils.FileOperateHelper; - -@Service(value = "OracleExtract") -public class OracleExtractServiceImpl implements IOracleExtractService { - - /** - * 日志 - */ - public final static Logger log = Logger - .getLogger(OracleExtractServiceImpl.class); - - @Resource(name = "dataInfoDao") - private DataInfoDao dataInfoDao; - - /** - * kubernetes client - */ - private SimpleKubeClient client = new SimpleKubeClient(); - /** - * 抽取 - */ - private OracleExtractHelper oracleExtract = new OracleExtractHelper(); - - /** - * 数据库连接实现类 - */ - private OracleConnector connect = new OracleConnector(); - - @Override - public boolean extractOracle(String name, - List datainfos, GatherOracleInfo oracleModel) - throws Exception { - boolean isSuccess = false; - try { - // map转 bean(汇总库信息-带tableName的) - // GatherOracleInfo oracleModel = oracleConnect; - // 采集库连接参数 - // List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), - oracleModel.getUser(), oracleModel.getPassword(), - datainfos.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + datainfos.get(0).getName() + ".log", - "创建oracle连接失败: [" + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try { - if (null != collectOracle.getDataId() - && !"".equals(collectOracle.getDataId())) { - data.setId(Integer.valueOf(collectOracle.getDataId())); - data.setExtractStatus(1); - dataInfoDao.updateExtract(data); - collectOracle.setName("J" - + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName - + " isExtract=1"; - // sql日志记录时间: - FileOperateHelper - .fileWrite( - Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName() - + ".log", - "\r\n 开始汇总 \r\n" - + DateForm - .date2StringBysecond(new Date()) - + "\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "1"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createDBLink(conn, collectOracle); // 创建dblink - oracleExtract.createTableSpace(conn, collectOracle, - oracleModel); // 创建表空间 - oracleExtract.createUser(conn, collectOracle, - oracleModel);// 创建用户并授权 - oracleExtract.extractColleDB(conn, collectOracle, - oracleModel);// 执行抽取 - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "2"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + replicasName - + " isExtract=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setExtractStatus(2); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data - .getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - } - } catch (Exception e) { - String cmd = "kubectl label --overwrite rc " + replicasName - + " isExtract=0"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - data.setExtractStatus(0); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - log.error(Custom4exception.OracleSQL_Except, e); - } finally { - // 去掉保存的当前数据id, - CacheSetCantDelete.removeExtractId(collectOracle - .getDataId()); - String msg = "汇总结束"; - if (2 != data.getExtractStatus()) { - msg += " 汇总有异常!状态重置为待汇总 "; - data.setExtractStatus(0); - dataInfoDao.updateExtract(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data - .getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.updateExtract(data); - } - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_LOG_LOCALTION - + collectOracle.getName() + ".log", - "\r\n " + msg + " >>>>>>> " - + DateForm.date2StringBysecond(new Date()) - + "\r\n\r\n\n"); - } - } - isSuccess = true; - } catch (Exception e) { - new CustomException(Custom4exception.OracleSQL_Except, e); - } - return isSuccess; - } - - @Override - public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) - throws Exception { - boolean isConnect = false; - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), - oracleModel.getPassword(), null); - if (null == conn) { - isConnect = false; - throw new CustomException(Custom4exception.connect_Oracle_Except, - null, oracleModel); - // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + - // "]\r\n"); - } else { - isConnect = oracleExtract.testConnect(conn); - conn.close(); // 关闭连接 - } - return isConnect; - } - - @Override - public boolean extractStandardTable(String name, - List dataInfolist, - GatherOracleInfo oracleConnect) throws Exception { - boolean isSuccess = false; - // map转 bean(汇总库信息-带tableName的) - GatherOracleInfo oracleModel = oracleConnect; - // 采集库连接参数 - List datainfos = dataInfolist; - if (datainfos.size() == 0) { - return false; - } - Connection conn = OracleConnector.connectionBuilder( - "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" - + oracleModel.getPort() + ":" - + oracleModel.getDatabaseName(), oracleModel.getUser(), - oracleModel.getPassword(), dataInfolist.get(0)); - if (null == conn) { - FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION - + dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: [" - + conn + "]\r\n"); - return false; - } - for (OracleConnectorParams collectOracle : datainfos) { - String replicasName = collectOracle.getName(); - DataInfoEntity data = new DataInfoEntity(); - try { - if (null != collectOracle.getDataId() - && !"".equals(collectOracle.getDataId())) { - data.setId(Integer.valueOf(collectOracle.getDataId())); - // 设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setStandardExtractStatus("1"); - dataInfoDao.update(data); - collectOracle.setName("CQ" - + collectOracle.getName().replace("-", "_")); - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=1"; - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName() + ".log", - "\r\n 开始抽取标准表 \r\n " - + DateForm.date2StringBysecond(new Date()) - + "\n\r\n"); - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "1"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink - oracleExtract.createStardardTableSpace(conn, collectOracle, - oracleModel); // 创建表空间 - oracleExtract.createOnlyUser(conn, collectOracle, - oracleModel);// 创建 抽取标准表的 用户并授权 - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - if (null != tmpdata) { - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata - .getPayResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata - .getPayResultLast())) { - // 抽取中 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try { - oracleExtract.extractStandardPayTable(conn, - collectOracle, oracleModel);// 执行抽取 - } catch (Exception e) { - // 改回 校验存在的状态 - data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - // 抽取成功 - data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata - .getExecResultLast()) - || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata - .getExecResultLast())) { - // 抽取中 - data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); - dataInfoDao.update(data); - boolean isExtrac = true; - try { - oracleExtract.extractStandardExecTable(conn, - collectOracle, oracleModel);// 执行抽取 - } catch (Exception e) { - // 改回 校验存在的状态 - data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - dataInfoDao.update(data); - isExtrac = false; - } - if (isExtrac) { - data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); - dataInfoDao.update(data); - } - } - // client.updateOrAddReplicasLabelById(collectOracle.getName(), - // "isExtract", "2"); - // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 - cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=2"; - rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - sb = new StringBuffer(); - for (String string : rList) - sb.append(string).append("\n"); - Configs.CONSOLE_LOGGER.info(sb.toString()); - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); - data.setStandardExtractStatus("2"); - dataInfoDao.update(data); - DataInfoEntity tmpSrcData = dataInfoDao.findById(data - .getId()); - data.setId(tmpSrcData.getSrcId()); - dataInfoDao.update(data); - } - } - } catch (Exception e) { - - String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + replicasName - + " standardExtractStatus=0"; - Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); - data.setStandardExtractStatus("0"); - dataInfoDao.update(data); - DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId()); - data.setId(tmpSrcData.getSrcId()); - dataInfoDao.update(data); - log.error(Custom4exception.OracleSQL_Except, e); - } finally { - // 去掉保存的当前数据id, - CacheSetCantDelete.removeStandardId(collectOracle.getDataId()); - String msg = "抽取标准表结束"; - if (!"2".equals(data.getStandardExtractStatus())) { - msg += " 抽取有异常!状态重置为待抽取 "; - data.setStandardExtractStatus("0"); - dataInfoDao.update(data); - DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.update(data); - } - // sql日志记录时间: - // sql日志记录时间: - FileOperateHelper.fileWrite( - Configs.EXTRACT_STANDARD_LOG_LOCALTION - + collectOracle.getName() + ".log", - " " + msg + " >>>>> " - + DateForm.date2StringBysecond(new Date()) - + "\r\n\r\n\n"); - } - } - isSuccess = true; - return isSuccess; - } - - @Override - public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { - DataInfoEntity data = new DataInfoEntity(); - data.setId(Integer.valueOf(ocp.getDataId())); - // 设置为 标准表 抽取中 - data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); - data.setExtractStatus(status); - try { - dataInfoDao.update(data); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - // @Override - // public boolean extractOracle(String name, List - // dataInfos, GatherOracleInfo oracleConnect) throws Exception { - // boolean isSuccess = false; - // try{ - // //map转 bean(汇总库信息-带tableName的) - // // GatherOracleInfo oracleModel = (GatherOracleInfo) - // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); - // - // //采集库连接参数 - // // List datainfos = new - // ArrayList(); - // // for (Map map : dataInfoMap) { - // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) - // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); - // // datainfos.add(dataInfoEntity); - // // } - // - // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" - // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" - // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), - // oracleConnect.getPassword()); - // - // for (OracleConnectorParams collectOracle : dataInfos) { - // - // oracleExtract.createDBLink(conn, collectOracle); - // oracleExtract.createTableSpace(conn, oracleConnect); - // oracleExtract.createUser(conn, oracleConnect); - // oracleExtract.extractColleDB(conn, collectOracle); - // } - // isSuccess = true; - // }catch(Exception e){ - // - // } - // return false; - // } - -} +package com.platform.service.impl; + +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import javax.annotation.Resource; + +import org.apache.log4j.Logger; +import org.springframework.stereotype.Service; + +import com.base.Custom4exception; +import com.base.CustomException; +import com.platform.dao.DataInfoDao; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.GatherOracleInfo; +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.service.IOracleExtractService; +import com.platform.service.OracleExtractHelper; +import com.platform.utils.CacheSetCantDelete; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.DateForm; +import com.platform.utils.FileOperateHelper; + +@Service(value = "OracleExtract") +public class OracleExtractServiceImpl implements IOracleExtractService { + + /** + * 日志 + */ + public final static Logger log = Logger + .getLogger(OracleExtractServiceImpl.class); + + @Resource(name = "dataInfoDao") + private DataInfoDao dataInfoDao; + + /** + * kubernetes client + */ + private SimpleKubeClient client = new SimpleKubeClient(); + /** + * 抽取 + */ + private OracleExtractHelper oracleExtract = new OracleExtractHelper(); + + /** + * 数据库连接实现类 + */ + private OracleConnector connect = new OracleConnector(); + + @Override + public boolean extractOracle(String name, + List datainfos, GatherOracleInfo oracleModel) + throws Exception { + boolean isSuccess = false; + try { + // map转 bean(汇总库信息-带tableName的) + // GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + // List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), + oracleModel.getUser(), oracleModel.getPassword(), + datainfos.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + datainfos.get(0).getName() + ".log", + "创建oracle连接失败: [" + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + DataInfoEntity data = new DataInfoEntity(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + data.setId(Integer.valueOf(collectOracle.getDataId())); + data.setExtractStatus(1); + dataInfoDao.updateExtract(data); + collectOracle.setName("J" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + + " isExtract=1"; + // sql日志记录时间: + FileOperateHelper + .fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + + ".log", + "\r\n 开始汇总 \r\n" + + DateForm + .date2StringBysecond(new Date()) + + "\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createUser(conn, collectOracle, + oracleModel);// 创建用户并授权 + oracleExtract.extractColleDB(conn, collectOracle, + oracleModel);// 执行抽取 + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + replicasName + + " isExtract=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setExtractStatus(2); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + } + } catch (Exception e) { + String cmd = "kubectl label --overwrite rc " + replicasName + + " isExtract=0"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + data.setExtractStatus(0); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // 去掉保存的当前数据id, + CacheSetCantDelete.removeExtractId(collectOracle + .getDataId()); + String msg = "汇总结束"; + if (2 != data.getExtractStatus()) { + msg += " 汇总有异常!状态重置为待汇总 "; + data.setExtractStatus(0); + dataInfoDao.updateExtract(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data + .getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.updateExtract(data); + } + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n " + msg + " >>>>>>> " + + DateForm.date2StringBysecond(new Date()) + + "\r\n\r\n\n"); + } + } + isSuccess = true; + } catch (Exception e) { + new CustomException(Custom4exception.OracleSQL_Except, e); + } + return isSuccess; + } + + @Override + public boolean isConnectTotalOracle(GatherOracleInfo oracleModel) + throws Exception { + boolean isConnect = false; + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), null); + if (null == conn) { + isConnect = false; + throw new CustomException(Custom4exception.connect_Oracle_Except, + null, oracleModel); + // FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + // + dataInfolist.get(0).getName(), "创建oracle连接失败: [" + conn + + // "]\r\n"); + } else { + isConnect = oracleExtract.testConnect(conn); + conn.close(); // 关闭连接 + } + return isConnect; + } + + @Override + public boolean extractStandardTable(String name, + List dataInfolist, + GatherOracleInfo oracleConnect) throws Exception { + boolean isSuccess = false; + // map转 bean(汇总库信息-带tableName的) + GatherOracleInfo oracleModel = oracleConnect; + // 采集库连接参数 + List datainfos = dataInfolist; + if (datainfos.size() == 0) { + return false; + } + Connection conn = OracleConnector.connectionBuilder( + "jdbc:oracle:thin:@" + oracleModel.getIp() + ":" + + oracleModel.getPort() + ":" + + oracleModel.getDatabaseName(), oracleModel.getUser(), + oracleModel.getPassword(), dataInfolist.get(0)); + if (null == conn) { + FileOperateHelper.fileWrite(Configs.EXTRACT_LOG_LOCALTION + + dataInfolist.get(0).getName() + ".log", "创建oracle连接失败: [" + + conn + "]\r\n"); + return false; + } + for (OracleConnectorParams collectOracle : datainfos) { + String replicasName = collectOracle.getName(); + DataInfoEntity data = new DataInfoEntity(); + try { + if (null != collectOracle.getDataId() + && !"".equals(collectOracle.getDataId())) { + data.setId(Integer.valueOf(collectOracle.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setStandardExtractStatus("1"); + dataInfoDao.update(data); + collectOracle.setName("CQ" + + collectOracle.getName().replace("-", "_")); + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=1"; + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + "\r\n 开始抽取标准表 \r\n " + + DateForm.date2StringBysecond(new Date()) + + "\n\r\n"); + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "1"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + oracleExtract.createStandardDBLink(conn, collectOracle); // 创建dblink + oracleExtract.createStardardTableSpace(conn, collectOracle, + oracleModel); // 创建表空间 + oracleExtract.createOnlyUser(conn, collectOracle, + oracleModel);// 创建 抽取标准表的 用户并授权 + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + if (null != tmpdata) { + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getPayResultLast()) + || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata + .getPayResultLast())) { + // 抽取中 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardPayTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + // 抽取成功 + data.setPayResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + if (Constant.CHECKOUT_STATUS_THREE.equals(tmpdata + .getExecResultLast()) + || Constant.CHECKOUT_STATUS_FIVE.equals(tmpdata + .getExecResultLast())) { + // 抽取中 + data.setExecResultLast(Constant.CHECKOUT_STATUS_SIX); + dataInfoDao.update(data); + boolean isExtrac = true; + try { + oracleExtract.extractStandardExecTable(conn, + collectOracle, oracleModel);// 执行抽取 + } catch (Exception e) { + // 改回 校验存在的状态 + data.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + dataInfoDao.update(data); + isExtrac = false; + } + if (isExtrac) { + data.setExecResultLast(Constant.CHECKOUT_STATUS_SEVEN); + dataInfoDao.update(data); + } + } + // client.updateOrAddReplicasLabelById(collectOracle.getName(), + // "isExtract", "2"); + // //更新oracle汇总状态,0标示为未汇总,1标示汇总中,2标示汇总完成 + cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=2"; + rList = Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + sb = new StringBuffer(); + for (String string : rList) + sb.append(string).append("\n"); + Configs.CONSOLE_LOGGER.info(sb.toString()); + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SEVEN); + data.setStandardExtractStatus("2"); + dataInfoDao.update(data); + DataInfoEntity tmpSrcData = dataInfoDao.findById(data + .getId()); + data.setId(tmpSrcData.getSrcId()); + dataInfoDao.update(data); + } + } + } catch (Exception e) { + + String cmd = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + replicasName + + " standardExtractStatus=0"; + Constant.ganymedSSH.execCmdWaitAcquiescent(cmd); + data.setStandardExtractStatus("0"); + dataInfoDao.update(data); + DataInfoEntity tmpSrcData = dataInfoDao.findById(data.getId()); + data.setId(tmpSrcData.getSrcId()); + dataInfoDao.update(data); + log.error(Custom4exception.OracleSQL_Except, e); + } finally { + // 去掉保存的当前数据id, + CacheSetCantDelete.removeStandardId(collectOracle.getDataId()); + String msg = "抽取标准表结束"; + if (!"2".equals(data.getStandardExtractStatus())) { + msg += " 抽取有异常!状态重置为待抽取 "; + data.setStandardExtractStatus("0"); + dataInfoDao.update(data); + DataInfoEntity tmpdata = dataInfoDao.findById(data.getId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.update(data); + } + // sql日志记录时间: + // sql日志记录时间: + FileOperateHelper.fileWrite( + Configs.EXTRACT_STANDARD_LOG_LOCALTION + + collectOracle.getName() + ".log", + " " + msg + " >>>>> " + + DateForm.date2StringBysecond(new Date()) + + "\r\n\r\n\n"); + } + } + isSuccess = true; + return isSuccess; + } + + @Override + public void updateDataExtractStatus(OracleConnectorParams ocp, int status) { + DataInfoEntity data = new DataInfoEntity(); + data.setId(Integer.valueOf(ocp.getDataId())); + // 设置为 标准表 抽取中 + data.setCheckoutFlag(Constant.CHECKOUTFLAG_SIX); + data.setExtractStatus(status); + try { + dataInfoDao.update(data); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + // @Override + // public boolean extractOracle(String name, List + // dataInfos, GatherOracleInfo oracleConnect) throws Exception { + // boolean isSuccess = false; + // try{ + // //map转 bean(汇总库信息-带tableName的) + // // GatherOracleInfo oracleModel = (GatherOracleInfo) + // Bean2MapUtils.convertMap(GatherOracleInfo.class, oracleConnect); + // + // //采集库连接参数 + // // List datainfos = new + // ArrayList(); + // // for (Map map : dataInfoMap) { + // // OracleConnectorParams dataInfoEntity = (OracleConnectorParams) + // Bean2MapUtils.convertMap(OracleConnectorParams.class, oracleConnect); + // // datainfos.add(dataInfoEntity); + // // } + // + // Connection conn = OracleConnector.ConnectionBuilder("jdbc:oracle:thin:@" + // + oracleConnect.getIp() + ":" + oracleConnect.getPort() + "/" + // + oracleConnect.getDatabaseName(), oracleConnect.getUser(), + // oracleConnect.getPassword()); + // + // for (OracleConnectorParams collectOracle : dataInfos) { + // + // oracleExtract.createDBLink(conn, collectOracle); + // oracleExtract.createTableSpace(conn, oracleConnect); + // oracleExtract.createUser(conn, oracleConnect); + // oracleExtract.extractColleDB(conn, collectOracle); + // } + // isSuccess = true; + // }catch(Exception e){ + // + // } + // return false; + // } + +} diff --git a/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java b/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java index b5d591fa..0e01dd24 100644 --- a/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java +++ b/src/com/platform/service/thread/ThreadCheckoutStandardOracle.java @@ -1,326 +1,335 @@ -package com.platform.service.thread; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.ReplicationController; - -import java.sql.Connection; -import java.util.List; -import java.util.Set; - -import org.apache.log4j.Logger; - -import com.platform.dao.DataInfoDao; -import com.platform.entities.CheckoutEntity; -import com.platform.entities.DataInfoEntity; -import com.platform.entities.OracleConnectorParams; -import com.platform.kubernetes.SimpleKubeClient; -import com.platform.oracle.OracleConnector; -import com.platform.utils.CacheOracleCheckoutEntity; -import com.platform.utils.Configs; -import com.platform.utils.Constant; -import com.platform.utils.FileOperateHelper; - -/** - * 校验oracle标准表是否存在 - * - * @author chen - * - */ -public class ThreadCheckoutStandardOracle extends Thread { - - public final static Logger log = Configs.CONSOLE_LOGGER - .getLogger(ThreadCheckoutStandardOracle.class); - - /** - * kuber 客户端 - */ - private SimpleKubeClient client; - - /** - * dataInfo 的数据持久层 - */ - private DataInfoDao dataInfoDao; - - public ThreadCheckoutStandardOracle(DataInfoDao dataInfoDao) { - this.setDaemon(true); - this.client = new SimpleKubeClient(); - this.dataInfoDao = dataInfoDao; - } - - @Override - public void run() { - try { - Thread.sleep(1000 * 5); - } catch (InterruptedException e2) { - log.error(e2); - } - // 循环11次,每次休眠 - for (int i = 0; i < 11; i++) { - try { - // 数据的 keys (=kuber的应用名称 taskName) - Set taskNames = CacheOracleCheckoutEntity - .getCheckKeys(); - int lengs = taskNames.size(); - if (lengs == 0) { - break; - } - String[] taskNamekeys = taskNames.toArray(new String[lengs]); - StringBuffer sbtask = new StringBuffer(); - for (int j = 0; j < taskNamekeys.length; j++) { - sbtask.append(taskNamekeys[j]).append("\t"); - } - log.info("replicationController标签: " + sbtask.toString()); - for (String key : taskNamekeys) { - - // 获得 kuber的 pod - Pod tmpPod = filterPod(key); - if (null == tmpPod) { - log.info("replicationController标签: " + key - + " 的 pod 节点不存在!"); - if (i > 5) { - CacheOracleCheckoutEntity.checkRemove(key); - } - continue; - } - // 尝试 连接 oracle - connectOracle(tmpPod, key); - - if (i == 10) { - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + key + " status=1"; - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 失败]"); - log.info(sb.toString()); - - String cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=0"; - List rList2 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - StringBuffer sb2 = new StringBuffer(); - for (String str : rList2) - sb2.append(str).append("\n"); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 未校验]"); - log.info(sb2.toString()); - // 当前key标签对应的 数据服务的记录 - CheckoutEntity tmp = CacheOracleCheckoutEntity - .getCheck(key); - tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO); - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO); - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO); - try { - // 更新数据库 - this.updateDataInfo(tmp); - } catch (Exception e) { - log.error(e); - } - } - } - } catch (Exception e1) { - log.error(e1.getStackTrace()); - } finally { - try { - Thread.sleep(1000 * 60); - } catch (InterruptedException e) { - log.error(e); - } - } - } - } - - /** - * 查找 pod 节点 - * - * @param taskName - * @return - */ - private Pod filterPod(String taskName) { - Pod pod = null; - ReplicationController replicationController = client - .getReplicationController(taskName); - if (null != replicationController) { - List filterPods = client - .getPodsForApplicaList(replicationController); - if (filterPods != null && filterPods.size() > 0) { - pod = filterPods.get(0); - } - } - return pod; - } - - /** - * 尝试连接 oracle服务 - * - * @param tmpPod - * @param key - */ - private void connectOracle(Pod pod, String key) throws Exception { - if (pod != null) { - String ip = client.getPodHostIp(pod); - int port = client.getPodContainerport(pod); - if (ip != null && port != 0) { - String url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" - + Configs.ORACLE_ORCL; - boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 - Configs.ORACLE_USER, Configs.ORACLE_PSW); - log.info("url:" + url + ",user:" + Configs.ORACLE_USER - + ",password:" + Configs.ORACLE_PSW); - String message = "失败"; - - String cmd3 = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key + " checkoutFlag=0"; - List rList3 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd3); - StringBuffer sb3 = new StringBuffer(); - for (String str : rList3) - sb3.append(str).append("\n"); - log.info(sb3.toString()); - log.info("更新replicationController标签: " + key + "\t[标签更新为:未校验]"); - - if (flag) { - String cmd = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " label --overwrite rc " + key + " status=2"; - // 设置服务为 成功 - List rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd); - StringBuffer sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info(sb.toString()); - message = "成功"; - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 成功]"); - // 校验标签 - String cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=2"; - List rList2 = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - StringBuffer sb2 = new StringBuffer(); - for (String str : rList2) - sb2.append(str).append("\n"); - log.info(sb2.toString()); - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 校验中]"); - // 获得当前 服务对应的 数据 - CheckoutEntity tmp = CacheOracleCheckoutEntity - .getCheck(key); - if (null != tmp) { - CacheOracleCheckoutEntity.putExtract(key, tmp); - } - // 查询 对应的 2 个标准表 - OracleConnectorParams oc = new OracleConnectorParams(); - String logName = tmp.getAreaCode().toLowerCase() + "_" - + tmp.getSysCode() + "_" + tmp.getDataVersion(); - oc.setName(logName); - try { - Connection conn = OracleConnector.connectionBuilder( - url, Configs.ORACLE_USER, Configs.ORACLE_PSW, - oc); - // 支付--校验 - if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp - .getPayResultLast())) { - String paySql = "select * from dba_tables where owner = '" - + Configs.COLLECT_STANDARD_TABLE_USER - .toUpperCase() - + "' and table_name = '" - + Configs.COLLECT_PAY_TABLE.toUpperCase() - + "'"; - if (OracleConnector - .execUpdateOracleSQL( - conn, - paySql, - FileOperateHelper - .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) - + logName + "jy.log")) { - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); - } else { - tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR); - } - } - // 可执行-- 校验 - if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp - .getExecResultLast())) { - String execSql = "select * from dba_tables where owner = '" - + Configs.COLLECT_STANDARD_TABLE_USER - .toUpperCase() - + "' and table_name = '" - + Configs.COLLECT_EXEC_TABLE.toUpperCase() - + "'"; - if (OracleConnector - .execUpdateOracleSQL( - conn, - execSql, - FileOperateHelper - .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) - + logName + "jy.log")) { - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); - } else { - tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR); - } - } - tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE); - // 更新数据库data_info - updateDataInfo(tmp); - // 将 CacheOracleCheckoutEntity checkMap中的 数据 - // 放入extractMap中 - /* CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity - .checkRemove(key)/* ) */; - // 更新kuber状态 - cmd2 = "kubectl --server " - + Configs.KUBE_MASTER_ADDRESS - + " annotate --overwrite rc " + key - + " checkoutFlag=1"; - // client.updateOrAddReplicasLabelById(taskNSyame, - // "status", "2"); - rList = Constant.ganymedSSH - .execCmdWaitAcquiescent(cmd2); - sb = new StringBuffer(); - for (String str : rList) - sb.append(str).append("\n"); - log.info(sb.toString()); - message = "成功"; - log.info("更新replicationController标签: " + key - + "\t[标签更新为: 已校验]"); - } catch (Exception e) { - log.error(e); - } - // 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录 - CacheOracleCheckoutEntity.checkRemove(key); - } - log.info("连接到数据库服务: " + key + "\t[连接结果: " + message + "]"); - } - - } - } - - /** - * 更新 dataInfo表 - * - * @param checkoutEntity - * @throws Exception - */ - private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception { - DataInfoEntity data = new DataInfoEntity(); - // 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO - data.setId(checkoutEntity.getDataId()); - data.setPayResultLast(checkoutEntity.getPayResultLast()); - data.setExecResultLast(checkoutEntity.getExecResultLast()); - data.setCheckoutFlag(checkoutEntity.getCheckoutFlag()); - dataInfoDao.update(data); - DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity - .getDataId()); - data.setId(tmpdata.getSrcId()); - dataInfoDao.update(data); - } -} +package com.platform.service.thread; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.ReplicationController; + +import java.sql.Connection; +import java.util.List; +import java.util.Set; + +import org.apache.log4j.Logger; + +import com.platform.dao.DataInfoDao; +import com.platform.entities.CheckoutEntity; +import com.platform.entities.DataInfoEntity; +import com.platform.entities.OracleConnectorParams; +import com.platform.kubernetes.SimpleKubeClient; +import com.platform.oracle.OracleConnector; +import com.platform.utils.CacheOracleCheckoutEntity; +import com.platform.utils.Configs; +import com.platform.utils.Constant; +import com.platform.utils.FileOperateHelper; + +/** + * 校验oracle标准表是否存在 + * + * @author chen + * + */ +public class ThreadCheckoutStandardOracle extends Thread { + + public final static Logger log = Configs.CONSOLE_LOGGER + .getLogger(ThreadCheckoutStandardOracle.class); + + /** + * kuber 客户端 + */ + private SimpleKubeClient client; + + /** + * dataInfo 的数据持久层 + */ + private DataInfoDao dataInfoDao; + + public ThreadCheckoutStandardOracle(DataInfoDao dataInfoDao) { + this.setDaemon(true); + this.client = new SimpleKubeClient(); + this.dataInfoDao = dataInfoDao; + } + + @Override + public void run() { + try { + Thread.sleep(1000 * 5); + } catch (InterruptedException e2) { + log.error(e2); + } + // 循环11次,每次休眠 + for (int i = 0; i < 11; i++) { + try { + // 数据的 keys (=kuber的应用名称 taskName) + Set taskNames = CacheOracleCheckoutEntity + .getCheckKeys(); + int lengs = taskNames.size(); + if (lengs == 0) { + break; + } + String[] taskNamekeys = taskNames.toArray(new String[lengs]); + StringBuffer sbtask = new StringBuffer(); + for (int j = 0; j < taskNamekeys.length; j++) { + sbtask.append(taskNamekeys[j]).append("\t"); + } + log.info("replicationController标签: " + sbtask.toString()); + for (String key : taskNamekeys) { + + // 获得 kuber的 pod + Pod tmpPod = filterPod(key); + if (null == tmpPod) { + log.info("replicationController标签: " + key + + " 的 pod 节点不存在!"); + if (i > 5) { + CacheOracleCheckoutEntity.checkRemove(key); + } + continue; + } + // 尝试 连接 oracle + connectOracle(tmpPod, key); + + if (i == 10) { + + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + key + " status=1"; + + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 失败]"); + log.info(sb.toString()); + + + String cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=0"; + List rList2 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + StringBuffer sb2 = new StringBuffer(); + for (String str : rList2) + sb2.append(str).append("\n"); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 未校验]"); + log.info(sb2.toString()); + // 当前key标签对应的 数据服务的记录 + CheckoutEntity tmp = CacheOracleCheckoutEntity + .getCheck(key); + tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ZERO); + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_ZERO); + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_ZERO); + try { + // 更新数据库 + this.updateDataInfo(tmp); + } catch (Exception e) { + log.error(e); + } + } + } + } catch (Exception e1) { + log.error(e1.getStackTrace()); + } finally { + try { + Thread.sleep(1000 * 60); + } catch (InterruptedException e) { + log.error(e); + } + } + } + } + + /** + * 查找 pod 节点 + * + * @param taskName + * @return + */ + private Pod filterPod(String taskName) { + Pod pod = null; + ReplicationController replicationController = client + .getReplicationController(taskName); + if (null != replicationController) { + List filterPods = client + .getPodsForApplicaList(replicationController); + if (filterPods != null && filterPods.size() > 0) { + pod = filterPods.get(0); + } + } + return pod; + } + + /** + * 尝试连接 oracle服务 + * + * @param tmpPod + * @param key + */ + private void connectOracle(Pod pod, String key) throws Exception { + if (pod != null) { + String ip = client.getPodHostIp(pod); + int port = client.getPodContainerport(pod); + if (ip != null && port != 0) { + String url = "jdbc:oracle:thin:@" + ip + ":" + port + ":" + + Configs.ORACLE_ORCL; + boolean flag = OracleConnector.canConnect(url, // 连接结果返回参数,true标示连接成功,false标示连接失败 + Configs.ORACLE_USER, Configs.ORACLE_PSW); + log.info("url:" + url + ",user:" + Configs.ORACLE_USER + + ",password:" + Configs.ORACLE_PSW); + String message = "失败"; + + + String cmd3 = "kubectl --server " + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + " checkoutFlag=0"; + + List rList3 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd3); + StringBuffer sb3 = new StringBuffer(); + for (String str : rList3) + sb3.append(str).append("\n"); + log.info(sb3.toString()); + log.info("更新replicationController标签: " + key + "\t[标签更新为:未校验]"); + + if (flag) { + + String cmd = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " label --overwrite rc " + key + " status=2"; + + // 设置服务为 成功 + List rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd); + StringBuffer sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info(sb.toString()); + message = "成功"; + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 成功]"); + // 校验标签 + + String cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=2"; + List rList2 = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + StringBuffer sb2 = new StringBuffer(); + for (String str : rList2) + sb2.append(str).append("\n"); + log.info(sb2.toString()); + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 校验中]"); + // 获得当前 服务对应的 数据 + CheckoutEntity tmp = CacheOracleCheckoutEntity + .getCheck(key); + if (null != tmp) { + CacheOracleCheckoutEntity.putExtract(key, tmp); + } + // 查询 对应的 2 个标准表 + OracleConnectorParams oc = new OracleConnectorParams(); + String logName = tmp.getAreaCode().toLowerCase() + "_" + + tmp.getSysCode() + "_" + tmp.getDataVersion(); + oc.setName(logName); + try { + Connection conn = OracleConnector.connectionBuilder( + url, Configs.ORACLE_USER, Configs.ORACLE_PSW, + oc); + // 支付--校验 + if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp + .getPayResultLast())) { + String paySql = "select * from dba_tables where owner = '" + + Configs.COLLECT_STANDARD_TABLE_USER + .toUpperCase() + + "' and table_name = '" + + Configs.COLLECT_PAY_TABLE.toUpperCase() + + "'"; + if (OracleConnector + .execUpdateOracleSQL( + conn, + paySql, + FileOperateHelper + .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) + + logName + "jy.log")) { + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_THREE); + } else { + tmp.setPayResultLast(Constant.CHECKOUT_STATUS_FOUR); + } + } + // 可执行-- 校验 + if (!Constant.CHECKOUT_STATUS_ONE.equals(tmp + .getExecResultLast())) { + String execSql = "select * from dba_tables where owner = '" + + Configs.COLLECT_STANDARD_TABLE_USER + .toUpperCase() + + "' and table_name = '" + + Configs.COLLECT_EXEC_TABLE.toUpperCase() + + "'"; + if (OracleConnector + .execUpdateOracleSQL( + conn, + execSql, + FileOperateHelper + .addLastSeparator(Configs.EXTRACT_STANDARD_LOG_LOCALTION) + + logName + "jy.log")) { + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_THREE); + } else { + tmp.setExecResultLast(Constant.CHECKOUT_STATUS_FOUR); + } + } + tmp.setCheckoutFlag(Constant.CHECKOUTFLAG_ONE); + // 更新数据库data_info + updateDataInfo(tmp); + // 将 CacheOracleCheckoutEntity checkMap中的 数据 + // 放入extractMap中 + /* CacheOracleCheckoutEntity.putExtract(key, */CacheOracleCheckoutEntity + .checkRemove(key)/* ) */; + // 更新kuber状态 + + cmd2 = "kubectl --server " + + Configs.KUBE_MASTER_ADDRESS + + " annotate --overwrite rc " + key + + " checkoutFlag=1"; + // client.updateOrAddReplicasLabelById(taskNSyame, + // "status", "2"); + rList = Constant.ganymedSSH + .execCmdWaitAcquiescent(cmd2); + sb = new StringBuffer(); + for (String str : rList) + sb.append(str).append("\n"); + log.info(sb.toString()); + message = "成功"; + log.info("更新replicationController标签: " + key + + "\t[标签更新为: 已校验]"); + } catch (Exception e) { + log.error(e); + } + // 成功 就 清除 CacheOracleCheckoutEntity 中 的该条记录 + CacheOracleCheckoutEntity.checkRemove(key); + } + log.info("连接到数据库服务: " + key + "\t[连接结果: " + message + "]"); + } + + } + } + + /** + * 更新 dataInfo表 + * + * @param checkoutEntity + * @throws Exception + */ + private void updateDataInfo(CheckoutEntity checkoutEntity) throws Exception { + DataInfoEntity data = new DataInfoEntity(); + // 状态改为 正则校验 Constant.CHECKOUTFLAG_TWO + data.setId(checkoutEntity.getDataId()); + data.setPayResultLast(checkoutEntity.getPayResultLast()); + data.setExecResultLast(checkoutEntity.getExecResultLast()); + data.setCheckoutFlag(checkoutEntity.getCheckoutFlag()); + dataInfoDao.update(data); + DataInfoEntity tmpdata = dataInfoDao.findById(checkoutEntity + .getDataId()); + data.setId(tmpdata.getSrcId()); + dataInfoDao.update(data); + } +} diff --git a/src/com/platform/service/thread/ThreadExtractStandardSqlServer.java b/src/com/platform/service/thread/ThreadExtractStandardSqlServer.java index e33fbcde..7c88d3cf 100644 --- a/src/com/platform/service/thread/ThreadExtractStandardSqlServer.java +++ b/src/com/platform/service/thread/ThreadExtractStandardSqlServer.java @@ -8,6 +8,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; @@ -154,10 +155,11 @@ public class ThreadExtractStandardSqlServer extends Thread{ // TODO Auto-generated catch block e.printStackTrace(); } + Connection conn = null; try { - File execSql = new File(execFilePath); - Connection conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + ":" + conn = OracleConnector.connectionBuilder("jdbc:oracle:thin:@" + oracleConnect.getIp() + ":" + oracleConnect.getPort() + ":" + oracleConnect.getDatabaseName(), oracleConnect.getUser(), oracleConnect.getPassword(), null); + File execSql = new File(execFilePath); // 创建表空间 创建 抽取标准表的 用户并授权 oracleExtract.createTableSpace(conn, collectOracle , oracleConnect); // oracleExtract.createOnlyUser(conn, collectOracle, oracleConnect);// @@ -199,8 +201,17 @@ public class ThreadExtractStandardSqlServer extends Thread{ // TODO Auto-generated catch block e.printStackTrace(); } + finally{ + try { + conn.close(); + } catch (SQLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } } + //删除保存过的id CacheSetCantDelete.removeStandardId(String.valueOf(element.getDataId())); // sql日志记录时间: diff --git a/src/com/platform/utils/Configs.java b/src/com/platform/utils/Configs.java index c151071e..9f789c00 100644 --- a/src/com/platform/utils/Configs.java +++ b/src/com/platform/utils/Configs.java @@ -1,129 +1,130 @@ -package com.platform.utils; - -import org.apache.log4j.Logger; - -/** 配置启动 变量 - * @author chen - * - */ -public class Configs { - - /** 全局自定义异常--编码 */ - public static final String GLOBAL_EXP_CUSTOM = "3001001001"; - - /** 全局非自定义异常--编码 */ - public static final String GLOBAL_EXP_NOT_CUSTOM = "3001001002"; - - public static final String CONFIG_LOCALTION = "WebContent/WEB-INF/config/config.properties"; - - public static final Logger CONSOLE_LOGGER = Logger.getLogger("console"); - - public static final Logger DAILY_ROLLING_LOGGER = Logger - .getLogger("dailyRollingFile"); - - public static final Logger DAILY_LOGGER = Logger.getLogger("railyFile"); - - public static final Logger LOGGER = Logger.getLogger(Configs.class); - - public static String KUBE_MASTER_URL = "http://192.168.0.110:8080/"; // kubernetes集群的maser - // URl - - public static String KUBE_MASTER_ADDRESS="127.0.0.1:8080"; - - public static int ORACLE_DEFAULT_PORT = 1521; // oracle的默认端口号 - - public static String COLLECT_USER_NAME = "system"; //采集统一的登入用户名 - - public static String COLLECT_PASSWORD = "oracle"; //采集统一的登入密码 - - public static String COLLECT_SERVICE_NAME = "orcl"; //采集库统一的服务名 - - public static String GATHER_PORT ="1521"; //汇总库的端口号 - - public static String GATHER_USER_NAME = "system"; //汇总库的登入用户名 - - public static String GATHER_USER_PASSWORD = "1"; //汇总库的登入密码 - - /** 抽取标准表 的 标准表所在 用户 */ - public static String GATHER_STANDARD_USER_NAME = "u_bzbjy"; - - /** 可执行表名 */ - public static String GATHER_STANDARD_EXEC_TABLE_NAME = "kzxzb"; - - /** 可支付表名 */ - public static String GATHER_STANDARD_PAY_TABLE_NAME = "zfxxb"; - - public static String GATHER_SERVICE_NAME = "orcl"; //汇总库的服务名 - - public static String TABLE_SUFFIX = "_20152016"; //汇总库汇总表的后缀名 - - public static String EXTRACT_LOG_LOCALTION = "D:\\log"; //数据汇总日志保存位置 - - public static String EXTRACT_STANDARD_LOG_LOCALTION = "D:\\log2"; //数据抽取日志保存位置 - - public static String GATHER_TABLESPACE_NAME=""; //表空间名 - - public static String GATHER_TABLESPACE_PATH=""; //表空间路径 - - public static String GATHER_TABLE_PASSWORD="1"; //登入密码 - - /** excel上传路径 */ - public static String FILE_UPLOAD_PATH=""; - - /** excel下载路经 */ - public static String FILE_DOWNLOAD_PATH=""; - - /** 包下载路径 */ - public static String PACKAGE_DOWNLOAD_PATH=""; - - /** 下载包名 */ - public static String PACKAGE_NAME=""; - - /** - * 最新脚本的位置 - */ - public static String SQL_SCRIPT_PATH_LAST="D:\\test\\sql_script_last\\"; - /** 归档脚本的位置 */ - public static String SQL_SCRIPT_PATH_STANDARD="D:\\test\\sql_script_standard\\"; - - /** - * 多少 个月前的数据 - */ - public static int dataBefore = 6; - - /** - * excel导入一次批量导入5个 - */ - public static int NUM_ONE_IMPORT_EXCEL = 5; - - /** - * docker启动的 oracle服务 的 实例名称 - */ - public static String ORACLE_ORCL = "orcl"; - - /** - * docker启动的 oracle服务 的 实例登陆用户名 - */ - public static String ORACLE_USER = "system"; - - /** - * docker启动的 oracle服务 的 实例登陆密码 - */ - public static String ORACLE_PSW = "oracle"; - - /** - * 采集库中的 标准表所在 的 用户 - */ - public static String COLLECT_STANDARD_TABLE_USER = "u_bzbjy"; - - /** - * 可执行的 标准表的 名 - */ - public static String COLLECT_EXEC_TABLE = "kzxzb"; - - /** - * 支付的 标准表的 名 - */ - public static String COLLECT_PAY_TABLE = "zfxxb"; - -} +package com.platform.utils; + +import org.apache.log4j.Logger; + +/** 配置启动 变量 + * @author chen + * + */ +public class Configs { + + /** 全局自定义异常--编码 */ + public static final String GLOBAL_EXP_CUSTOM = "3001001001"; + + /** 全局非自定义异常--编码 */ + public static final String GLOBAL_EXP_NOT_CUSTOM = "3001001002"; + + public static final String CONFIG_LOCALTION = "WebContent/WEB-INF/config/config.properties"; + + public static final Logger CONSOLE_LOGGER = Logger.getLogger("console"); + + public static final Logger DAILY_ROLLING_LOGGER = Logger + .getLogger("dailyRollingFile"); + + public static final Logger DAILY_LOGGER = Logger.getLogger("railyFile"); + + public static final Logger LOGGER = Logger.getLogger(Configs.class); + + public static String KUBE_MASTER_URL = "http://192.168.0.110:8080/"; // kubernetes集群的maser + // URl + + public static String KUBE_MASTER_ADDRESS="127.0.0.1:8080"; + + + public static int ORACLE_DEFAULT_PORT = 1521; // oracle的默认端口号 + + public static String COLLECT_USER_NAME = "system"; //采集统一的登入用户名 + + public static String COLLECT_PASSWORD = "oracle"; //采集统一的登入密码 + + public static String COLLECT_SERVICE_NAME = "orcl"; //采集库统一的服务名 + + public static String GATHER_PORT ="1521"; //汇总库的端口号 + + public static String GATHER_USER_NAME = "system"; //汇总库的登入用户名 + + public static String GATHER_USER_PASSWORD = "1"; //汇总库的登入密码 + + /** 抽取标准表 的 标准表所在 用户 */ + public static String GATHER_STANDARD_USER_NAME = "u_bzbjy"; + + /** 可执行表名 */ + public static String GATHER_STANDARD_EXEC_TABLE_NAME = "kzxzb"; + + /** 可支付表名 */ + public static String GATHER_STANDARD_PAY_TABLE_NAME = "zfxxb"; + + public static String GATHER_SERVICE_NAME = "orcl"; //汇总库的服务名 + + public static String TABLE_SUFFIX = "_20152016"; //汇总库汇总表的后缀名 + + public static String EXTRACT_LOG_LOCALTION = "D:\\log"; //数据汇总日志保存位置 + + public static String EXTRACT_STANDARD_LOG_LOCALTION = "D:\\log2"; //数据抽取日志保存位置 + + public static String GATHER_TABLESPACE_NAME=""; //表空间名 + + public static String GATHER_TABLESPACE_PATH=""; //表空间路径 + + public static String GATHER_TABLE_PASSWORD="1"; //登入密码 + + /** excel上传路径 */ + public static String FILE_UPLOAD_PATH=""; + + /** excel下载路经 */ + public static String FILE_DOWNLOAD_PATH=""; + + /** 包下载路径 */ + public static String PACKAGE_DOWNLOAD_PATH=""; + + /** 下载包名 */ + public static String PACKAGE_NAME=""; + + /** + * 最新脚本的位置 + */ + public static String SQL_SCRIPT_PATH_LAST="D:\\test\\sql_script_last\\"; + /** 归档脚本的位置 */ + public static String SQL_SCRIPT_PATH_STANDARD="D:\\test\\sql_script_standard\\"; + + /** + * 多少 个月前的数据 + */ + public static int dataBefore = 6; + + /** + * excel导入一次批量导入5个 + */ + public static int NUM_ONE_IMPORT_EXCEL = 5; + + /** + * docker启动的 oracle服务 的 实例名称 + */ + public static String ORACLE_ORCL = "orcl"; + + /** + * docker启动的 oracle服务 的 实例登陆用户名 + */ + public static String ORACLE_USER = "system"; + + /** + * docker启动的 oracle服务 的 实例登陆密码 + */ + public static String ORACLE_PSW = "oracle"; + + /** + * 采集库中的 标准表所在 的 用户 + */ + public static String COLLECT_STANDARD_TABLE_USER = "u_bzbjy"; + + /** + * 可执行的 标准表的 名 + */ + public static String COLLECT_EXEC_TABLE = "kzxzb"; + + /** + * 支付的 标准表的 名 + */ + public static String COLLECT_PAY_TABLE = "zfxxb"; + +} diff --git a/src/com/platform/utils/GanymedSSH.java b/src/com/platform/utils/GanymedSSH.java index 611f4e5e..0f34140c 100644 --- a/src/com/platform/utils/GanymedSSH.java +++ b/src/com/platform/utils/GanymedSSH.java @@ -114,6 +114,9 @@ public class GanymedSSH { Session sess = null; try { // conn = getOpenedConnection(host, username, password, port); + if (null != conn) { +// conn=getOpenedConnection(Configs., username, password, port) + } sess = conn.openSession(); // 执锟斤拷cmd sess.execCommand(cmd);