`
Missing1984
  • 浏览: 12620 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

ThreadPool,ObjectPool和任务控制

阅读更多

New a threadPool

ThreadPoolExecutor executorService = new ThreadPoolExecutor(
				poolCoreSize, poolMaxSize, timeout,
				java.util.concurrent.TimeUnit.MILLISECONDS, buff, handler);

 

poolcoreSize:  Pool至少运行的线程数量

poolMaxSize:   Pool最多运行的线程数量

timeout:空闲线程发呆时间,如果某线程发呆时间>timeout 而当前线程数量>poolcoreSize,就会终止该线程

buff:线程池所用的缓冲队列 实现BlockingQueue的那几个都能用,能设置定长和无限长度

handler: 当线程池线程数到达poolMaxSize buff 又满了,有新的任务丢到线程池的时候就会call这个hander进行处理.默认提供了几种处理方式,也可以自己实现,例如

public class PoolRejectHandler implements RejectedExecutionHandler {

    /* (non-Javadoc)
     * @see java.util.concurrent.RejectedExecutionHandler#rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)
     */
    public void rejectedExecution(Runnable arg0, ThreadPoolExecutor arg1) {
       //log process is slower then produce
       
       //put back to inqueue
        BaseTask task = (BaseTask)arg0;
       
        //task.pushToInQueue(task.getInObject());
        System.out.println("!!!!!!overload!!!");
        try {
            task.getInQueue().put(task.getInObject());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }    
}

 这里我把这个task丢回一开始的任务队列了,这里可以实现自己的处理逻辑

 

New and excute  task

任何实现runnable 或者callable接口的类 都可以作为一个任务丢到线程池里面运行

executorService.execute(processTask);

 

ThreadPool 调度策略

条件: 线程数<poolcoreSize, buff未满,有新的任务提交   结果:新建线程处理这个任务

条件: 线程数>=poolcoreSize, buff未满,有新的任务提交  结果:把任务丢到buff对列中

条件: poolMaxSize>线程数>=poolcoreSize, buff满,有新的任务提交  结果:新建线程

条件: poolMaxSize=线程数>poolcoreSize, buff满,有新的任务提交  结果:调用handler处理这个任务

 

 

这里需要讨论的就是最后那个情况,虽然threadpool提供了handler来处理这种超出负荷的情况,但是如果能从源头就直接控制提交到线程池的任务数量,感觉上会好很多

 

 

这里我用到一个Manager类和ObjectPool来处理任务的提交

 

Manager类实现Runnable 和 DispatchListener 接口 ,在Runnable里面处理任务生成和分发,DispatchListener接口中实现任务对象的回收和结果收集(示意代码)

public class DispathManager implements Runnable, DispatchListener {
	public void run() {
		while (true) {
			try {
				
				// get task from taskobjectpool. will wait here if the active taskobjects reach the tasklimit
				// so we can  control the limit of running task in the threadpool  
				BaseTask processTask = (BaseTask) taskPool.borrowObject();


				//set task params
				processTask.setListener(this);


				// put in threadpool  
				executorService.execute(processTask);
			} catch (InterruptedException e) {
				break;
			} catch (Exception e) {
				// log dispatch queue exception
				e.printStackTrace();
			}
		}
	}

	public void recycleTask(BaseTask baseTask) {
		// TODO Auto-generated method stub
		try {
			taskPool.returnObject(baseTask);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

 

然后在Task执行完毕的时候进行回收

    public void run() {
        try {
	//task logic here

            // return task
            listener.recycleTask(this);
        } catch (Exception e) {
            listener.onTaskError(e, inObject);
        }
    }

 

这样就不必每次都重新new 这个task 对一些重型的task,会提高性能

 

这里用到了ObjectPool,是Apache Common下的管理ObjectPool的类,使用如下:

 

 

New a ObjectPool

ObjectPool taskPool = new GenericObjectPool(new TaskPoolFactory(taskClass), taskLimit,
				GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);

 

其中TaskPoolFactory如下 taskClass是重配置文件读入的

public class TaskPoolFactory extends BasePoolableObjectFactory {

    /* (non-Javadoc)
     * @see org.apache.commons.pool.BasePoolableObjectFactory#makeObject()
     */
    private Class c;
    
    public TaskPoolFactory(Class c) {
        super();
        this.c = c;
    }
    @Override
    public Object makeObject() throws Exception {
        // TODO Auto-generated method stub
        return c.newInstance();
    }
}

 

taskLimit是允许借出的最大的数目(借出但是没有归还)

当借出的数目已经达到最大,这个时候还有借出请求 GenericObjectPool.WHEN_EXHAUSTED_BLOCK 就提供了策略(还有几种的) 这里设置为无限等待,也就是线程会wait在这里 直到有借出的task被归还

 

Objectpool使用参照Manager和task类

 

最后, 控制taskLimit的大小 就可以控制ThreadPool中处理任务的多少,也就是正在处理的task+buff里的task 必定<= taskLimit,这样就在源头控制了提交任务的数量,保证threadpool不会超负荷。

 

 

最后推荐大家的设置就是 ThreadPool buff设置为无限大 然后用poolcoresize和taskLimit来控制负荷能力(视不同机器设置不同的值)

 

 

 

欢迎大家拍砖...

0
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics