Java定时任务
Timer
Timer的使用
TimerTask是单线程的,与系统时间有关,报异常后无法自动恢复的。
1
2
3
4
5
6
7
8
9
10TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println("TimerTask run: " + LocalDateTime.now());
}
};
//延迟0ms,每隔1000ms执行一次
Timer timer = new Timer();
timer.schedule(timerTask, 0, 1000);
## Timer的源码
Timer维护着两个对象,一个是任务队列,一个是线程。Timer添加任务的方法最终都会调用其中的一个私有方法,这个方法有三个入参。任务:task
,延迟多久执行第一次:time
,以及执行的间隔:period
。这里有两点要注意,第一点是Timer使用System.currentTimeMillis()
来判断任务执行时间,所以如果电脑时间被修改了,任务的执行就会乱套了。第二点是period
参数为正数时表示任务的以任务开始时间为间隔周期执行,为负数则是以任务结束时间为间隔周期执行,而为零这是非周期执行。Timer是线程安全的,因为它会在私有方法里获取锁。经过一轮设定,任务最后会放到任务队列里,然后调用任务队列的notify方法通知线程去任务队列里检查新任务,这也是为什么TimerThread
的构造方法需要任务队列做入参的原因。
1 |
|
TaskQueue
TaskQueue就不贴代码了。TaskQueue是个队列,内部有个数组,数据结构是二叉堆,以任务的执行时间来排序。由于是数组,所以会像ArrayList一样自动扩容。为TimerThread提供获取删除最临进要执行的任务等方法。
TimerThread的源码
1 |
|
ScheduledExecutorService
ScheduledExecutorService的使用
1 |
|
ScheduledExecutorService的源码
ScheduledExecutorService通过execute或者submit提交任务,但最后都会调用schedule方法。在schedule方法里会将任务封装为RunnableScheduledFuture,然后通过delayedExecute方法加入任务队列。在delayedExecute方法里,在加入任务队列之前还会检查线程池状态等,以执行饱和策略。而任务队列使用的是内部类DelayedWorkQueue。DelayedWorkQueue是按照任务执行时间进行排序,这跟Timer的一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
当任务执行时:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
//无法执行,取消任务
cancel(false);
else if (!periodic)
//不是周期性执行的
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//周期性执行的,在if判断里执行
//然后设置下次执行时间
setNextRunTime();
//放到队列里等待再次执行
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
spring boot的任务调度
首先是EnableScheduling注解,EnableScheduling注解只是导入了SchedulingConfiguration配置类。而SchedulingConfiguration配置类只是注册了一个ScheduledAnnotationBeanPostProcessor的bean。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
@Configuration
public class SchedulingConfiguration {
@Bean(name = AnnotationConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
而在ScheduledAnnotationBeanPostProcessor中,不知为何大家说会调用其中的postProcessAfterInitialization方法。postProcessAfterInitialization方法会遍历全部需要任务调度的方法,塞到processScheduled方法里。processScheduled方法会根据任务的类型,即周期还是cron,添加到registrar对象里。而registrar对象是ScheduledTaskRegistrar类。ScheduledTaskRegistrar里面有个成员变量TaskScheduler。最终任务会被添加到TaskScheduler里。TaskScheduler只是个接口,实现类是ConcurrentTaskScheduler。ConcurrentTaskScheduler又有个成员变量ScheduledExecutorService。而ConcurrentTaskScheduler的这个ScheduledExecutorService是通过Executors.newSingleThreadScheduledExecutor()
方法来创建的。而newSingleThreadScheduledExecutor方法默认是单线程。然后相信之后大家都懂的了。(真TM绕)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52@Override
public Object postProcessAfterInitialization(final Object bean, String beanName) {
//。。。
for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (Scheduled scheduled : entry.getValue()) {
processScheduled(scheduled, method, bean);
}
}
//。。。
return bean;
}
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
//获取corn类型
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
String zone = scheduled.zone();
//放入cron任务列表中(不执行)
this.registrar.addCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)));
}
//执行频率类型(long类型)
long fixedRate = scheduled.fixedRate();
String fixedDelayString = scheduled.fixedDelayString();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
//放入FixedRate任务列表中(不执行)(registrar为ScheduledTaskRegistrar)
this.registrar.addFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay));
}
//执行频率类型(字符串类型,不接收参数计算如:600*20)
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
fixedRate = Long.parseLong(fixedRateString);
//放入FixedRate任务列表中(不执行)
this.registrar.addFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay));
}
return bean;
}
public class Executors {
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
}
在spring
boot里,为了并发执行任务,一般有两种方法。一是使用@Async
注解跟@Scheduled
注解一起用。二是实现SchedulingConfigurer接口,重写configureTasks方法
1
2
3
4
5
6
7@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}
}
定时任务执行时间过长的策略
参考文章: