diff --git a/src/main/java/org/nlpcn/jcoder/controller/GroupAction.java b/src/main/java/org/nlpcn/jcoder/controller/GroupAction.java index 4ea61f4..032a33c 100644 --- a/src/main/java/org/nlpcn/jcoder/controller/GroupAction.java +++ b/src/main/java/org/nlpcn/jcoder/controller/GroupAction.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.nlpcn.jcoder.constant.Api; import org.nlpcn.jcoder.constant.Constants; import org.nlpcn.jcoder.domain.FileInfo; @@ -36,301 +37,305 @@ @Ok("json") public class GroupAction { - private static final Logger LOG = LoggerFactory.getLogger(GroupAction.class); + private static final Logger LOG = LoggerFactory.getLogger(GroupAction.class); - @Inject - private GroupService groupService; + @Inject + private GroupService groupService; - @Inject - private ProxyService proxyService; + @Inject + private ProxyService proxyService; - @Inject - private TaskService taskService; + @Inject + private TaskService taskService; - private BasicDao basicDao = StaticValue.systemDao; + private BasicDao basicDao = StaticValue.systemDao; - @At - public Restful list() throws Exception { - return Restful.instance(groupService.list()); - } - - @At - public Restful hostList() throws Exception { - return Restful.instance(groupService.getAllHosts()); - } - - @At - public Restful groupHostList(@Param("name") String name) throws Exception { - return Restful.instance(groupService.getGroupHostList(name)); - } - - - @At - public Restful changeWeight(@Param("groupName") String groupName, @Param("hostPort") String hostPort, @Param("weight") Integer weight) { - if (weight == null) { - return Restful.instance().ok(false).msg("权重必须为正整数"); - } - - ZKMap hostGroupCache = StaticValue.space().getHostGroupCache(); - - HostGroup hostGroup = hostGroupCache.get(hostPort + "_" + groupName); - - if (hostGroup == null) { - return Restful.instance().ok(false).msg("没有找到此对象"); - } - - hostGroup.setWeight(weight); - - hostGroupCache.put(hostPort + "_" + groupName, hostGroup); - - return Restful.instance().ok(true).msg(hostPort + " 更改权重为:" + weight); + @At + public Restful list() throws Exception { + return Restful.instance(groupService.list()); + } + + @At + public Restful hostList() throws Exception { + return Restful.instance(groupService.getAllHosts()); + } + + @At + public Restful groupHostList(@Param("name") String name) throws Exception { + return Restful.instance(groupService.getGroupHostList(name)); + } + + + @At + public Restful changeWeight(@Param("groupName") String groupName, @Param("hostPort") String hostPort, @Param("weight") Integer weight) { + if (weight == null) { + return Restful.instance().ok(false).msg("权重必须为正整数"); + } + + ZKMap hostGroupCache = StaticValue.space().getHostGroupCache(); + + HostGroup hostGroup = hostGroupCache.get(hostPort + "_" + groupName); + + if (hostGroup == null) { + return Restful.instance().ok(false).msg("没有找到此对象"); + } + + hostGroup.setWeight(weight); + + hostGroupCache.put(hostPort + "_" + groupName, hostGroup); + + return Restful.instance().ok(true).msg(hostPort + " 更改权重为:" + weight); + + } + + @At + public Restful check(@Param("name") String name) { + Condition con = Cnd.where("name", "=", name); + int count = basicDao.searchCount(Group.class, con); + if (count > 0) { + return Restful.instance().ok(false).msg("组" + name + "已存在"); + } + return Restful.instance().ok(true).msg("组" + name + "不存在"); + } - } - - @At - public Restful check(@Param("name") String name) { - Condition con = Cnd.where("name", "=", name); - int count = basicDao.searchCount(Group.class, con); - if (count > 0) { - return Restful.instance().ok(false).msg("组" + name + "已存在"); - } - return Restful.instance().ok(true).msg("组" + name + "不存在"); - } - - @At - public Restful add(@Param("hostPorts") String[] hostPorts, @Param("name") String name, @Param(value = "first", df = "true") boolean first) throws Exception { - - if (!name.matches("[a-z0-9A-Z_\\.]+")) { - return Restful.fail().msg("Group名称只能是英文字母或数字和_"); - } - - if (!first) { - - Group groupByName = groupService.findGroupByName(name); - - if (groupByName != null) { - return Restful.fail().msg(name + " 已存在"); - } - - File file = new File(StaticValue.GROUP_FILE, name); - file.mkdirs(); - File resource = new File(StaticValue.GROUP_FILE, name + "/resources"); - resource.mkdir(); - File lib = new File(StaticValue.GROUP_FILE, name + "/lib"); - lib.mkdir(); - - - if (!new File(resource, "ioc.js").exists()) { - IOUtil.Writer(new File(resource, "ioc.js").getAbsolutePath(), "utf-8", "var ioc = {\n\t\n};"); - } - - if (!new File(file, "pom.xml").exists()) { - IOUtil.Writer(new File(file, "pom.xml").getAbsolutePath(), "utf-8", - "\n" + - "\t4.0.0\n" + - "\torg.nlpcn.jcoder\n" + - "\t" + name + "\n" + - "\t0.1\n" + - "\n" + - "\t\n" + - "\t\t\n" + - "\t\t\torg.nlpcn\n" + - "\t\t\tjcoder\n" + - "\t\t\t0.1\n" + - "\t\t\t${basedir}/../../../lib/" + StaticValue.getJcoderJarFile().getName() + "\n" + - "\t\t\tsystem\n" + - "\t\t" + - "\n" + - "\t\n" + - "\t\n" + - "\t\tsrc/main\n" + - "\t\tsrc/api\n" + - "\t\t\n" + - "\t\t\t\n" + - "\t\t\t\tmaven-compiler-plugin\n" + - "\t\t\t\t3.3\n" + - "\t\t\t\t\n" + - "\t\t\t\t\t1.8\n" + - "\t\t\t\t\t1.8\n" + - "\t\t\t\t\tUTF-8\n" + - "\t\t\t\t\t\n" + - "\t\t\t\t\t\tlib\n" + - "\t\t\t\t\t\n" + - "\t\t\t\t\n" + - "\t\t\t\n" + - "\t\t\n" + - "\t\n" + - "\n"); - } - - Group group = new Group(); - group.setName(name); - - basicDao.save(group); - - StaticValue.space().joinCluster(group); - - if (StaticValue.TESTRING) { //测试模式进行文件监听 - GroupFileListener.regediter(name); - } - - return Restful.instance(true, "添加成功"); - } else { - - Set hostPortsArr = new HashSet<>(); - - Arrays.stream(hostPorts).forEach(s -> hostPortsArr.add((String) s)); - - Restful restful = proxyService.post(hostPortsArr, "/admin/group/check", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); - - if (!restful.isOk()) { - return restful.code(500); - } - - return proxyService.post(hostPortsArr, "/admin/group/add", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_MESSAGE_CALLBACK); - } - } - - /** - * 从集群中彻底删除一个group 要求group name必须没有任何一个机器使用中 - */ - @At - public Restful deleteByCluster(@Param("name") String name) { - try { - groupService.deleteByCluster(name); - return Restful.instance().ok(true).msg("删除 group: " + name + " 成功"); - } catch (Exception e) { - e.printStackTrace(); - return Restful.instance().ok(false).msg("删除 group: " + name + " 失败"); - } - } - - - /** - * 删除group - */ - @At - public Restful delete(@Param("hostPorts") String[] hostPorts, @Param("name") String name, @Param(value = "first", df = "true") boolean first) throws Exception { - - if (!first) { - boolean flag = groupService.deleteGroup(name); - if (StaticValue.TESTRING) { //测试模式进行文件监听 - GroupFileListener.unRegediter(name); - } - if (flag) { - return Restful.instance(flag, "删除成功"); - } else { - return Restful.instance(flag, "删除文件失败"); - } - } else { - Set hostPortsArr = new HashSet<>(); - Arrays.stream(hostPorts).forEach(s -> hostPortsArr.add((String) s)); - return proxyService.post(hostPortsArr, "/admin/group/delete", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_MESSAGE_CALLBACK); - } - } - - - @At - public Restful share(@Param("hostPorts") String[] hostPorts, @Param("formHostPort") String fromHostPort, @Param("groupName") String groupName, @Param("toGroupName") String toGroupName) throws Exception { - if (StringUtil.isBlank(toGroupName)) { - toGroupName = groupName; - } - - if (hostPorts == null || hostPorts.length == 0 || StringUtil.isBlank(hostPorts[0])) { - return Restful.fail().msg("必须选择一个目标主机"); - } - - return proxyService.post(hostPorts, "/admin/group/installGroup", ImmutableMap.of("fromHostPort", fromHostPort, "groupName", groupName, "toGroupName", toGroupName.trim()), 1200000, ProxyService.MERGE_MESSAGE_CALLBACK); - } - - - /** - * 克隆一个主机的group到当前主机上 - */ - @At - public synchronized Restful installGroup(@Param("fromHostPort") String fromHostPort, @Param("groupName") String groupName, @Param("toGroupName") String toGroupName) throws Exception { - //判断当前group是否存在 - Group group = basicDao.findByCondition(Group.class, Cnd.where("name", "=", toGroupName)); - if (group != null) { - return Restful.instance(false, toGroupName + " 已存在!"); - } - - //获取远程主机的所有files - Response response = proxyService.post(fromHostPort, "/admin/fileInfo/listFiles", ImmutableMap.of("hostPort", fromHostPort, "groupName", groupName), 120000); - - JSONArray jarry = JSONObject.parseObject(response.getContent()).getJSONArray("obj"); - - File groupFile = new File(StaticValue.GROUP_FILE, toGroupName); - - for (Object o : jarry) { - FileInfo fileInfo = JSONObject.toJavaObject((JSON) o, FileInfo.class); - - File file = new File(groupFile, fileInfo.getRelativePath()); - - if (fileInfo.isDirectory()) { - file.mkdirs(); - } else { - if (!file.getParentFile().exists()) { - file.getParentFile().mkdirs(); - } - long start = System.currentTimeMillis(); - LOG.info("to down " + fileInfo.getRelativePath()); - Response post = proxyService.post(fromHostPort, "/admin/fileInfo/downFile", ImmutableMap.of("hostPort", fromHostPort, "groupName", groupName, "relativePath", fileInfo.getRelativePath()), 1200000); - IOUtil.writeAndClose(post.getStream(), file); - LOG.info("down ok : {} use time : {} ", fileInfo.getRelativePath(), System.currentTimeMillis() - start); - } - } - - //从远程主机获取所有的task - response = proxyService.post(fromHostPort, "/admin/task/taskGroupList", ImmutableMap.of("groupName", groupName), 120000); - - System.out.println(response.getContent()); - - //获取远程主机的所有tasks,本地创建group - group = new Group(); - group.setName(toGroupName); - group.setDescription("create at " + DateUtils.formatDate(new Date(), DateUtils.SDF_FORMAT) + " from " + fromHostPort); - group.setCreateTime(new Date()); - basicDao.save(group); - - jarry = JSONObject.parseObject(response.getContent()).getJSONArray("obj"); - for (Object o : jarry) { - Task task = JSONObject.toJavaObject((JSON) o, Task.class); - task.setGroupName(group.getName()); - basicDao.save(task); - taskService.flush(task.getId()); - LOG.info("install task {}", task.getName()); - } - - //刷新本地group,加入到集群中 - groupService.flush(group); - - if (StaticValue.TESTRING) { //测试模式进行文件监听 - GroupFileListener.regediter(toGroupName); - } - - return Restful.instance().msg("克隆成功"); - } - - /** - * 刷新一个group到集群中 - * - * @param hostPort 需要刷新的主机 - * @return 不同 - */ - @At - public Restful flush(@Param("hostPort") String hostPort, @Param("groupName") String groupName, @Param(value = "first", df = "true") boolean first) throws Exception { - if (!first || StaticValue.getHostPort().equals(hostPort)) { - JarService.flush(groupName); - return Restful.instance(groupService.flush(groupName)); - } else { - Response post = proxyService.post(hostPort, "/admin/group/flush", ImmutableMap.of("hostPort", hostPort, "groupName", groupName, "first", false), 120000); - return Restful.instance(post); - } - - } + @At + public Restful add(@Param(value = "hostPorts", array_auto_split = true) String[] hostPorts, @Param("name") String name, @Param(value = "first", df = "true") boolean first) throws Exception { + + if (first && (hostPorts == null || hostPorts.length == 0)) { + return Restful.fail().msg("group:" + name + " host num is 0"); + } + + if (!name.matches("[a-z0-9A-Z_\\.]+")) { + return Restful.fail().msg("Group名称只能是英文字母或数字和_"); + } + + if (!first) { + + Group groupByName = groupService.findGroupByName(name); + + if (groupByName != null) { + return Restful.fail().msg(name + " 已存在"); + } + + File file = new File(StaticValue.GROUP_FILE, name); + file.mkdirs(); + File resource = new File(StaticValue.GROUP_FILE, name + "/resources"); + resource.mkdir(); + File lib = new File(StaticValue.GROUP_FILE, name + "/lib"); + lib.mkdir(); + + + if (!new File(resource, "ioc.js").exists()) { + IOUtil.Writer(new File(resource, "ioc.js").getAbsolutePath(), "utf-8", "var ioc = {\n\t\n};"); + } + + if (!new File(file, "pom.xml").exists()) { + IOUtil.Writer(new File(file, "pom.xml").getAbsolutePath(), "utf-8", + "\n" + + "\t4.0.0\n" + + "\torg.nlpcn.jcoder\n" + + "\t" + name + "\n" + + "\t0.1\n" + + "\n" + + "\t\n" + + "\t\t\n" + + "\t\t\torg.nlpcn\n" + + "\t\t\tjcoder\n" + + "\t\t\t0.1\n" + + "\t\t\t${basedir}/../../../lib/" + StaticValue.getJcoderJarFile().getName() + "\n" + + "\t\t\tsystem\n" + + "\t\t" + + "\n" + + "\t\n" + + "\t\n" + + "\t\tsrc/main\n" + + "\t\tsrc/api\n" + + "\t\t\n" + + "\t\t\t\n" + + "\t\t\t\tmaven-compiler-plugin\n" + + "\t\t\t\t3.3\n" + + "\t\t\t\t\n" + + "\t\t\t\t\t1.8\n" + + "\t\t\t\t\t1.8\n" + + "\t\t\t\t\tUTF-8\n" + + "\t\t\t\t\t\n" + + "\t\t\t\t\t\tlib\n" + + "\t\t\t\t\t\n" + + "\t\t\t\t\n" + + "\t\t\t\n" + + "\t\t\n" + + "\t\n" + + "\n"); + } + + Group group = new Group(); + group.setName(name); + + basicDao.save(group); + + StaticValue.space().joinCluster(group); + + if (StaticValue.TESTRING) { //测试模式进行文件监听 + GroupFileListener.regediter(name); + } + + return Restful.instance(true, "添加成功"); + } else { + + Set hostPortsArr = new HashSet<>(); + + Arrays.stream(hostPorts).filter(StringUtil::isNotBlank).forEach(s -> hostPortsArr.add((String) s)); + + Restful restful = proxyService.post(hostPortsArr, "/admin/group/check", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); + + if (!restful.isOk()) { + return restful.code(500); + } + + return proxyService.post(hostPortsArr, "/admin/group/add", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_MESSAGE_CALLBACK); + } + } + + /** + * 从集群中彻底删除一个group 要求group name必须没有任何一个机器使用中 + */ + @At + public Restful deleteByCluster(@Param("name") String name) { + try { + groupService.deleteByCluster(name); + return Restful.instance().ok(true).msg("删除 group: " + name + " 成功"); + } catch (Exception e) { + e.printStackTrace(); + return Restful.instance().ok(false).msg("删除 group: " + name + " 失败"); + } + } + + + /** + * 删除group + */ + @At + public Restful delete(@Param("hostPorts") String[] hostPorts, @Param("name") String name, @Param(value = "first", df = "true") boolean first) throws Exception { + + if (!first) { + boolean flag = groupService.deleteGroup(name); + if (StaticValue.TESTRING) { //测试模式进行文件监听 + GroupFileListener.unRegediter(name); + } + if (flag) { + return Restful.instance(flag, "删除成功"); + } else { + return Restful.instance(flag, "删除文件失败"); + } + } else { + Set hostPortsArr = new HashSet<>(); + Arrays.stream(hostPorts).forEach(s -> hostPortsArr.add((String) s)); + return proxyService.post(hostPortsArr, "/admin/group/delete", ImmutableMap.of("name", name, "first", false), 100000, ProxyService.MERGE_MESSAGE_CALLBACK); + } + } + + + @At + public Restful share(@Param("hostPorts") String[] hostPorts, @Param("formHostPort") String fromHostPort, @Param("groupName") String groupName, @Param("toGroupName") String toGroupName) throws Exception { + if (StringUtil.isBlank(toGroupName)) { + toGroupName = groupName; + } + + if (hostPorts == null || hostPorts.length == 0 || StringUtil.isBlank(hostPorts[0])) { + return Restful.fail().msg("必须选择一个目标主机"); + } + + return proxyService.post(hostPorts, "/admin/group/installGroup", ImmutableMap.of("fromHostPort", fromHostPort, "groupName", groupName, "toGroupName", toGroupName.trim()), 1200000, ProxyService.MERGE_MESSAGE_CALLBACK); + } + + + /** + * 克隆一个主机的group到当前主机上 + */ + @At + public synchronized Restful installGroup(@Param("fromHostPort") String fromHostPort, @Param("groupName") String groupName, @Param("toGroupName") String toGroupName) throws Exception { + //判断当前group是否存在 + Group group = basicDao.findByCondition(Group.class, Cnd.where("name", "=", toGroupName)); + if (group != null) { + return Restful.instance(false, toGroupName + " 已存在!"); + } + + //获取远程主机的所有files + Response response = proxyService.post(fromHostPort, "/admin/fileInfo/listFiles", ImmutableMap.of("hostPort", fromHostPort, "groupName", groupName), 120000); + + JSONArray jarry = JSONObject.parseObject(response.getContent()).getJSONArray("obj"); + + File groupFile = new File(StaticValue.GROUP_FILE, toGroupName); + + for (Object o : jarry) { + FileInfo fileInfo = JSONObject.toJavaObject((JSON) o, FileInfo.class); + + File file = new File(groupFile, fileInfo.getRelativePath()); + + if (fileInfo.isDirectory()) { + file.mkdirs(); + } else { + if (!file.getParentFile().exists()) { + file.getParentFile().mkdirs(); + } + long start = System.currentTimeMillis(); + LOG.info("to down " + fileInfo.getRelativePath()); + Response post = proxyService.post(fromHostPort, "/admin/fileInfo/downFile", ImmutableMap.of("hostPort", fromHostPort, "groupName", groupName, "relativePath", fileInfo.getRelativePath()), 1200000); + IOUtil.writeAndClose(post.getStream(), file); + LOG.info("down ok : {} use time : {} ", fileInfo.getRelativePath(), System.currentTimeMillis() - start); + } + } + + //从远程主机获取所有的task + response = proxyService.post(fromHostPort, "/admin/task/taskGroupList", ImmutableMap.of("groupName", groupName), 120000); + + System.out.println(response.getContent()); + + //获取远程主机的所有tasks,本地创建group + group = new Group(); + group.setName(toGroupName); + group.setDescription("create at " + DateUtils.formatDate(new Date(), DateUtils.SDF_FORMAT) + " from " + fromHostPort); + group.setCreateTime(new Date()); + basicDao.save(group); + + jarry = JSONObject.parseObject(response.getContent()).getJSONArray("obj"); + for (Object o : jarry) { + Task task = JSONObject.toJavaObject((JSON) o, Task.class); + task.setGroupName(group.getName()); + basicDao.save(task); + taskService.flush(task.getId()); + LOG.info("install task {}", task.getName()); + } + + //刷新本地group,加入到集群中 + groupService.flush(group); + + if (StaticValue.TESTRING) { //测试模式进行文件监听 + GroupFileListener.regediter(toGroupName); + } + + return Restful.instance().msg("克隆成功"); + } + + /** + * 刷新一个group到集群中 + * + * @param hostPort 需要刷新的主机 + * @return 不同 + */ + @At + public Restful flush(@Param("hostPort") String hostPort, @Param("groupName") String groupName, @Param(value = "first", df = "true") boolean first) throws Exception { + if (!first || StaticValue.getHostPort().equals(hostPort)) { + JarService.flush(groupName); + return Restful.instance(groupService.flush(groupName)); + } else { + Response post = proxyService.post(hostPort, "/admin/group/flush", ImmutableMap.of("hostPort", hostPort, "groupName", groupName, "first", false), 120000); + return Restful.instance(post); + } + + } // /** @@ -350,89 +355,89 @@ public Restful flush(@Param("hostPort") String hostPort, @Param("groupName") Str // } // } - /** - * 修复不同 - */ - @At - public Restful fixDiff(String fromHostPort, String toHostPort, String groupName, @Param("relativePath[]") String[] relativePaths) throws Exception { - - if (StringUtil.isBlank(fromHostPort)) { - fromHostPort = Constants.HOST_MASTER; - } - - if (StringUtil.isBlank(toHostPort)) { - toHostPort = Constants.HOST_MASTER; - } - - boolean toMaster = Constants.HOST_MASTER.equals(toHostPort); - - Set toHostPorts = new HashSet<>(); - - if (Constants.HOST_MASTER.equals(fromHostPort)) { //说明是主机 - fromHostPort = groupService.getRandomCurrentHostPort(groupName); - if (fromHostPort == null) { - return Restful.fail().msg("主版本中不存在任何实例,所以无法同步"); - } - } - - if (toMaster) { - List currentHostPort = groupService.getCurrentHostPort(groupName); - toHostPorts.addAll(currentHostPort); - } else { - toHostPorts.add(toHostPort); - } - - toHostPorts.remove(fromHostPort); - - List message = new ArrayList<>(); - - boolean flag = true; - - for (String relativePath : relativePaths) { - if (relativePath.startsWith("/")) {//更新文件的 - if (toHostPorts.size() > 0) { - Restful restful = proxyService.post(toHostPorts, "/admin/fileInfo/copyFile", ImmutableMap.of("fromHostPort", fromHostPort, "groupName", groupName, "relativePaths", relativePath), 120000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); - flag = flag && restful.isOk(); - if (!restful.isOk()) { - message.add(restful.getMessage()); - } - } - - if (toMaster) { - Response post1 = proxyService.post(fromHostPort, "/admin/fileInfo/upCluster", ImmutableMap.of("groupName", groupName, "relativePaths", relativePath), 120000); - message.add(post1.getContent()); - flag = flag && Restful.instance(post1).isOk(); - } - - } else {//更新task的 - - if (toHostPorts.size() > 0) { - Restful restful = proxyService.post(toHostPorts, Api.TASK_SYN.getPath(), ImmutableMap.of("fromHost", fromHostPort, "groupName", groupName, "taskName", relativePath), 10000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); - if (!restful.isOk()) { - flag = false; - message.add(restful.getMessage()); - } - } - - if (toMaster) { - Response post = proxyService.post(StaticValue.getHostPort(), "/admin/task/task", ImmutableMap.of("groupName", groupName, "name", relativePath, "sourceHost", fromHostPort), 100000); - Restful restful = Restful.instance(post); - - if (restful.code() == ApiException.NotFound) { - taskService.deleteTaskFromCluster(groupName, relativePath); - } else if (restful.code() == ApiException.OK && restful.getObj() != null) { - StaticValue.space().addTask(JSONObject.toJavaObject(restful.getObj(), Task.class)); - } else { - message.add(restful.getMessage()); - flag = false; - } - - } - - - } - } - return Restful.instance(flag, Joiner.on(",").skipNulls().join(message)); - } + /** + * 修复不同 + */ + @At + public Restful fixDiff(String fromHostPort, String toHostPort, String groupName, @Param("relativePath[]") String[] relativePaths) throws Exception { + + if (StringUtil.isBlank(fromHostPort)) { + fromHostPort = Constants.HOST_MASTER; + } + + if (StringUtil.isBlank(toHostPort)) { + toHostPort = Constants.HOST_MASTER; + } + + boolean toMaster = Constants.HOST_MASTER.equals(toHostPort); + + Set toHostPorts = new HashSet<>(); + + if (Constants.HOST_MASTER.equals(fromHostPort)) { //说明是主机 + fromHostPort = groupService.getRandomCurrentHostPort(groupName); + if (fromHostPort == null) { + return Restful.fail().msg("主版本中不存在任何实例,所以无法同步"); + } + } + + if (toMaster) { + List currentHostPort = groupService.getCurrentHostPort(groupName); + toHostPorts.addAll(currentHostPort); + } else { + toHostPorts.add(toHostPort); + } + + toHostPorts.remove(fromHostPort); + + List message = new ArrayList<>(); + + boolean flag = true; + + for (String relativePath : relativePaths) { + if (relativePath.startsWith("/")) {//更新文件的 + if (toHostPorts.size() > 0) { + Restful restful = proxyService.post(toHostPorts, "/admin/fileInfo/copyFile", ImmutableMap.of("fromHostPort", fromHostPort, "groupName", groupName, "relativePaths", relativePath), 120000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); + flag = flag && restful.isOk(); + if (!restful.isOk()) { + message.add(restful.getMessage()); + } + } + + if (toMaster) { + Response post1 = proxyService.post(fromHostPort, "/admin/fileInfo/upCluster", ImmutableMap.of("groupName", groupName, "relativePaths", relativePath), 120000); + message.add(post1.getContent()); + flag = flag && Restful.instance(post1).isOk(); + } + + } else {//更新task的 + + if (toHostPorts.size() > 0) { + Restful restful = proxyService.post(toHostPorts, Api.TASK_SYN.getPath(), ImmutableMap.of("fromHost", fromHostPort, "groupName", groupName, "taskName", relativePath), 10000, ProxyService.MERGE_FALSE_MESSAGE_CALLBACK); + if (!restful.isOk()) { + flag = false; + message.add(restful.getMessage()); + } + } + + if (toMaster) { + Response post = proxyService.post(StaticValue.getHostPort(), "/admin/task/task", ImmutableMap.of("groupName", groupName, "name", relativePath, "sourceHost", fromHostPort), 100000); + Restful restful = Restful.instance(post); + + if (restful.code() == ApiException.NotFound) { + taskService.deleteTaskFromCluster(groupName, relativePath); + } else if (restful.code() == ApiException.OK && restful.getObj() != null) { + StaticValue.space().addTask(JSONObject.toJavaObject(restful.getObj(), Task.class)); + } else { + message.add(restful.getMessage()); + flag = false; + } + + } + + + } + } + return Restful.instance(flag, Joiner.on(",").skipNulls().join(message)); + } } diff --git a/src/main/java/org/nlpcn/jcoder/service/ProxyService.java b/src/main/java/org/nlpcn/jcoder/service/ProxyService.java index b85ba8e..bf86ee5 100644 --- a/src/main/java/org/nlpcn/jcoder/service/ProxyService.java +++ b/src/main/java/org/nlpcn/jcoder/service/ProxyService.java @@ -215,6 +215,7 @@ public T post(String[] hostPorts, String path, Map params, i * 同时提交到多个主机上 */ public T post(Set hostPorts, String path, Map params, int timeout, Function, T> fun) throws Exception { + LOG.info("send post to : "+hostPorts); Map result = post(hostPorts, path, params, timeout); return fun.apply(result); } diff --git a/src/main/java/org/nlpcn/jcoder/util/StaticValue.java b/src/main/java/org/nlpcn/jcoder/util/StaticValue.java index 5c61bb5..d79a6dd 100644 --- a/src/main/java/org/nlpcn/jcoder/util/StaticValue.java +++ b/src/main/java/org/nlpcn/jcoder/util/StaticValue.java @@ -178,11 +178,16 @@ public static Ioc getUserIoc() { * 从配置文件查找 */ public static String getResource(String key) { - ResourceBundle bundle = ResourceBundle.getBundle("jcoder"); - if (bundle.containsKey(key)) { - return bundle.getString(key); - } else { - return null; + try { + ResourceBundle bundle = ResourceBundle.getBundle("jcoder"); + if (bundle.containsKey(key)) { + return bundle.getString(key); + } else { + return null; + } + }catch (Exception e){ + LOG.warn("init version err ",e); + return key ; } } diff --git a/src/main/webapp/modules/group/groupAddOrEdit.html b/src/main/webapp/modules/group/groupAddOrEdit.html index 5693173..195f41f 100644 --- a/src/main/webapp/modules/group/groupAddOrEdit.html +++ b/src/main/webapp/modules/group/groupAddOrEdit.html @@ -57,11 +57,11 @@ } }); }, - hostArray: function () { + getHostArray: function () { var $this = this; - var len = $(":checkbox").length ; + var len = $(".ace").length ; for(var i =0 ;i