监控服务程序调度算法实现(一)

2014-11-24 09:10:14 · 作者: · 浏览: 2

监控服务程序实现调度算法


完成nginx服务监控(从nginx配置解析出对应的服务作为监控对象之五,还有可以从数据库里读出待监控的服务)与更新服务后的监控算法:


处理休眠队列---------将所有的待监控服务记录放入一个优先级队列里(休眠队列,最小堆的数据结构,堆顶为绝对间隔时间最小的,优先执行),每次只需要检查堆顶就可以了,需要执行的放进执行队列里,删除的不加入执行队列


执行线程---------将执行列里的记录抛给异步执行的池里,每一个都是异步调用运行


回收线程----------运行完成的请求回收休眠队列,不回收已删除的。


更新线程---------定时加载新的数据,设置好绝对间隔时间,放入休眠队列


废话少说,主要实现代码如下。。


package com.wole.monitor;


import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;


import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


import com.wole.monitor.dao.ServiceDao;
import com.wole.servicemonitor.util.ServiceUtils;


/**
* 管理并调度某一个服务数据源的监控池
* @author yzygenuine
*
*/
public class MonitorsManage {
private final static Logger logger = LoggerFactory.getLogger(MonitorsManage.class);


private ServiceDao dao;


/**
* 执行的一个并发池
*/
private Executor commExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue());


/**
*
*/
private CompletionService completionService = new ExecutorCompletionService(commExecutor);


/**
* 正在执行中的MonitorService集合
*/
private ConcurrentHashSet currentSet = new ConcurrentHashSet();


/**
* 等待优先级队列
*/
private Queue sleepQueue = new PriorityBlockingQueue();


/**
* 执行队列
*/
private Queue executeQueue = new LinkedBlockingQueue();


/**
* 是否关闭
*/
private AtomicBoolean isClose = new AtomicBoolean(false);


/**
* 生产者启动时间
*/
private AtomicLong startTime = new AtomicLong(0);
/**
* 相对于启动的间隔时间
*/
private AtomicLong intervalTime = new AtomicLong(0);


public void close() {
logger.info("closing................");
isClose.compareAndSet(false, true);
}



public void init() {
logger.info("初始化");


}


public void work() {
logger.info("开始工作");
// 生产者启动工作


Thread productThread = new Thread(new ProductMonitor(1000));
// 消费者启动工作
Thread consumerThread = new Thread(new ConsumerMonitor(1000));
// 回收者启动工作
Thread recoverThread = new Thread(new RecoverMonitor(1000));


// 启动定时加载数据工作
Thread refreshThread = new Thread(new RefreshMonitorService(60000, dao));
productThread.start();
consumerThread.start();
recoverThread.start();
refreshThread.start();


}


/**
* 生产者
*
* @author yzygenuine
*
*/
class ProductMonitor implements Runnable {
long sleepTime = 1000;


public ProductMonitor(long sleepTime) {
this.sleepTime = sleepTime;
}


@Override
public void run() {
logger.info("生产者开启工作");
// 开始进行定时监控
long now = System.currentTimeMillis();
long lastTime = now;
star