Skip to content

Commit

Permalink
optimize cachecloud quartz
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosfu committed Aug 24, 2016
1 parent 4890432 commit 620a925
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 444 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.sohu.cache.async;

import java.util.concurrent.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.sohu.cache.client.service.impl.ClientReportDataServiceImpl;
import com.sohu.cache.redis.impl.RedisCenterImpl;
Expand All @@ -11,7 +14,7 @@
public class AsyncThreadPoolFactory {

public static final ThreadPoolExecutor CLIENT_REPORT_THREAD_POOL =
new ThreadPoolExecutor(80, 80, 0L, TimeUnit.MILLISECONDS,
new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new NamedThreadFactory(
ClientReportDataServiceImpl.CLIENT_REPORT_POOL, true));

Expand All @@ -20,5 +23,10 @@ public class AsyncThreadPoolFactory {
new ThreadPoolExecutor(30, 30, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(), new NamedThreadFactory(
RedisCenterImpl.REDIS_SLOWLOG_POOL, true));


public static final String MACHINE_POOL ="machine-pool";
public static final ThreadPoolExecutor MACHINE_THREAD_POOL =
new ThreadPoolExecutor(100, 100, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1000), new NamedThreadFactory(
MACHINE_POOL, true));
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.sohu.cache.inspect;

import com.sohu.cache.inspect.impl.HostInspectHandler;
import com.sohu.cache.schedule.jobs.CacheBaseJob;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
Expand All @@ -18,12 +17,13 @@ public class InspectorJob extends CacheBaseJob {
@Override
public void action(JobExecutionContext context) {
try {
long start = System.currentTimeMillis();
SchedulerContext schedulerContext = context.getScheduler().getContext();
ApplicationContext applicationContext = (ApplicationContext) schedulerContext.get(APPLICATION_CONTEXT_KEY);
// 应用相关
InspectHandler inspectHandler;
JobDataMap jobDataMap = context.getMergedJobDataMap();
String inspectorType = MapUtils.getString(jobDataMap,"inspectorType");
String inspectorType = MapUtils.getString(jobDataMap, "inspectorType");
if (StringUtils.isBlank(inspectorType)) {
logger.error("=====================InspectorJob:inspectorType is null=====================");
return;
Expand All @@ -36,7 +36,9 @@ public void action(JobExecutionContext context) {
return;
}
inspectHandler.handle();
logger.warn("=====================InspectorJob handle Done!=====================");
long end = System.currentTimeMillis();
logger.info("=====================InspectorJob {} Done! cost={} ms=====================",
inspectHandler.getClass().getSimpleName(), (end - start));
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand All @@ -35,7 +36,7 @@ public abstract class AbstractInspectHandler implements InspectHandler {
public void init() {
asyncService.assemblePool(getThreadPoolKey(), new ThreadPoolExecutor(5, 100,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new LinkedBlockingQueue<Runnable>(1024),
new NamedThreadFactory(getThreadPoolKey(), true)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.sohu.cache.machine;

import java.util.List;
import java.util.Map;

import com.sohu.cache.constant.MachineInfoEnum.TypeEnum;
import com.sohu.cache.entity.InstanceInfo;
import com.sohu.cache.entity.InstanceStats;
import com.sohu.cache.entity.MachineInfo;
import com.sohu.cache.entity.MachineStats;

import java.util.List;
import java.util.Map;

/**
* 基于host的操作
*
Expand Down Expand Up @@ -45,6 +45,15 @@ public interface MachineCenter {
* @return 机器的信息
*/
public Map<String, Object> collectMachineInfo(final long hostId, final long collectTime, final String ip);

/**
* 异步收集host的状态信息
*
* @param hostId 机器id
* @param collectTime 收集时间
* @param ip ip
*/
public void asyncCollectMachineInfo(final long hostId, final long collectTime, final String ip);

/**
* 为当前机器的监控删除trigger
Expand Down Expand Up @@ -72,6 +81,15 @@ public interface MachineCenter {
* @return
*/
public void monitorMachineStats(final long hostId, final String ip);

/**
* 异步监控机器的状态信息,向上层汇报或者报警
*
* @param hostId 机器id
* @param ip ip
* @return
*/
public void asyncMonitorMachineStats(final long hostId, final String ip);

/**
* 在主机ip上的端口port上启动一个进程,并check是否启动成功;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,48 @@
package com.sohu.cache.machine.impl;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.quartz.JobKey;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import redis.clients.jedis.HostAndPort;

import com.google.common.base.Strings;
import com.sohu.cache.async.AsyncService;
import com.sohu.cache.async.AsyncThreadPoolFactory;
import com.sohu.cache.async.KeyCallable;
import com.sohu.cache.constant.InstanceStatusEnum;
import com.sohu.cache.constant.MachineConstant;
import com.sohu.cache.constant.MachineInfoEnum.TypeEnum;
import com.sohu.cache.dao.InstanceDao;
import com.sohu.cache.dao.InstanceStatsDao;
import com.sohu.cache.dao.MachineDao;
import com.sohu.cache.dao.MachineStatsDao;
import com.sohu.cache.entity.*;
import com.sohu.cache.entity.InstanceInfo;
import com.sohu.cache.entity.InstanceStats;
import com.sohu.cache.entity.MachineInfo;
import com.sohu.cache.entity.MachineMemInfo;
import com.sohu.cache.entity.MachineStats;
import com.sohu.cache.exception.SSHException;
import com.sohu.cache.machine.MachineCenter;
import com.sohu.cache.machine.PortGenerator;
Expand All @@ -25,33 +59,6 @@
import com.sohu.cache.web.component.EmailComponent;
import com.sohu.cache.web.component.MobileAlertComponent;

import org.apache.commons.lang.StringUtils;
import org.quartz.JobKey;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import redis.clients.jedis.HostAndPort;

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;

/**
* 机器接口的实现
* User: lingguo
Expand Down Expand Up @@ -85,6 +92,13 @@ public class MachineCenterImpl implements MachineCenter {
* 手机短信报警
*/
private MobileAlertComponent mobileAlertComponent;

private AsyncService asyncService;

public void init() {
asyncService.assemblePool(AsyncThreadPoolFactory.MACHINE_POOL,
AsyncThreadPoolFactory.MACHINE_THREAD_POOL);
}

/**
* 为当前机器收集信息创建trigger并部署
Expand Down Expand Up @@ -120,6 +134,21 @@ public boolean unDeployMachineCollection(long hostId, String ip) {
return schedulerCenter.unscheduleJob(collectionTriggerKey);
}

//异步执行任务
public void asyncCollectMachineInfo(final long hostId, final long collectTime, final String ip) {
String key = "collect-machine-"+hostId+"-"+ip+"-"+collectTime;
asyncService.submitFuture(AsyncThreadPoolFactory.MACHINE_POOL, new KeyCallable<Boolean>(key) {
public Boolean execute() {
try {
collectMachineInfo(hostId, collectTime, ip);
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
}
});
}

/**
* 收集当前host的状态信息,保存到mysql;
Expand Down Expand Up @@ -205,6 +234,21 @@ public boolean unDeployMachineMonitor(long hostId, String ip) {
return schedulerCenter.unscheduleJob(monitorTriggerKey);
}

//异步执行任务
public void asyncMonitorMachineStats(final long hostId, final String ip) {
String key = "monitor-machine-"+hostId+"-"+ip;
asyncService.submitFuture(AsyncThreadPoolFactory.MACHINE_POOL, new KeyCallable<Boolean>(key) {
public Boolean execute() {
try {
monitorMachineStats(hostId, ip);
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
return false;
}
}
});
}

/**
* 监控机器的状态
Expand Down Expand Up @@ -650,6 +694,8 @@ public boolean unDeployServerCollection(long hostId, String ip) {
return schedulerCenter.unscheduleJob(collectionTriggerKey);
}


public void setAsyncService(AsyncService asyncService) {
this.asyncService = asyncService;
}

}
Loading

0 comments on commit 620a925

Please sign in to comment.